目录

注意力

2024 年 6 月状态更新:删除 DataPipes 和 DataLoader V2

我们将 torchdata 存储库重新调整为torch.utils.data.DataLoader的迭代增强。我们不打算 继续开发或维护 [DataPipes] 和 [DataLoaderV2] 解决方案,它们将从 torchdata 存储库。我们还将重新访问 pytorch/pytorch 中的 DataPipes 引用。在 torchdata==0.8.0(2024 年 7 月)版本中,它们将被标记为已弃用,而在 0.9.0(2024 年 10 月)中,它们将被删除。现存 建议用户固定到 torchdata==0.8.0 或更旧版本,直到他们能够迁移出去。随后的 版本将不包含 DataPipes 或 DataLoaderV2。 如果您有建议或评论,请联系我们(请使用此问题进行反馈)

阅读服务

ReadingService根据不同的使用案例处理图形的就地修改。DataPipe

特征

动态分片

动态分片是通过 and 根据对应的多进程和分布式 worker 的信息对流水线进行分片来实现的。而且, TorchData 提供了两种类型的用户,让用户可以定义管道中的分片位置。MultiProcessingReadingServiceDistributedReadingServiceDataPipe

  • sharding_filter ():当管道可复制时,每个分布式/多进程工作程序都会从自己的图形副本加载数据,同时跳过不属于放置点的相应工作程序的样本。DataPipesharding_filter

  • sharding_round_robin_dispatch ():当管道中有任何分支时,该分支(即 之前的所有 DataPipes )将被视为不可复制的分支(在多处理的上下文中)。将创建一个 dispatching 进程,用于从不可复制的分支加载数据并将数据分发到后续的工作进程。sharding_round_robin_dispatchDataPipesharding_round_robin_dispatch

以下是管道中具有两种分片策略的示例。

digraph 示例 {
 子图 cluster_replicable {
 label=“可复制”
 a -> b -> c -> d -> l;
 颜色=蓝色;
 }

子图 cluster_non_replicable {
 style=filled;
 颜色=浅灰色;
 节点 [style=filled,color=white];
 label=“不可复制”
 e -> f -> g -> k;
 h -> i -> j -> k;
 }

k -> l -> fullsync -> 结束;

a [label=“DP1”];
 b [label=“shuffle”];
 c [标签=“sharding_filter”, color=blue];
 d [label=“DP4”];
 e [label=“DP2”];
 f [label=“shuffle”];
 g [标签=“sharding_round_robin_dispatch”, style=“填充,圆整”, 颜色=红色, 填充颜色=白色];
 h [label=“DP3”];
 i [label=“shuffle”];
 j [label=“sharding_round_robin_dispatch”, style=“filled,rounded”, color=red, fillcolor=white];
 k [label=“DP5 (最低共同祖先)”];
 l [label=“DP6”];
 完全同步;
 end [形状=框];
}

当 multiprocessing 发生时,图形变为:

digraph 示例 {
 子图 cluster_worker_0 {
 label=“工人 0”
 a0 -> b0 -> c0 -> d0 -> l0;
 m0 -> l0;
 颜色=蓝色;
 }

子图 cluster_worker_1 {
 label=“工作线程 1”
 a1 -> b1 -> c1 -> d1 -> l1;
 m1 -> l1;
 颜色=蓝色;
 }

子图 cluster_non_replicable {
 style=filled;
 颜色=浅灰色;
 节点 [style=filled,color=white];
 label=“不可复制”
 e -> f -> g -> k;
 h -> i -> j -> k;
 k -> round_robin_demux;
 }

round_robin_demux -> m0;
 round_robin_demux -> m1;
 l0 -> n;
 l1 -> n;
 n -> fullsync -> 结束;

a0 [label=“DP1”];
 b0 [label=“shuffle”];
 c0 [label=“sharding_filter”, color=blue];
 d0 [label=“DP4”];
 a1 [label=“DP1”];
 b1 [label=“shuffle”];
 c1 [label=“sharding_filter”, color=blue];
 d1 [label=“DP4”];
 e [label=“DP2”];
 f [label=“shuffle”];
 g [标签=“sharding_round_robin_dispatch”, style=“填充,圆整”, 颜色=红色, 填充颜色=白色];
 h [label=“DP3”];
 i [label=“shuffle”];
 j [label=“sharding_round_robin_dispatch”, style=“filled,rounded”, color=red, fillcolor=white];
 k [label=“DP5 (最低共同祖先)”];
 完全同步;
 l0 [label=“DP6”];
 l1 [label=“DP6”];
 m0 [label=“客户”]
 m1 [label=“客户”]
 n [label=“客户”]
 end [形状=框];
}

Client图中是 A,它发送请求并从 Multiprocessing 队列接收响应。DataPipe

决定论

在 中,a 成为随机性的单一来源,每个来源都可以通过 a 访问它并为随机运算生成相应的随机种子。DataLoader2SeedGeneratorReadingServiceinitialize_iteration()DataPipe

