DataLoader2 教程¶
这是用户创建DataPipe图并通过DataLoader2加载不同后端系统的数据(ReadingService)的教程。使用示例可以在这个Colab笔记本中找到。
DataPipe¶
请参阅DataPipe 教程以获取更多详细信息。以下是必要的最重要的注意事项: 确保每个 epoch 的数据管道顺序不同且数据分片互斥且集体穷尽:
将
sharding_filter或sharding_round_robin_dispatch作为流程中的早期步骤,以避免在工人/分布式过程中重复昂贵的操作。在分片之前添加一个
shuffle个DataPipe以实现跨分片打乱。ReadingService将处理这些shuffle操作的同步,以确保数据在分片前的顺序相同,从而保证所有分片相互独立且完全覆盖。
这是一个DataPipe图的例子:
datapipe = IterableWrapper(["./train1.csv", "./train2.csv"])
datapipe = datapipe.open_files(encoding="utf-8").parse_csv()
datapipe = datapipe.shuffle().sharding_filter()
datapipe = datapipe.map(fn).batch(8)
多进程¶
MultiProcessingReadingService 在 sharding_filter 处处理多进程分片,并同步工作进程之间的种子。
rs = MultiProcessingReadingService(num_workers=4)
dl = DataLoader2(datapipe, reading_service=rs)
for epoch in range(10):
dl.seed(epoch)
for d in dl:
model(d)
dl.shutdown()
分布式¶
DistributedReadingService 处理分布式分片在 sharding_filter 点,并同步分布式进程之间的种子。为了平衡数据分片在分布式节点之间,一个 fullsync DataPipe 将附加到 DataPipe 图表以对齐分布式排名中的批次数量。这将防止分布式训练中不均匀分片引起的挂起问题。
rs = DistributedReadingService()
dl = DataLoader2(datapipe, reading_service=rs)
for epoch in range(10):
dl.seed(epoch)
for d in dl:
model(d)
dl.shutdown()
Multiprocessing + 分布式¶
SequentialReadingService 可以用于将 ReadingServices 结合在一起,以实现同时进行多进程和分布式训练。
mp_rs = MultiProcessingReadingService(num_workers=4)
dist_rs = DistributedReadingService()
rs = SequentialReadingService(dist_rs, mp_rs)
dl = DataLoader2(datapipe, reading_service=rs)
for epoch in range(10):
dl.seed(epoch)
for d in dl:
model(d)
dl.shutdown()