目录

注意力

2024 年 6 月状态更新:删除 DataPipes 和 DataLoader V2

我们将 torchdata 存储库重新调整为torch.utils.data.DataLoader的迭代增强。我们不打算 继续开发或维护 [DataPipes] 和 [DataLoaderV2] 解决方案,它们将从 torchdata 存储库。我们还将重新访问 pytorch/pytorch 中的 DataPipes 引用。在 torchdata==0.8.0(2024 年 7 月)版本中,它们将被标记为已弃用,而在 0.9.0(2024 年 10 月)中,它们将被删除。现存 建议用户固定到 torchdata==0.8.0 或更旧版本,直到他们能够迁移出去。随后的 版本将不包含 DataPipes 或 DataLoaderV2。 如果您有建议或评论,请联系我们(请使用此问题进行反馈)

Iterable 风格的 DataPipes

可迭代样式的数据集是实现协议 ,表示数据样本的可迭代对象。这种类型的数据集特别适用于随机 读取是昂贵的,甚至是不可能的,并且批处理大小取决于获取的数据。__iter__()

例如,这样的数据集在调用时可以返回从数据库读取的数据流 远程服务器,甚至是实时生成的日志。iter(iterdatapipe)

这是 in 的更新版本。IterableDatasettorch

torchdata.datapipes.iter 中。IterDataPipe

Iterable 样式的 DataPipe 中。

所有表示数据样本可迭代对象的 DataPipes 都应该子类化 this。 当数据来自流时,这种风格的 DataPipes 特别有用,或者 当样本数量太大而无法将它们全部放入内存中时。 是延迟初始化的,并且其 仅当在 的迭代器上调用 时,才会计算元素。IterDataPipenext()IterDataPipe

所有子类都应覆盖 ,这将返回一个 此 DataPipe 中的样本的迭代器。调用 an 会自动调用其 method ,默认情况下不执行任何操作。编写自定义时,用户应 如有必要,请覆盖。常见的用法包括重置缓冲区、指针、 以及自定义 .__iter__()__iter__IterDataPipereset()IterDataPipereset()IterDataPipe

注意

一次只能有一个迭代器对每个迭代器有效。 创建第二个迭代器将使第一个迭代器无效。此约束是必需的,因为 有些具有内部缓冲区,如果有多个迭代器,则其状态可能会变得无效。 下面的代码示例详细介绍了此约束在实践中的外观。 如果您有任何与此约束相关的反馈,请参阅 GitHub IterDataPipe 单个迭代器问题IterDataPipeIterDataPipe

可以通过两种方式调用这些 DataPipe,使用类构造函数或应用它们的 函数形式添加到现有 (推荐,适用于大多数但并非所有 DataPipe) 上。 您可以将多个 IterDataPipe 链接在一起,以形成一个将执行多个 连续操作。IterDataPipe

注意

当子类与 一起使用时,每个 item 将从迭代器中生成。当 ,每个工作进程都将具有 DataPipe 对象的不同副本,因此通常需要配置 每个副本,以避免从 工人。,在 worker 中调用 process 返回有关工作程序的信息。它可以用于 dataset 的方法或 的选项来修改每个副本的行为。DataLoaderDataLoadernum_workers > 0get_worker_info()__iter__()DataLoaderworker_init_fn

例子

一般用法:
>>> # xdoctest: +SKIP
>>> from torchdata.datapipes.iter import IterableWrapper, Mapper
>>> dp = IterableWrapper(range(10))
>>> map_dp_1 = Mapper(dp, lambda x: x + 1)  # Using class constructor
>>> map_dp_2 = dp.map(lambda x: x + 1)  # Using functional form (recommended)
>>> list(map_dp_1)
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
>>> list(map_dp_2)
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
>>> filter_dp = map_dp_1.filter(lambda x: x % 2 == 0)
>>> list(filter_dp)
[2, 4, 6, 8, 10]
单个迭代器约束示例:
>>> from torchdata.datapipes.iter import IterableWrapper, Mapper
>>> source_dp = IterableWrapper(range(10))
>>> it1 = iter(source_dp)
>>> list(it1)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> it1 = iter(source_dp)
>>> it2 = iter(source_dp)  # The creation of a new iterator invalidates `it1`
>>> next(it2)
0
>>> next(it1)  # Further usage of `it1` will raise a `RunTimeError`

