目录

使用PyTorch编写分布式应用

创建时间:2017年10月06日 | 最后更新时间:2024年12月10日 | 最后验证时间:2024年11月05日

作者: Séb Arnold

注意

edit 查看和编辑此教程在 github

Prerequisites:

在这个简短的教程中,我们将介绍 PyTorch 的分布式包。我们将看到如何设置分布式环境,使用不同的通信策略,并了解该包的一些内部机制。

设置

PyTorch包含的分布式包(即, torch.distributed) 使研究人员和实践者能够轻松地 在进程和机器集群上并行计算。为此,它利用消息传递语义 允许每个进程将数据通信到其他任何进程。与multiprocessing (torch.multiprocessing) 包不同, 进程可以使用不同的通信后端,并不受限制于必须在同一台机器上执行。

为了开始,我们需要能够同时运行多个进程。如果您有计算集群的访问权限,应该咨询您的本地系统管理员或使用您喜欢的协调工具(例如, pdsh, clustershell, 或 slurm)。出于本教程的目的,我们将使用单台机器,并通过以下模板生成多个进程。

"""run.py:"""
#!/usr/bin/env python
import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp

def run(rank, size):
    """ Distributed function to be implemented later. """
    pass

def init_process(rank, size, fn, backend='gloo'):
    """ Initialize the distributed environment. """
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29500'
    dist.init_process_group(backend, rank=rank, world_size=size)
    fn(rank, size)


if __name__ == "__main__":
    world_size = 2
    processes = []
    mp.set_start_method("spawn")
    for rank in range(world_size):
        p = mp.Process(target=init_process, args=(rank, world_size, run))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

上述脚本会启动两个进程,每个进程都将设置分布式环境,初始化进程组 (dist.init_process_group),最后执行给定的 run 函数。

让我们来看看init_process函数。它确保每个进程都能通过主节点进行协调,使用相同的IP地址和端口。请注意,我们使用了gloo后端,但其他后端也是可用的。(参见第5.1节)我们将在本教程的最后介绍dist.init_process_group中的魔法,但它基本上允许进程通过共享位置相互通信。

点对点通信

Send and Recv

发送和接收

从一个进程向另一个进程传输数据称为点对点通信。这些通信通过 sendrecv 函数或其 立即 对应函数 isendirecv 实现。

"""Blocking point-to-point communication."""

def run(rank, size):
    tensor = torch.zeros(1)
    if rank == 0:
        tensor += 1
        # Send the tensor to process 1
        dist.send(tensor=tensor, dst=1)
    else:
        # Receive tensor from process 0
        dist.recv(tensor=tensor, src=0)
    print('Rank ', rank, ' has data ', tensor[0])

在上面的例子中,两个进程都从一个零张量开始,然后 进程 0 增加该张量并将其发送给进程 1,这样它们 最终都会得到 1.0。请注意,进程 1 需要分配内存以存储它将接收的数据。

请注意,send/recv阻塞的:两个进程都会阻塞 直到通信完成。另一方面,立即操作是 非阻塞的;脚本继续执行,方法返回一个 Work 对象, 我们可以选择在该对象上执行 wait()

"""Non-blocking point-to-point communication."""

def run(rank, size):
    tensor = torch.zeros(1)
    req = None
    if rank == 0:
        tensor += 1
        # Send the tensor to process 1
        req = dist.isend(tensor=tensor, dst=1)
        print('Rank 0 started sending')
    else:
        # Receive tensor from process 0
        req = dist.irecv(tensor=tensor, src=0)
        print('Rank 1 started receiving')
    req.wait()
    print('Rank ', rank, ' has data ', tensor[0])

当使用立即值时,我们必须小心如何使用发送和接收张量。 由于我们不知道数据何时会被通信到其他进程, 在 req.wait() 完成之前,不应修改发送的张量也不应访问接收的张量。 换句话说,

  • tensor 之后写入 dist.isend() 将导致未定义行为。

  • tensor 读取,在 dist.irecv() 之后将导致未定义行为。

然而,在 req.wait() 执行后,我们可以保证通信已经发生, 并且存储在 tensor[0] 中的值为 1.0。

