注意力
2024 年 6 月状态更新:删除 DataPipes 和 DataLoader V2
我们将 torchdata 存储库重新调整为torch.utils.data.DataLoader的迭代增强。我们不打算 继续开发或维护 [DataPipes] 和 [DataLoaderV2] 解决方案,它们将从 torchdata 存储库。我们还将重新访问 pytorch/pytorch 中的 DataPipes 引用。在 torchdata==0.8.0(2024 年 7 月)版本中,它们将被标记为已弃用,而在 0.9.0(2024 年 10 月)中,它们将被删除。现存 建议用户固定到 torchdata==0.8.0 或更旧版本,直到他们能够迁移出去。随后的 版本将不包含 DataPipes 或 DataLoaderV2。 如果您有建议或评论,请联系我们(请使用此问题进行反馈)
Iterable 风格的 DataPipes¶
可迭代样式的数据集是实现协议
,表示数据样本的可迭代对象。这种类型的数据集特别适用于随机
读取是昂贵的,甚至是不可能的,并且批处理大小取决于获取的数据。__iter__()
例如,这样的数据集在调用时可以返回从数据库读取的数据流
远程服务器,甚至是实时生成的日志。iter(iterdatapipe)
这是 in 的更新版本。IterableDataset
torch
- 类 torchdata.datapipes.iter 中。IterDataPipe¶
Iterable 样式的 DataPipe 中。
所有表示数据样本可迭代对象的 DataPipes 都应该子类化 this。 当数据来自流时,这种风格的 DataPipes 特别有用,或者 当样本数量太大而无法将它们全部放入内存中时。 是延迟初始化的,并且其 仅当在 的迭代器上调用 时,才会计算元素。
IterDataPipe
next()
IterDataPipe
所有子类都应覆盖 ,这将返回一个 此 DataPipe 中的样本的迭代器。调用 an 会自动调用其 method ,默认情况下不执行任何操作。编写自定义时,用户应 如有必要,请覆盖。常见的用法包括重置缓冲区、指针、 以及自定义 .
__iter__()
__iter__
IterDataPipe
reset()
IterDataPipe
reset()
IterDataPipe
注意
一次只能有一个迭代器对每个迭代器有效。 创建第二个迭代器将使第一个迭代器无效。此约束是必需的,因为 有些具有内部缓冲区,如果有多个迭代器,则其状态可能会变得无效。 下面的代码示例详细介绍了此约束在实践中的外观。 如果您有任何与此约束相关的反馈,请参阅 GitHub IterDataPipe 单个迭代器问题。
IterDataPipe
IterDataPipe
可以通过两种方式调用这些 DataPipe,使用类构造函数或应用它们的 函数形式添加到现有 (推荐,适用于大多数但并非所有 DataPipe) 上。 您可以将多个 IterDataPipe 链接在一起,以形成一个将执行多个 连续操作。
IterDataPipe
注意
当子类与 一起使用时,每个 item 将从迭代器中生成。当 ,每个工作进程都将具有 DataPipe 对象的不同副本,因此通常需要配置 每个副本,以避免从 工人。,在 worker 中调用 process 返回有关工作程序的信息。它可以用于 dataset 的方法或 的选项来修改每个副本的行为。
DataLoader
DataLoader
num_workers > 0
get_worker_info()
__iter__()
DataLoader
worker_init_fn
例子
- 一般用法:
>>> # xdoctest: +SKIP >>> from torchdata.datapipes.iter import IterableWrapper, Mapper >>> dp = IterableWrapper(range(10)) >>> map_dp_1 = Mapper(dp, lambda x: x + 1) # Using class constructor >>> map_dp_2 = dp.map(lambda x: x + 1) # Using functional form (recommended) >>> list(map_dp_1) [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> list(map_dp_2) [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> filter_dp = map_dp_1.filter(lambda x: x % 2 == 0) >>> list(filter_dp) [2, 4, 6, 8, 10]
- 单个迭代器约束示例:
>>> from torchdata.datapipes.iter import IterableWrapper, Mapper >>> source_dp = IterableWrapper(range(10)) >>> it1 = iter(source_dp) >>> list(it1) [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] >>> it1 = iter(source_dp) >>> it2 = iter(source_dp) # The creation of a new iterator invalidates `it1` >>> next(it2) 0 >>> next(it1) # Further usage of `it1` will raise a `RunTimeError`
我们有不同类型的可迭代 DataPipes:
存档 - 打开和解压缩不同格式的存档文件。
Augmenting - 增强您的样本(例如,添加索引或无限循环)。
Combinatorial - 执行组合运算(例如采样、洗牌)。
组合/拆分 - 通过组合或一对多来与多个 DataPipe 进行交互。
Grouping - 在 DataPipe 中对样本进行分组
IO - 与文件系统或远程服务器交互(例如下载、打开、 保存文件,并在目录中列出文件)。
映射 - 将给定函数应用于 DataPipe 中的每个元素。
其他 - 执行其他操作集。
选择 - 在 DataPipe 中选择特定样本。
文本 - 解析、读取和转换文本文件和数据
存档 DataPipes¶
这些 DataPipes 有助于打开和解压缩不同格式的存档文件。
从包含路径名和 bz2 二进制流元组的可迭代 DataPipe 中解压缩 bz2 二进制流,并生成路径名和提取的二进制流的元组(函数名称:)。 |
|
采用 path 元组和压缩的数据流,并返回 path 的元组和解压缩的数据流(函数名称:)。 |
|
从包含路径名和 rar 二进制流元组的输入 Iterable Datapipes 中解压缩 rar 二进制流,并产生路径名和提取的二进制流(函数名称:)的元组。 |
|
从包含路径名和 tar 二进制流元组的 Iterable DataPipe 打开/解压缩 tar 二进制流,并生成路径名和提取的二进制流(函数名称:)的元组。 |
|
从包含路径名和 tfrecord 二进制流元组的 Iterable DataPipe 打开/解压缩 tfrecord 二进制流,并生成存储的记录(函数名称:)。 |
|
Iterable DataPipe 接受 (path, data) 元组流,通常表示 tar 存档的路径名和文件(函数名称:)。 |
|
从包含路径名和 xy 二进制流元组的可迭代 DataPipe 中解压缩 xz (lzma) 二进制流,并产生路径名和提取的二进制流(函数名称:)的元组。 |
|
从 Iterable DataPipe 打开/解压缩 zip 二进制流,该 DataPipe 包含路径名和 zip 二进制流的元组,并生成路径名和提取的二进制流的元组(函数名称:)。 |
扩充 DataPipes¶
这些 DataPipes 有助于增强您的样本。
默认情况下,永久循环指定的输入,或循环指定的次数(函数名称:)。 |
|
通过枚举向已有的 DataPipe 添加索引,索引默认从 0 开始(函数名称: )。 |
|
使用 (function name: ) 向现有的 Iterable DataPipe 添加索引。 |
|
在移动到下一个元素(函数名称:)之前,将源 DataPipe 的每个元素重复生成指定的次数。 |
组合 DataPipes¶
这些 DataPipes 有助于执行组合运算。
从前面的 DataPipe (function name: ) 中随机排列每个小批量。 |
|
使用提供的 (默认为 )生成示例元素。 |
|
使用缓冲区(功能名称:)对输入 DataPipe 进行随机排序。 |
组合/拆分 DataPipe¶
这些操作往往涉及多个 DataPipe,将它们组合起来或将一对多拆分。
连接多个 Iterable DataPipe(函数名称:)。 |
|
使用给定的分类函数(函数名称:)将输入 DataPipe 拆分为多个子 DataPipe。 |
|
创建同一 Iterable DataPipe 的多个实例(函数名称:)。 |
|
根据匹配的键(函数名称:)将两个 IterDataPipes 压缩在一起。 |
|
将源 IterDataPipe 中的项与 MapDataPipe(函数名称:)中的项联接起来。 |
|
从每个输入的 Iterable DataPipes (function name: ) 中一次产生一个元素。 |
|
从每个输入的 Iterable DataPipes (function name: ) 中一次产生一个元素。 |
|
将输入 DataPipe 按循环顺序拆分为多个子 DataPipe(函数名称:)。 |
|
采用 (IterDataPipe, Weight) 的 Dict,并通过从这些 DataPipes 中采样其权重来生成项目。 |
|
接收序列的 DataPipe,解包每个序列,并根据元素在 Sequence 中的位置(函数名称:)在单独的 DataPipes 中返回元素。 |
|
将每个输入 DataPipes 的元素聚合到一个元组中(函数名称:)。 |
|
将每个输入 DataPipes 的元素聚合到一个元组中(函数名称:)。 |
对 DataPipes 进行分组¶
这些 DataPipes 允许您在 DataPipe 中对样本进行分组。
创建小批量数据 (function name: )。 |
|
从排序的存储桶(功能名称:)创建小批量数据。 |
|
通过自定义 collate 函数(函数名称:)将 DataPipe 中的样本整理到 Tensor(s)。 |
|
按 中的 键对来自 IterDataPipe 的数据进行分组,从而生成一个批大小最大为 . |
|
从大小有限的最小堆创建小批量数据,每个批次中返回的样本总长度将受到 (function name: ) 的限制。 |
|
撤消数据批处理(功能名称:)。 |
IO 数据管道¶
这些 DataPipes 有助于与文件系统或远程服务器交互(例如,下载、打开、 保存文件,并在目录中列出文件)。
Iterable Datapipe 列出来自具有给定 URL 前缀(功能名称:)的 AIStore 后端的文件。 |
|
Iterable DataPipe 使用给定的 URL(功能名称:)从 AIStore 加载文件。 |
|
在提供的路径名或 URL 处列出目录的内容,并生成目录中每个文件的完整路径名或 URL(功能名称: )。 |
|
从包含 fsspec 路径的 input datapipe 打开文件,并生成路径名和打开的文件流(功能名称:)的元组。 |
|
接收元数据和数据元组的 DataPipe,将数据保存到目标路径(由 filepath_fn 和 metadata 生成),并生成生成的 fsspec 路径(函数名称:)。 |
|
给定根目录的路径,生成根目录中的文件路径名(路径 + 文件名)。 |
|
给定路径名,打开文件并在元组(函数名称:)中生成路径名和文件流。 |
|
采用指向 GDrive 文件的 URL,并生成文件名和 IO 流的元组(功能名称:)。 |
|
获取文件 URL(指向文件的 HTTP URL),并生成文件 URL 和 IO 流的元组(函数名称:)。 |
|
接收数据集名称并返回 Iterable HuggingFace 数据集。 |
|
在提供的路径名或 URL 处列出目录的内容,并生成目录中每个文件的完整路径名或 URL(功能名称: )。 |
|
从包含路径名或 URL 的 Importing datapipe 打开文件,并生成路径名和打开的文件流(function name: )的元组。 |
|
接收元数据和数据元组的 DataPipe,将数据保存到由 and 元数据生成的目标路径,并以 iopath 格式(功能名称:)生成结果路径。 |
|
获取文件 URL(可以是指向文件的 HTTP URL 或指向 GDrive 文件的 URL),并生成文件 URL 和 IO 流的元组(函数名称:)。 |
|
获取 Parquet 文件的路径,并返回 Parquet 文件(函数名称:)中每个行组的 TorchArrow DataFrame。 |
|
[已弃用]请改用 https://github.com/awslabs/s3-connector-for-pytorch。 |
|
[已弃用]请改用 https://github.com/awslabs/s3-connector-for-pytorch。 |
|
接收元数据和数据元组的 DataPipe,将数据保存到 and 元数据生成的目标路径,并在本地文件系统上生成文件路径(功能名称:)。 |
映射 DataPipes¶
这些 DataPipe 将给定函数应用于 DataPipe 中的每个元素。
将源 DataPipe 中的元素合并到 batches,并发地将协程函数应用于 batch 中的每个元素,然后将输出展平为单个未嵌套的 IterDataPipe(函数名称: )。 |
|
将源 DataPipe 中的元素组合成 batch,并在每个 batch 上应用一个函数,然后将输出展平为单个未嵌套的 IterDataPipe(函数名称:)。 |
|
将函数应用于源 DataPipe 中的每个项目,然后将输出展平为单个未嵌套的 IterDataPipe(函数名称:)。 |
|
将函数应用于源 DataPipe 中的每个项目(函数名称:)。 |
|
对源 DataPipe 中的每个项目应用一个函数,然后收集缓冲区中返回的可迭代对象,然后,在每次迭代中,随机选择缓冲区中的一个可迭代对象,并从该可迭代对象中生成一个项目(函数名称:)。 |
|
使用 (function name: ) 同时将函数应用于源 DataPipe 中的每个项目。 |
其他 DataPipe¶
具有不同功能的杂项 DataPipe。
获取数据行,将大量数据批处理在一起,并创建 TorchArrow DataFrame (function name: )。 |
|
指示何时将先前 DataPipe 的结果保存为 (function name: ) 指定的本地文件。 |
|
跨分布式进程同步数据,防止训练时因分片数据不均匀导致卡顿(函数名称:)。 |
|
从文件名和 data/stream (函数名称: ) 元组的输入 DataPipe 计算并检查每个文件的哈希值。 |
|
将源 DataPipe 中的元素存储在内存中,如果指定,则最多为大小限制 (function name: )。 |
|
包装一个可迭代对象以创建 IterDataPipe。 |
|
设置 DataPipe 的 length 属性,该属性由 (function name: ) 返回。 |
|
将 a 转换为 an (函数名称: )。 |
|
将多个 DataPipe 操作的输出缓存到本地文件,这些文件通常是性能瓶颈,如 download、decompress 等(功能名称:)。 |
|
从源 DataPipe 中预取一个元素并将其移动到固定内存(函数名称: )。 |
|
从源 DataPipe 中预取元素并将它们放入缓冲区(函数名称:)。 |
|
将源 DataPipe 中的样本随机拆分为组(功能名称:)。 |
|
将传入的分片字符串扩展为分片。 |
|
允许对 DataPipe 进行分片的包装器(函数名称: )。 |
|
Wrapper 表示图形的前一部分是不可复制的,并将在单独的单个调度进程中迭代,以便在使用多进程时以循环方式将数据分发到工作进程。 |
选择 DataPipes¶
这些 DataPipes 可帮助您在 DataPipe 中选择特定示例。
根据输入(功能名称:)从源数据管道中筛选出元素。 |
|
从源 DataPipe 开始生成元素,直到指定的限制(函数名称: )。 |
|
通过其索引(函数名称:)将列/元素拖放到输入 DataPipe 中。 |
|
通过 start/stop/step 或 indices (函数名称: ) 返回输入 DataPipe 中的元素切片。 |
|
根据提供的索引(函数名称:)在每个样本/元素级别返回输入 DataPipe 的扁平化副本。 |
文本 DataPipes¶
这些 DataPipes 可帮助您解析、读取和转换文本文件和数据。
接受 DataPipe 由文件名和 CSV 数据流的元组组成,一次读取并返回 CSV 文件中的一行内容(函数名称:)。 |
|
接受 DataPipe 由文件名和 CSV 数据流的元组组成,一次读取并返回 CSV 文件中的一行内容(函数名称:)。 |
|
从 JSON 数据流中读取并生成文件名和 JSON 数据的元组(函数名称:)。 |
|
接受由文件名和字符串数据流的元组组成的 DataPipe,对于流中的每一行,生成文件名和行(函数名称:)的元组。 |
|
将同一文件中的文本行聚合到一个段落中(功能名称:)。 |
|
对来自输入 DataPipe 的二进制流进行解码,在元组中生成路径名和解码数据。 |
|
接受包含批量数据的输入 DataPipe,一次处理一个批次,并为每个批次生成一个 Dict,其中键和每行中相应值的列表作为值(函数名称:)。 |
|
给定 IO 流及其标签名称,生成标签名称为元组的字节。 |