目录

ReadingService

ReadingService 处理基于不同用例的 DataPipe 图的原地修改。

功能

动态分片

动态分片通过 MultiProcessingReadingServiceDistributedReadingService 来基于对应的多进程和分布式工作者的信息对管道进行分片。并且,TorchData 提供了两种类型的 DataPipe 让用户在管道内定义分片位置。

  • sharding_filter (ShardingFilter): 当管道可复制时,每个分布式/多进程工作者从其自己的 DataPipe 图副本中加载数据,同时跳过不属于相应工作者的样本,在放置 sharding_filter 的位置。

  • sharding_round_robin_dispatch (ShardingRoundRobinDispatcher): 当管道中存在任何 sharding_round_robin_dispatch DataPipe 时,该分支(即所有在 sharding_round_robin_dispatch 之前的DataPipes)将被视为不可复制的分支(在多进程的上下文中)。将创建一个单独的调度进程来从不可复制的分支加载数据,并将数据分发给后续的工作进程。

以下是一个在管道中使用两种类型分片策略的例子。

digraph Example {
    subgraph cluster_replicable {
        label="Replicable"
        a -> b -> c -> d -> l;
        color=blue;
    }

    subgraph cluster_non_replicable {
        style=filled;
        color=lightgrey;
        node [style=filled,color=white];
        label="Non-Replicable"
        e -> f -> g -> k;
        h -> i -> j -> k;
    }

    k -> l -> fullsync -> end;

    a [label="DP1"];
    b [label="shuffle"];
    c [label="sharding_filter", color=blue];
    d [label="DP4"];
    e [label="DP2"];
    f [label="shuffle"];
    g [label="sharding_round_robin_dispatch", style="filled,rounded", color=red, fillcolor=white];
    h [label="DP3"];
    i [label="shuffle"];
    j [label="sharding_round_robin_dispatch", style="filled,rounded", color=red, fillcolor=white];
    k [label="DP5 (Lowest common ancestor)"];
    l [label="DP6"];
    fullsync;
    end [shape=box];
}

当进行多进程操作时,图会变成:

digraph Example {
    subgraph cluster_worker_0 {
        label="Worker 0"
        a0 -> b0 -> c0 -> d0 -> l0;
        m0 -> l0;
        color=blue;
    }

    subgraph cluster_worker_1 {
        label="Worker 1"
        a1 -> b1 -> c1 -> d1 -> l1;
        m1 -> l1;
        color=blue;
    }

    subgraph cluster_non_replicable {
        style=filled;
        color=lightgrey;
        node [style=filled,color=white];
        label="Non-Replicable"
        e -> f -> g -> k;
        h -> i -> j -> k;
        k -> round_robin_demux;
    }

    round_robin_demux -> m0;
    round_robin_demux -> m1;
    l0 -> n;
    l1 -> n;
    n -> fullsync -> end;

    a0 [label="DP1"];
    b0 [label="shuffle"];
    c0 [label="sharding_filter", color=blue];
    d0 [label="DP4"];
    a1 [label="DP1"];
    b1 [label="shuffle"];
    c1 [label="sharding_filter", color=blue];
    d1 [label="DP4"];
    e [label="DP2"];
    f [label="shuffle"];
    g [label="sharding_round_robin_dispatch", style="filled,rounded", color=red, fillcolor=white];
    h [label="DP3"];
    i [label="shuffle"];
    j [label="sharding_round_robin_dispatch", style="filled,rounded", color=red, fillcolor=white];
    k [label="DP5 (Lowest common ancestor)"];
    fullsync;
    l0 [label="DP6"];
    l1 [label="DP6"];
    m0 [label="Client"]
    m1 [label="Client"]
    n [label="Client"]
    end [shape=box];
}

Client 在图中是一个 DataPipe,它发送请求并从 multiprocessing 队列接收响应。

确定性

DataLoader2 中,一个 SeedGenerator 变为单一的随机源,并且每个 ReadingService 会通过 initialize_iteration() 访问它并生成相应的随机种子以执行随机 DataPipe 操作。

为了确保在多进程和分布式节点上,Dataset shards之间互斥且完全穷尽,MultiProcessingReadingServiceDistributedReadingService 将帮助 DataLoader2 在任何随机 DataPipe 操作之前同步随机状态。对于剩余的 DataPipe 操作,在分片之后,每个 ReadingService 通过分布式排名和工作进程 ID 生成唯一的随机状态,以便执行不同的随机变换。

