目录

教程

使用DataPipes

假设我们要从以下步骤加载CSV文件中的数据:

  • 列出目录中的所有CSV文件

  • 加载CSV文件

  • 解析CSV文件并返回行

  • 将我们的数据集分割成训练集和验证集

有一些 内置的DataPipes 可以帮助我们完成上述操作。

作为一个例子,CSVParser的源代码看起来像这样:

@functional_datapipe("parse_csv")
class CSVParserIterDataPipe(IterDataPipe):
    def __init__(self, dp, **fmtparams) -> None:
        self.dp = dp
        self.fmtparams = fmtparams

    def __iter__(self) -> Iterator[Union[Str_Or_Bytes, Tuple[str, Str_Or_Bytes]]]:
        for path, file in self.source_datapipe:
            stream = self._helper.skip_lines(file)
            stream = self._helper.strip_newline(stream)
            stream = self._helper.decode(stream)
            yield from self._helper.return_path(stream, path=path)  # Returns 1 line at a time as List[str or bytes]

如前所述,在不同的部分中,DataPipes 可以通过它们的功能形式(推荐)或类构造函数来调用。管道可以按照以下方式组装:

import torchdata.datapipes as dp

FOLDER = 'path/2/csv/folder'
datapipe = dp.iter.FileLister([FOLDER]).filter(filter_fn=lambda filename: filename.endswith('.csv'))
datapipe = dp.iter.FileOpener(datapipe, mode='rt')
datapipe = datapipe.parse_csv(delimiter=',')
N_ROWS = 10000  # total number of rows of data
train, valid = datapipe.random_split(total_length=N_ROWS, weights={"train": 0.5, "valid": 0.5}, seed=0)

for x in train:  # Iterating through the training dataset
    pass

for y in valid:  # Iterating through the validation dataset
    pass

您可以在此处找到内置的 IterDataPipes 列表MapDataPipes 列表

使用DataLoader进行工作

在本节中,我们将演示如何使用DataPipe与DataLoader。 大多数情况下,您只需将dataset=datapipe作为输入参数传递 到DataLoader中即可。有关DataLoader的详细文档, 请访问此页面

对于这个例子,我们将首先有一个辅助函数,该函数生成一些包含随机标签和数据的CSV文件。

import csv
import random

def generate_csv(file_label, num_rows: int = 5000, num_features: int = 20) -> None:
    fieldnames = ['label'] + [f'c{i}' for i in range(num_features)]
    writer = csv.DictWriter(open(f"sample_data{file_label}.csv", "w", newline=''), fieldnames=fieldnames)
    writer.writeheader()
    for i in range(num_rows):
        row_data = {col: random.random() for col in fieldnames}
        row_data['label'] = random.randint(0, 9)
        writer.writerow(row_data)

接下来,我们将构建我们的DataPipes来读取和解析生成的CSV文件。请注意,我们更喜欢使用定义好的函数而不是lambda函数作为DataPipes,因为前者可以通过pickle进行序列化。

import numpy as np
import torchdata.datapipes as dp

def filter_for_data(filename):
    return "sample_data" in filename and filename.endswith(".csv")

def row_processer(row):
    return {"label": np.array(row[0], np.int32), "data": np.array(row[1:], dtype=np.float64)}

def build_datapipes(root_dir="."):
    datapipe = dp.iter.FileLister(root_dir)
    datapipe = datapipe.filter(filter_fn=filter_for_data)
    datapipe = datapipe.open_files(mode='rt')
    datapipe = datapipe.parse_csv(delimiter=",", skip_lines=1)
    # Shuffle will happen as long as you do NOT set `shuffle=False` later in the DataLoader
    datapipe = datapipe.shuffle()
    datapipe = datapipe.map(row_processer)
    return datapipe

Lastly, we will put everything together in '__main__' and pass the DataPipe into the DataLoader. Note that if you choose to use Batcher while setting batch_size > 1 for DataLoader, your samples will be batched more than once. You should choose one or the other.

from torch.utils.data import DataLoader

