ReadingService¶
ReadingService 处理基于不同用例的 DataPipe 图的原地修改。
功能¶
动态分片¶
动态分片通过 MultiProcessingReadingService 和 DistributedReadingService 来基于对应的多进程和分布式工作者的信息对管道进行分片。并且,TorchData 提供了两种类型的 DataPipe 让用户在管道内定义分片位置。
sharding_filter(ShardingFilter): 当管道可复制时,每个分布式/多进程工作者从其自己的DataPipe图副本中加载数据,同时跳过不属于相应工作者的样本,在放置sharding_filter的位置。sharding_round_robin_dispatch(ShardingRoundRobinDispatcher): 当管道中存在任何sharding_round_robin_dispatchDataPipe时,该分支(即所有在sharding_round_robin_dispatch之前的DataPipes)将被视为不可复制的分支(在多进程的上下文中)。将创建一个单独的调度进程来从不可复制的分支加载数据,并将数据分发给后续的工作进程。
以下是一个在管道中使用两种类型分片策略的例子。
![digraph Example {
subgraph cluster_replicable {
label="Replicable"
a -> b -> c -> d -> l;
color=blue;
}
subgraph cluster_non_replicable {
style=filled;
color=lightgrey;
node [style=filled,color=white];
label="Non-Replicable"
e -> f -> g -> k;
h -> i -> j -> k;
}
k -> l -> fullsync -> end;
a [label="DP1"];
b [label="shuffle"];
c [label="sharding_filter", color=blue];
d [label="DP4"];
e [label="DP2"];
f [label="shuffle"];
g [label="sharding_round_robin_dispatch", style="filled,rounded", color=red, fillcolor=white];
h [label="DP3"];
i [label="shuffle"];
j [label="sharding_round_robin_dispatch", style="filled,rounded", color=red, fillcolor=white];
k [label="DP5 (Lowest common ancestor)"];
l [label="DP6"];
fullsync;
end [shape=box];
}](https://pytorch.org/data/0.7/_images/graphviz-a4b77ae8d32185927d8707cd0b25fc9226103ca2.png)
当进行多进程操作时,图会变成:
![digraph Example {
subgraph cluster_worker_0 {
label="Worker 0"
a0 -> b0 -> c0 -> d0 -> l0;
m0 -> l0;
color=blue;
}
subgraph cluster_worker_1 {
label="Worker 1"
a1 -> b1 -> c1 -> d1 -> l1;
m1 -> l1;
color=blue;
}
subgraph cluster_non_replicable {
style=filled;
color=lightgrey;
node [style=filled,color=white];
label="Non-Replicable"
e -> f -> g -> k;
h -> i -> j -> k;
k -> round_robin_demux;
}
round_robin_demux -> m0;
round_robin_demux -> m1;
l0 -> n;
l1 -> n;
n -> fullsync -> end;
a0 [label="DP1"];
b0 [label="shuffle"];
c0 [label="sharding_filter", color=blue];
d0 [label="DP4"];
a1 [label="DP1"];
b1 [label="shuffle"];
c1 [label="sharding_filter", color=blue];
d1 [label="DP4"];
e [label="DP2"];
f [label="shuffle"];
g [label="sharding_round_robin_dispatch", style="filled,rounded", color=red, fillcolor=white];
h [label="DP3"];
i [label="shuffle"];
j [label="sharding_round_robin_dispatch", style="filled,rounded", color=red, fillcolor=white];
k [label="DP5 (Lowest common ancestor)"];
fullsync;
l0 [label="DP6"];
l1 [label="DP6"];
m0 [label="Client"]
m1 [label="Client"]
n [label="Client"]
end [shape=box];
}](https://pytorch.org/data/0.7/_images/graphviz-8581116405d37f067d4dfa1c6bb711728c59d81e.png)
Client 在图中是一个 DataPipe,它发送请求并从 multiprocessing 队列接收响应。
确定性¶
在 DataLoader2 中,一个 SeedGenerator 变为单一的随机源,并且每个 ReadingService 会通过 initialize_iteration() 访问它并生成相应的随机种子以执行随机 DataPipe 操作。
为了确保在多进程和分布式节点上,Dataset shards之间互斥且完全穷尽,MultiProcessingReadingService 和 DistributedReadingService 将帮助 DataLoader2 在任何随机 DataPipe 操作之前同步随机状态。对于剩余的 DataPipe 操作,在分片之后,每个 ReadingService 通过分布式排名和工作进程 ID 生成唯一的随机状态,以便执行不同的随机变换。
图模式¶
这还使得数据预处理管道从研究到生产的过渡更加容易。在创建并使用DataPipe图进行验证后,可以提供一个不同的ReadingServices配置并连接到生产服务/基础设施(如ReadingService)的AIStore作为DataLoader2的替代品。该ReadingService可能会搜索图,并找到可以委派给生产服务/基础设施的DataPipe操作,然后相应地修改图以实现更高性能的执行。
扩展阅读服务¶
以下是自定义ReadingService的接口。
- class torchdata.dataloader2.ReadingServiceInterface¶
接口用于
ReadingService。请基于此接口类扩展自定义ReadingService。ReadingService必须在
initialize被调用之前是可序列化的。这是因为DataLoader2会创建它的副本,以避免同一ReadingService对象被多个DataLoader2使用,并且其内部状态可以被它们各自修改。由于此约束,某些初始化步骤可能需要在
initialize方法中而不是__init__方法中进行,这是ReadingService类的一部分。- finalize() None¶
ReadingService清理内部状态并完全关闭服务。 在DataLoader2的shutdown和__del__中调用。
- finalize_iteration() None¶
ReadingService在一个epoch结束后终止服务。当DataLoader2的迭代器耗尽时调用。
- abstract initialize(datapipe: Union[IterDataPipe, MapDataPipe]) Union[IterDataPipe, MapDataPipe]¶
ReadingService接受一个DataPipe图,根据自定义需求将其转换为一个新的DataPipe图。 在首次创建DataLoader2迭代器时调用一次。在调用此方法之前,ReadingService对象必须是可序列化的。- Parameters:
datapipe – 原始
DataPipe图表。- Returns:
一个适应的或新的
DataPipe图表。
- initialize_iteration(seed_generator: SeedGenerator, iter_reset_fn: Optional[Callable[[Union[IterDataPipe, MapDataPipe]], Union[IterDataPipe, MapDataPipe]]] = None) Optional[Callable[[Union[IterDataPipe, MapDataPipe]], Union[IterDataPipe, MapDataPipe]]]¶
ReadingService启动一个epoch的服务。在每次获取DataLoader2迭代器时调用。- Parameters:
seed_generator – 由DataLoader2创建和管理的SeedGenerator对象。作为唯一的随机性来源,它将控制所有随机操作在DataPipes图中的确定性。
iter_reset_fn – 可选的重置函数,来自之前的
ReadingServcie当SequentialReadingService链接多个ReadingServices
- Returns:
一个新的
iter_reset_fn将被后续的ReadingService使用
示例
多进程读取服务开始为每个进程设置工作种子并从图中预取项目。
检查点/快照功能正在进行中。这里是初步的接口(可能会有小的变化):
- class torchdata.dataloader2.CheckpointableReadingServiceInterface¶
使用两种额外的方法扩展
ReadingServiceInterface以保存/恢复数据处理图的状态。- abstract checkpoint() bytes¶
ReadingService序列化内部状态。在DataLoader2.state_dict中调用。
- abstract restore(datapipe: Union[IterDataPipe, MapDataPipe], serialized_state: bytes) Union[IterDataPipe, MapDataPipe]¶
ReadingService根据序列化状态调整DataPipe图。 在首次创建DataLoader2迭代器时调用一次。 与initialize相对应,后者从头开始调整DataPipe图。- Parameters:
datapipe – 原始
DataPipe图表在被ReadingService适配之前serialized_state – 用于恢复适应的
DataPipe图内部状态的序列化状态。
- Returns:
从序列化状态生成的经过调整的
DataPipe。
图形函数¶
并且,torchdata.dataloader.graph 中提供了图形实用函数,以帮助用户进行自定义 ReadingService 的 DataPipe 图形重写:
遍历 DataPipes 及其属性以提取 DataPipe 图。 |
|
给定由 |
|
给定由 |
|
给定由 |
|
给定由 |