目录

分布式数据并行

警告

Pytorch的实现torch.nn.parallel.DistributedDataParallel 随着时间的推移而演变。本设计说明是基于v1.4版本的状态编写的。

torch.nn.parallel.DistributedDataParallel (DDP) 透明地执行分布式数据并行训练。此页面描述了它是如何工作的,并揭示了实现细节。

示例

让我们从一个简单的 torch.nn.parallel.DistributedDataParallel 示例开始。这个示例使用一个 torch.nn.Linear 作为本地模型,用DDP将其包裹起来,然后在DDP模型上运行一次前向传递、一次反向传递和一次优化器步骤。之后,本地模型的参数将被更新,并且不同进程中的所有模型应该完全相同。

import torch
import torch.distributed as dist
import torch.multiprocessing as mp
import torch.nn as nn
import torch.optim as optim
import os
from torch.nn.parallel import DistributedDataParallel as DDP


def example(rank, world_size):
    # create default process group
    dist.init_process_group("gloo", rank=rank, world_size=world_size)
    # create local model
    model = nn.Linear(10, 10).to(rank)
    # construct DDP model
    ddp_model = DDP(model, device_ids=[rank])
    # define loss function and optimizer
    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    # forward pass
    outputs = ddp_model(torch.randn(20, 10).to(rank))
    labels = torch.randn(20, 10).to(rank)
    # backward pass
    loss_fn(outputs, labels).backward()
    # update parameters
    optimizer.step()

def main():
    world_size = 2
    mp.spawn(example,
        args=(world_size,),
        nprocs=world_size,
        join=True)

if __name__=="__main__":
    # Environment variables which need to be
    # set when using c10d's default "env"
    # initialization mode.
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "29500"
    main()

DDP 与 TorchDynamo 兼容。当与 TorchDynamo 一起使用时,在编译模型之前应用 DDP 模型包装器,以便 torchdynamo 可以根据 DDP 桶大小应用 DDPOptimizer(图形中断优化)。有关更多信息,请参阅 TorchDynamo DDPOptimizer

ddp_model = DDP(model, device_ids=[rank])
ddp_model = torch.compile(ddp_model)

内部设计

本部分揭示了 torch.nn.parallel.DistributedDataParallel 在幕后的工作原理,通过深入探讨一次迭代中的每一步细节。

  • 先决条件: DDP 依赖于 c10d ProcessGroup 进行通信。 因此,应用程序必须在构建 DDP 之前创建 ProcessGroup 实例。

  • 构建:DDP 构造函数接收一个本地模块的引用,并从具有 rank 0 的进程广播 state_dict(),以确保组中的所有其他进程都从相同的初始状态开始。然后,每个 DDP 进程创建一个本地 Reducer,它将在反向传播过程中处理梯度同步。为了提高通信效率,Reducer 将参数梯度组织到桶中,并一次减少一个桶。桶的大小可以通过设置 DDP 构造函数中的 bucket_cap_mb 参数来配置。参数梯度到桶的映射是在构造时根据桶大小限制和参数大小确定的。模型参数被分配到桶中,大致按照给定模型中 Model.parameters() 的逆序。使用逆序的原因是因为 DDP 假设在反向传播过程中梯度会按此顺序准备好。下图显示了一个示例。请注意,grad0grad1bucket1 中,而另外两个梯度在 bucket0 中。当然,这个假设并不总是正确的,当不正确时,可能会降低 DDP 反向传播速度,因为 Reducer 无法在最早的时间启动通信。除了分桶之外,Reducer 在构造时还为每个参数注册自动微分钩子。这些钩子将在反向传播过程中触发,当梯度准备好时。

  • 前向传递: DDP 接收输入并将其传递给本地模型, 然后分析本地模型的输出,如果 find_unused_parameters 设置为 True。这种模式允许在模型的子图上运行反向传播, DDP 通过从模型输出遍历 autograd 图并标记所有未使用的参数为可减少来找出哪些参数参与了反向传播。 在反向传播过程中,Reducer 只会等待未准备好的参数,但它仍然会减少所有桶。 将参数梯度标记为已准备好目前并不能帮助 DDP 跳过桶,但它可以防止 DDP 在反向传播期间永远等待缺失的梯度。 请注意,遍历 autograd 图会引入额外的开销,因此应用程序只有在必要时才应将 find_unused_parameters 设置为 True

  • 反向传播:函数backward()直接在损失Tensor上被调用,这超出了DDP的控制范围,DDP使用在构建时注册的autograd钩子来触发梯度同步。当一个梯度准备就绪时,对应的DDP钩子会在该梯度累加器上触发,然后DDP会将该参数梯度标记为准备进行归约。当一个桶中的所有梯度都准备就绪时,Reducer会启动一个异步allreduce操作来计算所有进程中的梯度均值。当所有桶都准备就绪时,Reducer会阻塞等待所有allreduce操作完成。完成后,平均梯度会被写入到所有参数的param.grad字段中。因此,在反向传播之后,不同DDP进程中相同对应参数的grad字段应该是一样的。

  • 优化器步骤: 从优化器的角度来看,它正在优化一个本地模型。所有DDP进程上的模型副本可以保持同步,因为它们都从相同的状态开始,并且在每次迭代中具有相同的平均梯度。