if __name__ == '__main__':
    num_files_to_generate = 3
    for i in range(num_files_to_generate):
        generate_csv(file_label=i, num_rows=10, num_features=3)
    datapipe = build_datapipes()
    dl = DataLoader(dataset=datapipe, batch_size=5, num_workers=2)
    first = next(iter(dl))
    labels, features = first['label'], first['data']
    print(f"Labels batch shape: {labels.size()}")
    print(f"Feature batch shape: {features.size()}")
    print(f"{labels = }\n{features = }")
    n_sample = 0
    for row in iter(dl):
        n_sample += 1
    print(f"{n_sample = }")

以下语句将被打印出来,以显示单个批次标签和特征的形状。

Labels batch shape: torch.Size([5])
Feature batch shape: torch.Size([5, 3])
labels = tensor([8, 9, 5, 9, 7], dtype=torch.int32)
features = tensor([[0.2867, 0.5973, 0.0730],
        [0.7890, 0.9279, 0.7392],
        [0.8930, 0.7434, 0.0780],
        [0.8225, 0.4047, 0.0800],
        [0.1655, 0.0323, 0.5561]], dtype=torch.float64)
n_sample = 12

The reason why n_sample = 12 is because ShardingFilter (datapipe.sharding_filter()) was not used, such that each worker will independently return all samples. In this case, there are 10 rows per file and 3 files, with a batch size of 5, that gives us 6 batches per worker. With 2 workers, we get 12 total batches from the DataLoader.

为了使DataPipe分片与DataLoader工作,我们需要添加以下内容。

def build_datapipes(root_dir="."):
    datapipe = ...
    # Add the following line to `build_datapipes`
    # Note that it is somewhere after `Shuffler` in the DataPipe line, but before expensive operations
    datapipe = datapipe.sharding_filter()
    return datapipe

当我们重新运行时,我们会得到:

...
n_sample = 6

Note:

  • ShardingFilter (datapipe.sharding_filter) 作为流程中的早期步骤,特别是在解码等昂贵操作之前,以避免在工作/分布式进程中重复这些昂贵的操作。

  • 对于需要分片的数据源,添加ShufflerShardingFilter之前至关重要,以确保数据在被分割成分片之前在全球范围内打乱。否则,每个工作进程将始终处理所有epoch中的同一分片数据。这意味着每个批次仅包含来自同一分片的数据,导致训练期间的低准确性。然而,这不适用于已经为每个多/分布式进程分片的数据源,因为ShardingFilter不再需要在管道中呈现。

  • 在某些情况下,将Shuffler更早地放置在管道中可能导致性能下降,因为一些操作(例如解压缩)在顺序读取时更快。在这种情况下,我们建议在打乱之前(可能在任何数据加载之前)先解压缩文件。

您可以在此页面上找到更多适用于各种研究领域的DataPipe实现示例。

实现自定义数据管道

目前,我们已经拥有大量的内置DataPipes,并期望它们能够覆盖大多数必要的数据处理操作。如果它们中的任何一个都不支持您的需求,您可以创建自己的自定义DataPipe。

作为一个引导示例,让我们实现一个 IterDataPipe 该应用可调用到输入迭代器。对于 MapDataPipe,请参阅 map 文件夹中的示例,并按照以下步骤执行 __getitem__ 方法而不是 __iter__ 方法。

命名

命名约定为 DataPipe 的是“操作”-er,后跟 IterDataPipeMapDataPipe,因为每个 DataPipe 实际上是一个容器,用于对从源 DataPipe 生成的数据应用操作。为了简洁起见, 我们在 init 文件中将其别名为“操作-er”。对于我们的 IterDataPipe 示例,我们将模块命名为 MapperIterDataPipe 并在 torchdata.datapipes 下将其别名为 iter.Mapper

对于功能方法名,命名约定是 datapipe.<operation>。例如, 功能方法名 Mappermap,因此可以通过 datapipe.map(...) 调用。

构造函数

数据集现在通常被构造为堆栈的DataPipes,因此每个DataPipe通常将一个 源DataPipe作为其第一个参数。以下是一个简化版本的Mapper作为示例:

from torchdata.datapipes.iter import IterDataPipe

class MapperIterDataPipe(IterDataPipe):
    def __init__(self, source_dp: IterDataPipe, fn) -> None:
        super().__init__()
        self.source_dp = source_dp
        self.fn = fn

