教程¶
使用 DataPipes¶
假设我们想通过以下步骤从 CSV 文件加载数据:
列出目录中的所有 CSV 文件
加载 CSV 文件
解析 CSV 文件并生成行
有一些内置的 DataPipes 可以帮助我们完成上述操作。
FileLister
- 列出目录中的文件Filter
- 根据给定的 功能FileOpener
- 使用文件路径并返回打开的文件 流CSVParser
- 使用文件流,解析 CSV 内容,并在 时间
例如,的源代码如下所示:CSVParser
@functional_datapipe("parse_csv")
class CSVParserIterDataPipe(IterDataPipe):
def __init__(self, dp, **fmtparams) -> None:
self.dp = dp
self.fmtparams = fmtparams
def __iter__(self) -> Iterator[Union[Str_Or_Bytes, Tuple[str, Str_Or_Bytes]]]:
for path, file in self.source_datapipe:
stream = self._helper.skip_lines(file)
stream = self._helper.strip_newline(stream)
stream = self._helper.decode(stream)
yield from self._helper.return_path(stream, path=path) # Returns 1 line at a time as List[str or bytes]
如另一节所述,DataPipes 可以使用其函数形式(推荐)或 类构造函数。管道可以按以下方式组装:
import torchdata.datapipes as dp
FOLDER = 'path/2/csv/folder'
datapipe = dp.iter.FileLister([FOLDER]).filter(filter_fn=lambda filename: filename.endswith('.csv'))
datapipe = dp.iter.FileOpener(datapipe, mode='rt')
datapipe = datapipe.parse_csv(delimiter=',')
for d in datapipe: # Iterating through the data
pass
您可以在此处找到内置 IterDataPipes 的完整列表,在此处找到 MapDataPipes 的完整列表。
使用 DataLoader¶
在本节中,我们将演示如何将 DataPipe 与 .
在大多数情况下,您应该能够通过作为 input arugment 传递来使用它
到 .有关的详细文档 ,
请访问此页面。DataLoader
dataset=datapipe
DataLoader
DataLoader
在此示例中,我们首先使用一个辅助函数,该函数生成一些具有随机标签和数据的 CSV 文件。
import csv
import random
def generate_csv(file_label, num_rows: int = 5000, num_features: int = 20) -> None:
fieldnames = ['label'] + [f'c{i}' for i in range(num_features)]
writer = csv.DictWriter(open(f"sample_data{file_label}.csv", "w"), fieldnames=fieldnames)
writer.writerow({col: col for col in fieldnames}) # writing the header row
for i in range(num_rows):
row_data = {col: random.random() for col in fieldnames}
row_data['label'] = random.randint(0, 9)
writer.writerow(row_data)
接下来,我们将构建 DataPipes 来读取和解析生成的 CSV 文件:
import numpy as np
import torchdata.datapipes as dp
def build_datapipes(root_dir="."):
datapipe = dp.iter.FileLister(root_dir)
datapipe = datapipe.filter(filter_fn=lambda filename: "sample_data" in filename and filename.endswith(".csv"))
datapipe = dp.iter.FileOpener(datapipe, mode='rt')
datapipe = datapipe.parse_csv(delimiter=",", skip_lines=1)
datapipe = datapipe.map(lambda row: {"label": np.array(row[0], np.int32),
"data": np.array(row[1:], dtype=np.float64)})
return datapipe
最后,我们将所有内容放在一起,并将 DataPipe 传递到 DataLoader 中。'__main__'
from torch.utils.data import DataLoader
if __name__ == '__main__':
num_files_to_generate = 3
for i in range(num_files_to_generate):
generate_csv(file_label=i)
datapipe = build_datapipes()
dl = DataLoader(dataset=datapipe, batch_size=50, shuffle=True)
first = next(iter(dl))
labels, features = first['label'], first['data']
print(f"Labels batch shape: {labels.size()}")
print(f"Feature batch shape: {features.size()}")
将打印以下语句以显示单个标签和特征的形状。
Labels batch shape: 50
Feature batch shape: torch.Size([50, 20])
您可以在此页面上找到各种研究领域的更多 DataPipe 实现示例。
实现自定义 DataPipe¶
目前,我们已经有大量的内置 DataPipe,我们希望它们能够覆盖最必要的 数据处理操作。如果它们都不支持您的需求,您可以创建自己的自定义 DataPipe。
作为一个指导性示例,让我们实现一个将可调用对象应用于输入迭代器的 an。对于 ,请查看 map 文件夹的示例,然后对方法而不是方法执行以下步骤。IterDataPipe
MapDataPipe
__getitem__
__iter__
命名¶
的命名约定是 “Operation”-er,后跟 或 ,因为每个
DataPipe 本质上是一个容器,用于将操作应用于从 source 生成的数据。为了简洁,
我们在 init 文件中仅别名为 “Operation-er”。对于我们的示例,我们将模块命名并为其别名,如下所示。DataPipe
IterDataPipe
MapDataPipe
DataPipe
IterDataPipe
MapperIterDataPipe
iter.Mapper
torchdata.datapipes
构造 函数¶
DataSet 现在通常构造为 ,因此每个 DataSet 通常都采用
source 作为其第一个参数。下面是一个简化版本的 Mapper 作为示例:DataPipes
DataPipe
DataPipe
from torchdata.datapipes.iter import IterDataPipe
class MapperIterDataPipe(IterDataPipe):
def __init__(self, source_dp: IterDataPipe, fn) -> None:
super().__init__()
self.source_dp = source_dp
self.fn = fn
注意:
避免在函数中从源 DataPipe 加载数据,以支持延迟数据加载和保存 记忆。
__init__
如果 instance 将数据保存在内存中,请注意数据的就地修改。当第二个 iterator 是从实例创建的,则数据可能已经更改。请将 class 作为每个迭代器的 data 的引用。
IterDataPipe
IterableWrapper
deepcopy
避免使用现有 DataPipes 的函数名称获取的变量名称。例如,是 是可用于调用 的函数名称。在 另一个可能会导致混乱。
.filter
FilterIterDataPipe
filter
IterDataPipe
迭 代¶
对于 ,需要一个函数来使用来自源的数据,然后
对 之前的数据应用操作。IterDataPipes
__iter__
IterDataPipe
yield
class MapperIterDataPipe(IterDataPipe):
# ... See __init__() defined above
def __iter__(self):
for d in self.dp:
yield self.fn(d)
长度¶
在许多情况下,如我们的示例所示,DataPipe 的方法返回
source DataPipe 的 API API 中。MapperIterDataPipe
__len__
class MapperIterDataPipe(IterDataPipe):
# ... See __iter__() defined above
def __len__(self):
return len(self.dp)
但是,请注意,这是可选的,并且通常不可取。For 在下面的 using DataPipes 部分中,未实现,因为每个文件中的行数
在加载之前是未知的。在某些特殊情况下,可以返回一个整数或引发
错误,具体取决于输入。在这些情况下,Error 必须是 a 才能支持 Python 的
内置函数,如 .__len__
IterDataPipe
CSVParserIterDataPipe
__len__
__len__
TypeError
list(dp)
使用函数式 API 注册 DataPipes¶
每个 DataPipe 都可以注册以支持使用 decorator 进行功能调用 。functional_datapipe
@functional_datapipe("map")
class MapperIterDataPipe(IterDataPipe):
# ...
然后可以使用其函数形式(推荐)或类构造函数构造 DataPipes 堆栈:
import torchdata.datapipes as dp
# Using functional form (recommended)
datapipes1 = dp.iter.FileOpener(['a.file', 'b.file']).map(fn=decoder).shuffle().batch(2)
# Using class constructors
datapipes2 = dp.iter.FileOpener(['a.file', 'b.file'])
datapipes2 = dp.iter.Mapper(datapipes2, fn=decoder)
datapipes2 = dp.iter.Shuffler(datapipes2)
datapipes2 = dp.iter.Batcher(datapipes2, 2)
在上面的例子中,和 表示完全相同的 s 堆栈。我们
建议使用 DataPipes 的函数形式。datapipes1
datapipes2
IterDataPipe