目录

使用分布式数据并行开始

创建时间:2019年4月23日 | 最后更新时间:2024年10月30日 | 最后验证时间:2024年11月5日

作者: 李申

编辑者: 朱乔, 奇拉格·潘迪

注意

edit 查看和编辑此教程在 github

Prerequisites:

DistributedDataParallel (DDP) 是 PyTorch 中一个强大的模块,允许你将模型并行化到多台机器上,非常适合大规模深度学习应用。要使用 DDP,你需要启动多个进程,并为每个进程创建一个 DDP 实例。

但是它是如何工作的?DDP 使用 torch.distributed 包中的集体通信来在所有进程中同步梯度和缓冲区。这意味着每个进程都会有模型的独立副本,但它们会协同工作以训练模型,就像它是在单台机器上一样。

为了实现这一点,DDP 为模型中的每个参数注册一个 autograd 钩子。 当反向传播运行时,该钩子会被触发,并在所有进程中同步梯度。 这确保了每个进程都有相同的梯度,这些梯度随后用于更新模型。

有关DDP的工作原理以及如何有效使用它的更多信息,请务必查看 DDP设计说明。 使用DDP,您可以比以往更快、更高效地训练模型!

推荐使用DDP的方式是为每个模型副本启动一个进程。模型副本可以跨越多个设备。DDP进程可以放置在同一台机器上或跨多台机器。请注意,GPU设备不能在DDP进程之间共享(即一个GPU对应一个DDP进程)。

在这个教程中,我们将从一个基本的DDP使用案例开始,然后演示更高级的用例,包括模型检查点和将DDP与模型并行结合。

注意

本教程中的代码在一台8-GPU服务器上运行,但可以轻松地推广到其他环境。

DataParallelDistributedDataParallel 之间的比较

在深入之前,让我们明确为什么你会考虑使用 DistributedDataParallel 而不是 DataParallel,尽管它增加了复杂性:

  • 首先,DataParallel 是单进程、多线程的,但只能在单台机器上运行。相比之下,DistributedDataParallel 是多进程的,支持单机和多机训练。 由于线程间的GIL竞争、每轮复制模型以及输入分散和输出收集带来的额外开销,DataParallel 即使在单台机器上也通常比 DistributedDataParallel 更慢。

  • 回想一下 之前的教程 ,如果你的模型太大而无法放在单个GPU上,你必须使用模型并行 将其拆分到多个GPU上。DistributedDataParallel 支持 模型并行, 而 DataParallel 目前不支持。当DDP与模型并行结合使用时,每个DDP进程将使用模型并行,并且所有进程 共同使用数据并行。

基本用例

要创建DDP模块,您必须首先正确设置进程组。更多详情可以 在使用PyTorch编写分布式应用程序中找到。

import os
import sys
import tempfile
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp

from torch.nn.parallel import DistributedDataParallel as DDP

# On Windows platform, the torch.distributed package only
# supports Gloo backend, FileStore and TcpStore.
# For FileStore, set init_method parameter in init_process_group
# to a local file. Example as follow:
# init_method="file:///f:/libtmp/some_file"
# dist.init_process_group(
#    "gloo",
#    rank=rank,
#    init_method=init_method,
#    world_size=world_size)
# For TcpStore, same way as on Linux.

def setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # initialize the process group
    dist.init_process_group("gloo", rank=rank, world_size=world_size)

def cleanup():
    dist.destroy_process_group()

现在,让我们创建一个玩具模块,用DDP包装它,并输入一些虚拟数据。请注意,在DDP构造函数中,DDP会从rank 0进程广播模型状态到所有其他进程,因此您无需担心不同的DDP进程从不同的初始模型参数值开始。

class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = nn.Linear(10, 10)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(10, 5)

    def forward(self, x):
        return self.net2(self.relu(self.net1(x)))


