目录

使用 PyTorch 编写分布式应用程序

创建时间: 2017年10月06日 |上次更新时间:2024 年 12 月 10 日 |上次验证: Nov 05, 2024

作者Séb Arnold

注意

编辑github 中查看和编辑本教程。

先决条件:

在这个简短的教程中,我们将介绍分布式软件包 PyTorch 中。我们将看到如何设置 distributed 设置,使用 不同的沟通策略,并回顾 包。

设置

PyTorch 中包含的分布式软件包(即 )使研究人员和从业者能够轻松地 跨进程和集群并行计算 机器。为此,它利用消息传递语义 允许每个进程将数据传送到任何其他进程。 与 multiprocessing () 包相反, 进程可以使用不同的通信后端,而不是 仅限于在同一台计算机上执行。torch.distributedtorch.multiprocessing

为了开始,我们需要能够运行多个进程 同时。如果您有权访问 compute cluster,则应选中 与本地系统管理员一起使用,或使用您最喜欢的协调工具(例如 pdshclustershellslurm)。为此 教程中,我们将使用一台机器并使用 以下模板。

"""run.py:"""
#!/usr/bin/env python
import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp

def run(rank, size):
    """ Distributed function to be implemented later. """
    pass

def init_process(rank, size, fn, backend='gloo'):
    """ Initialize the distributed environment. """
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29500'
    dist.init_process_group(backend, rank=rank, world_size=size)
    fn(rank, size)


if __name__ == "__main__":
    world_size = 2
    processes = []
    mp.set_start_method("spawn")
    for rank in range(world_size):
        p = mp.Process(target=init_process, args=(rank, world_size, run))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

上面的脚本生成了两个进程,每个进程都将设置 分布式环境,初始化进程组 (),最后执行给定的函数。dist.init_process_grouprun

我们来看看这个函数。它确保 每个进程都将能够通过 master 进行协调,使用 相同的 IP 地址和端口。请注意,我们使用了后端,但 其他后端可用。(参见第 5.1 节)我们将回顾一下魔术 在本教程结束时, 但它本质上允许进程通过以下方式相互通信 分享他们的位置。init_processgloodist.init_process_group

点对点通信

Send 和 Recv

Send 和 Recv

数据从一个进程传输到另一个进程称为 点对点通信。这些是通过 and 函数或其直接对应部分 和 实现的。sendrecvisendirecv

"""Blocking point-to-point communication."""

def run(rank, size):
    tensor = torch.zeros(1)
    if rank == 0:
        tensor += 1
        # Send the tensor to process 1
        dist.send(tensor=tensor, dst=1)
    else:
        # Receive tensor from process 0
        dist.recv(tensor=tensor, src=0)
    print('Rank ', rank, ' has data ', tensor[0])

在上面的示例中,两个进程都以零张量开始,然后 进程 0 递增张量并将其发送到进程 1,以便它们 两者都以 1.0 结束。请注意,进程 1 需要在 order 来存储它将接收的数据。

另请注意 are blocking:两个进程都阻塞 直到通信完成。另一方面,即时是非阻塞的;脚本继续执行,并且 返回一个对象,我们可以选择对其执行 。send/recvWorkwait()

"""Non-blocking point-to-point communication."""

def run(rank, size):
    tensor = torch.zeros(1)
    req = None
    if rank == 0:
        tensor += 1
        # Send the tensor to process 1
        req = dist.isend(tensor=tensor, dst=1)
        print('Rank 0 started sending')
    else:
        # Receive tensor from process 0
        req = dist.irecv(tensor=tensor, src=0)
        print('Rank 1 started receiving')
    req.wait()
    print('Rank ', rank, ' has data ', tensor[0])

使用 immediates 时,我们必须小心使用发送和接收的张量。 由于我们不知道数据何时会传送到另一个进程, 我们不应该修改发送的 Tensor,也不应该在完成之前访问接收到的 Tensor。 换句话说,req.wait()

  • 写入 after 将导致 undefined 行为。tensordist.isend()

  • 从 after 读取将导致 undefined 行为。tensordist.irecv()

但是,在执行之后,我们可以保证通信已经发生。 并且存储的值为 1.0。req.wait()tensor[0]

当我们想要更细粒度时,点对点通信非常有用 控制我们流程的通信。它们可用于 实现花哨的算法,例如百度的 DeepSpeechFacebook 的大规模 实验。(参见第 4.1 节)

集体通信

散射

散射

收集

收集

减少

减少

全部减少

全部减少

广播

广播

全聚集

全聚集