我们有不同类型的可迭代 DataPipes:

  1. 存档 - 打开和解压缩不同格式的存档文件。

  2. Augmenting - 增强您的样本(例如,添加索引或无限循环)。

  3. Combinatorial - 执行组合运算(例如采样、洗牌)。

  4. 组合/拆分 - 通过组合或一对多来与多个 DataPipe 进行交互。

  5. Grouping - 在 DataPipe 中对样本进行分组

  6. IO - 与文件系统或远程服务器交互(例如下载、打开、 保存文件,并在目录中列出文件)。

  7. 映射 - 将给定函数应用于 DataPipe 中的每个元素。

  8. 其他 - 执行其他操作集。

  9. 选择 - 在 DataPipe 中选择特定样本。

  10. 文本 - 解析、读取和转换文本文件和数据

存档 DataPipes

这些 DataPipes 有助于打开和解压缩不同格式的存档文件。

bz2FileLoader 文件加载器

从包含路径名和 bz2 二进制流元组的可迭代 DataPipe 中解压缩 bz2 二进制流,并生成路径名和提取的二进制流的元组(函数名称:)。load_from_bz2

解压缩器

采用 path 元组和压缩的数据流,并返回 path 的元组和解压缩的数据流(函数名称:)。decompress

RarArchiveLoader 文件

从包含路径名和 rar 二进制流元组的输入 Iterable Datapipes 中解压缩 rar 二进制流,并产生路径名和提取的二进制流(函数名称:)的元组。load_from_rar

TarArchiveLoader 格式

从包含路径名和 tar 二进制流元组的 Iterable DataPipe 打开/解压缩 tar 二进制流,并生成路径名和提取的二进制流(函数名称:)的元组。load_from_tar

TFRecordLoader

从包含路径名和 tfrecord 二进制流元组的 Iterable DataPipe 打开/解压缩 tfrecord 二进制流,并生成存储的记录(函数名称:)。load_from_tfrecord

Web数据集

Iterable DataPipe 接受 (path, data) 元组流,通常表示 tar 存档的路径名和文件(函数名称:)。webdataset

XzFileLoader 文件加载器

从包含路径名和 xy 二进制流元组的可迭代 DataPipe 中解压缩 xz (lzma) 二进制流,并产生路径名和提取的二进制流(函数名称:)的元组。load_from_xz

ZipArchiveLoader 文件

从 Iterable DataPipe 打开/解压缩 zip 二进制流,该 DataPipe 包含路径名和 zip 二进制流的元组,并生成路径名和提取的二进制流的元组(函数名称:)。load_from_zip

扩充 DataPipes

这些 DataPipes 有助于增强您的样本。

循环仪

默认情况下,永久循环指定的输入,或循环指定的次数(函数名称:)。cycle

枚举 数

通过枚举向已有的 DataPipe 添加索引,索引默认从 0 开始(函数名称: )。enumerate

IndexAdder

使用 (function name: ) 向现有的 Iterable DataPipe 添加索引。add_index

中继 器

在移动到下一个元素(函数名称:)之前,将源 DataPipe 的每个元素重复生成指定的次数。repeat

组合 DataPipes

这些 DataPipes 有助于执行组合运算。

InBatchShuffler

从前面的 DataPipe (function name: ) 中随机排列每个小批量。in_batch_shuffle

采样

使用提供的 (默认为 )生成示例元素。SamplerSequentialSampler

洗牌机

使用缓冲区(功能名称:)对输入 DataPipe 进行随机排序。shuffle

组合/拆分 DataPipe

这些操作往往涉及多个 DataPipe,将它们组合起来或将一对多拆分。

连接

连接多个 Iterable DataPipe(函数名称:)。concat

解复用器

使用给定的分类函数(函数名称:)将输入 DataPipe 拆分为多个子 DataPipe。demux

分叉器

创建同一 Iterable DataPipe 的多个实例(函数名称:)。fork

IterKeyZipper

根据匹配的键(函数名称:)将两个 IterDataPipes 压缩在一起。zip_with_iter

MapKeyZipper

将源 IterDataPipe 中的项与 MapDataPipe(函数名称:)中的项联接起来。zip_with_map

复 用

从每个输入的 Iterable DataPipes (function name: ) 中一次产生一个元素。mux

Multiplexer最长

从每个输入的 Iterable DataPipes (function name: ) 中一次产生一个元素。mux_longest

RoundRobinDemultiplexer

将输入 DataPipe 按循环顺序拆分为多个子 DataPipe(函数名称:)。round_robin_demux

SampleMultiplexer (样品多路复用器)

采用 (IterDataPipe, Weight) 的 Dict,并通过从这些 DataPipes 中采样其权重来生成项目。

解开拉链

接收序列的 DataPipe,解包每个序列,并根据元素在 Sequence 中的位置(函数名称:)在单独的 DataPipes 中返回元素。unzip

拉链

将每个输入 DataPipes 的元素聚合到一个元组中(函数名称:)。zip

