目录

DataLoader2 教程

这是用户创建DataPipe图并通过DataLoader2加载不同后端系统的数据(ReadingService)的教程。使用示例可以在这个Colab笔记本中找到。

DataPipe

请参阅DataPipe 教程以获取更多详细信息。以下是必要的最重要的注意事项: 确保每个 epoch 的数据管道顺序不同且数据分片互斥且集体穷尽:

  • sharding_filtersharding_round_robin_dispatch 作为流程中的早期步骤,以避免在工人/分布式过程中重复昂贵的操作。

  • 在分片之前添加一个shuffle个DataPipe以实现跨分片打乱。ReadingService将处理这些shuffle操作的同步,以确保数据在分片前的顺序相同,从而保证所有分片相互独立且完全覆盖。

这是一个DataPipe图的例子:

datapipe = IterableWrapper(["./train1.csv", "./train2.csv"])
datapipe = datapipe.open_files(encoding="utf-8").parse_csv()
datapipe = datapipe.shuffle().sharding_filter()
datapipe = datapipe.map(fn).batch(8)

多进程

MultiProcessingReadingServicesharding_filter 处处理多进程分片,并同步工作进程之间的种子。

rs = MultiProcessingReadingService(num_workers=4)
dl = DataLoader2(datapipe, reading_service=rs)
for epoch in range(10):
    dl.seed(epoch)
    for d in dl:
        model(d)
dl.shutdown()

分布式

DistributedReadingService 处理分布式分片在 sharding_filter 点,并同步分布式进程之间的种子。为了平衡数据分片在分布式节点之间,一个 fullsync DataPipe 将附加到 DataPipe 图表以对齐分布式排名中的批次数量。这将防止分布式训练中不均匀分片引起的挂起问题。

rs = DistributedReadingService()
dl = DataLoader2(datapipe, reading_service=rs)
for epoch in range(10):
    dl.seed(epoch)
    for d in dl:
        model(d)
dl.shutdown()

Multiprocessing + 分布式

SequentialReadingService 可以用于将 ReadingServices 结合在一起,以实现同时进行多进程和分布式训练。

mp_rs = MultiProcessingReadingService(num_workers=4)
dist_rs = DistributedReadingService()
rs = SequentialReadingService(dist_rs, mp_rs)

dl = DataLoader2(datapipe, reading_service=rs)
for epoch in range(10):
    dl.seed(epoch)
    for d in dl:
        model(d)
dl.shutdown()

文档

访问 PyTorch 的全面开发人员文档

查看文档

教程

获取面向初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并解答您的问题

查看资源