注意力
2024 年 6 月状态更新:删除 DataPipes 和 DataLoader V2
我们将 torchdata 存储库重新调整为torch.utils.data.DataLoader的迭代增强。我们不打算 继续开发或维护 [DataPipes] 和 [DataLoaderV2] 解决方案,它们将从 torchdata 存储库。我们还将重新访问 pytorch/pytorch 中的 DataPipes 引用。在 torchdata==0.8.0(2024 年 7 月)版本中,它们将被标记为已弃用,而在 0.9.0(2024 年 10 月)中,它们将被删除。现存 建议用户固定到 torchdata==0.8.0 或更旧版本,直到他们能够迁移出去。随后的 版本将不包含 DataPipes 或 DataLoaderV2。 如果您有建议或评论,请联系我们(请使用此问题进行反馈)
阅读服务¶
ReadingService
根据不同的使用案例处理图形的就地修改。DataPipe
特征¶
动态分片¶
动态分片是通过 and 根据对应的多进程和分布式 worker 的信息对流水线进行分片来实现的。而且, TorchData 提供了两种类型的用户,让用户可以定义管道中的分片位置。MultiProcessingReadingService
DistributedReadingService
DataPipe
sharding_filter
(ShardingFilter
):当管道可复制时,每个分布式/多进程工作程序都会从自己的图形副本加载数据,同时跳过不属于放置点的相应工作程序的样本。DataPipe
sharding_filter
sharding_round_robin_dispatch
(ShardingRoundRobinDispatcher
):当管道中有任何分支时,该分支(即 之前的所有 DataPipes )将被视为不可复制的分支(在多处理的上下文中)。将创建一个 dispatching 进程,用于从不可复制的分支加载数据并将数据分发到后续的工作进程。sharding_round_robin_dispatch
DataPipe
sharding_round_robin_dispatch
以下是管道中具有两种分片策略的示例。
![digraph 示例 {
子图 cluster_replicable {
label=“可复制”
a -> b -> c -> d -> l;
颜色=蓝色;
}
子图 cluster_non_replicable {
style=filled;
颜色=浅灰色;
节点 [style=filled,color=white];
label=“不可复制”
e -> f -> g -> k;
h -> i -> j -> k;
}
k -> l -> fullsync -> 结束;
a [label=“DP1”];
b [label=“shuffle”];
c [标签=“sharding_filter”, color=blue];
d [label=“DP4”];
e [label=“DP2”];
f [label=“shuffle”];
g [标签=“sharding_round_robin_dispatch”, style=“填充,圆整”, 颜色=红色, 填充颜色=白色];
h [label=“DP3”];
i [label=“shuffle”];
j [label=“sharding_round_robin_dispatch”, style=“filled,rounded”, color=red, fillcolor=white];
k [label=“DP5 (最低共同祖先)”];
l [label=“DP6”];
完全同步;
end [形状=框];
}](https://pytorch.org/data/0.9/_images/graphviz-a4b77ae8d32185927d8707cd0b25fc9226103ca2.png)
当 multiprocessing 发生时,图形变为:
![digraph 示例 {
子图 cluster_worker_0 {
label=“工人 0”
a0 -> b0 -> c0 -> d0 -> l0;
m0 -> l0;
颜色=蓝色;
}
子图 cluster_worker_1 {
label=“工作线程 1”
a1 -> b1 -> c1 -> d1 -> l1;
m1 -> l1;
颜色=蓝色;
}
子图 cluster_non_replicable {
style=filled;
颜色=浅灰色;
节点 [style=filled,color=white];
label=“不可复制”
e -> f -> g -> k;
h -> i -> j -> k;
k -> round_robin_demux;
}
round_robin_demux -> m0;
round_robin_demux -> m1;
l0 -> n;
l1 -> n;
n -> fullsync -> 结束;
a0 [label=“DP1”];
b0 [label=“shuffle”];
c0 [label=“sharding_filter”, color=blue];
d0 [label=“DP4”];
a1 [label=“DP1”];
b1 [label=“shuffle”];
c1 [label=“sharding_filter”, color=blue];
d1 [label=“DP4”];
e [label=“DP2”];
f [label=“shuffle”];
g [标签=“sharding_round_robin_dispatch”, style=“填充,圆整”, 颜色=红色, 填充颜色=白色];
h [label=“DP3”];
i [label=“shuffle”];
j [label=“sharding_round_robin_dispatch”, style=“filled,rounded”, color=red, fillcolor=white];
k [label=“DP5 (最低共同祖先)”];
完全同步;
l0 [label=“DP6”];
l1 [label=“DP6”];
m0 [label=“客户”]
m1 [label=“客户”]
n [label=“客户”]
end [形状=框];
}](https://pytorch.org/data/0.9/_images/graphviz-8581116405d37f067d4dfa1c6bb711728c59d81e.png)
Client
图中是 A,它发送请求并从 Multiprocessing 队列接收响应。DataPipe
决定论¶
在 中,a 成为随机性的单一来源,每个来源都可以通过 a 访问它并为随机运算生成相应的随机种子。DataLoader2
SeedGenerator
ReadingService
initialize_iteration()
DataPipe
为了确保 Dataset 分片在多处理进程和分布式节点上是互斥的并且集体详尽,这将有助于MultiProcessingReadingService
DistributedReadingService
DataLoader2
以同步 或 之前的任何随机运算的随机状态。对于分片后的其余作,每个作都会根据分布式 rank 和 worker process id 生成唯一的随机状态,以便执行不同的随机转换。DataPipe
sharding_filter
sharding_round_robin_dispatch
DataPipe
ReadingService
图形模式¶
这也允许数据预处理管道更轻松地从研究过渡到生产。创建图形并使用 验证图形后,配置并连接到生产服务/基础设施的不同方法,例如可以提供给DataPipe
ReadingServices
ReadingService
AIStore
DataLoader2
作为直接替代品。它可能会搜索图形,并找到可以委托给生产服务/基础设施的作,然后相应地修改图形以实现更高性能的执行。ReadingService
DataPipe
扩展 ReadingService¶
以下是自定义的接口。ReadingService
- 类 torchdata.dataloader2.ReadingServiceInterface¶
的接口。请基于此接口类扩展自定义。
ReadingService
ReadingService
ReadingService 在被调用之前必须是可挑选的。这是因为它的副本将是 created by 以避免同一 ReadingService 对象被 multiple ,并且其内部状态将可由它们中的每一个修改。
initialize
DataLoader2
DataLoader2
由于此约束,某些初始化步骤可能需要在方法中进行,而不是在 ReadingService 类中进行。
initialize
__init__
- finalize() 无 ¶
ReadingService
清理内部状态并完全关闭服务。 在 的 和 中调用 。DataLoader2
shutdown
__del__
- finalize_iteration() 无 ¶
ReadingService
在 epoch 完成后结束服务。调用时间 的迭代器已耗尽。DataLoader2
- 抽象 initialize(datapipe: Union[IterDataPipe, MapDataPipe]) Union[IterDataPipe]、 MapDataPipe] ¶
ReadingService
获取一个图形,根据自定义需求将其调整为新图形。 首次创建 iterator 时调用一次。在调用该方法之前, 对象必须是可腌制的。DataPipe
DataPipe
DataLoader2
ReadingService
- 参数
datapipe – 原始图形。
DataPipe
- 结果
改编的图表或新图表。
DataPipe
- initialize_iteration(seed_generator: SeedGenerator, iter_reset_fn: 可选[Callable[[Union[IterDataPipe, MapDataPipe]], Union[IterDataPipe, MapDataPipe]]] = 无) 可选[Callable[[Union[IterDataPipe, MapDataPipe]], Union[IterDataPipe, MapDataPipe]]] ¶
ReadingService
为一个 epoch 启动 service。开始时被调用 每次获取 iterator 的DataLoader2
- 参数
seed_generator – 由 DataLoader2 创建和管理的 SeedGenerator 对象。作为单个 source of randomness 的 source 的 Randomness 的 intent 函数,它将控制所有随机运算的确定性 与 DataPipes 的图形。
iter_reset_fn – 当链接多个时,可选的重置功能
ReadingServcie
SequentialReadingService
ReadingServices
- 结果
subseqeuent 将使用的 New
iter_reset_fn
ReadingService
例
MultiProcessingReadingService 开始为每个进程设置 worker 种子并预取 图表中的项。
检查点/快照功能正在开发中。下面是初步界面(可能会有小的更改):
- 类 torchdata.dataloader2.CheckpointableReadingServiceInterface¶
使用两种附加方法进行扩展,以保存/恢复数据处理图的状态。
ReadingServiceInterface
- 抽象 checkpoint()字节 ¶
ReadingService
序列化内部状态。已调用 。DataLoader2.state_dict
- 抽象还原(datapipe: Union[IterDataPipe, MapDataPipe], serialized_state: bytes) Union[IterDataPipe, MapDataPipe] ¶
ReadingService
根据序列化状态调整 Graph。 首次创建 iterator 时调用一次。 的对应项 ,它从头开始调整 graph。DataPipe
DataLoader2
initialize
DataPipe
- 参数
DataPipe – 改编者
DataPipe
ReadingService
serialized_state – 用于恢复状态的内部状态的序列化状态 适应的图形。
DataPipe
- 结果
改编自 serialized 状态。
DataPipe
图形函数¶
并且,中提供了图形工具函数来帮助用户对自定义进行图形重写:torchdata.dataloader.graph
DataPipe
ReadingService
遍历 DataPipes 及其属性以提取 DataPipe 图形。 |
|
给定 function 生成的 DataPipe 图,返回提供的 DataPipe 类型的 DataPipe 实例。 |
|
给定函数生成的 DataPipe 图,返回所有 DataPipe 实例的列表,不重复。 |
|
给定 function 生成的 DataPipe 图和需要移除的 DataPipe,返回新的 DataPipe 图。 |
|
给定 function 生成的 DataPipe 图和需要替换的 DataPipe 以及新的 DataPipe,返回新的 DataPipe 图。 |