目录

可迭代样式数据管道

一个可迭代式数据集是一个实现了__iter__()协议的子类IterableDataset的实例, 并表示一个数据样本的可迭代对象。这种类型的数据集特别适合于随机读取成本高或甚至不可能的情况, 以及批量大小取决于获取的数据。

例如,当调用iter(iterdatapipe)时,可以返回从数据库、远程服务器或实时生成的日志中读取的数据流。

这是IterableDataset的更新版本,在torch中。

class torchdata.datapipes.iter.IterDataPipe(*args, **kwds)

迭代式数据管道。

所有表示数据样本可迭代性的 DataPipes 都应继承此类。这种风格的 DataPipes 在数据来自流或样本数量过大无法全部放入内存时特别有用。

所有子类应该重写 __iter__(),这将返回一个 在这个 DataPipe 中的样本迭代器。

IterDataPipe 是懒加载的,并且其元素仅在调用其迭代器上的 next() 时计算。

这些DataPipes可以通过两种方式调用,使用类构造函数或将其功能形式应用于现有的IterDataPipe(推荐,适用于大多数但并非所有DataPipes)。 您可以将多个IterDataPipe串联在一起,形成一个管道,该管道将在一系列操作中执行多个操作。

注意

当使用子类时,每个 DataPipe中的项目将从DataLoader迭代器中被传递。当num_workers > 0时,每个工作进程将拥有不同的DataPipe对象的副本,因此通常希望独立配置每个副本以避免从工作进程返回重复的数据。3,在工作进程调用时返回有关工作的信息。它可以在数据集的__iter__()方法或6选项的worker_init_fn位置用于修改每个副本的行为。

示例

>>> from torchdata.datapipes.iter import IterableWrapper, Mapper
>>> dp = IterableWrapper(range(10))
>>> map_dp_1 = Mapper(dp, lambda x: x + 1)  # Using class constructor
>>> map_dp_2 = dp.map(lambda x: x + 1)  # Using functional form (recommended)
>>> list(map_dp_1)
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
>>> list(map_dp_2)
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
>>> filter_dp = map_dp_1.filter(lambda x: x % 2 == 0)
>>> list(filter_dp)
[2, 4, 6, 8, 10]

我们有不同的可迭代数据管道类型:

  1. 归档 - 打开并解压缩不同格式的归档文件。

  2. 增强 - 增强您的样本(例如添加索引,或无限循环)。

  3. 组合 - 执行组合操作(例如采样、洗牌)。

  4. 组合/拆分 - 通过组合多个数据管道或拆分一个数据管道为多个来与多个数据管道进行交互。

  5. 分组 - 在 DataPipe 内部对样本进行分组

  6. IO - 与文件系统或远程服务器交互(例如下载、打开、保存文件以及列出目录中的文件)。

  7. 映射 - 对 DataPipe 中的每个元素应用给定的函数。

  8. 其他 - 执行一组杂项操作。

  9. 选择 - 在 DataPipe 中选择特定的样本。

  10. 解析、读取和转换文本文件和数据。

归档数据管道

这些数据管道有助于打开和解压缩不同格式的归档文件。

Decompressor

接受路径和数据压缩流的元组,并返回路径和解压流的元组(功能名称:decompress)。

RarArchiveLoader

从包含路径名和RAR二进制流元组的输入可迭代数据管道中解压缩RAR二进制流,并生成一个包含路径名和提取的二进制流的元组(功能名称:load_from_rar)。

TarArchiveLoader

从包含路径名和 tar 二进制流的可迭代 DataPipe 中打开/解压 tar 二进制流,并生成一个包含路径名和提取的二进制流的元组(功能名称:load_from_tar)。

XzFileLoader

从包含路径名和 xz (lzma) 二进制流的可迭代 DataPipe 中解压缩二进制流,并生成一个包含路径名和提取的二进制流的元组(功能名称:load_from_xz)。