为了确保 Dataset 分片在多处理进程和分布式节点上是互斥的和共同详尽的,并且有助于同步 or 之前的任何随机操作的随机状态。对于分片后的其余操作,每个操作都会根据分布式 rank 和 worker process id 生成唯一的随机状态,以便执行不同的随机转换。MultiProcessingReadingServiceDistributedReadingServiceDataPipesharding_filtersharding_round_robin_dispatchDataPipeReadingService

图形模式

这也允许数据预处理管道更轻松地从研究过渡到生产。在创建图形并使用 验证图形后,配置并连接到生产服务/基础设施的不同工具,例如可以作为直接替换提供。它可能会搜索图形,并找到可以委托给生产服务/基础设施的操作,然后相应地修改图形以实现更高性能的执行。DataPipeReadingServicesReadingServiceAIStoreReadingServiceDataPipe

扩展 ReadingService

以下是自定义的接口。ReadingService

torchdata.dataloader2.ReadingServiceInterface

的接口。请基于此接口类扩展自定义。ReadingServiceReadingService

ReadingService 在被调用之前必须是可挑选的。这是因为它的副本将是 created by 以避免同一 ReadingService 对象被 multiple ,并且其内部状态将可由它们中的每一个修改。initializeDataLoader2DataLoader2

由于此约束,某些初始化步骤可能需要在方法中进行,而不是在 ReadingService 类中进行。initialize__init__

finalize()

ReadingService清理内部状态并完全关闭服务。 在 的 和 中调用 。DataLoader2shutdown__del__

finalize_iteration

ReadingService在 epoch 完成后结束服务。调用时间 的迭代器已耗尽。DataLoader2

抽象 initializedatapipe Union[IterDataPipe MapDataPipe] Union[IterDataPipe] MapDataPipe]

ReadingService获取一个图形,根据自定义需求将其调整为新图形。 首次创建 iterator 时调用一次。在调用该方法之前, 对象必须是可腌制的。DataPipeDataPipeDataLoader2ReadingService

参数

datapipe – 原始图形。DataPipe

结果

改编的图表或新图表。DataPipe

initialize_iterationseed_generator SeedGeneratoriter_reset_fn: 可选[Callable[[Union[IterDataPipe MapDataPipe]], Union[IterDataPipe MapDataPipe]]] = 可选[Callable[[Union[IterDataPipe MapDataPipe]], Union[IterDataPipe MapDataPipe]]]

ReadingService为一个 epoch 启动 service。开始时被调用 每次获取 iterator 的DataLoader2

参数
  • seed_generator – 由 DataLoader2 创建和管理的 SeedGenerator 对象。作为单个 source of randomness 的 source 的 Randomness 的 intent 函数,它将控制所有随机运算的确定性 与 DataPipes 的图形。

  • iter_reset_fn – 当链接多个时,可选的重置功能ReadingServcieSequentialReadingServiceReadingServices

结果

subseqeuent 将使用的 Newiter_reset_fnReadingService

MultiProcessingReadingService 开始为每个进程设置 worker 种子并预取 图表中的项。

检查点/快照功能正在开发中。下面是初步界面(可能会有小的更改):

torchdata.dataloader2.CheckpointableReadingServiceInterface

使用两种附加方法进行扩展,以保存/恢复数据处理图的状态。ReadingServiceInterface

抽象 checkpoint 字节

ReadingService序列化内部状态。已调用 。DataLoader2.state_dict

抽象还原datapipe Union[IterDataPipe MapDataPipe]serialized_state: bytes Union[IterDataPipe MapDataPipe]

ReadingService根据序列化状态调整 Graph。 首次创建 iterator 时调用一次。 的对应项 ,它从头开始调整 graph。DataPipeDataLoader2initializeDataPipe

参数
  • DataPipe – 改编者DataPipeReadingService

  • serialized_state – 用于恢复状态的内部状态的序列化状态 适应的图形。DataPipe

结果

改编自 serialized 状态。DataPipe

图形函数

并且,中提供了图形工具函数来帮助用户对自定义进行图形重写:torchdata.dataloader.graphDataPipeReadingService

traverse_dps

遍历 DataPipes 及其属性以提取 DataPipe 图形。

find_dps

给定 function 生成的 DataPipe 图,返回提供的 DataPipe 类型的 DataPipe 实例。traverse_dps

list_dps

给定函数生成的 DataPipe 图,返回所有 DataPipe 实例的列表,不重复。traverse_dps

remove_dp

给定 function 生成的 DataPipe 图和需要移除的 DataPipe,返回新的 DataPipe 图。traverse_dps

replace_dp

给定 function 生成的 DataPipe 图和需要替换的 DataPipe 以及新的 DataPipe,返回新的 DataPipe 图。traverse_dps

文档

访问 PyTorch 的全面开发人员文档

查看文档

教程

获取面向初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并解答您的问题

查看资源