可迭代样式数据管道¶
一个可迭代式数据集是一个实现了__iter__()协议的子类IterableDataset的实例,
并表示一个数据样本的可迭代对象。这种类型的数据集特别适合于随机读取成本高或甚至不可能的情况,
以及批量大小取决于获取的数据。
例如,当调用iter(iterdatapipe)时,可以返回从数据库、远程服务器或实时生成的日志中读取的数据流。
这是IterableDataset的更新版本,在torch中。
- class torchdata.datapipes.iter.IterDataPipe(*args, **kwds)¶
迭代式数据管道。
所有表示数据样本迭代的DataPipes都应该继承自这个类。 这种类型的DataPipes在数据来自流或 当样本数量太大而无法全部存储在内存中时特别有用。
IterDataPipe是惰性初始化的,其元素仅在调用next()方法时计算出来,该方法是在IterDataPipe迭代器上调用的。所有子类应该重写
__iter__(),这将返回一个 在这个DataPipe中的样本迭代器。调用__iter__的IterDataPipe自动调用它的 方法reset(),默认情况下不执行任何操作。当编写自定义IterDataPipe时, 用户应根据需要重写reset()。常见的使用包括重置缓冲区、指针和 自定义IterDataPipe中的各种状态变量。注意
只有one个迭代器在同一时间可以对每个
IterDataPipe有效, 并且创建第二个迭代器将使第一个迭代器失效。这一限制是必要的,因为 某些IterDataPipe具有内部缓冲区,如果存在多个迭代器,其状态可能会变得无效。 下面的代码示例详细说明了这一限制在实际中的表现。 如果您有关于此限制的任何反馈,请参阅GitHub IterDataPipe 单一迭代器问题。这些DataPipes可以通过两种方式调用,使用类构造函数或将其功能形式应用于现有的
IterDataPipe(推荐,适用于大多数但并非所有DataPipes)。 您可以将多个IterDataPipe串联在一起,形成一个管道,该管道将在一系列操作中执行多个操作。注意
当使用子类时,每个 DataPipe中的项目将从
DataLoader迭代器中被传递。当num_workers > 0时,每个工作进程将拥有不同的DataPipe对象的副本,因此通常希望独立配置每个副本以避免从工作进程返回重复的数据。3,在工作进程调用时返回有关工作的信息。它可以在数据集的__iter__()方法或6选项的worker_init_fn位置用于修改每个副本的行为。示例
- General Usage:
>>> # xdoctest: +SKIP >>> from torchdata.datapipes.iter import IterableWrapper, Mapper >>> dp = IterableWrapper(range(10)) >>> map_dp_1 = Mapper(dp, lambda x: x + 1) # Using class constructor >>> map_dp_2 = dp.map(lambda x: x + 1) # Using functional form (recommended) >>> list(map_dp_1) [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> list(map_dp_2) [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> filter_dp = map_dp_1.filter(lambda x: x % 2 == 0) >>> list(filter_dp) [2, 4, 6, 8, 10]
- Single Iterator Constraint Example:
>>> from torchdata.datapipes.iter import IterableWrapper, Mapper >>> source_dp = IterableWrapper(range(10)) >>> it1 = iter(source_dp) >>> list(it1) [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] >>> it1 = iter(source_dp) >>> it2 = iter(source_dp) # The creation of a new iterator invalidates `it1` >>> next(it2) 0 >>> next(it1) # Further usage of `it1` will raise a `RunTimeError`
我们有不同的可迭代数据管道类型:
归档 - 打开并解压缩不同格式的归档文件。
增强 - 增强您的样本(例如添加索引,或无限循环)。
组合 - 执行组合操作(例如采样、洗牌)。
组合/拆分 - 通过组合多个数据管道或拆分一个数据管道为多个来与多个数据管道进行交互。
分组 - 在 DataPipe 内部对样本进行分组
IO - 与文件系统或远程服务器交互(例如下载、打开、保存文件以及列出目录中的文件)。
映射 - 对 DataPipe 中的每个元素应用给定的函数。
其他 - 执行一组杂项操作。
选择 - 在 DataPipe 中选择特定的样本。
解析、读取和转换文本文件和数据。
归档数据管道¶
这些数据管道有助于打开和解压缩不同格式的归档文件。
从包含路径名和 bz2 二进制流的可迭代 DataPipe 中解压缩 bz2 二进制流,并生成一个包含路径名和提取的二进制流的元组(功能名称: |
|
接受路径和数据压缩流的元组,并返回路径和解压流的元组(功能名称: |
|
从包含路径名和RAR二进制流元组的输入可迭代数据管道中解压缩RAR二进制流,并生成一个包含路径名和提取的二进制流的元组(功能名称: |
|
从包含路径名和 tar 二进制流的可迭代 DataPipe 中打开/解压 tar 二进制流,并生成一个包含路径名和提取的二进制流的元组(功能名称: |
|
从包含路径名和tfrecord二进制流元组的可迭代DataPipe中打开/解压缩tfrecord二进制流,并生成存储的记录(功能名称: |
|
Iterable DataPipe 接受 (path, data) 元组流,通常表示 tar 归档文件的路径名和文件(功能名称: |
|
从包含路径名和 xz (lzma) 二进制流的可迭代 DataPipe 中解压缩二进制流,并生成一个包含路径名和提取的二进制流的元组(功能名称: |
|
从包含路径名和zip二进制流的Iterable DataPipe中打开/解压缩zip二进制流,并生成一个包含路径名和提取的二进制流的元组(功能名称: |
增强数据管道¶
这些数据管道有助于增强您的样本。
默认情况下,无限循环指定的输入,或者按照指定的次数循环(功能名称: |
|
通过枚举为现有 DataPipe 添加索引,默认情况下索引从 0 开始(功能名称: |
|
为现有的 Iterable DataPipe 添加索引(功能名称: |
|
重复生成源DataPipe的每个元素指定次数,然后再移动到下一个元素(功能名称: |
组合式数据管道¶
这些数据管道有助于执行组合操作。
从之前的 DataPipe 中打乱每个 mini-batch(功能名称: |
|
使用提供的 |
|
使用缓冲区随机打乱输入 DataPipe(功能名称: |
组合/拆分数据管道¶
这些通常涉及多个DataPipes,将它们组合起来或将一个拆分为多个。
将多个可迭代数据管道(功能名称: |
|
将输入 DataPipe 使用给定的分类函数拆分为多个子 DataPipe(功能名称: |
|
创建多个相同的 Iterable DataPipe 实例(功能名称: |
|
根据匹配的键将两个 IterDataPipe 压缩在一起(功能名称: |
|
将源 IterDataPipe 的项与 MapDataPipe 的项连接起来(功能名称: |
|
逐个从每个输入的可迭代数据管道(函数名称: |
|
逐个从每个输入的可迭代数据管道(函数名称: |
|
将输入 DataPipe 按照轮询顺序拆分为多个子 DataPipe(函数名称: |
|
接受一个 Dict(IterDataPipe, Weight),并根据权重从这些 DataPipes 中采样生成项。 |
|
接收一个序列数据管道,解包每个序列,并根据其在序列中的位置返回元素所在的独立数据管道(功能名称: |
|
聚合每个输入DataPipes中的元素,形成一个元组(函数名: |
|
聚合每个输入DataPipes中的元素,形成一个元组(函数名: |
数据管道分组¶
这些数据管道可以将数据管道中的样本进行分组。
创建数据的小批量(功能名称: |
|
从排序的桶中创建数据的小批量(功能名称: |
|
通过自定义的合并函数(功能名称: |
|
按从 |
|
从一个有限大小的最小堆中创建数据的小批量,并且每个批次中由 |
|
取消数据的批量处理(功能名称: |
IO 数据管道¶
这些数据管道有助于与文件系统或远程服务器交互(例如下载、打开、保存文件以及列出目录中的文件)。
可迭代的数据管道,用于列出来自AIStore后端的文件,并带有给定的URL前缀(功能名称: |
|
Iterable DataPipe,用于从AIStore加载给定URL的文件(功能名称: |
|
列出所提供 |
|
从包含 fsspec 个路径的输入数据管道中打开文件,并返回一个包含路径名和已打开文件流的元组(功能名称: |
|
接收一个包含元数据和数据的DataPipe,将数据保存到目标路径(由filepath_fn和元数据生成),并返回结果路径(功能名称: |
|
给定根目录的路径,生成根目录内文件的文件路径名(路径 + 文件名)。 |
|
给定路径名,打开文件并以元组形式(功能名称: |
|
接受指向Google Drive文件的URL,并返回文件名和IO流的元组(功能名称: |
|
接受文件URL(指向文件的HTTP URL),并生成文件URL和IO流的元组(功能名称: |
|
接受数据集名称并返回一个可迭代的 HuggingFace 数据集。 |
|
列出所提供 |
|
从包含路径名或URL的输入数据管道中打开文件,并返回一个包含路径名和已打开文件流的元组(功能名称: |
|
接收一个包含元数据和数据的DataPipe,将数据保存到由 |
|
接受文件URL(可以是指向文件的HTTP URL,也可以是GDrive文件的URL),并生成文件URL和IO流的元组(功能名称: |
|
接受Parquet文件的路径并返回每个Parquet文件中行组内的一个TorchArrow DataFrame(功能名称: |
|
Iterable DataPipe,用于列出给定前缀的 Amazon S3 文件 URL(功能名称: |
|
Iterable DataPipe 从给定的 S3 URL 加载 Amazon S3 文件(功能名称: |
|
接收一个包含元数据和数据的DataPipe,将数据保存到由 |
映射数据管道¶
这些数据管道对数据管道中的每个元素应用给定的函数。
将源 DataPipe 中的元素组合成批次,并对每个批次内的每个元素并发地应用一个协程函数,然后将输出展平为一个单一、非嵌套的 IterDataPipe(功能名称: |
|
将源 DataPipe 中的元素组合成批次,并对每个批次应用一个函数,然后将输出展平为单个、非嵌套的 IterDataPipe(功能名称: |
|
对源 DataPipe 中的每个项应用一个函数,然后将输出展平为一个单一的、非嵌套的 IterDataPipe(功能名称: |
|
对源 DataPipe 中的每个项应用一个函数(功能名称: |
|
对源 DataPipe 中的每个项应用一个函数,然后将返回的可迭代对象收集到缓冲区中,然后在每次迭代时,从缓冲区中的可迭代对象中随机选择一个,并从该可迭代对象中生成一个项(功能名称: |
|
对源 DataPipe 中的每个项并行应用一个函数,使用 |
其他数据管道¶
一组具有不同功能的数据管道。
取数据的行,将其中的若干行批量组合并创建 TorchArrow 个 DataFrame(功能名称: |
|
指示先前 DataPipe 的结果何时将保存到由 |
|
在分布式进程中同步数据,以防止训练过程中因数据分片不均匀而导致的挂起(功能名称: |
|
计算每个文件的哈希值,并从输入 DataPipe 的文件名和数据/流元组中进行检查(功能名称: |
|
将源 DataPipe 中的元素存储在内存中,如果指定了大小限制,则最多存储到该限制(功能名称: |
|
将一个可迭代对象包装以创建一个 IterDataPipe。 |
|
设置 DataPipe 的 length 属性,该属性由 |
|
将一个 |
|
缓存多个DataPipe操作的输出到本地文件,这些通常是性能瓶颈,例如下载、解压等(功能名称: |
|
从源DataPipe预取一个元素并将其移动到固定内存中(功能名称: |
|
从源 DataPipe 预取元素并将其放入缓冲区(功能名称: |
|
从源 DataPipe 中随机拆分样本以形成组(功能名称: |
|
将传入的分片字符串扩展为分片。 |
|
允许 DataPipe 进行分片的包装器(功能名称: |
|
指示先前 |
选择DataPipes¶
这些数据管道可以帮助您在数据管道中选择特定的样本。
根据输入 |
|
从起始位置开始,从源 DataPipe 中生成元素,直到指定的限制(功能名称: |
|
通过其索引从输入 DataPipe 中删除列/元素(功能名称: |
|
通过 start/stop/step 或索引(功能名称: |
|
返回一个基于所提供索引在每个样本/元素级别上的输入 DataPipe 的扁平化副本(功能名称: |
文本数据管道¶
这些数据管道帮助您解析、读取和转换文本文件和数据。
接受一个由文件名和CSV数据流组成的DataPipe,逐行读取并返回CSV文件中的内容(功能名称: |
|
接受一个由文件名和CSV数据流组成的DataPipe,逐行读取并返回CSV文件中的内容(功能名称: |
|
从JSON数据流中读取并返回一个文件名和JSON数据的元组(功能名称: |
|
接受一个由文件名和字符串数据流组成的 DataPipe,对于流中的每一行,生成一个包含文件名和该行的元组(功能名称: |
|
将来自同一文件的文本行聚合为一个段落(功能名称: |
|
从输入 DataPipe 解码二进制流,返回一个元组(功能名称: |
|
接受一个包含数据批次的DataPipe输入,并逐个批次进行处理,为每个批次生成一个字典,其中 |
|
给定IO流及其标签名称,生成一个元组(函数名称: |