ddp_grad_sync.png

注意

DDP 需要在所有进程中调用 Reducer 个实例来调用 allreduce 以完全相同的顺序,这是通过始终按照桶索引顺序而不是实际的桶就绪顺序运行 allreduce 来实现的。进程间不匹配的 allreduce 顺序可能会导致错误结果或 DDP 反向挂起。

实现

以下是DDP实现组件的指针。堆叠图显示了代码的结构。

ProcessGroup

  • ProcessGroup.hpp: 包含所有进程组实现的抽象API。该c10d 库提供了三种开箱即用的实现,分别是, ProcessGroupGlooProcessGroupNCCL,和ProcessGroupMPIDistributedDataParallel 使用ProcessGroup::broadcast() 在初始化时从排名为0的进程中发送模型状态到其他进程,并使用ProcessGroup::allreduce() 来求梯度的和。

  • Store.hpp: 协助进程组实例的会面服务找到彼此。

DistributedDataParallel

  • distributed.py: 是DDP的Python入口点。它实现了初始化步骤和 forward 函数,用于调用C++库的 nn.parallel.DistributedDataParallel 模块。它的 _sync_param 函数在单个DDP进程在多个设备上工作时执行进程内参数同步,并且还将模型缓冲区从秩为0的进程广播到所有其他进程。进程间参数同步发生在 Reducer.cpp

  • comm.h: 实现了合并广播辅助函数,该函数在初始化期间调用以广播模型状态,并在前向传递之前同步模型缓冲区。

  • reducer.h: 提供反向传播过程中梯度同步的核心实现。它有三个入口函数:

    • Reducer: 构造函数在 distributed.py 中被调用,这将注册 Reducer::autograd_hook() 到梯度累加器。

    • autograd_hook() 函数将在梯度准备好时由自动求导引擎调用。

    • prepare_for_backward() 在DDP前向传递结束时被调用,在 distributed.py 中。当在DDP构造函数中将 find_unused_parameters 设置为 True 时,它会遍历自动梯度图以查找未使用的参数。

ddp_code.png

TorchDynamo DDPOptimizer

DDP的性能优势来自于在反向传播期间将allreduce集合操作与计算重叠。 当使用TorchDynamo编译整个前向和后向图时,AotAutograd会阻止这种重叠, 因为allreduce操作是在整个优化后的反向计算完成后由autograd钩子启动的。

TorchDynamo 的 DDPOptimizer 通过在 DDP 的 allreduce 桶的逻辑边界处打断前向图来提供帮助,在反向传播期间。注意:目标是在反向传播期间打断图,最简单的实现方式是打断前向图,然后对每个部分调用 AotAutograd 和编译。这允许 DDP 的 allreduce 钩子在反向传播的部分之间触发,并安排通信与计算重叠。

参见这篇博客文章以获取更深入的解释和实验结果,或阅读位于 torch/_dynamo/optimizations/distributed.py 的文档和代码

要调试DDPOptimizer,请将 torch._dynamo.config.log_level 设置为 DEBUG(用于完整图转储)或 INFO (用于关于分桶边界的基本信息)。要禁用DDPOptimizer,请将 torch._dynamo.config.optimize_ddp=False 设置为. 即使没有DDPOptimizer,DDP 和 TorchDynamo 仍应能正确工作,但性能会有所下降。

文档

访问 PyTorch 的全面开发人员文档

查看文档

教程

获取面向初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并解答您的问题

查看资源