Note:

  • 避免在__init__函数中从源DataPipe加载数据,以支持懒加载数据并节省内存。

  • 如果IterDataPipe实例在内存中持有数据,请注意数据的就地修改。当从该实例创建第二个迭代器时,数据可能已经改变。请参考deepcopy中的为每个迭代器处理数据。

  • 避免使用已被现有DataPipes功能名称占用的变量名。例如,.filter 是可以用来调用 FilterIterDataPipe 的功能名称。在另一个 IterDataPipe 中使用名为 filter 的变量可能会引起混淆。

迭代器

For IterDataPipes, an __iter__ function is needed to consume data from the source IterDataPipe then apply the operation over the data before yield.

class MapperIterDataPipe(IterDataPipe):
    # ... See __init__() defined above

    def __iter__(self):
        for d in self.dp:
            yield self.fn(d)

长度

在许多情况下,就像我们的 MapperIterDataPipe 示例中一样,DataPipe 的 __len__ 方法返回源 DataPipe 的长度。

class MapperIterDataPipe(IterDataPipe):
    # ... See __iter__() defined above

    def __len__(self):
        return len(self.dp)

但是请注意,__len__ 对于 IterDataPipe 是可选的,并且通常不建议使用。对于下面“使用 DataPipes”部分中的 CSVParserIterDataPipe,没有实现 __len__,因为在加载文件之前,每个文件中的行数是未知的。在某些特殊情况下,__len__ 可以被设计为根据输入返回一个整数或抛出一个错误。在这些情况下,错误必须是一个 TypeError,以支持 Python 的内置函数,如 list(dp)

使用功能API注册DataPipes

每个DataPipe都可以通过装饰器functional_datapipe注册以支持功能调用。

@functional_datapipe("map")
class MapperIterDataPipe(IterDataPipe):
   # ...

数据管道堆栈可以使用它们的功能形式(推荐)或类构造函数来构建。

import torchdata.datapipes as dp

# Using functional form (recommended)
datapipes1 = dp.iter.FileOpener(['a.file', 'b.file']).map(fn=decoder).shuffle().batch(2)
# Using class constructors
datapipes2 = dp.iter.FileOpener(['a.file', 'b.file'])
datapipes2 = dp.iter.Mapper(datapipes2, fn=decoder)
datapipes2 = dp.iter.Shuffler(datapipes2)
datapipes2 = dp.iter.Batcher(datapipes2, 2)

在上面的例子中,datapipes1datapipes2代表相同的堆栈IterDataPipes。我们 推荐使用DataPipes的功能形式。

与云存储提供商合作

在本节中,我们展示了使用内置的fsspec DataPipes访问AWS S3、Google云存储和Azure云存储的示例。 尽管这里只讨论了这两个提供商,但通过添加其他库,fsspec DataPipes 也应该允许您连接到其他存储系统(已知实现列表)。

请在 GitHub 上告知我们,如果您需要其他云存储提供商的支持请求,或者您有代码示例可以与社区分享。

访问AWS S3使用fsspec个DataPipes

这需要安装库 fsspec文档)和 s3fss3fs GitHub 仓库)。

你可以通过传递一个以 "s3://BUCKET_NAME" 开头的路径来列出 S3 存储桶目录中的文件,然后调用 FSSpecFileLister (.list_files_by_fsspec(...))。

from torchdata.datapipes.iter import IterableWrapper

dp = IterableWrapper(["s3://BUCKET_NAME"]).list_files_by_fsspec()

您可以使用 FSSpecFileOpener (.open_files_by_fsspec(...)) 打开文件,并流式传输它们 (如果支持该文件格式)。

请注意,您也可以通过参数 kwargs_for_open 提供其他参数。这对于访问特定存储桶版本等目的非常有用,您可以通过传递 {version_id: 'SOMEVERSIONID'} 来实现(更多关于 S3 存储桶版本控制的信息,请参阅 s3fs详细信息)。支持的参数因您访问的(云)文件系统而异。

在下面的例子中,我们正在使用 TarArchiveLoader (.load_from_tar(mode="r|")), 与通常的mode="r:". 这允许我们在不首先将整个归档下载到内存的情况下开始处理归档内的数据。