ZipArchiveLoader

从包含路径名和zip二进制流的Iterable DataPipe中打开/解压缩zip二进制流,并生成一个包含路径名和提取的二进制流的元组(功能名称:load_from_zip)。

增强数据管道

这些数据管道有助于增强您的样本。

Cycler

默认情况下,无限循环指定的输入,或者按照指定的次数循环(功能名称:cycle)。

Enumerator

通过枚举为现有 DataPipe 添加索引,默认情况下索引从 0 开始(功能名称:enumerate)。

IndexAdder

为现有的 Iterable DataPipe 添加索引(功能名称:add_index)。

组合式数据管道

这些数据管道有助于执行组合操作。

Sampler

使用提供的Sampler(默认为SequentialSampler)生成样本元素。

Shuffler

使用缓冲区随机打乱输入 DataPipe(功能名称:shuffle)。

组合/拆分数据管道

这些通常涉及多个DataPipes,将它们组合起来或将一个拆分为多个。

Concater

将多个可迭代数据管道(功能名称:concat)连接起来。

Demultiplexer

将输入 DataPipe 使用给定的分类函数拆分为多个子 DataPipe(功能名称:demux)。

Forker

创建多个相同的 Iterable DataPipe 实例(功能名称:fork)。

IterKeyZipper

根据匹配的键将两个 IterDataPipe 压缩在一起(功能名称:zip_with_iter)。

MapKeyZipper

将源 IterDataPipe 的项与 MapDataPipe 的项连接起来(功能名称:zip_with_map)。

Multiplexer

逐个从每个输入的可迭代数据管道(函数名称:mux)中生成一个元素。

SampleMultiplexer

接受一个 Dict(IterDataPipe, Weight),并根据权重从这些 DataPipes 中采样生成项。

UnZipper

接收一个序列数据管道,解包每个序列,并根据元素在序列中的位置将其分别返回到不同的数据管道中。

Zipper

聚合每个输入DataPipes中的元素,形成一个元组(函数名:zip)。

数据管道分组

这些数据管道可以将数据管道中的样本进行分组。

Batcher

创建数据的小批量(功能名称:batch)。

BucketBatcher

从排序的桶中创建数据的小批量(功能名称:bucketbatch)。

Collator

通过自定义的合并函数(功能名称:collate)将DataPipe中的样本合并为张量。

Grouper

按从 group_key_fn 生成的键对输入 IterDataPipe 的数据进行分组,并在定义的情况下,以最多 group_size 的批次大小生成一个 DataChunk(功能名称:groupby)。

UnBatcher

取消数据的批量处理(功能名称:unbatch)。

IO 数据管道

这些数据管道有助于与文件系统或远程服务器交互(例如下载、打开、保存文件以及列出目录中的文件)。

FSSpecFileLister

列出所提供 root 路径名或 URL 的目录内容,并为目录中的每个文件生成完整路径名或 URL。

FSSpecFileOpener

从包含 fsspec 个路径的输入数据管道中打开文件,并返回一个包含路径名和已打开文件流的元组(功能名称:open_file_by_fsspec)。

FSSpecSaver

接收一个包含元数据和数据的DataPipe,将数据保存到目标路径(由filepath_fn和元数据生成),并返回结果路径(功能名称:save_by_fsspec)。

FileLister

给定根目录的路径,生成根目录内文件的文件路径名(路径 + 文件名)。

FileOpener

给定路径名,打开文件并以元组形式返回路径名和文件流。

GDriveReader

接收指向 Google Drive 文件的 URL,并返回文件名和 IO 流的元组。

HttpReader

接受文件URL(指向文件的HTTP URL),并生成文件URL和IO流的元组。

IoPathFileLister

列出所提供 root 路径名或 URL 的目录内容,并为目录中的每个文件生成完整路径名或 URL。

IoPathFileOpener

