分布式数据并行¶
(DDP) 以透明方式执行
分布式数据并行训练。本页介绍了它的工作原理并揭示了
实现细节。
例¶
让我们从一个简单的例子开始。此示例使用 a
作为本地模型 wraps
它,然后运行一个前向传递、一个向后传递和一个优化器
Step on DDP 模型。之后,本地模型上的参数将为
updated,并且不同进程上的所有模型都应该完全相同。
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
import torch.nn as nn
import torch.optim as optim
import os
from torch.nn.parallel import DistributedDataParallel as DDP
def example(rank, world_size):
# create default process group
dist.init_process_group("gloo", rank=rank, world_size=world_size)
# create local model
model = nn.Linear(10, 10).to(rank)
# construct DDP model
ddp_model = DDP(model, device_ids=[rank])
# define loss function and optimizer
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
# forward pass
outputs = ddp_model(torch.randn(20, 10).to(rank))
labels = torch.randn(20, 10).to(rank)
# backward pass
loss_fn(outputs, labels).backward()
# update parameters
optimizer.step()
def main():
world_size = 2
mp.spawn(example,
args=(world_size,),
nprocs=world_size,
join=True)
if __name__=="__main__":
# Environment variables which need to be
# set when using c10d's default "env"
# initialization mode.
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "29500"
main()
DDP 可与 TorchDynamo 配合使用。与 TorchDynamo 一起使用时,应用 DDP 模型包装器
在编译模型之前,以便 TorchDynamo 可以根据 DDP 存储桶大小应用(图中断优化)。(有关更多信息,请参阅 TorchDynamo DDPOptimizer。DDPOptimizer
ddp_model = DDP(model, device_ids=[rank])
ddp_model = torch.compile(ddp_model)
内部设计¶
前提条件:DDP 依赖 c10d 进行通信。 因此,应用程序必须在构造之前创建实例 DDP 的。
ProcessGroup
ProcessGroup
构造:DDP 构造函数引用本地模块 并从排名为 0 的进程广播到所有其他进程 进程,以确保所有模型副本都从 完全相同的状态。然后,每个 DDP 进程都会创建一个本地 ,该 later 将在 backward 期间处理梯度同步 通过。为了提高通信效率,organizes 参数 gradients 转换为 bucket,并一次减少一个 bucket。存储桶大小可以是 通过在 DDP 构造函数中设置 bucket_cap_mb 参数进行配置。这 从参数梯度到存储桶的映射是在构造时确定的 时间,具体取决于存储桶大小限制和参数大小。模型参数为 以(大致)与给定模型的相反顺序分配到存储桶中。使用反向的原因 order 是因为 DDP 期望梯度在向后 按大致该顺序传递。下图显示了一个示例。注意 that、 和 都在 中 ,而其他两个 渐变位于 中。当然,这种假设可能并不总是 是真的,当这种情况发生时,它可能会损害 DDP 的倒退速度,因为它无法尽早启动通信。 除了分桶之外,它还会在 构造,每个参数一个钩子。这些钩子将在 渐变准备就绪时的 backward pass。
state_dict()
Reducer
Reducer
Model.parameters()
grad0
grad1
bucket1
bucket0
Reducer
Reducer
Forward Pass:DDP 获取输入并将其传递给本地模型, 然后分析本地模型的输出(如果设置为 )。此模式允许运行 在模型的子图上向后,DDP 找出哪些参数是 通过从模型中遍历 autograd 图参与向后传递 output 并将所有未使用的参数标记为 ready for reduction。在 backward pass,则只会等待 unready 参数,但它 仍会减少所有存储桶。将参数梯度标记为就绪不会 目前帮助 DDP 跳过存储桶,但它会阻止 DDP 等待 在向后传递期间永远没有梯度。请注意,遍历 Autograd Graph 会带来额外的开销,因此应用程序应仅在必要时设置为 。
find_unused_parameters
True
Reducer
find_unused_parameters
True
Backward Pass:函数在 loss 时直接调用,这超出了 DDP 的控制范围,DDP 使用了 autograd hook 在构建时注册以触发渐变同步。什么时候 一个梯度变为 ready,其对应的 DDP 钩子位于该 grad 上 accumulator 将触发,然后 DDP 会将该参数梯度标记为 准备减少。当一个桶中的梯度全部准备就绪时,该桶将启动异步 计算所有过程的梯度平均值。当所有存储桶都准备就绪时, 将阻止等待所有操作完成。 完成此操作后,平均梯度将写入字段 的所有参数。所以在向后传递之后,相同的 grad 字段 不同 DDP 过程中的相应参数应相同。
backward()
Tensor
Reducer
allreduce
Reducer
allreduce
param.grad
Optimizer Step:从优化器的角度来看,它正在优化本地 型。所有 DDP 进程上的模型副本都可以保持同步,因为它们都 从相同的状态开始,它们具有相同的平均梯度 每次迭代。
注意
DDP 要求所有进程上的实例以完全相同的顺序调用,这是通过始终以 bucket 索引顺序而不是实际的 bucket ready 顺序运行来完成的。跨流程的顺序不匹配可能会导致错误的结果或 DDP 倒退
挂。Reducer
allreduce
allreduce
allreduce
实现¶
以下是指向 DDP 实施组件的指针。堆叠图显示 代码的结构。
进程组¶
ProcessGroup.hpp 中: 包含所有进程组实现的抽象 API。该库提供了 3 种开箱即用的实现,即 ProcessGroupGloo、ProcessGroupNCCL和 ProcessGroupMPI。 用于发送 在初始化期间,从秩为 0 的进程到其他进程的模型状态 并对梯度求和。
c10d
DistributedDataParallel
ProcessGroup::broadcast()
ProcessGroup::allreduce()
Store.hpp 中: 帮助进程组实例的 Rendezvous 服务找到彼此。
分布式数据并行¶
distributed.py: 是 DDP 的 Python 入口点。它实现了初始化步骤和 调用 C++ 库的模块的函数。其功能执行 当一个 DDP 进程在多个进程上工作时,进程内参数同步 devices 的 v,它还将模型缓冲区从秩为 0 的进程广播到 所有其他进程。进程间参数同步发生在 中。
forward
nn.parallel.DistributedDataParallel
_sync_param
Reducer.cpp
comm.h 中: 实现合并的广播帮助程序函数,该函数被调用到 在初始化期间广播模型状态并同步模型缓冲区 在向前传球之前。
reducer.h 中: 为向后 通过。它有三个入口点函数:
Reducer
:在 其中调用构造函数,其中寄存器对梯度累加器。distributed.py
Reducer::autograd_hook()
autograd_hook()
函数将由 autograd 引擎调用,当 渐变将变为 READY。prepare_for_backward()
在 DDP 正向传递的末尾调用 。它遍历 autograd 图以查找未使用的 参数(当在 DDP 中设置为 构造 函数。distributed.py
find_unused_parameters
True
TorchDynamo DDPOptimizer¶
DDP 的性能优势来自于在向后期间将 allreduce 集合与计算重叠。 AotAutograd 在与 TorchDynamo 一起使用时可以防止这种重叠,以编译整个正向和整个反向图形。 因为 AllReduce 操作是由 autograd 钩子 _after_ 启动的,所以整个优化的反向计算完成。
TorchDynamo 的 DDPOptimizer 通过在 DDP 的 allreduce 存储桶的逻辑边界处打破前向图来提供帮助 期间向后。注意:目标是在向后时打破图,最简单的实现是 打破前向图,然后调用 AotAutograd 并对每个部分进行编译。这允许 DDP 的 allreduce 钩子 以触发 backwards 的中间部分,并安排通信与 Compute 重叠。
请参阅此博客文章 更深入的解释和实验结果,或阅读 torch/_dynamo/optimizations/distributed.py 上的文档和代码
要调试 DDPOptimizer,请为完整图形转储设置 TORCH_LOGS='ddp_graphs' 。对于没有图表的日志,请将 'dynamo'、'distributed' 或 'dist_ddp' 中的任何一个添加到 TORCH_LOGS(有关存储桶边界的基本信息)。要禁用 DDPOptimizer,请设置 torch._dynamo.config.optimize_ddp=False。 DDP 和 TorchDynamo 在没有 DDPOptimizer 的情况下应该仍然可以正常工作,但性能会下降。