目录

Distributed Data Parallel 入门

创建时间: 2019 年 4 月 23 日 |上次更新时间:2024 年 10 月 30 日 |上次验证: Nov 05, 2024

作者沈丽

编辑:Joe ZhuChirag Pandya

注意

编辑github 中查看和编辑本教程。

先决条件:

DistributedDataParallel (DDP) 是 PyTorch 中一个强大的模块,允许您跨 多台计算机,使其非常适合大规模深度学习应用程序。 要使用 DDP,您需要生成多个进程,并为每个进程创建一个 DDP 实例。

但它是如何工作的呢?DDP 使用来自 torch.distributed 包的集体通信来同步所有进程之间的梯度和缓冲区。这意味着每个进程都将具有 它自己的模型副本,但它们都将协同工作以训练模型,就像在一台机器上一样。

为了实现这一点,DDP 为模型中的每个参数注册了一个 autograd 钩子。 当向后传递运行时,此 hook 会触发并触发所有进程之间的梯度同步。 这可确保每个过程具有相同的梯度,然后使用这些梯度来更新模型。

有关 DDP 如何工作以及如何有效使用它的更多信息,请务必查看 DDP 设计说明。 使用 DDP,您可以比以往更快、更高效地训练模型!

使用 DDP 的推荐方法是为每个模型副本生成一个进程。模型副本可以跨越 多个设备。DDP 流程可以放置在同一台机器上,也可以放置在多台机器上。请注意,GPU 设备 不能在 DDP 进程之间共享(即一个 GPU 用于一个 DDP 进程)。

在本教程中,我们将从一个基本的 DDP 使用案例开始,然后演示更高级的使用案例 包括检查点模型以及将 DDP 与模型并行相结合。

注意

本教程中的代码在 8 个 GPU 服务器上运行,但它可以很容易地 推广到其他环境。

和 之间的比较DataParallelDistributedDataParallel

在我们深入研究之前,让我们澄清一下为什么您会考虑使用 over ,尽管它增加了复杂性:DistributedDataParallelDataParallel

  • 首先,是单进程、多线程的,但它仅适用于 单机。相反,是多进程并支持 单机和多机训练。 由于线程之间的 GIL 争用、每次迭代的复制模型以及 分散输入和收集输出,通常是 甚至比在单台机器上还要慢。DataParallelDistributedDataParallelDataParallelDistributedDataParallel

  • 回想一下前面的教程,如果您的模型太大而无法在单个 GPU 上容纳,则必须使用模型并行将其拆分到多个 GPU 上。 适用于 Model Parallel,而此时则不起作用。合并 DDP 时 使用 Model Parallel,每个 DDP 进程都将使用 Model Parallel,并且所有进程 共同使用 Data Parallel。DistributedDataParallelDataParallel

基本用例

要创建 DDP 模块,您必须首先正确设置进程组。更多细节可以 可在使用 PyTorch 编写分布式应用程序中找到。

import os
import sys
import tempfile
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp

from torch.nn.parallel import DistributedDataParallel as DDP

# On Windows platform, the torch.distributed package only
# supports Gloo backend, FileStore and TcpStore.
# For FileStore, set init_method parameter in init_process_group
# to a local file. Example as follow:
# init_method="file:///f:/libtmp/some_file"
# dist.init_process_group(
#    "gloo",
#    rank=rank,
#    init_method=init_method,
#    world_size=world_size)
# For TcpStore, same way as on Linux.

def setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # initialize the process group
    dist.init_process_group("gloo", rank=rank, world_size=world_size)

def cleanup():
    dist.destroy_process_group()

现在,让我们创建一个玩具模块,用 DDP 包装它,并给它一些假人 输入数据。请注意,由于 DDP 会将模型状态从第 0 级进程广播到 DDP 构造函数中的所有其他进程,您无需担心 不同的 DDP 过程从不同的初始模型参数值开始。

class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = nn.Linear(10, 10)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(10, 5)

    def forward(self, x):
        return self.net2(self.relu(self.net1(x)))