def demo_basic(rank, world_size):
    print(f"Running basic DDP example on rank {rank}.")
    setup(rank, world_size)

    # create model and move it to GPU with id rank
    model = ToyModel().to(rank)
    ddp_model = DDP(model, device_ids=[rank])

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(rank)
    loss_fn(outputs, labels).backward()
    optimizer.step()

    cleanup()
    print(f"Finished running basic DDP example on rank {rank}.")


def run_demo(demo_fn, world_size):
    mp.spawn(demo_fn,
             args=(world_size,),
             nprocs=world_size,
             join=True)

如你所见,DDP封装了底层分布式通信的细节,并提供了一个干净的API,就像它是一个本地模型一样。梯度同步通信发生在反向传播过程中,并与反向计算重叠。当backward()返回时,param.grad已经包含了同步后的梯度张量。对于基本用例,DDP只需几行额外代码即可设置进程组。在将DDP应用于更高级用例时,需要注意一些注意事项。

处理速度不均衡

在DDP中,构造函数、前向传播和反向传播是分布式的同步点。不同的进程应启动相同数量的同步,并以相同的顺序到达这些同步点,且大致同时进入每个同步点。否则,快速进程可能会提前到达并在等待落后进程时超时。因此,用户有责任平衡各进程之间的工作负载分布。有时,由于网络延迟、资源争用或不可预测的工作负载激增等原因,处理速度可能会出现偏差。为了避免在这些情况下发生超时,请确保在调用init_process_group时传递一个足够大的timeout值。

保存和加载检查点

在训练过程中,通常使用torch.savetorch.load来检查点模块并在恢复时从检查点加载。详情请参见 保存和加载模型。当使用DDP时,一种优化方法是在一个进程中保存模型,然后在所有进程中加载它,从而减少写入开销。 这种方法有效是因为所有进程都从相同的参数开始,并且在反向传播中梯度是同步的,因此优化器应该保持将参数设置为相同的值。 如果你使用这种优化(即在一个进程中保存但在所有进程中恢复),请确保没有进程在保存完成之前开始加载。此外,在加载模块时, 你需要提供适当的map_location参数,以防止进程进入其他设备。如果缺少map_locationtorch.load将首先把模块加载到CPU,然后将每个参数复制到其保存的位置,这会导致同一台机器上的所有进程使用相同的设备集。 有关更高级的故障恢复和弹性支持,请参阅 TorchElastic

def demo_checkpoint(rank, world_size):
    print(f"Running DDP checkpoint example on rank {rank}.")
    setup(rank, world_size)

    model = ToyModel().to(rank)
    ddp_model = DDP(model, device_ids=[rank])


    CHECKPOINT_PATH = tempfile.gettempdir() + "/model.checkpoint"
    if rank == 0:
        # All processes should see same parameters as they all start from same
        # random parameters and gradients are synchronized in backward passes.
        # Therefore, saving it in one process is sufficient.
        torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)

    # Use a barrier() to make sure that process 1 loads the model after process
    # 0 saves it.
    dist.barrier()
    # configure map_location properly
    map_location = {'cuda:%d' % 0: 'cuda:%d' % rank}
    ddp_model.load_state_dict(
        torch.load(CHECKPOINT_PATH, map_location=map_location, weights_only=True))

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(rank)

    loss_fn(outputs, labels).backward()
    optimizer.step()

    # Not necessary to use a dist.barrier() to guard the file deletion below
    # as the AllReduce ops in the backward pass of DDP already served as
    # a synchronization.

    if rank == 0:
        os.remove(CHECKPOINT_PATH)

    cleanup()
    print(f"Finished running DDP checkpoint example on rank {rank}.")

结合DDP与模型并行

DDP 也适用于多 GPU 模型。在使用大量数据训练大型模型时,对多 GPU 模型进行 DDP 包装尤其有帮助。

