目录

将分布式数据并行与分布式RPC框架结合使用

创建日期:2020年7月28日 | 最后更新日期:2023年6月6日 | 最后验证日期:未验证

作者: Pritam DamaniaYi Wang

注意

edit 查看和编辑此教程在 github

本教程使用一个简单示例来演示如何结合 DistributedDataParallel (DDP) 与 分布式RPC框架 将分布式数据并行与分布式模型并行结合,以训练一个简单的模型。示例的源代码可以在这里找到 here

之前的教程, 使用分布式数据并行入门使用分布式RPC框架入门, 分别描述了如何执行分布式数据并行和分布式模型并行训练。尽管如此,仍有几种训练范式 可能需要结合这两种技术。例如:

  1. 如果我们有一个模型包含稀疏部分(大型嵌入表)和密集部分(全连接层),我们可能希望将嵌入表放在参数服务器上,并使用 DistributedDataParallel 在多个训练器之间复制全连接层。 可以使用 Distributed RPC框架 在参数服务器上执行嵌入查找。

  2. 启用如PipeDream论文中所述的混合并行性。 我们可以使用分布式RPC框架 在多个工作进程中对模型阶段进行流水线处理,并使用DistributedDataParallel复制每个阶段(如需)。


在这个教程中,我们将介绍上述提到的案例1。我们的设置中共有4个工作者,如下所示:

  1. 主节点负责在参数服务器上创建嵌入表(nn.EmbeddingBag)。主节点还驱动两个训练器上的训练循环。

  2. 1 参数服务器,基本上将嵌入表存储在内存中,并响应来自主节点和训练器的RPC请求。

  3. 2 个训练器,它们存储一个全连接层(nn.Linear),并使用 DistributedDataParallel 在彼此之间进行复制。 训练器还负责执行前向传播、反向传播和优化器步骤。


整个训练过程执行如下:

  1. 主进程创建一个 RemoteModule 它在参数服务器上保存一个嵌入表。

  2. 主节点随后在训练器上启动训练循环,并将远程模块传递给训练器。

  3. 训练师创建一个 HybridModel,该模块首先使用主节点提供的远程模块执行嵌入查找,然后执行包裹在DDP中的全连接层。

  4. 训练器执行模型的前向传播,并使用损失通过分布式自动求导执行反向传播。

  5. 作为反向传播的一部分,FC 层的梯度首先被计算,并通过 DDP 中的 allreduce 同步到所有训练器。

  6. 接下来,分布式自动求导将梯度传播到参数服务器,其中嵌入表的梯度会被更新。

  7. 最后,使用分布式优化器来更新所有参数。

注意力

您应该始终在结合使用DDP和RPC时,为反向传播使用 分布式自动求导

现在,让我们详细了解一下每个部分。首先,在进行任何训练之前,我们需要设置所有的工作者。我们创建了4个进程,其中rank 0和1是我们的训练员,rank 2是主节点,rank 3是参数服务器。

我们使用TCP init_method在所有4个工作者上初始化RPC框架。 一旦RPC初始化完成,主节点会创建一个远程模块,在Parameter Server上使用EmbeddingBag 层,通过RemoteModule。 然后主节点遍历每个训练节点,并通过rpc_async调用每个训练节点的_run_trainer来启动训练循环。 最后,主节点等待所有训练完成后再退出。

训练师首先使用 init_process_group 为 DDP 初始化一个 ProcessGroup(world_size=2,用于两个训练师)。 接下来,他们使用 TCP init_method 初始化 RPC 框架。请注意, RPC 初始化和 ProcessGroup 初始化使用的端口是不同的。 这是为了避免两个框架初始化之间的端口冲突。 一旦初始化完成,训练师只需等待来自主节点的 _run_trainer RPC。

参数服务器只是初始化 RPC 框架,并等待来自训练器和主节点的 RPC 请求。

def run_worker(rank, world_size):
    r"""
    A wrapper function that initializes RPC, calls the function, and shuts down
    RPC.
    """

    # We need to use different port numbers in TCP init_method for init_rpc and
    # init_process_group to avoid port conflicts.
    rpc_backend_options = TensorPipeRpcBackendOptions()
    rpc_backend_options.init_method = "tcp://localhost:29501"

    # Rank 2 is master, 3 is ps and 0 and 1 are trainers.
    if rank == 2:
        rpc.init_rpc(
            "master",
            rank=rank,
            world_size=world_size,
            rpc_backend_options=rpc_backend_options,
        )

        remote_emb_module = RemoteModule(
            "ps",
            torch.nn.EmbeddingBag,
            args=(NUM_EMBEDDINGS, EMBEDDING_DIM),
            kwargs={"mode": "sum"},
        )

        # Run the training loop on trainers.
        futs = []
        for trainer_rank in [0, 1]:
            trainer_name = "trainer{}".format(trainer_rank)
            fut = rpc.rpc_async(
                trainer_name, _run_trainer, args=(remote_emb_module, trainer_rank)
            )
            futs.append(fut)

        # Wait for all training to finish.
        for fut in futs:
            fut.wait()
    elif rank <= 1:
        # Initialize process group for Distributed DataParallel on trainers.
        dist.init_process_group(
            backend="gloo", rank=rank, world_size=2, init_method="tcp://localhost:29500"
        )

        # Initialize RPC.
        trainer_name = "trainer{}".format(rank)
        rpc.init_rpc(
            trainer_name,
            rank=rank,
            world_size=world_size,
            rpc_backend_options=rpc_backend_options,
        )

        # Trainer just waits for RPCs from master.
    else:
        rpc.init_rpc(
            "ps",
            rank=rank,
            world_size=world_size,
            rpc_backend_options=rpc_backend_options,
        )
        # parameter server do nothing
        pass

    # block until all rpcs finish
    rpc.shutdown()