def demo_basic(rank, world_size):
    print(f"Running basic DDP example on rank {rank}.")
    setup(rank, world_size)

    # create model and move it to GPU with id rank
    model = ToyModel().to(rank)
    ddp_model = DDP(model, device_ids=[rank])

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(rank)
    loss_fn(outputs, labels).backward()
    optimizer.step()

    cleanup()
    print(f"Finished running basic DDP example on rank {rank}.")


def run_demo(demo_fn, world_size):
    mp.spawn(demo_fn,
             args=(world_size,),
             nprocs=world_size,
             join=True)

如您所见,DDP 包装了较低级别的分布式通信详细信息和 提供干净的 API,就像它是本地模型一样。梯度同步 通信发生在向后传递期间,并与 反向计算。当返回时,已经 包含 synchronized gradient 张量。对于基本使用案例,仅限 DDP 需要再编写几行代码来设置 Process 组。当将 DDP 应用于更多 高级用例,一些注意事项需要谨慎。backward()param.grad

加工速度不平衡

在 DDP 中,构造函数、前向传递和向后传递分别为 分布式同步点。预计将启动不同的流程 相同数量的同步,并在 相同的顺序,并在大致相同的时间输入每个同步点。 否则,快速进程可能会提前到达并在等待 散兵游勇。因此,用户负责平衡工作负载分配 跨流程。有时,由于以下原因,处理速度偏差是不可避免的, 例如,网络延迟、资源争用或不可预测的工作负载峰值。自 避免在这些情况下超时,请确保传递足够的 init_process_group large 值。timeout

保存和加载 Checkpoints

使用 和 checkpoint 模块是很常见的 在训练期间并从检查点恢复。有关更多详细信息,请参阅 保存和加载模型 。使用 DDP 时,一种优化是将模型保存在 只有一个进程,然后将其加载到所有进程上,从而减少写入开销。 这之所以有效,是因为所有进程都从相同的参数开始,并且 梯度在向后传递中是同步的,因此优化器应该保持 将 parameters 设置为相同的值。 如果使用此优化(即保存一个进程,但还原所有进程),请确保没有进程启动 在保存完成之前加载。此外,当 加载该模块时,您需要提供适当的参数以防止进程单步进入其他设备。如果缺少,则首先将模块加载到 CPU,然后复制每个 参数保存到保存位置,这将导致 同一台机器使用同一组设备。用于更高级的故障恢复 和 elasticity 支持,请参考 TorchElastictorch.savetorch.loadmap_locationmap_locationtorch.load

def demo_checkpoint(rank, world_size):
    print(f"Running DDP checkpoint example on rank {rank}.")
    setup(rank, world_size)

    model = ToyModel().to(rank)
    ddp_model = DDP(model, device_ids=[rank])


    CHECKPOINT_PATH = tempfile.gettempdir() + "/model.checkpoint"
    if rank == 0:
        # All processes should see same parameters as they all start from same
        # random parameters and gradients are synchronized in backward passes.
        # Therefore, saving it in one process is sufficient.
        torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)

    # Use a barrier() to make sure that process 1 loads the model after process
    # 0 saves it.
    dist.barrier()
    # configure map_location properly
    map_location = {'cuda:%d' % 0: 'cuda:%d' % rank}
    ddp_model.load_state_dict(
        torch.load(CHECKPOINT_PATH, map_location=map_location, weights_only=True))

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(rank)

    loss_fn(outputs, labels).backward()
    optimizer.step()

    # Not necessary to use a dist.barrier() to guard the file deletion below
    # as the AllReduce ops in the backward pass of DDP already served as
    # a synchronization.

    if rank == 0:
        os.remove(CHECKPOINT_PATH)

    cleanup()
    print(f"Finished running DDP checkpoint example on rank {rank}.")

将 DDP 与模型并行性相结合

DDP 也适用于多 GPU 模型。DDP 包装多 GPU 模型尤其 在训练具有大量数据的大型模型时非常有用。