from torchdata.datapipes.iter import IterableWrapper
dp = IterableWrapper(["s3://BUCKET_NAME/DIRECTORY/1.tar"])
dp = dp.open_files_by_fsspec(mode="rb", anon=True).load_from_tar(mode="r|") # Streaming version
# The rest of data processing logic goes here

Finally, FSSpecFileSaver也是可用的,用于将数据写入云。

访问Google Cloud Storage (GCS) 通过fsspec DataPipes

这需要安装库 fsspec文档)和 gcsfsgcsfs GitHub 仓库)。

你可以通过指定一个以 "gcs://BUCKET_NAME" 开头的路径来列出 GCS 存储桶目录中的文件。例如,下面示例中的存储桶名称为 uspto-pair

from torchdata.datapipes.iter import IterableWrapper

dp = IterableWrapper(["gcs://uspto-pair/"]).list_files_by_fsspec()
print(list(dp))
# ['gcs://uspto-pair/applications', 'gcs://uspto-pair/docs', 'gcs://uspto-pair/prosecution-history-docs']

这是一个加载名为05900035.zip的zip文件uspto-pairapplications目录内的bucket中的示例。

from torchdata.datapipes.iter import IterableWrapper

dp = IterableWrapper(["gcs://uspto-pair/applications/05900035.zip"]) \
        .open_files_by_fsspec(mode="rb") \
        .load_from_zip()
# Logic to process those archive files comes after
for path, filestream in dp:
    print(path, filestream)
# gcs:/uspto-pair/applications/05900035.zip/05900035/README.txt, StreamWrapper<...>
# gcs:/uspto-pair/applications/05900035.zip/05900035/05900035-address_and_attorney_agent.tsv, StreamWrapper<...>
# gcs:/uspto-pair/applications/05900035.zip/05900035/05900035-application_data.tsv, StreamWrapper<...>
# gcs:/uspto-pair/applications/05900035.zip/05900035/05900035-continuity_data.tsv, StreamWrapper<...>
# gcs:/uspto-pair/applications/05900035.zip/05900035/05900035-transaction_history.tsv, StreamWrapper<...>

访问Azure Blob存储使用fsspec个DataPipes

这需要安装库 fsspec文档)和 adlfsadlfs GitHub 仓库)。 您可以通过提供以 abfs:// 开头的 URI 来访问 Azure Data Lake Storage Gen2 中的数据。 例如, FSSpecFileLister (.list_files_by_fsspec(...)) 可以用于列出容器中的目录文件:

from torchdata.datapipes.iter import IterableWrapper

storage_options={'account_name': ACCOUNT_NAME, 'account_key': ACCOUNT_KEY}
dp = IterableWrapper(['abfs://CONTAINER/DIRECTORY']).list_files_by_fsspec(**storage_options)
print(list(dp))
# ['abfs://container/directory/file1.txt', 'abfs://container/directory/file2.txt', ...]

您可以使用 FSSpecFileOpener (.open_files_by_fsspec(...)) 打开文件,并流式传输它们 (如果支持该文件格式)。

这是一个从公共容器中加载CSV文件 ecdc_cases.csv 的示例,该容器位于目录 curated/covid-19/ecdc_cases/latest 中,属于账户 pandemicdatalake

from torchdata.datapipes.iter import IterableWrapper
dp = IterableWrapper(['abfs://public/curated/covid-19/ecdc_cases/latest/ecdc_cases.csv']) \
        .open_files_by_fsspec(account_name='pandemicdatalake') \
        .parse_csv()
print(list(dp)[:3])
# [['date_rep', 'day', ..., 'iso_country', 'daterep'],
# ['2020-12-14', '14', ..., 'AF', '2020-12-14'],
# ['2020-12-13', '13', ..., 'AF', '2020-12-13']]

如果需要,您也可以通过使用以 adl://abfs:// 开头的 URI 来访问 Azure Data Lake Storage Gen1 中的数据,具体描述请参阅 adlfs 仓库的 README

文档

访问 PyTorch 的全面开发人员文档

查看文档

教程

获取面向初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并解答您的问题

查看资源