DataLoader2 教程¶
这是用户创建图形并通过不同的后端系统 () 加载数据的教程。可以在此 colab 笔记本中找到使用示例。DataPipe
DataLoader2
ReadingService
数据管道¶
有关详细信息,请参阅 DataPipe 教程。以下是必要的最重要的注意事项: 要确保数据管道每个 epoch 的顺序不同,并且数据分片是互斥的并且共同详尽:
将 或 尽早 放置在管道中,以避免在 worker/分布式进程中重复昂贵的操作。
sharding_filter
sharding_round_robin_dispatch
在分片之前添加 DataPipe,以实现分片间 shuffing。 将处理这些操作的同步,以确保数据的顺序在分片之前相同,以便所有分片都是互斥的,并且都是详尽的。
shuffle
ReadingService
shuffle
下面是一个图表示例:DataPipe
datapipe = IterableWrapper(["./train1.csv", "./train2.csv"])
datapipe = datapipe.open_files(encoding="utf-8").parse_csv()
datapipe = datapipe.shuffle().sharding_filter()
datapipe = datapipe.map(fn).batch(8)
多处理¶
MultiProcessingReadingService
在 点处理多进程分片,并在工作进程之间同步种子。sharding_filter
rs = MultiProcessingReadingService(num_workers=4)
dl = DataLoader2(datapipe, reading_service=rs)
for epoch in range(10):
dl.seed(epoch)
for d in dl:
model(d)
dl.shutdown()
分散式¶
DistributedReadingService
在 点处理分布式分片,并在分布式进程之间同步种子。而且,为了平衡分布式节点之间的数据分片,将 附加到图表上,以对齐分布式排名之间的批次数。这将防止分布式训练中分片不均匀导致挂起问题。sharding_filter
fullsync
DataPipe
DataPipe
rs = DistributedReadingService()
dl = DataLoader2(datapipe, reading_service=rs)
for epoch in range(10):
dl.seed(epoch)
for d in dl:
model(d)
dl.shutdown()
多处理 + 分布式¶
SequentialReadingService
可以用来将两者结合在一起,同时实现多处理和分布式训练。ReadingServices
mp_rs = MultiProcessingReadingService(num_workers=4)
dist_rs = DistributedReadingService()
rs = SequentialReadingService(dist_rs, mp_rs)
dl = DataLoader2(datapipe, reading_service=rs)
for epoch in range(10):
dl.seed(epoch)
for d in dl:
model(d)
dl.shutdown()