与点对点通信相反,集体允许 组中所有进程的通信模式。组是一个 我们所有流程的子集。要创建一个组,我们可以传递一个 等级设置为 。默认情况下,将执行 collectives 在所有进程上,也称为世界。例如,在 order 要获得所有进程上所有张量的总和,我们可以使用 collective。dist.new_group(group)dist.all_reduce(tensor, op, group)

""" All-Reduce example."""
def run(rank, size):
    """ Simple collective communication. """
    group = dist.new_group([0, 1])
    tensor = torch.ones(1)
    dist.all_reduce(tensor, op=dist.ReduceOp.SUM, group=group)
    print('Rank ', rank, ' has data ', tensor[0])

由于我们想要组中所有张量的总和,因此我们用作 reduce 运算符。一般来说,任何 交换数学运算可以用作运算符。 开箱即用的 PyTorch 附带了许多这样的运算符,它们都在 元素级别:dist.ReduceOp.SUM

  • dist.ReduceOp.SUM,

  • dist.ReduceOp.PRODUCT,

  • dist.ReduceOp.MAX,

  • dist.ReduceOp.MIN,

  • dist.ReduceOp.BAND,

  • dist.ReduceOp.BOR,

  • dist.ReduceOp.BXOR,

  • dist.ReduceOp.PREMUL_SUM.

支持的运算符的完整列表在这里。

除了 之外,目前还有许多其他 collectives 实现在 PyTorch 的 Torch 中。以下是一些受支持的 collective。dist.all_reduce(tensor, op, group)

  • dist.broadcast(tensor, src, group):复制到所有其他进程。tensorsrc

  • dist.reduce(tensor, dst, op, group):应用于 every 并将结果存储在 中。optensordst

  • dist.all_reduce(tensor, op, group):与 reduce 相同,但 result 存储在所有进程中。

  • dist.scatter(tensor, scatter_list, src, group):将 \(i^{\text{th}}\) 张量复制到 \(i^{\text{th}}\) 进程。scatter_list[i]

  • dist.gather(tensor, gather_list, dst, group):从 中的所有进程复制。tensordst

  • dist.all_gather(tensor_list, tensor, group):在所有进程上从所有进程复制到 。tensortensor_list

  • dist.barrier(group):阻止中的所有进程,直到每个进程都进入此功能。

  • dist.all_to_all(output_tensor_list, input_tensor_list, group):将输入张量列表分散到 A Group 并返回 Output List 中收集的张量列表。

通过查看 PyTorch Distributed 的最新文档(链接),可以找到支持的 Collectives 的完整列表。

分布式训练

注意:您可以在此中找到本节的示例脚本 GitHub 存储库

现在我们了解了分布式模块的工作原理,让我们编写 一些有用的东西。我们的目标是复制 DistributedDataParallel 的功能。 当然,这将是一个说教式的例子,并且在现实世界中 情况下,您应该使用官方的、经过充分测试和优化的 版本。