拉链最长

将每个输入 DataPipes 的元素聚合到一个元组中(函数名称:)。zip_longest

对 DataPipes 进行分组

这些 DataPipes 允许您在 DataPipe 中对样本进行分组。

Batcher (批处理器)

创建小批量数据 (function name: )。batch

BucketBatcher

从排序的存储桶(功能名称:)创建小批量数据。bucketbatch

对照者

通过自定义 collate 函数(函数名称:)将 DataPipe 中的样本整理到 Tensor(s)。collate

石斑鱼

按 中的 键对来自 IterDataPipe 的数据进行分组,从而生成一个批大小最大为 .group_key_fnDataChunkgroup_size

MaxTokenBucketizer

从大小有限的最小堆创建小批量数据,每个批次中返回的样本总长度将受到 (function name: ) 的限制。len_fnmax_token_countmax_token_bucketize

取消批处理

撤消数据批处理(功能名称:)。unbatch

IO 数据管道

这些 DataPipes 有助于与文件系统或远程服务器交互(例如,下载、打开、 保存文件,并在目录中列出文件)。

AISFileLister的

Iterable Datapipe 列出来自具有给定 URL 前缀(功能名称:)的 AIStore 后端的文件。list_files_by_ais

AISFileLoader

Iterable DataPipe 使用给定的 URL(功能名称:)从 AIStore 加载文件。load_files_by_ais

FSSpecFileLister

在提供的路径名或 URL 处列出目录的内容,并生成目录中每个文件的完整路径名或 URL(功能名称: )。rootlist_files_by_fsspec

FSSpecFileOpener

从包含 fsspec 路径的 input datapipe 打开文件,并生成路径名和打开的文件流(功能名称:)的元组。open_files_by_fsspec

FSSpecSaver

接收元数据和数据元组的 DataPipe,将数据保存到目标路径(由 filepath_fn 和 metadata 生成),并生成生成的 fsspec 路径(函数名称:)。save_by_fsspec

FileLister 文件列表

给定根目录的路径,生成根目录中的文件路径名(路径 + 文件名)。

FileOpener 文件打开

给定路径名,打开文件并在元组(函数名称:)中生成路径名和文件流。open_files

GDriveReader

采用指向 GDrive 文件的 URL,并生成文件名和 IO 流的元组(功能名称:)。read_from_gdrive

HttpReader

获取文件 URL(指向文件的 HTTP URL),并生成文件 URL 和 IO 流的元组(函数名称:)。read_from_http

HuggingFaceHubReader

接收数据集名称并返回 Iterable HuggingFace 数据集。

IoPathFileLister (英语)

在提供的路径名或 URL 处列出目录的内容,并生成目录中每个文件的完整路径名或 URL(功能名称: )。rootlist_files_by_iopath

IoPathFileOpener

从包含路径名或 URL 的 Importing datapipe 打开文件,并生成路径名和打开的文件流(function name: )的元组。open_files_by_iopath

IoPathSaver (英语)

接收元数据和数据元组的 DataPipe,将数据保存到由 and 元数据生成的目标路径,并以 iopath 格式(功能名称:)生成结果路径。filepath_fnsave_by_iopath

在线阅读器

获取文件 URL(可以是指向文件的 HTTP URL 或指向 GDrive 文件的 URL),并生成文件 URL 和 IO 流的元组(函数名称:)。read_from_remote

ParquetDataFrameLoader

获取 Parquet 文件的路径,并返回 Parquet 文件(函数名称:)中每个行组的 TorchArrow DataFrame。load_parquet_as_df

S3FileLister

[已弃用]请改用 https://github.com/awslabs/s3-connector-for-pytorch

S3FileLoader 文件加载器

[已弃用]请改用 https://github.com/awslabs/s3-connector-for-pytorch

保护

接收元数据和数据元组的 DataPipe,将数据保存到 and 元数据生成的目标路径,并在本地文件系统上生成文件路径(功能名称:)。filepath_fnsave_to_disk

映射 DataPipes

这些 DataPipe 将给定函数应用于 DataPipe 中的每个元素。

BatchAsyncMapper

将源 DataPipe 中的元素合并到 batches,并发地将协程函数应用于 batch 中的每个元素,然后将输出展平为单个未嵌套的 IterDataPipe(函数名称: )。async_map_batches

BatchMapper

将源 DataPipe 中的元素组合成 batch,并在每个 batch 上应用一个函数,然后将输出展平为单个未嵌套的 IterDataPipe(函数名称:)。map_batches

FlatMapper

将函数应用于源 DataPipe 中的每个项目,然后将输出展平为单个未嵌套的 IterDataPipe(函数名称:)。flatmap