从包含路径名或URL的输入数据管道中打开文件,并返回一个包含路径名和已打开文件流的元组(功能名称:open_file_by_iopath)。

IoPathSaver

接收一个包含元数据和数据的DataPipe,将数据保存到由filepath_fn和元数据生成的目标路径,并以iopath格式返回结果路径(功能名称:save_by_iopath)。

OnlineReader

接受文件URL(可以是指向文件的HTTP URL,也可以是Google Drive文件的URL),并生成文件URL和IO流的元组。

ParquetDataFrameLoader

接受Parquet文件的路径并返回每个Parquet文件中行组内的一个TorchArrow DataFrame(功能名称:load_parquet_as_df)。

Saver

接收一个包含元数据和数据的DataPipe,将数据保存到由filepath_fn生成的目标路径,并返回本地文件系统上的文件路径(功能名称:save_to_disk)。

映射数据管道

这些数据管道对数据管道中的每个元素应用给定的函数。

FlatMapper

对源 DataPipe 中的每个项应用一个函数,然后将输出展平为一个单一的、非嵌套的 IterDataPipe(功能名称:flatmap)。

Mapper

对源 DataPipe 中的每个项应用一个函数(功能名称:map)。

其他数据管道

一组具有不同功能的数据管道。

DataFrameMaker

取数据的行,将其中的若干行批量组合并创建 TorchArrow 个 DataFrame(功能名称:dataframe)。

EndOnDiskCacheHolder

指示先前 DataPipe 的结果何时将保存到由 filepath_fn 指定的本地文件(功能名称:end_caching)。

HashChecker

计算每个文件的哈希值,并从输入 DataPipe 的文件名和数据/流元组中进行检查(功能名称:check_hash)。

InMemoryCacheHolder

将源 DataPipe 中的元素存储在内存中,如果指定了大小限制,则最多存储到该限制(功能名称:in_memory_cache)。

IterableWrapper

将一个可迭代对象包装以创建一个 IterDataPipe。

OnDiskCacheHolder

缓存多个DataPipe操作的输出到本地文件,这些通常是性能瓶颈,例如下载、解压等(功能名称:on_disk_cache)。

ShardingFilter

允许 DataPipe 进行分片的包装器(功能名称:sharding_filter)。

选择DataPipes

这些数据管道可以帮助您在数据管道中选择特定的样本。

Filter

根据输入 filter_fn(功能名称:filter)从源数据管道中过滤元素。

Header

从起始位置开始,从源 DataPipe 中生成元素,直到指定的限制(功能名称:header)。

文本数据管道

这些数据管道帮助您解析、读取和转换文本文件和数据。

CSVDictParser

接受一个由文件名和CSV数据流组成的DataPipe,逐行读取并返回CSV文件中的内容(功能名称:parse_csv_as_dict)。

CSVParser

接受一个由文件名和CSV数据流组成的DataPipe,逐行读取并返回CSV文件中的内容(功能名称:parse_csv)。

JsonParser

从JSON数据流中读取并返回一个文件名和JSON数据的元组(功能名称:parse_json_files)。

LineReader

接受一个由文件名和字符串数据流组成的 DataPipe,对于流中的每一行,生成一个包含文件名和该行的元组(功能名称:readlines)。

ParagraphAggregator

将来自同一文件的文本行聚合为一个段落(功能名称:lines_to_paragraphs)。

RoutedDecoder

从输入 DataPipe 解码二进制流,返回一个元组(功能名称:routed_decode),其中包含路径名和解码后的数据。

Rows2Columnar

接受一个包含数据批次的DataPipe输入,并逐个批次进行处理,为每个批次生成一个字典,其中column_names作为键,对应行中的值列表作为值(功能名称:rows2columnar)。

StreamReader

给定输入输出流及其标签名称,生成包含标签名称的字节元组。

文档

访问 PyTorch 的全面开发人员文档

查看文档

教程

获取面向初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并解答您的问题

查看资源