class ToyMpModel(nn.Module):
    def __init__(self, dev0, dev1):
        super(ToyMpModel, self).__init__()
        self.dev0 = dev0
        self.dev1 = dev1
        self.net1 = torch.nn.Linear(10, 10).to(dev0)
        self.relu = torch.nn.ReLU()
        self.net2 = torch.nn.Linear(10, 5).to(dev1)

    def forward(self, x):
        x = x.to(self.dev0)
        x = self.relu(self.net1(x))
        x = x.to(self.dev1)
        return self.net2(x)

当将多GPU模型传递给DDP时,device_idsoutput_device 不得设置。输入和输出数据将由应用程序或模型的forward()方法正确放置到相应的设备上。

def demo_model_parallel(rank, world_size):
    print(f"Running DDP with model parallel example on rank {rank}.")
    setup(rank, world_size)

    # setup mp_model and devices for this process
    dev0 = rank * 2
    dev1 = rank * 2 + 1
    mp_model = ToyMpModel(dev0, dev1)
    ddp_mp_model = DDP(mp_model)

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_mp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    # outputs will be on dev1
    outputs = ddp_mp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(dev1)
    loss_fn(outputs, labels).backward()
    optimizer.step()

    cleanup()
    print(f"Finished running DDP with model parallel example on rank {rank}.")


if __name__ == "__main__":
    n_gpus = torch.cuda.device_count()
    assert n_gpus >= 2, f"Requires at least 2 GPUs to run, but got {n_gpus}"
    world_size = n_gpus
    run_demo(demo_basic, world_size)
    run_demo(demo_checkpoint, world_size)
    world_size = n_gpus//2
    run_demo(demo_model_parallel, world_size)

使用 torch.distributed.run/torchrun 初始化 DDP

我们可以利用PyTorch Elastic来简化DDP代码并更轻松地初始化作业。 让我们仍然使用Toymodel示例,并创建一个名为elastic_ddp.py的文件。

import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim

from torch.nn.parallel import DistributedDataParallel as DDP

class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = nn.Linear(10, 10)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(10, 5)

    def forward(self, x):
        return self.net2(self.relu(self.net1(x)))


def demo_basic():
    torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))
    dist.init_process_group("nccl")
    rank = dist.get_rank()
    print(f"Start running basic DDP example on rank {rank}.")
    # create model and move it to GPU with id rank
    device_id = rank % torch.cuda.device_count()
    model = ToyModel().to(device_id)
    ddp_model = DDP(model, device_ids=[device_id])
    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(device_id)
    loss_fn(outputs, labels).backward()
    optimizer.step()
    dist.destroy_process_group()
    print(f"Finished running basic DDP example on rank {rank}.")

if __name__ == "__main__":
    demo_basic()

然后可以在所有节点上运行一个 torch elastic/torchrun 命令 来初始化上面创建的DDP任务:

torchrun --nnodes=2 --nproc_per_node=8 --rdzv_id=100 --rdzv_backend=c10d --rdzv_endpoint=$MASTER_ADDR:29400 elastic_ddp.py

在上面的例子中,我们在两个主机上运行DDP脚本,并且每个主机上运行8个进程。也就是说,我们在这个任务上使用了16个GPU。请注意 $MASTER_ADDR 必须在所有节点上保持一致。

这里 torchrun 将启动 8 个进程,并在它所在的节点上每个进程中调用 elastic_ddp.py ,但用户还需要使用 slurm 等集群管理工具才能实际在 2 个节点上运行此命令。

例如,在一个支持SLURM的集群上,我们可以编写一个脚本来运行上面的命令 并设置 MASTER_ADDR 为:

export MASTER_ADDR=$(scontrol show hostname ${SLURM_NODELIST} | head -n 1)

那么我们就可以使用SLURM命令运行此脚本:srun --nodes=2 ./torchrun_script.sh

这只是一个示例;您可以选择自己的集群调度工具来启动 torchrun 个任务。

有关Elastic运行的更多信息,请参阅 快速入门文档

文档

访问 PyTorch 的全面开发人员文档

查看文档

教程

获取面向初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并解答您的问题

查看资源