映射

将函数应用于源 DataPipe 中的每个项目(函数名称:)。map

ShuffledFlatMapper (ShuffledFlatMapper)

对源 DataPipe 中的每个项目应用一个函数,然后收集缓冲区中返回的可迭代对象,然后,在每次迭代中,随机选择缓冲区中的一个可迭代对象,并从该可迭代对象中生成一个项目(函数名称:)。shuffled_flatmap

线程池映射器

使用 (function name: ) 同时将函数应用于源 DataPipe 中的每个项目。ThreadPoolExecutorthreadpool_map

其他 DataPipe

具有不同功能的杂项 DataPipe。

DataFrameMaker 公司

获取数据行,将大量数据批处理在一起,并创建 TorchArrow DataFrame (function name: )。dataframe

EndOnDiskCacheHolder

指示何时将先前 DataPipe 的结果保存为 (function name: ) 指定的本地文件。filepath_fnend_caching

FullSync (全面同步)

跨分布式进程同步数据,防止训练时因分片数据不均匀导致卡顿(函数名称:)。fullsync

哈希检查器

从文件名和 data/stream (函数名称: ) 元组的输入 DataPipe 计算并检查每个文件的哈希值。check_hash

InMemoryCacheHolder 中

将源 DataPipe 中的元素存储在内存中,如果指定,则最多为大小限制 (function name: )。in_memory_cache

IterableWrapper 包装器

包装一个可迭代对象以创建 IterDataPipe。

长度设置器

设置 DataPipe 的 length 属性,该属性由 (function name: ) 返回。__len__set_length

MapToIter转换器

将 a 转换为 an (函数名称: )。MapDataPipeIterDataPipeto_iter_datapipe

OnDiskCacheHolder

将多个 DataPipe 操作的输出缓存到本地文件,这些文件通常是性能瓶颈,如 download、decompress 等(功能名称:)。on_disk_cache

PinMemory (像素内存)

从源 DataPipe 中预取一个元素并将其移动到固定内存(函数名称: )。pin_memory

预取程序

从源 DataPipe 中预取元素并将它们放入缓冲区(函数名称:)。prefetch

随机拆分器

将源 DataPipe 中的样本随机拆分为组(功能名称:)。random_split

分片扩展器

将传入的分片字符串扩展为分片。

ShardingFilter

允许对 DataPipe 进行分片的包装器(函数名称: )。sharding_filter

ShardingRoundRobin调度器

Wrapper 表示图形的前一部分是不可复制的,并将在单独的单个调度进程中迭代,以便在使用多进程时以循环方式将数据分发到工作进程。DataPipe

选择 DataPipes

这些 DataPipes 可帮助您在 DataPipe 中选择特定示例。

滤波器

根据输入(功能名称:)从源数据管道中筛选出元素。filter_fnfilter

页眉

从源 DataPipe 开始生成元素,直到指定的限制(函数名称: )。header

滴管

通过其索引(函数名称:)将列/元素拖放到输入 DataPipe 中。drop

切片机

通过 start/stop/step 或 indices (函数名称: ) 返回输入 DataPipe 中的元素切片。slice

压平机

根据提供的索引(函数名称:)在每个样本/元素级别返回输入 DataPipe 的扁平化副本。flatten

文本 DataPipes

这些 DataPipes 可帮助您解析、读取和转换文本文件和数据。

CSVDict解析器

接受 DataPipe 由文件名和 CSV 数据流的元组组成,一次读取并返回 CSV 文件中的一行内容(函数名称:)。parse_csv_as_dict

CSVParser

接受 DataPipe 由文件名和 CSV 数据流的元组组成,一次读取并返回 CSV 文件中的一行内容(函数名称:)。parse_csv

Json解析器

从 JSON 数据流中读取并生成文件名和 JSON 数据的元组(函数名称:)。parse_json_files

LineReader 系列

接受由文件名和字符串数据流的元组组成的 DataPipe,对于流中的每一行,生成文件名和行(函数名称:)的元组。readlines

段落聚合器

将同一文件中的文本行聚合到一个段落中(功能名称:)。lines_to_paragraphs

路由解码器

对来自输入 DataPipe 的二进制流进行解码,在元组中生成路径名和解码数据。

Rows2Columnar

接受包含批量数据的输入 DataPipe,一次处理一个批次,并为每个批次生成一个 Dict,其中键和每行中相应值的列表作为值(函数名称:)。column_namesrows2columnar

StreamReader 系列

给定 IO 流及其标签名称,生成标签名称为元组的字节。

文档

访问 PyTorch 的全面开发人员文档

查看文档

教程

获取面向初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并解答您的问题

查看资源