可迭代样式数据管道¶
一个可迭代式数据集是一个实现了__iter__()协议的子类IterableDataset的实例,
并表示一个数据样本的可迭代对象。这种类型的数据集特别适合于随机读取成本高或甚至不可能的情况,
以及批量大小取决于获取的数据。
例如,当调用iter(iterdatapipe)时,可以返回从数据库、远程服务器或实时生成的日志中读取的数据流。
这是IterableDataset的更新版本,在torch中。
- class torchdata.datapipes.iter.IterDataPipe(*args, **kwds)¶
迭代式数据管道。
所有表示数据样本可迭代性的 DataPipes 都应继承此类。这种风格的 DataPipes 在数据来自流或样本数量过大无法全部放入内存时特别有用。
所有子类应该重写
__iter__(),这将返回一个 在这个 DataPipe 中的样本迭代器。IterDataPipe 是懒加载的,并且其元素仅在调用其迭代器上的
next()时计算。这些DataPipes可以通过两种方式调用,使用类构造函数或将其功能形式应用于现有的IterDataPipe(推荐,适用于大多数但并非所有DataPipes)。 您可以将多个IterDataPipe串联在一起,形成一个管道,该管道将在一系列操作中执行多个操作。
注意
当使用子类时,每个 DataPipe中的项目将从
DataLoader迭代器中被传递。当num_workers > 0时,每个工作进程将拥有不同的DataPipe对象的副本,因此通常希望独立配置每个副本以避免从工作进程返回重复的数据。3,在工作进程调用时返回有关工作的信息。它可以在数据集的__iter__()方法或6选项的worker_init_fn位置用于修改每个副本的行为。示例
>>> from torchdata.datapipes.iter import IterableWrapper, Mapper >>> dp = IterableWrapper(range(10)) >>> map_dp_1 = Mapper(dp, lambda x: x + 1) # Using class constructor >>> map_dp_2 = dp.map(lambda x: x + 1) # Using functional form (recommended) >>> list(map_dp_1) [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> list(map_dp_2) [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> filter_dp = map_dp_1.filter(lambda x: x % 2 == 0) >>> list(filter_dp) [2, 4, 6, 8, 10]
我们有不同的可迭代数据管道类型:
归档 - 打开并解压缩不同格式的归档文件。
增强 - 增强您的样本(例如添加索引,或无限循环)。
组合 - 执行组合操作(例如采样、洗牌)。
组合/拆分 - 通过组合多个数据管道或拆分一个数据管道为多个来与多个数据管道进行交互。
分组 - 在 DataPipe 内部对样本进行分组
IO - 与文件系统或远程服务器交互(例如下载、打开、保存文件以及列出目录中的文件)。
映射 - 对 DataPipe 中的每个元素应用给定的函数。
其他 - 执行一组杂项操作。
选择 - 在 DataPipe 中选择特定的样本。
解析、读取和转换文本文件和数据。
归档数据管道¶
这些数据管道有助于打开和解压缩不同格式的归档文件。
接受路径和数据压缩流的元组,并返回路径和解压流的元组(功能名称: |
|
从包含路径名和RAR二进制流元组的输入可迭代数据管道中解压缩RAR二进制流,并生成一个包含路径名和提取的二进制流的元组(功能名称: |
|
从包含路径名和 tar 二进制流的可迭代 DataPipe 中打开/解压 tar 二进制流,并生成一个包含路径名和提取的二进制流的元组(功能名称: |
|
从包含路径名和 xz (lzma) 二进制流的可迭代 DataPipe 中解压缩二进制流,并生成一个包含路径名和提取的二进制流的元组(功能名称: |
|
从包含路径名和zip二进制流的Iterable DataPipe中打开/解压缩zip二进制流,并生成一个包含路径名和提取的二进制流的元组(功能名称: |
增强数据管道¶
这些数据管道有助于增强您的样本。
默认情况下,无限循环指定的输入,或者按照指定的次数循环(功能名称: |
|
通过枚举为现有 DataPipe 添加索引,默认情况下索引从 0 开始(功能名称: |
|
为现有的 Iterable DataPipe 添加索引(功能名称: |
组合式数据管道¶
这些数据管道有助于执行组合操作。
使用提供的 |
|
使用缓冲区随机打乱输入 DataPipe(功能名称: |
组合/拆分数据管道¶
这些通常涉及多个DataPipes,将它们组合起来或将一个拆分为多个。
将多个可迭代数据管道(功能名称: |
|
将输入 DataPipe 使用给定的分类函数拆分为多个子 DataPipe(功能名称: |
|
创建多个相同的 Iterable DataPipe 实例(功能名称: |
|
根据匹配的键将两个 IterDataPipe 压缩在一起(功能名称: |
|
将源 IterDataPipe 的项与 MapDataPipe 的项连接起来(功能名称: |
|
逐个从每个输入的可迭代数据管道(函数名称: |
|
接受一个 Dict(IterDataPipe, Weight),并根据权重从这些 DataPipes 中采样生成项。 |
|
接收一个序列数据管道,解包每个序列,并根据元素在序列中的位置将其分别返回到不同的数据管道中。 |
|
聚合每个输入DataPipes中的元素,形成一个元组(函数名: |
数据管道分组¶
这些数据管道可以将数据管道中的样本进行分组。
创建数据的小批量(功能名称: |
|
从排序的桶中创建数据的小批量(功能名称: |
|
通过自定义的合并函数(功能名称: |
|
按从 |
|
取消数据的批量处理(功能名称: |
IO 数据管道¶
这些数据管道有助于与文件系统或远程服务器交互(例如下载、打开、保存文件以及列出目录中的文件)。
列出所提供 |
|
从包含 fsspec 个路径的输入数据管道中打开文件,并返回一个包含路径名和已打开文件流的元组(功能名称: |
|
接收一个包含元数据和数据的DataPipe,将数据保存到目标路径(由filepath_fn和元数据生成),并返回结果路径(功能名称: |
|
给定根目录的路径,生成根目录内文件的文件路径名(路径 + 文件名)。 |
|
给定路径名,打开文件并以元组形式返回路径名和文件流。 |
|
接收指向 Google Drive 文件的 URL,并返回文件名和 IO 流的元组。 |
|
接受文件URL(指向文件的HTTP URL),并生成文件URL和IO流的元组。 |
|
列出所提供 |
|
从包含路径名或URL的输入数据管道中打开文件,并返回一个包含路径名和已打开文件流的元组(功能名称: |
|
接收一个包含元数据和数据的DataPipe,将数据保存到由 |
|
接受文件URL(可以是指向文件的HTTP URL,也可以是Google Drive文件的URL),并生成文件URL和IO流的元组。 |
|
接受Parquet文件的路径并返回每个Parquet文件中行组内的一个TorchArrow DataFrame(功能名称: |
|
接收一个包含元数据和数据的DataPipe,将数据保存到由 |
映射数据管道¶
这些数据管道对数据管道中的每个元素应用给定的函数。
对源 DataPipe 中的每个项应用一个函数,然后将输出展平为一个单一的、非嵌套的 IterDataPipe(功能名称: |
|
对源 DataPipe 中的每个项应用一个函数(功能名称: |
其他数据管道¶
一组具有不同功能的数据管道。
取数据的行,将其中的若干行批量组合并创建 TorchArrow 个 DataFrame(功能名称: |
|
指示先前 DataPipe 的结果何时将保存到由 |
|
计算每个文件的哈希值,并从输入 DataPipe 的文件名和数据/流元组中进行检查(功能名称: |
|
将源 DataPipe 中的元素存储在内存中,如果指定了大小限制,则最多存储到该限制(功能名称: |
|
将一个可迭代对象包装以创建一个 IterDataPipe。 |
|
缓存多个DataPipe操作的输出到本地文件,这些通常是性能瓶颈,例如下载、解压等(功能名称: |
|
允许 DataPipe 进行分片的包装器(功能名称: |
选择DataPipes¶
这些数据管道可以帮助您在数据管道中选择特定的样本。
根据输入 |
|
从起始位置开始,从源 DataPipe 中生成元素,直到指定的限制(功能名称: |
文本数据管道¶
这些数据管道帮助您解析、读取和转换文本文件和数据。
接受一个由文件名和CSV数据流组成的DataPipe,逐行读取并返回CSV文件中的内容(功能名称: |
|
接受一个由文件名和CSV数据流组成的DataPipe,逐行读取并返回CSV文件中的内容(功能名称: |
|
从JSON数据流中读取并返回一个文件名和JSON数据的元组(功能名称: |
|
接受一个由文件名和字符串数据流组成的 DataPipe,对于流中的每一行,生成一个包含文件名和该行的元组(功能名称: |
|
将来自同一文件的文本行聚合为一个段落(功能名称: |
|
从输入 DataPipe 解码二进制流,返回一个元组(功能名称: |
|
接受一个包含数据批次的DataPipe输入,并逐个批次进行处理,为每个批次生成一个字典,其中 |
|
给定输入输出流及其标签名称,生成包含标签名称的字节元组。 |