class ToyMpModel(nn.Module):
    def __init__(self, dev0, dev1):
        super(ToyMpModel, self).__init__()
        self.dev0 = dev0
        self.dev1 = dev1
        self.net1 = torch.nn.Linear(10, 10).to(dev0)
        self.relu = torch.nn.ReLU()
        self.net2 = torch.nn.Linear(10, 5).to(dev1)

    def forward(self, x):
        x = x.to(self.dev0)
        x = self.relu(self.net1(x))
        x = x.to(self.dev1)
        return self.net2(x)

将多 GPU 模型传递给 DDP 时,不得进行设置。输入和输出数据将通过以下方式放置在适当的设备中 应用程序或模型方法。device_idsoutput_deviceforward()

def demo_model_parallel(rank, world_size):
    print(f"Running DDP with model parallel example on rank {rank}.")
    setup(rank, world_size)

    # setup mp_model and devices for this process
    dev0 = rank * 2
    dev1 = rank * 2 + 1
    mp_model = ToyMpModel(dev0, dev1)
    ddp_mp_model = DDP(mp_model)

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_mp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    # outputs will be on dev1
    outputs = ddp_mp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(dev1)
    loss_fn(outputs, labels).backward()
    optimizer.step()

    cleanup()
    print(f"Finished running DDP with model parallel example on rank {rank}.")


if __name__ == "__main__":
    n_gpus = torch.cuda.device_count()
    assert n_gpus >= 2, f"Requires at least 2 GPUs to run, but got {n_gpus}"
    world_size = n_gpus
    run_demo(demo_basic, world_size)
    run_demo(demo_checkpoint, world_size)
    world_size = n_gpus//2
    run_demo(demo_model_parallel, world_size)

使用 torch.distributed.run/torchrun 初始化 DDP

我们可以利用 PyTorch Elastic 来简化 DDP 代码并更轻松地初始化作业。 我们仍然使用 Toymodel 示例并创建一个名为 .elastic_ddp.py

import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim

from torch.nn.parallel import DistributedDataParallel as DDP

class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = nn.Linear(10, 10)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(10, 5)

    def forward(self, x):
        return self.net2(self.relu(self.net1(x)))


def demo_basic():
    torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))
    dist.init_process_group("nccl")
    rank = dist.get_rank()
    print(f"Start running basic DDP example on rank {rank}.")
    # create model and move it to GPU with id rank
    device_id = rank % torch.cuda.device_count()
    model = ToyModel().to(device_id)
    ddp_model = DDP(model, device_ids=[device_id])
    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(device_id)
    loss_fn(outputs, labels).backward()
    optimizer.step()
    dist.destroy_process_group()
    print(f"Finished running basic DDP example on rank {rank}.")

if __name__ == "__main__":
    demo_basic()

然后可以运行 torch elastic/torchrun 命令 在所有节点上初始化上面创建的 DDP 作业:

torchrun --nnodes=2 --nproc_per_node=8 --rdzv_id=100 --rdzv_backend=c10d --rdzv_endpoint=$MASTER_ADDR:29400 elastic_ddp.py

在上面的示例中,我们在两台主机上运行 DDP 脚本,并在每台主机上运行 8 个进程。也就是说,我们 正在 16 个 GPU 上运行此作业。请注意,所有节点都必须相同。$MASTER_ADDR

这里将启动 8 个进程,并在启动它的节点上调用每个进程,但用户还需要申请集群 管理工具(如 slurm)在 2 个节点上实际运行此命令。torchrunelastic_ddp.py

例如,在启用了 SLURM 的集群上,我们可以编写一个脚本来运行上面的命令 并设置为:MASTER_ADDR

export MASTER_ADDR=$(scontrol show hostname ${SLURM_NODELIST} | head -n 1)

然后我们可以使用 SLURM 命令运行此脚本:。srun --nodes=2 ./torchrun_script.sh

这只是一个例子;您可以选择自己的集群计划工具来启动作业。torchrun

有关 Elastic run 的更多信息,请参阅快速入门文档

文档

访问 PyTorch 的全面开发人员文档

查看文档

教程

获取面向初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并解答您的问题

查看资源