注意力
2024 年 6 月状态更新:删除 DataPipes 和 DataLoader V2
我们将 torchdata 存储库重新调整为torch.utils.data.DataLoader的迭代增强。我们不打算 继续开发或维护 [DataPipes] 和 [DataLoaderV2] 解决方案,它们将从 torchdata 存储库。我们还将重新访问 pytorch/pytorch 中的 DataPipes 引用。在 torchdata==0.8.0(2024 年 7 月)版本中,它们将被标记为已弃用,而在 0.9.0(2024 年 10 月)中,它们将被删除。现存 建议用户固定到 torchdata==0.8.0 或更旧版本,直到他们能够迁移出去。随后的 版本将不包含 DataPipes 或 DataLoaderV2。 如果您有建议或评论,请联系我们(请使用此问题进行反馈)
DataLoader2 教程¶
这是用户创建图形并通过不同的后端系统 () 加载数据的教程。可以在此 colab 笔记本中找到使用示例。DataPipe
DataLoader2
ReadingService
数据管道¶
有关详细信息,请参阅 DataPipe 教程。以下是必要的最重要的注意事项: 要确保数据管道每个 epoch 的顺序不同,并且数据分片是互斥的并且共同详尽:
将 或 尽早 放置在管道中,以避免在 worker/分布式进程中重复昂贵的操作。
sharding_filter
sharding_round_robin_dispatch
在分片之前添加 DataPipe,以实现分片间 shuffing。 将处理这些操作的同步,以确保数据的顺序在分片之前相同,以便所有分片都是互斥的,并且都是详尽的。
shuffle
ReadingService
shuffle
下面是一个图表示例:DataPipe
datapipe = IterableWrapper(["./train1.csv", "./train2.csv"])
datapipe = datapipe.open_files(encoding="utf-8").parse_csv()
datapipe = datapipe.shuffle().sharding_filter()
datapipe = datapipe.map(fn).batch(8)
多处理¶
MultiProcessingReadingService
在 点处理多进程分片,并在工作进程之间同步种子。sharding_filter
rs = MultiProcessingReadingService(num_workers=4)
dl = DataLoader2(datapipe, reading_service=rs)
for epoch in range(10):
dl.seed(epoch)
for d in dl:
model(d)
dl.shutdown()
分散式¶
DistributedReadingService
在 点处理分布式分片,并在分布式进程之间同步种子。而且,为了平衡分布式节点之间的数据分片,将 附加到图表上,以对齐分布式排名之间的批次数。这将防止分布式训练中分片不均匀导致挂起问题。sharding_filter
fullsync
DataPipe
DataPipe
rs = DistributedReadingService()
dl = DataLoader2(datapipe, reading_service=rs)
for epoch in range(10):
dl.seed(epoch)
for d in dl:
model(d)
dl.shutdown()
多处理 + 分布式¶
SequentialReadingService
可以用来将两者结合在一起,同时实现多处理和分布式训练。ReadingServices
mp_rs = MultiProcessingReadingService(num_workers=4)
dist_rs = DistributedReadingService()
rs = SequentialReadingService(dist_rs, mp_rs)
dl = DataLoader2(datapipe, reading_service=rs)
for epoch in range(10):
dl.seed(epoch)
for d in dl:
model(d)
dl.shutdown()