if __name__ == "__main__":
    # 2 trainers, 1 parameter server, 1 master.
    world_size = 4
    mp.spawn(run_worker, args=(world_size,), nprocs=world_size, join=True)

在讨论Trainer的细节之前,让我们介绍Trainer使用的HybridModel。如下面所述,HybridModel使用一个远程模块进行初始化,该模块在参数服务器上保存了一个嵌入表(remote_emb_module),并使用device用于DDP。模型的初始化将一个 nn.Linear 层包装在DDP中,以在所有Trainer之间复制和同步该层。

模型的forward方法非常直接。它使用RemoteModule的forward在参数服务器上执行嵌入查找,并将其输出传递给全连接层。

class HybridModel(torch.nn.Module):
    r"""
    The model consists of a sparse part and a dense part.
    1) The dense part is an nn.Linear module that is replicated across all trainers using DistributedDataParallel.
    2) The sparse part is a Remote Module that holds an nn.EmbeddingBag on the parameter server.
    This remote model can get a Remote Reference to the embedding table on the parameter server.
    """

    def __init__(self, remote_emb_module, device):
        super(HybridModel, self).__init__()
        self.remote_emb_module = remote_emb_module
        self.fc = DDP(torch.nn.Linear(16, 8).cuda(device), device_ids=[device])
        self.device = device

    def forward(self, indices, offsets):
        emb_lookup = self.remote_emb_module.forward(indices, offsets)
        return self.fc(emb_lookup.cuda(self.device))

接下来,我们来看Trainer上的设置。Trainer首先使用一个远程模块创建上述的HybridModel,该模块在参数服务器上保存嵌入表,并且拥有自己的rank。

现在,我们需要获取一个包含所有要优化的参数的RRef列表,这些参数将使用 DistributedOptimizer 进行优化。 为了从参数服务器中获取嵌入表的参数,我们可以调用 RemoteModule 的 remote_parameters, 该方法基本上遍历嵌入表的所有参数并返回一个RRef列表。训练器通过RPC在参数服务器上调用此方法以接收所需参数的RRef列表。由于 DistributedOptimizer 总是需要一个要优化的参数的RRef列表,因此即使对于我们全连接层的本地参数,也需要创建RRef。这是通过遍历 model.fc.parameters(),为每个参数创建一个RRef,并将其追加到从 remote_parameters() 返回的列表中完成的。 请注意,我们不能使用 model.parameters(), 因为它会递归调用 model.remote_emb_module.parameters(), 而 RemoteModule 不支持此操作。

最后,我们使用所有的 RRefs 创建我们的 DistributedOptimizer,并定义一个 CrossEntropyLoss 函数。

def _run_trainer(remote_emb_module, rank):
    r"""
    Each trainer runs a forward pass which involves an embedding lookup on the
    parameter server and running nn.Linear locally. During the backward pass,
    DDP is responsible for aggregating the gradients for the dense part
    (nn.Linear) and distributed autograd ensures gradients updates are
    propagated to the parameter server.
    """

    # Setup the model.
    model = HybridModel(remote_emb_module, rank)

    # Retrieve all model parameters as rrefs for DistributedOptimizer.

    # Retrieve parameters for embedding table.
    model_parameter_rrefs = model.remote_emb_module.remote_parameters()

    # model.fc.parameters() only includes local parameters.
    # NOTE: Cannot call model.parameters() here,
    # because this will call remote_emb_module.parameters(),
    # which supports remote_parameters() but not parameters().
    for param in model.fc.parameters():
        model_parameter_rrefs.append(RRef(param))

    # Setup distributed optimizer
    opt = DistributedOptimizer(
        optim.SGD,
        model_parameter_rrefs,
        lr=0.05,
    )

    criterion = torch.nn.CrossEntropyLoss()

现在我们准备好介绍在每个训练器上运行的主要训练循环。 get_next_batch 仅仅是生成训练用随机输入和 目标的辅助函数。我们对多个纪元运行训练循环,并为每个 批次:

  1. 设置 分布式自动求导上下文 用于分布式自动求导。

  2. 运行模型的前向传播并获取其输出。

  3. 根据我们的输出和目标使用损失函数计算损失。

  4. 使用分布式自动求导来使用损失执行分布式反向传播。

  5. 最后,运行分布式优化器步骤以优化所有参数。

    def get_next_batch(rank):
        for _ in range(10):
            num_indices = random.randint(20, 50)
            indices = torch.LongTensor(num_indices).random_(0, NUM_EMBEDDINGS)

            # Generate offsets.
            offsets = []
            start = 0
            batch_size = 0
            while start < num_indices:
                offsets.append(start)
                start += random.randint(1, 10)
                batch_size += 1

            offsets_tensor = torch.LongTensor(offsets)
            target = torch.LongTensor(batch_size).random_(8).cuda(rank)
            yield indices, offsets_tensor, target

    # Train for 100 epochs
    for epoch in range(100):
        # create distributed autograd context
        for indices, offsets, target in get_next_batch(rank):
            with dist_autograd.context() as context_id:
                output = model(indices, offsets)
                loss = criterion(output, target)

                # Run distributed backward pass
                dist_autograd.backward(context_id, [loss])

                # Tun distributed optimizer
                opt.step(context_id)

                # Not necessary to zero grads as each iteration creates a different
                # distributed autograd context which hosts different grads
        print("Training done for epoch {}".format(epoch))

整个示例的源代码可以在这里找到。

文档

访问 PyTorch 的全面开发人员文档

查看文档

教程

获取面向初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并解答您的问题

查看资源