图模式

这还使得数据预处理管道从研究到生产的过渡更加容易。在创建并使用DataPipe图进行验证后,可以提供一个不同的ReadingServices配置并连接到生产服务/基础设施(如ReadingService)的AIStore作为DataLoader2的替代品。该ReadingService可能会搜索图,并找到可以委派给生产服务/基础设施的DataPipe操作,然后相应地修改图以实现更高性能的执行。

扩展阅读服务

以下是自定义ReadingService的接口。

class torchdata.dataloader2.ReadingServiceInterface

接口用于 ReadingService。请基于此接口类扩展自定义 ReadingService

ReadingService必须在initialize被调用之前是可序列化的。这是因为DataLoader2会创建它的副本,以避免同一ReadingService对象被多个DataLoader2使用,并且其内部状态可以被它们各自修改。

由于此约束,某些初始化步骤可能需要在initialize方法中而不是__init__方法中进行,这是ReadingService类的一部分。

finalize() None

ReadingService 清理内部状态并完全关闭服务。 在 DataLoader2shutdown__del__ 中调用。

finalize_iteration() None

ReadingService 在一个epoch结束后终止服务。当DataLoader2的迭代器耗尽时调用。

abstract initialize(datapipe: Union[IterDataPipe, MapDataPipe]) Union[IterDataPipe, MapDataPipe]

ReadingService 接受一个 DataPipe 图,根据自定义需求将其转换为一个新的 DataPipe 图。 在首次创建 DataLoader2 迭代器时调用一次。在调用此方法之前, ReadingService 对象必须是可序列化的。

Parameters:

datapipe – 原始 DataPipe 图表。

Returns:

一个适应的或新的 DataPipe 图表。

initialize_iteration(seed_generator: SeedGenerator, iter_reset_fn: Optional[Callable[[Union[IterDataPipe, MapDataPipe]], Union[IterDataPipe, MapDataPipe]]] = None) Optional[Callable[[Union[IterDataPipe, MapDataPipe]], Union[IterDataPipe, MapDataPipe]]]

ReadingService 启动一个epoch的服务。在每次获取 DataLoader2 迭代器时调用。

Parameters:
  • seed_generator – 由DataLoader2创建和管理的SeedGenerator对象。作为唯一的随机性来源,它将控制所有随机操作在DataPipes图中的确定性。

  • iter_reset_fn – 可选的重置函数,来自之前的ReadingServcieSequentialReadingService 链接多个 ReadingServices

Returns:

一个新的 iter_reset_fn 将被后续的 ReadingService 使用

示例

多进程读取服务开始为每个进程设置工作种子并从图中预取项目。

检查点/快照功能正在进行中。这里是初步的接口(可能会有小的变化):

class torchdata.dataloader2.CheckpointableReadingServiceInterface

使用两种额外的方法扩展 ReadingServiceInterface 以保存/恢复数据处理图的状态。

abstract checkpoint() bytes

ReadingService 序列化内部状态。在 DataLoader2.state_dict 中调用。

abstract restore(datapipe: Union[IterDataPipe, MapDataPipe], serialized_state: bytes) Union[IterDataPipe, MapDataPipe]

ReadingService 根据序列化状态调整 DataPipe 图。 在首次创建 DataLoader2 迭代器时调用一次。 与 initialize 相对应,后者从头开始调整 DataPipe 图。

Parameters:
  • datapipe – 原始 DataPipe 图表在被 ReadingService 适配之前

  • serialized_state – 用于恢复适应的 DataPipe 图内部状态的序列化状态。

Returns:

从序列化状态生成的经过调整的DataPipe

图形函数

并且,torchdata.dataloader.graph 中提供了图形实用函数,以帮助用户进行自定义 ReadingServiceDataPipe 图形重写:

traverse_dps

遍历 DataPipes 及其属性以提取 DataPipe 图。

find_dps

给定由traverse_dps函数生成的数据管道图,返回具有所提供数据管道类型的数据管道实例。

list_dps

给定由traverse_dps函数生成的数据管道图,返回所有不重复的数据管道实例列表。

remove_dp

给定由traverse_dps函数生成的数据管道图以及要移除的数据管道,返回新的数据管道图。

replace_dp

给定由traverse_dps函数生成的DataPipe图以及要替换的DataPipe和新的DataPipe,返回新的DataPipe图。

文档

访问 PyTorch 的全面开发人员文档

查看文档

教程

获取面向初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并解答您的问题

查看资源