点对点通信在我们希望对进程之间的通信进行更细粒度的控制时非常有用。它们可以用于实现复杂的算法,例如 百度的DeepSpeechFacebook的大规模实验 中使用的算法。(参见 第4.1节

集体通信

Scatter

分散

Gather

聚集

Reduce

减少

All-Reduce

All-Reduce

Broadcast

广播

All-Gather

All-Gather

与点对点通信不同,集体通信允许在中的所有进程中进行通信模式。一个组是所有进程的一个子集。要创建一个组,我们可以将一组rank传递给dist.new_group(group)。默认情况下,集体通信会在所有进程中执行,也称为world。例如,为了获得所有进程上所有张量的总和,我们可以使用dist.all_reduce(tensor, op, group)集体通信。

""" All-Reduce example."""
def run(rank, size):
    """ Simple collective communication. """
    group = dist.new_group([0, 1])
    tensor = torch.ones(1)
    dist.all_reduce(tensor, op=dist.ReduceOp.SUM, group=group)
    print('Rank ', rank, ' has data ', tensor[0])

由于我们想要计算组内所有张量的总和,我们使用 dist.ReduceOp.SUM 作为归约运算符。一般来说,任何 可交换的数学运算都可以用作运算符。 默认情况下,PyTorch 提供了许多这样的运算符,所有运算都在元素级进行:

  • dist.ReduceOp.SUM,

  • dist.ReduceOp.PRODUCT,

  • dist.ReduceOp.MAX,

  • dist.ReduceOp.MIN,

  • dist.ReduceOp.BAND,

  • dist.ReduceOp.BOR,

  • dist.ReduceOp.BXOR,

  • dist.ReduceOp.PREMUL_SUM.

支持的操作符完整列表在 这里

除了 dist.all_reduce(tensor, op, group),PyTorch 当前还实现了许多其他集合操作。以下是一些支持的集合操作。

  • dist.broadcast(tensor, src, group): 从 tensor 复制到 src 的所有其他进程。

  • dist.reduce(tensor, dst, op, group): 对 op 应用于每一个 tensor,并将结果存储在 dst 中。

  • dist.all_reduce(tensor, op, group): 与reduce相同,但结果会存储在所有进程中。

  • dist.scatter(tensor, scatter_list, src, group): 将 \(i^{\text{th}}\) 张量 scatter_list[i] 复制到 \(i^{\text{th}}\) 进程。

  • dist.gather(tensor, gather_list, dst, group): 从 dst 中的所有进程中复制 tensor

  • dist.all_gather(tensor_list, tensor, group): 将 tensor 从所有进程复制到 tensor_list,在所有进程中。

  • dist.barrier(group): 阻止 group 中的所有进程,直到每个进程都进入此函数。

  • dist.all_to_all(output_tensor_list, input_tensor_list, group): 将输入张量列表散播到组中的所有进程中,并返回输出列表中的收集张量列表。

支持的集合操作的完整列表可以通过查看PyTorch分布式最新文档找到 (链接)

分布式训练

注意: 你可以在 这个GitHub仓库 中找到本节的示例脚本。

现在我们了解了分布式模块的工作原理,让我们用它来编写一些有用的内容。我们的目标是复制 DistributedDataParallel 的功能。当然,这将是一个教学示例,在实际应用中您应使用上述链接的官方、经过充分测试和优化的版本。

非常简单,我们想要实现随机梯度下降的分布式版本。我们的脚本将让所有进程在其数据批次上计算模型的梯度,然后平均这些梯度。为了在更改进程数量时确保类似的收敛结果,我们首先必须对数据集进行分区。(您也可以使用 torch.utils.data.random_split, 而不是下面的代码片段。)

""" Dataset partitioning helper """
class Partition(object):

    def __init__(self, data, index):
        self.data = data
        self.index = index

    def __len__(self):
        return len(self.index)

    def __getitem__(self, index):
        data_idx = self.index[index]
        return self.data[data_idx]


class DataPartitioner(object):

    def __init__(self, data, sizes=[0.7, 0.2, 0.1], seed=1234):
        self.data = data
        self.partitions = []
        rng = Random()  # from random import Random
        rng.seed(seed)
        data_len = len(data)
        indexes = [x for x in range(0, data_len)]
        rng.shuffle(indexes)

        for frac in sizes:
            part_len = int(frac * data_len)
            self.partitions.append(indexes[0:part_len])
            indexes = indexes[part_len:]

    def use(self, partition):
        return Partition(self.data, self.partitions[partition])

通过上述代码片段,我们现在可以使用以下几行代码对任何数据集进行划分:

""" Partitioning MNIST """
def partition_dataset():
    dataset = datasets.MNIST('./data', train=True, download=True,
                             transform=transforms.Compose([
                                 transforms.ToTensor(),
                                 transforms.Normalize((0.1307,), (0.3081,))
                             ]))
    size = dist.get_world_size()
    bsz = 128 // size
    partition_sizes = [1.0 / size for _ in range(size)]
    partition = DataPartitioner(dataset, partition_sizes)
    partition = partition.use(dist.get_rank())
    train_set = torch.utils.data.DataLoader(partition,
                                         batch_size=bsz,
                                         shuffle=True)
    return train_set, bsz

假设我们有 2 个副本,那么每个进程将有 train_set 个 60000 / 2 = 30000 个样本。我们还将批量大小除以副本数量,以保持 整体 批量大小为 128。

我们现在可以编写常规的前向-后向-优化训练代码,并添加一个函数调用来平均我们模型的梯度。(以下内容主要受到官方PyTorch MNIST示例的启发。)

""" Distributed Synchronous SGD Example """
def run(rank, size):
    torch.manual_seed(1234)
    train_set, bsz = partition_dataset()
    model = Net()
    optimizer = optim.SGD(model.parameters(),
                          lr=0.01, momentum=0.5)

    num_batches = ceil(len(train_set.dataset) / float(bsz))
    for epoch in range(10):
        epoch_loss = 0.0
        for data, target in train_set:
            optimizer.zero_grad()
            output = model(data)
            loss = F.nll_loss(output, target)
            epoch_loss += loss.item()
            loss.backward()
            average_gradients(model)
            optimizer.step()
        print('Rank ', dist.get_rank(), ', epoch ',
              epoch, ': ', epoch_loss / num_batches)

尚需实现 average_gradients(model) 函数,该函数 仅接收一个模型,并在其整个世界范围内平均其梯度。

""" Gradient averaging. """
def average_gradients(model):
    size = float(dist.get_world_size())
    for param in model.parameters():
        dist.all_reduce(param.grad.data, op=dist.ReduceOp.SUM)
        param.grad.data /= size

Et voilà! 我们成功实现了分布式同步SGD,并且可以在大型计算机集群上训练任何模型。

注意:虽然最后一句话在技术上是正确的,但要实现生产级别的同步SGD实现还需要很多更多的技巧。再次强调,使用经过测试和优化过的内容。

我们的Ring-Allreduce

作为一个额外的挑战,想象一下我们想要实现 DeepSpeech 的高效环形全规约。使用点对点通信集合操作来实现这一点相当容易。

""" Implementation of a ring-reduce with addition. """
def allreduce(send, recv):
   rank = dist.get_rank()
   size = dist.get_world_size()
   send_buff = send.clone()
   recv_buff = send.clone()
   accum = send.clone()

   left = ((rank - 1) + size) % size
   right = (rank + 1) % size

   for i in range(size - 1):
       if i % 2 == 0:
           # Send send_buff
           send_req = dist.isend(send_buff, right)
           dist.recv(recv_buff, left)
           accum[:] += recv_buff[:]
       else:
           # Send recv_buff
           send_req = dist.isend(recv_buff, right)
           dist.recv(send_buff, left)
           accum[:] += send_buff[:]
       send_req.wait()
   recv[:] = accum[:]

在上面的脚本中,allreduce(send, recv) 函数的签名与 PyTorch 中的略有不同。它接受一个 recv 张量,并将在其中存储所有 send 张量的总和。作为留给读者的练习,我们的版本与 DeepSpeech 中的版本仍有一个不同:他们的实现将梯度张量分成 ,以便最优地利用通信带宽。(提示: torch.chunk

高级主题

我们现在可以探索一些更高级的功能 of torch.distributed。由于内容较多,本节分为两个子部分:

  1. 通信后端:我们学习如何使用 MPI 和 Gloo 进行 GPU 间的通信。

  2. 初始化方法:我们理解如何最好地设置初始协调阶段在dist.init_process_group()

通信后端

PyTorch 最优雅的方面之一是它能够抽象并构建在不同的后端之上。如前所述,PyTorch 中实现了多个后端。其中一些最流行的后端是 Gloo、NCCL 和 MPI。它们各自有不同的规格和权衡,取决于所需的使用场景。支持的功能的比较表格可以在这里找到 这里

Gloo 后端

到目前为止,我们广泛使用了 Gloo后端。 它作为一个开发平台非常方便,因为它包含在预编译的PyTorch二进制文件中,并且可以在Linux(自0.2版本起)和macOS(自1.3版本起)上工作。它支持所有CPU上的点对点和集体操作,以及GPU上的所有集体操作。CUDA张量的集体操作实现不如NCCL后端提供的那些优化得好。

正如您已经注意到的,我们的 分布式SGD示例如果您在GPU上放置 model 是无法工作的。 为了使用多个GPU,我们还需要进行以下 修改:

  1. 使用 device = torch.device("cuda:{}".format(rank))

  2. model = Net() \(\rightarrow\) model = Net().to(device)

  3. 使用 data, target = data.to(device), target.to(device)

在上述修改后,我们的模型现在正在两个GPU上进行训练,并且您可以使用 watch nvidia-smi 来监控它们的利用率。

MPI 后端

消息传递接口(MPI)是高性能计算领域的一种标准化工具。它允许进行点对点和集体通信,并且是 torch.distributed API 的主要灵感来源。存在多种 MPI 的实现(例如 Open-MPI, MVAPICH2, Intel MPI),每种实现都针对不同的用途进行了优化。使用 MPI 后端的优势在于 MPI 在大型计算机集群上具有广泛的可用性 - 以及高水平的优化。一些 近期 实现 也能利用 CUDA IPC 和 GPU Direct 技术,以避免通过 CPU 进行内存复制。

遗憾的是,PyTorch 的二进制文件无法包含 MPI 实现, 因此我们需要手动重新编译。幸运的是,这个过程相当简单, 因为在编译时,PyTorch 会自己查找可用的 MPI 实现。以下步骤通过从源代码安装 PyTorch 来安装 MPI 后端 从源代码

  1. 创建并激活你的Anaconda环境,按照指南安装所有先决条件,但暂时还不要运行python setup.py install

  2. 选择并安装您最喜欢的MPI实现。请注意,启用CUDA感知MPI可能需要一些额外的步骤。在我们的情况下,我们将使用不支持GPU的Open-MPI: conda install -c conda-forge openmpi

  3. 现在,前往您克隆的PyTorch仓库并执行 python setup.py install

为了测试我们新安装的后端,需要进行一些修改。

  1. if __name__ == '__main__': 下的内容替换为 init_process(0, 0, run, backend='mpi')

  2. 运行 mpirun -n 4 python myscript.py

这些更改的原因是MPI需要在生成进程之前创建自己的环境。MPI还将生成自己的进程并执行初始化方法中描述的握手,使得ranksize参数对于init_process_group来说是多余的。这实际上非常强大,因为你可以传递额外的参数给mpirun,以便为每个进程定制计算资源。(例如每个进程的核心数、手动分配特定机器到特定等级,以及更多) 这样做后,你应该获得与其他通信后端相同的熟悉输出。

NCCL 后端

NCCL 后端提供了针对 CUDA 张量的集体操作的优化实现。如果你只使用 CUDA 张量进行集体操作,建议使用此后端以获得最佳性能。NCCL 后端包含在具有 CUDA 支持的预构建二进制文件中。

初始化方法

为结束本教程,让我们检查我们调用的第一个函数: dist.init_process_group(backend, init_method)。具体来说,我们将讨论负责每个进程之间初步协调步骤的各种初始化方法。 这些方法使您能够定义这种协调是如何完成的。

初始化方法的选择取决于您的硬件设置,某种方法可能比其他方法更合适。除了以下部分,请参阅官方文档以获取更多信息。

环境变量

我们一直在本教程中使用环境变量初始化方法。通过在所有机器上设置以下四个环境变量,所有进程都将能够正确连接到主节点,获取其他进程的信息,并最终与它们进行握手。

  • MASTER_PORT: 机器上将托管排名为 0 的进程的免费端口。

  • MASTER_ADDR: 用于托管进程(rank 0)的机器的IP地址

  • WORLD_SIZE: 总进程数,这样主进程知道要等待多少个工作进程。

  • RANK: 每个进程的排名,这样它们将知道是否是主进程还是工作进程。

共享文件系统

共享文件系统要求所有进程都能访问共享文件系统,并通过共享文件协调它们。这意味着每个进程将打开文件,写入其信息,并等待所有人都完成。之后,所有所需的信息都将对所有进程可用。为了避免竞争条件,文件系统必须通过 fcntl 支持锁定。

dist.init_process_group(
    init_method='file:///mnt/nfs/sharedfile',
    rank=args.rank,
    world_size=4)

TCP

通过 TCP 进行初始化可以通过提供 rank 为 0 的进程的 IP 地址和一个可访问的端口号来实现。 在此,所有工作进程都将能够连接到 rank 为 0 的进程,并交换如何互相连接的信息。

dist.init_process_group(
    init_method='tcp://10.1.1.20:23456',
    rank=args.rank,
    world_size=4)

致谢

我想感谢PyTorch的开发者们在实现、文档和测试方面所做的出色工作。当代码不清楚时,我总是可以依赖于文档测试来找到答案。特别感谢Soumith Chintala、Adam Paszke和Natalia Gimelshein提供了深刻的评论,并回答了早期草稿中的问题。

文档

访问 PyTorch 的全面开发人员文档

查看文档

教程

获取面向初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并解答您的问题

查看资源