很简单,我们想实现随机振荡的分布式版本 梯度下降。我们的脚本将让所有进程计算 他们的模型对他们的批次数据的梯度,然后平均他们的 梯度。为了确保在更改时获得相似的收敛结果 进程数,我们首先必须对数据集进行分区。 (您也可以使用 torch.utils.data.random_split, 而不是下面的代码段。

""" Dataset partitioning helper """
class Partition(object):

    def __init__(self, data, index):
        self.data = data
        self.index = index

    def __len__(self):
        return len(self.index)

    def __getitem__(self, index):
        data_idx = self.index[index]
        return self.data[data_idx]


class DataPartitioner(object):

    def __init__(self, data, sizes=[0.7, 0.2, 0.1], seed=1234):
        self.data = data
        self.partitions = []
        rng = Random()  # from random import Random
        rng.seed(seed)
        data_len = len(data)
        indexes = [x for x in range(0, data_len)]
        rng.shuffle(indexes)

        for frac in sizes:
            part_len = int(frac * data_len)
            self.partitions.append(indexes[0:part_len])
            indexes = indexes[part_len:]

    def use(self, partition):
        return Partition(self.data, self.partitions[partition])

使用上面的代码片段,我们现在可以使用 以下几行:

""" Partitioning MNIST """
def partition_dataset():
    dataset = datasets.MNIST('./data', train=True, download=True,
                             transform=transforms.Compose([
                                 transforms.ToTensor(),
                                 transforms.Normalize((0.1307,), (0.3081,))
                             ]))
    size = dist.get_world_size()
    bsz = 128 // size
    partition_sizes = [1.0 / size for _ in range(size)]
    partition = DataPartitioner(dataset, partition_sizes)
    partition = partition.use(dist.get_rank())
    train_set = torch.utils.data.DataLoader(partition,
                                         batch_size=bsz,
                                         shuffle=True)
    return train_set, bsz

假设我们有 2 个副本,那么每个进程将有 60000 / 2 = 30000 个样本。我们还将批量大小除以 数量,以保持整体批处理大小为 128。train_set

我们现在可以编写通常的 forward-backward-optimize 训练代码,并且 添加一个函数调用来平均模型的梯度。(该 following 在很大程度上受到官方 PyTorch MNIST 的启发 示例

""" Distributed Synchronous SGD Example """
def run(rank, size):
    torch.manual_seed(1234)
    train_set, bsz = partition_dataset()
    model = Net()
    optimizer = optim.SGD(model.parameters(),
                          lr=0.01, momentum=0.5)

    num_batches = ceil(len(train_set.dataset) / float(bsz))
    for epoch in range(10):
        epoch_loss = 0.0
        for data, target in train_set:
            optimizer.zero_grad()
            output = model(data)
            loss = F.nll_loss(output, target)
            epoch_loss += loss.item()
            loss.backward()
            average_gradients(model)
            optimizer.step()
        print('Rank ', dist.get_rank(), ', epoch ',
              epoch, ': ', epoch_loss / num_batches)

它仍然需要实现函数,该函数 只需接收一个模型并对其整个 世界。average_gradients(model)

""" Gradient averaging. """
def average_gradients(model):
    size = float(dist.get_world_size())
    for param in model.parameters():
        dist.all_reduce(param.grad.data, op=dist.ReduceOp.SUM)
        param.grad.data /= size

Et voilà!我们成功实现了分布式同步 SGD 和 可以在大型计算机集群上训练任何模型。

注意:虽然最后一句话在技术上是正确的,但有一个 需要更多技巧 实现同步 SGD 的生产级实现。再 使用经过测试的产品 优化。

我们自己的 Ring-Allreduce

作为另一个挑战,假设我们想实现 DeepVoice 的高效环 allreduce 的 Ring allReduce 的 Limit。这相当容易实现 使用 point-to-point collectives。

""" Implementation of a ring-reduce with addition. """
def allreduce(send, recv):
   rank = dist.get_rank()
   size = dist.get_world_size()
   send_buff = send.clone()
   recv_buff = send.clone()
   accum = send.clone()

   left = ((rank - 1) + size) % size
   right = (rank + 1) % size

   for i in range(size - 1):
       if i % 2 == 0:
           # Send send_buff
           send_req = dist.isend(send_buff, right)
           dist.recv(recv_buff, left)
           accum[:] += recv_buff[:]
       else:
           # Send recv_buff
           send_req = dist.isend(recv_buff, right)
           dist.recv(send_buff, left)
           accum[:] += send_buff[:]
       send_req.wait()
   recv[:] = accum[:]

在上面的脚本中,该函数有一个 签名与 PyTorch 中的签名略有不同。它需要一个张量,并将所有张量的总和存储在其中。如 留给读者的练习,两者之间仍然存在一个区别 我们的版本和 DeepSpeech 中的版本:它们的实现将 gradient tensor 转换为,以便最佳地利用 通信带宽。(提示:torch.chunkallreduce(send, recv)recvsend)

高级主题

现在,我们已准备好发现一些更高级的功能 之。由于要涵盖的内容很多,因此本节是 分为两个小节:torch.distributed

  1. 通信后端:我们学习如何使用 MPI 和 Gloo 的地方 GPU-GPU 通信。

  2. 初始化方法:我们了解如何最好地设置 的初始协调阶段。dist.init_process_group()

通信后端

最优雅的方面之一是它的能力 来抽象和构建不同的后端。如前所述, PyTorch 中实现了多个后端。 一些最受欢迎的是 Gloo、NCCL 和 MPI。 它们各自具有不同的规格和权衡,具体取决于 在所需的用例上。支持的函数的比较表可以 在这里找到。torch.distributed

Gloo 后端

到目前为止,我们已经广泛使用了 Gloo 后端。 它作为一个开发平台非常方便,因为它包含在 预编译的 PyTorch 二进制文件,可在 Linux 上运行(自 0.2 起) 和 macOS(自 1.3 起)。它支持所有点对点和集体 作,以及 GPU 上的所有集合作。这 CUDA 张量的集体作的实现不是 优化为 NCCL 后端提供的。

您肯定已经注意到,我们的 如果放置 GPU,则分布式 SGD 示例不起作用。 为了使用多个 GPU,我们还进行以下作 修改:model

  1. device = torch.device("cuda:{}".format(rank))

  2. model = Net() \(\右箭头\) model = Net().to(device)

  3. data, target = data.to(device), target.to(device)

通过上述修改,我们的模型现在在两个 GPU 和 您可以使用 监控其利用率。watch nvidia-smi

MPI 后端

消息传递接口 (MPI) 是 高性能计算领域。它允许进行点对点和 集体通信,是 API 的主要灵感来源。MPI 存在多种实现方式(例如 Open-MPIMVAPICH2Intel MPI) 每个 针对不同目的进行了优化。使用 MPI 后端的优势 在于 MPI 的广泛可用性和高水平的优化 大型计算机集群。一些最近的实现也能够将 利用 CUDA IPC 和 GPU Direct 技术来避免 通过 CPU 进行内存复制。torch.distributed

遗憾的是,PyTorch 的二进制文件不能包含 MPI 实现 我们得手动重新编译它。幸运的是,这个过程是 相当简单,因为在编译时,PyTorch 将自行查找可用的 MPI 实现。以下步骤安装 MPI backend 中,通过 source

  1. 创建并激活您的 Anaconda 环境,安装所有 先决条件 指南但尚未运行python setup.py install

  2. 选择并安装您最喜欢的 MPI 实施。请注意, 启用 CUDA 感知 MPI 可能需要一些额外的步骤。在我们的 的情况下,我们将坚持使用不支持 GPU 的 Open-MPI:conda install -c conda-forge openmpi

  3. 现在,转到克隆的 PyTorch 存储库并执行 .python setup.py install

为了测试我们新安装的后端,一些修改是 必填。

  1. 将 下的内容替换为 。if __name__ == '__main__':init_process(0, 0, run, backend='mpi')

  2. 跑。mpirun -n 4 python myscript.py

这些更改的原因是 MPI 需要创建自己的 environment 中。MPI 也将生成自己的 进程并执行初始化中描述的握手 方法,使 和 参数 的 多余。这实际上是相当的 强大,因为你可以将额外的参数传递给 为每个进程定制计算资源。(像 cores per process,手动将机器分配给特定等级,以及一些 更多) 这样做,您应该获得与其他 通信后端。ranksizeinit_process_groupmpirun

NCCL 后端

NCCL 后端提供了一个 针对 CUDA 的集合作的优化实现 张。如果您只将 CUDA 张量用于集合运算, 考虑使用此后端以获得一流的性能。这 NCCL 后端包含在支持 CUDA 的预构建二进制文件中。

初始化方法

为了结束本教程,让我们检查一下我们调用的初始函数:.具体来说,我们将讨论各种 负责每个进程之间的初步协调步骤的初始化方法。 这些方法使您能够定义如何完成此协调。dist.init_process_group(backend, init_method)

初始化方法的选择取决于您的硬件设置,一种方法可能更多 比其他人更合适。除了以下部分外,请参考官方 文档了解更多信息。

环境变量

我们一直在使用环境变量初始化方法 在本教程中。通过设置以下四个环境 变量,所有进程都将能够正确地 连接到主服务器,获取有关其他进程的信息,以及 最后与他们握手。

  • MASTER_PORT:计算机上将托管 排名为 0 的进程。

  • MASTER_ADDR:将托管进程的计算机的 IP 地址 等级为 0。

  • WORLD_SIZE:进程总数,以便 master 知道要等待多少 worker。

  • RANK:每个进程的 Rank,以便他们知道它是否是 master 或 worker 的

共享文件系统

共享文件系统要求所有进程都有权访问共享的 文件系统,并将通过共享文件协调它们。这意味着 每个进程都将打开文件,写入其信息,然后等待 直到每个人都这样做。之后,所有必需的信息都将是 随时可用于所有过程。为了避免竞争条件, 文件系统必须支持通过 FCNTL 进行锁定。

dist.init_process_group(
    init_method='file:///mnt/nfs/sharedfile',
    rank=args.rank,
    world_size=4)

TCP 协议

通过TCP进行初始化可以通过提供进程的IP地址(等级为0)和可访问的端口号来实现。 在这里,所有 worker 都将能够连接到流程 与 rank 0 并交换有关如何相互联系的信息。

dist.init_process_group(
    init_method='tcp://10.1.1.20:23456',
    rank=args.rank,
    world_size=4)

确认

我要感谢 PyTorch 开发人员在 它们的实现、文档和测试。当代码为 不清楚,我总是可以依靠文档测试来找到答案。我特别要感谢 Soumith Chintala, Adam Paszke 和 Natalia Gimelshein 提供有见地的评论 以及回答有关早期草稿的问题。

文档

访问 PyTorch 的全面开发人员文档

查看文档

教程

获取面向初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并解答您的问题

查看资源