教程¶
使用DataPipes¶
假设我们要从以下步骤加载CSV文件中的数据:
列出目录中的所有CSV文件
加载CSV文件
解析CSV文件并返回行
有一些 内置的DataPipes 可以帮助我们完成上述操作。
FileLister- 列出目录中的文件Filter- 根据给定的函数过滤DataPipe中的元素FileOpener- 消耗文件路径并返回打开的文件流CSVParser- 消耗文件流,解析CSV内容,并一次返回一行已解析的数据
作为一个例子,CSVParser的源代码看起来像这样:
@functional_datapipe("parse_csv")
class CSVParserIterDataPipe(IterDataPipe):
def __init__(self, dp, **fmtparams) -> None:
self.dp = dp
self.fmtparams = fmtparams
def __iter__(self) -> Iterator[Union[Str_Or_Bytes, Tuple[str, Str_Or_Bytes]]]:
for path, file in self.source_datapipe:
stream = self._helper.skip_lines(file)
stream = self._helper.strip_newline(stream)
stream = self._helper.decode(stream)
yield from self._helper.return_path(stream, path=path) # Returns 1 line at a time as List[str or bytes]
如前所述,在不同的部分中,DataPipes 可以通过它们的功能形式(推荐)或类构造函数来调用。管道可以按照以下方式组装:
import torchdata.datapipes as dp
FOLDER = 'path/2/csv/folder'
datapipe = dp.iter.FileLister([FOLDER]).filter(filter_fn=lambda filename: filename.endswith('.csv'))
datapipe = dp.iter.FileOpener(datapipe, mode='rt')
datapipe = datapipe.parse_csv(delimiter=',')
for d in datapipe: # Iterating through the data
pass
您可以在此处找到内置的 IterDataPipes 列表 和 MapDataPipes 列表。
使用DataLoader进行工作¶
在本节中,我们将演示如何使用DataPipe与DataLoader。
大多数情况下,您只需将dataset=datapipe作为输入参数传递
到DataLoader中即可。有关DataLoader的详细文档,
请访问此页面。
对于这个例子,我们将首先有一个辅助函数,该函数生成一些包含随机标签和数据的CSV文件。
import csv
import random
def generate_csv(file_label, num_rows: int = 5000, num_features: int = 20) -> None:
fieldnames = ['label'] + [f'c{i}' for i in range(num_features)]
writer = csv.DictWriter(open(f"sample_data{file_label}.csv", "w"), fieldnames=fieldnames)
writer.writerow({col: col for col in fieldnames}) # writing the header row
for i in range(num_rows):
row_data = {col: random.random() for col in fieldnames}
row_data['label'] = random.randint(0, 9)
writer.writerow(row_data)
接下来,我们将构建我们的DataPipes来读取和解析生成的CSV文件。
import numpy as np
import torchdata.datapipes as dp
def build_datapipes(root_dir="."):
datapipe = dp.iter.FileLister(root_dir)
datapipe = datapipe.filter(filter_fn=lambda filename: "sample_data" in filename and filename.endswith(".csv"))
datapipe = dp.iter.FileOpener(datapipe, mode='rt')
datapipe = datapipe.parse_csv(delimiter=",", skip_lines=1)
datapipe = datapipe.map(lambda row: {"label": np.array(row[0], np.int32),
"data": np.array(row[1:], dtype=np.float64)})
return datapipe
Lastly, we will put everything together in '__main__' and pass the DataPipe into the DataLoader.
from torch.utils.data import DataLoader
if __name__ == '__main__':
num_files_to_generate = 3
for i in range(num_files_to_generate):
generate_csv(file_label=i)
datapipe = build_datapipes()
dl = DataLoader(dataset=datapipe, batch_size=50, shuffle=True)
first = next(iter(dl))
labels, features = first['label'], first['data']
print(f"Labels batch shape: {labels.size()}")
print(f"Feature batch shape: {features.size()}")
以下语句将被打印出来,以显示单个批次标签和特征的形状。
Labels batch shape: 50
Feature batch shape: torch.Size([50, 20])
您可以在此页面上找到更多适用于各种研究领域的DataPipe实现示例。
实现自定义数据管道¶
目前,我们已经拥有大量的内置DataPipes,并期望它们能够覆盖大多数必要的数据处理操作。如果它们中的任何一个都不支持您的需求,您可以创建自己的自定义DataPipe。
作为一个引导示例,让我们实现一个 IterDataPipe 该应用可调用到输入迭代器。对于
MapDataPipe,请参阅
map
文件夹中的示例,并按照以下步骤执行 __getitem__ 方法而不是 __iter__ 方法。
命名¶
命名约定为 DataPipe 的是“操作”-er,后跟 IterDataPipe 或 MapDataPipe,因为每个
DataPipe 实际上是一个容器,用于对从源 DataPipe 生成的数据应用操作。为了简洁起见,
我们在 init 文件中将其别名为“操作-er”。对于我们的 IterDataPipe 示例,我们将模块命名为
MapperIterDataPipe 并在 torchdata.datapipes 下将其别名为 iter.Mapper。
构造函数¶
数据集现在通常被构造为堆栈的DataPipes,因此每个DataPipe通常将一个
源DataPipe作为其第一个参数。以下是一个简化版本的Mapper作为示例:
from torchdata.datapipes.iter import IterDataPipe
class MapperIterDataPipe(IterDataPipe):
def __init__(self, source_dp: IterDataPipe, fn) -> None:
super().__init__()
self.source_dp = source_dp
self.fn = fn
Note:
避免在
__init__函数中从源DataPipe加载数据,以支持懒加载数据并节省内存。如果
IterDataPipe实例在内存中持有数据,请注意数据的就地修改。当从该实例创建第二个迭代器时,数据可能已经改变。请参考deepcopy中的类为每个迭代器处理数据。避免使用与现有DataPipes功能名称相同的变量名。例如,
.filter是可以用来调用FilterIterDataPipe的功能名称。在另一个IterDataPipe中使用名为filter的变量可能会导致混淆。
迭代器¶
For IterDataPipes, an __iter__ function is needed to consume data from the source IterDataPipe then
apply the operation over the data before yield.
class MapperIterDataPipe(IterDataPipe):
# ... See __init__() defined above
def __iter__(self):
for d in self.dp:
yield self.fn(d)
长度¶
在许多情况下,就像我们的 MapperIterDataPipe 示例中一样,DataPipe 的 __len__ 方法返回源 DataPipe 的长度。
class MapperIterDataPipe(IterDataPipe):
# ... See __iter__() defined above
def __len__(self):
return len(self.dp)
但是请注意,__len__ 对于 IterDataPipe 是可选的,并且通常不建议使用。对于下面“使用 DataPipes”部分中的 CSVParserIterDataPipe,没有实现 __len__,因为在加载文件之前,每个文件中的行数是未知的。在某些特殊情况下,__len__ 可以被设计为根据输入返回一个整数或抛出一个错误。在这些情况下,错误必须是一个 TypeError,以支持 Python 的内置函数,如 list(dp)。
使用功能API注册DataPipes¶
每个DataPipe都可以通过装饰器functional_datapipe注册以支持功能调用。
@functional_datapipe("map")
class MapperIterDataPipe(IterDataPipe):
# ...
数据管道堆栈可以使用它们的功能形式(推荐)或类构造函数来构建。
import torchdata.datapipes as dp
# Using functional form (recommended)
datapipes1 = dp.iter.FileOpener(['a.file', 'b.file']).map(fn=decoder).shuffle().batch(2)
# Using class constructors
datapipes2 = dp.iter.FileOpener(['a.file', 'b.file'])
datapipes2 = dp.iter.Mapper(datapipes2, fn=decoder)
datapipes2 = dp.iter.Shuffler(datapipes2)
datapipes2 = dp.iter.Batcher(datapipes2, 2)
在上面的例子中,datapipes1和datapipes2代表相同的堆栈IterDataPipes。我们
推荐使用DataPipes的功能形式。