注意力
2024 年 6 月状态更新:删除 DataPipes 和 DataLoader V2
我们将 torchdata 存储库重新调整为torch.utils.data.DataLoader的迭代增强。我们不打算 继续开发或维护 [DataPipes] 和 [DataLoaderV2] 解决方案,它们将从 torchdata 存储库。我们还将重新访问 pytorch/pytorch 中的 DataPipes 引用。在 torchdata==0.8.0(2024 年 7 月)版本中,它们将被标记为已弃用,而在 0.9.0(2024 年 10 月)中,它们将被删除。现存 建议用户固定到 torchdata==0.8.0 或更旧版本,直到他们能够迁移出去。随后的 版本将不包含 DataPipes 或 DataLoaderV2。 如果您有建议或评论,请联系我们(请使用此问题进行反馈)
阅读服务¶
ReadingService
根据不同的使用案例处理图形的就地修改。DataPipe
特征¶
动态分片¶
动态分片是通过 and 根据对应的多进程和分布式 worker 的信息对流水线进行分片来实现的。而且, TorchData 提供了两种类型的用户,让用户可以定义管道中的分片位置。MultiProcessingReadingService
DistributedReadingService
DataPipe
sharding_filter
():当管道可复制时,每个分布式/多进程工作程序都会从自己的图形副本加载数据,同时跳过不属于放置点的相应工作程序的样本。
DataPipe
sharding_filter
sharding_round_robin_dispatch
():当管道中有任何分支时,该分支(即 之前的所有 DataPipes )将被视为不可复制的分支(在多处理的上下文中)。将创建一个 dispatching 进程,用于从不可复制的分支加载数据并将数据分发到后续的工作进程。
sharding_round_robin_dispatch
DataPipe
sharding_round_robin_dispatch
以下是管道中具有两种分片策略的示例。
当 multiprocessing 发生时,图形变为:
Client
图中是 A,它发送请求并从 Multiprocessing 队列接收响应。DataPipe
决定论¶
在 中,a 成为随机性的单一来源,每个来源都可以通过 a 访问它并为随机运算生成相应的随机种子。DataLoader2
SeedGenerator
ReadingService
initialize_iteration()
DataPipe
为了确保 Dataset 分片在多处理进程和分布式节点上是互斥的和共同详尽的,并且有助于同步 or 之前的任何随机操作的随机状态。对于分片后的其余操作,每个操作都会根据分布式 rank 和 worker process id 生成唯一的随机状态,以便执行不同的随机转换。
MultiProcessingReadingService
DistributedReadingService
DataPipe
sharding_filter
sharding_round_robin_dispatch
DataPipe
ReadingService
图形模式¶
这也允许数据预处理管道更轻松地从研究过渡到生产。在创建图形并使用 验证图形后,配置并连接到生产服务/基础设施的不同工具,例如可以作为直接替换提供。它可能会搜索图形,并找到可以委托给生产服务/基础设施的操作,然后相应地修改图形以实现更高性能的执行。
DataPipe
ReadingServices
ReadingService
AIStore
ReadingService
DataPipe
扩展 ReadingService¶
以下是自定义的接口。ReadingService
- 类 torchdata.dataloader2.ReadingServiceInterface¶
的接口。请基于此接口类扩展自定义。
ReadingService
ReadingService
ReadingService 在被调用之前必须是可挑选的。这是因为它的副本将是 created by 以避免同一 ReadingService 对象被 multiple ,并且其内部状态将可由它们中的每一个修改。
initialize
DataLoader2
DataLoader2
由于此约束,某些初始化步骤可能需要在方法中进行,而不是在 ReadingService 类中进行。
initialize
__init__
- finalize() 无 ¶
ReadingService
清理内部状态并完全关闭服务。 在 的 和 中调用 。DataLoader2
shutdown
__del__
- finalize_iteration() 无 ¶
ReadingService
在 epoch 完成后结束服务。调用时间 的迭代器已耗尽。DataLoader2
- 抽象 initialize(datapipe: Union[IterDataPipe, MapDataPipe]) Union[IterDataPipe]、 MapDataPipe] ¶
ReadingService
获取一个图形,根据自定义需求将其调整为新图形。 首次创建 iterator 时调用一次。在调用该方法之前, 对象必须是可腌制的。DataPipe
DataPipe
DataLoader2
ReadingService
- 参数
datapipe – 原始图形。
DataPipe
- 结果
改编的图表或新图表。
DataPipe
- initialize_iteration(seed_generator: SeedGenerator, iter_reset_fn: 可选[Callable[[Union[IterDataPipe, MapDataPipe]], Union[IterDataPipe, MapDataPipe]]] = 无) 可选[Callable[[Union[IterDataPipe, MapDataPipe]], Union[IterDataPipe, MapDataPipe]]] ¶
ReadingService
为一个 epoch 启动 service。开始时被调用 每次获取 iterator 的DataLoader2
- 参数
seed_generator – 由 DataLoader2 创建和管理的 SeedGenerator 对象。作为单个 source of randomness 的 source 的 Randomness 的 intent 函数,它将控制所有随机运算的确定性 与 DataPipes 的图形。
iter_reset_fn – 当链接多个时,可选的重置功能
ReadingServcie
SequentialReadingService
ReadingServices
- 结果
subseqeuent 将使用的 New
iter_reset_fn
ReadingService
例
MultiProcessingReadingService 开始为每个进程设置 worker 种子并预取 图表中的项。
检查点/快照功能正在开发中。下面是初步界面(可能会有小的更改):
- 类 torchdata.dataloader2.CheckpointableReadingServiceInterface¶
使用两种附加方法进行扩展,以保存/恢复数据处理图的状态。
ReadingServiceInterface
- 抽象 checkpoint()字节 ¶
ReadingService
序列化内部状态。已调用 。DataLoader2.state_dict
- 抽象还原(datapipe: Union[IterDataPipe, MapDataPipe], serialized_state: bytes) Union[IterDataPipe, MapDataPipe] ¶
ReadingService
根据序列化状态调整 Graph。 首次创建 iterator 时调用一次。 的对应项 ,它从头开始调整 graph。DataPipe
DataLoader2
initialize
DataPipe
- 参数
DataPipe – 改编者
DataPipe
ReadingService
serialized_state – 用于恢复状态的内部状态的序列化状态 适应的图形。
DataPipe
- 结果
改编自 serialized 状态。
DataPipe
图形函数¶
并且,中提供了图形工具函数来帮助用户对自定义进行图形重写:torchdata.dataloader.graph
DataPipe
ReadingService
遍历 DataPipes 及其属性以提取 DataPipe 图形。 |
|
给定 function 生成的 DataPipe 图,返回提供的 DataPipe 类型的 DataPipe 实例。 |
|
给定函数生成的 DataPipe 图,返回所有 DataPipe 实例的列表,不重复。 |
|
给定 function 生成的 DataPipe 图和需要移除的 DataPipe,返回新的 DataPipe 图。 |
|
给定 function 生成的 DataPipe 图和需要替换的 DataPipe 以及新的 DataPipe,返回新的 DataPipe 图。 |