使用 PyTorch 编写分布式应用程序¶
创建时间: 2017年10月06日 |上次更新时间:2024 年 12 月 10 日 |上次验证: Nov 05, 2024
作者: Séb Arnold
注意
在 github 中查看和编辑本教程。
先决条件:
在这个简短的教程中,我们将介绍分布式软件包 PyTorch 中。我们将看到如何设置 distributed 设置,使用 不同的沟通策略,并回顾 包。
设置¶
PyTorch 中包含的分布式软件包(即 )使研究人员和从业者能够轻松地
跨进程和集群并行计算
机器。为此,它利用消息传递语义
允许每个进程将数据传送到任何其他进程。
与 multiprocessing () 包相反,
进程可以使用不同的通信后端,而不是
仅限于在同一台计算机上执行。torch.distributed
torch.multiprocessing
为了开始,我们需要能够运行多个进程 同时。如果您有权访问 compute cluster,则应选中 与本地系统管理员一起使用,或使用您最喜欢的协调工具(例如 pdsh、clustershell 或 slurm)。为此 教程中,我们将使用一台机器并使用 以下模板。
"""run.py:"""
#!/usr/bin/env python
import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
def run(rank, size):
""" Distributed function to be implemented later. """
pass
def init_process(rank, size, fn, backend='gloo'):
""" Initialize the distributed environment. """
os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_PORT'] = '29500'
dist.init_process_group(backend, rank=rank, world_size=size)
fn(rank, size)
if __name__ == "__main__":
world_size = 2
processes = []
mp.set_start_method("spawn")
for rank in range(world_size):
p = mp.Process(target=init_process, args=(rank, world_size, run))
p.start()
processes.append(p)
for p in processes:
p.join()
上面的脚本生成了两个进程,每个进程都将设置
分布式环境,初始化进程组
(),最后执行给定的函数。dist.init_process_group
run
我们来看看这个函数。它确保
每个进程都将能够通过 master 进行协调,使用
相同的 IP 地址和端口。请注意,我们使用了后端,但
其他后端可用。(参见第 5.1 节)我们将回顾一下魔术
在本教程结束时,
但它本质上允许进程通过以下方式相互通信
分享他们的位置。init_process
gloo
dist.init_process_group
点对点通信¶

Send 和 Recv¶
数据从一个进程传输到另一个进程称为
点对点通信。这些是通过 and 函数或其直接对应部分 和 实现的。send
recv
isend
irecv
"""Blocking point-to-point communication."""
def run(rank, size):
tensor = torch.zeros(1)
if rank == 0:
tensor += 1
# Send the tensor to process 1
dist.send(tensor=tensor, dst=1)
else:
# Receive tensor from process 0
dist.recv(tensor=tensor, src=0)
print('Rank ', rank, ' has data ', tensor[0])
在上面的示例中,两个进程都以零张量开始,然后 进程 0 递增张量并将其发送到进程 1,以便它们 两者都以 1.0 结束。请注意,进程 1 需要在 order 来存储它将接收的数据。
另请注意 are blocking:两个进程都阻塞
直到通信完成。另一方面,即时是非阻塞的;脚本继续执行,并且
返回一个对象,我们可以选择对其执行 。send/recv
Work
wait()
"""Non-blocking point-to-point communication."""
def run(rank, size):
tensor = torch.zeros(1)
req = None
if rank == 0:
tensor += 1
# Send the tensor to process 1
req = dist.isend(tensor=tensor, dst=1)
print('Rank 0 started sending')
else:
# Receive tensor from process 0
req = dist.irecv(tensor=tensor, src=0)
print('Rank 1 started receiving')
req.wait()
print('Rank ', rank, ' has data ', tensor[0])
使用 immediates 时,我们必须小心使用发送和接收的张量。
由于我们不知道数据何时会传送到另一个进程,
我们不应该修改发送的 Tensor,也不应该在完成之前访问接收到的 Tensor。
换句话说,req.wait()
写入 after 将导致 undefined 行为。
tensor
dist.isend()
从 after 读取将导致 undefined 行为。
tensor
dist.irecv()
但是,在执行之后,我们可以保证通信已经发生。
并且存储的值为 1.0。req.wait()
tensor[0]
当我们想要更细粒度时,点对点通信非常有用 控制我们流程的通信。它们可用于 实现花哨的算法,例如百度的 DeepSpeech 或 Facebook 的大规模 实验。(参见第 4.1 节)
集体通信¶
![]() 散射¶ |
![]() 收集¶ |
![]() 减少¶ |
![]() 全部减少¶ |
![]() 广播¶ |
![]() 全聚集¶ |
与点对点通信相反,集体允许
组中所有进程的通信模式。组是一个
我们所有流程的子集。要创建一个组,我们可以传递一个
等级设置为 。默认情况下,将执行 collectives
在所有进程上,也称为世界。例如,在 order
要获得所有进程上所有张量的总和,我们可以使用 collective。dist.new_group(group)
dist.all_reduce(tensor, op, group)
""" All-Reduce example."""
def run(rank, size):
""" Simple collective communication. """
group = dist.new_group([0, 1])
tensor = torch.ones(1)
dist.all_reduce(tensor, op=dist.ReduceOp.SUM, group=group)
print('Rank ', rank, ' has data ', tensor[0])
由于我们想要组中所有张量的总和,因此我们用作 reduce 运算符。一般来说,任何
交换数学运算可以用作运算符。
开箱即用的 PyTorch 附带了许多这样的运算符,它们都在
元素级别:dist.ReduceOp.SUM
dist.ReduceOp.SUM
,dist.ReduceOp.PRODUCT
,dist.ReduceOp.MAX
,dist.ReduceOp.MIN
,dist.ReduceOp.BAND
,dist.ReduceOp.BOR
,dist.ReduceOp.BXOR
,dist.ReduceOp.PREMUL_SUM
.
支持的运算符的完整列表在这里。
除了 之外,目前还有许多其他 collectives 实现在
PyTorch 的 Torch 中。以下是一些受支持的 collective。dist.all_reduce(tensor, op, group)
dist.broadcast(tensor, src, group)
:复制到所有其他进程。tensor
src
dist.reduce(tensor, dst, op, group)
:应用于 every 并将结果存储在 中。op
tensor
dst
dist.all_reduce(tensor, op, group)
:与 reduce 相同,但 result 存储在所有进程中。dist.scatter(tensor, scatter_list, src, group)
:将 \(i^{\text{th}}\) 张量复制到 \(i^{\text{th}}\) 进程。scatter_list[i]
dist.gather(tensor, gather_list, dst, group)
:从 中的所有进程复制。tensor
dst
dist.all_gather(tensor_list, tensor, group)
:在所有进程上从所有进程复制到 。tensor
tensor_list
dist.barrier(group)
:阻止组中的所有进程,直到每个进程都进入此功能。dist.all_to_all(output_tensor_list, input_tensor_list, group)
:将输入张量列表分散到 A Group 并返回 Output List 中收集的张量列表。
通过查看 PyTorch Distributed 的最新文档(链接),可以找到支持的 Collectives 的完整列表。
分布式训练¶
注意:您可以在此中找到本节的示例脚本 GitHub 存储库。
现在我们了解了分布式模块的工作原理,让我们编写 一些有用的东西。我们的目标是复制 DistributedDataParallel 的功能。 当然,这将是一个说教式的例子,并且在现实世界中 情况下,您应该使用官方的、经过充分测试和优化的 版本。
很简单,我们想实现随机振荡的分布式版本 梯度下降。我们的脚本将让所有进程计算 他们的模型对他们的批次数据的梯度,然后平均他们的 梯度。为了确保在更改时获得相似的收敛结果 进程数,我们首先必须对数据集进行分区。 (您也可以使用 torch.utils.data.random_split, 而不是下面的代码段。
""" Dataset partitioning helper """
class Partition(object):
def __init__(self, data, index):
self.data = data
self.index = index
def __len__(self):
return len(self.index)
def __getitem__(self, index):
data_idx = self.index[index]
return self.data[data_idx]
class DataPartitioner(object):
def __init__(self, data, sizes=[0.7, 0.2, 0.1], seed=1234):
self.data = data
self.partitions = []
rng = Random() # from random import Random
rng.seed(seed)
data_len = len(data)
indexes = [x for x in range(0, data_len)]
rng.shuffle(indexes)
for frac in sizes:
part_len = int(frac * data_len)
self.partitions.append(indexes[0:part_len])
indexes = indexes[part_len:]
def use(self, partition):
return Partition(self.data, self.partitions[partition])
使用上面的代码片段,我们现在可以使用 以下几行:
""" Partitioning MNIST """
def partition_dataset():
dataset = datasets.MNIST('./data', train=True, download=True,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
]))
size = dist.get_world_size()
bsz = 128 // size
partition_sizes = [1.0 / size for _ in range(size)]
partition = DataPartitioner(dataset, partition_sizes)
partition = partition.use(dist.get_rank())
train_set = torch.utils.data.DataLoader(partition,
batch_size=bsz,
shuffle=True)
return train_set, bsz
假设我们有 2 个副本,那么每个进程将有 60000 / 2 = 30000 个样本。我们还将批量大小除以
数量,以保持整体批处理大小为 128。train_set
我们现在可以编写通常的 forward-backward-optimize 训练代码,并且 添加一个函数调用来平均模型的梯度。(该 following 在很大程度上受到官方 PyTorch MNIST 的启发 示例。
""" Distributed Synchronous SGD Example """
def run(rank, size):
torch.manual_seed(1234)
train_set, bsz = partition_dataset()
model = Net()
optimizer = optim.SGD(model.parameters(),
lr=0.01, momentum=0.5)
num_batches = ceil(len(train_set.dataset) / float(bsz))
for epoch in range(10):
epoch_loss = 0.0
for data, target in train_set:
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
epoch_loss += loss.item()
loss.backward()
average_gradients(model)
optimizer.step()
print('Rank ', dist.get_rank(), ', epoch ',
epoch, ': ', epoch_loss / num_batches)
它仍然需要实现函数,该函数
只需接收一个模型并对其整个
世界。average_gradients(model)
""" Gradient averaging. """
def average_gradients(model):
size = float(dist.get_world_size())
for param in model.parameters():
dist.all_reduce(param.grad.data, op=dist.ReduceOp.SUM)
param.grad.data /= size
Et voilà!我们成功实现了分布式同步 SGD 和 可以在大型计算机集群上训练任何模型。
注意:虽然最后一句话在技术上是正确的,但有一个 需要更多技巧 实现同步 SGD 的生产级实现。再 使用经过测试的产品 优化。
我们自己的 Ring-Allreduce¶
作为另一个挑战,假设我们想实现 DeepVoice 的高效环 allreduce 的 Ring allReduce 的 Limit。这相当容易实现 使用 point-to-point collectives。
""" Implementation of a ring-reduce with addition. """
def allreduce(send, recv):
rank = dist.get_rank()
size = dist.get_world_size()
send_buff = send.clone()
recv_buff = send.clone()
accum = send.clone()
left = ((rank - 1) + size) % size
right = (rank + 1) % size
for i in range(size - 1):
if i % 2 == 0:
# Send send_buff
send_req = dist.isend(send_buff, right)
dist.recv(recv_buff, left)
accum[:] += recv_buff[:]
else:
# Send recv_buff
send_req = dist.isend(recv_buff, right)
dist.recv(send_buff, left)
accum[:] += send_buff[:]
send_req.wait()
recv[:] = accum[:]
在上面的脚本中,该函数有一个
签名与 PyTorch 中的签名略有不同。它需要一个张量,并将所有张量的总和存储在其中。如
留给读者的练习,两者之间仍然存在一个区别
我们的版本和 DeepSpeech 中的版本:它们的实现将
gradient tensor 转换为块,以便最佳地利用
通信带宽。(提示:torch.chunkallreduce(send, recv)
recv
send
)
高级主题¶
现在,我们已准备好发现一些更高级的功能
之。由于要涵盖的内容很多,因此本节是
分为两个小节:torch.distributed
通信后端:我们学习如何使用 MPI 和 Gloo 的地方 GPU-GPU 通信。
初始化方法:我们了解如何最好地设置 的初始协调阶段。
dist.init_process_group()
通信后端¶
最优雅的方面之一是它的能力
来抽象和构建不同的后端。如前所述,
PyTorch 中实现了多个后端。
一些最受欢迎的是 Gloo、NCCL 和 MPI。
它们各自具有不同的规格和权衡,具体取决于
在所需的用例上。支持的函数的比较表可以
在这里找到。torch.distributed
Gloo 后端
到目前为止,我们已经广泛使用了 Gloo 后端。 它作为一个开发平台非常方便,因为它包含在 预编译的 PyTorch 二进制文件,可在 Linux 上运行(自 0.2 起) 和 macOS(自 1.3 起)。它支持所有点对点和集体 作,以及 GPU 上的所有集合作。这 CUDA 张量的集体作的实现不是 优化为 NCCL 后端提供的。
您肯定已经注意到,我们的
如果放置 GPU,则分布式 SGD 示例不起作用。
为了使用多个 GPU,我们还进行以下作
修改:model
用
device = torch.device("cuda:{}".format(rank))
model = Net()
\(\右箭头\)model = Net().to(device)
用
data, target = data.to(device), target.to(device)
通过上述修改,我们的模型现在在两个 GPU 和
您可以使用 监控其利用率。watch nvidia-smi
MPI 后端
消息传递接口 (MPI) 是
高性能计算领域。它允许进行点对点和
集体通信,是 API 的主要灵感来源。MPI 存在多种实现方式(例如 Open-MPI、MVAPICH2、Intel
MPI) 每个
针对不同目的进行了优化。使用 MPI 后端的优势
在于 MPI 的广泛可用性和高水平的优化
大型计算机集群。一些最近的实现也能够将
利用 CUDA IPC 和 GPU Direct 技术来避免
通过 CPU 进行内存复制。torch.distributed
遗憾的是,PyTorch 的二进制文件不能包含 MPI 实现 我们得手动重新编译它。幸运的是,这个过程是 相当简单,因为在编译时,PyTorch 将自行查找可用的 MPI 实现。以下步骤安装 MPI backend 中,通过 source 的
创建并激活您的 Anaconda 环境,安装所有 先决条件 指南,但尚未运行。
python setup.py install
选择并安装您最喜欢的 MPI 实施。请注意, 启用 CUDA 感知 MPI 可能需要一些额外的步骤。在我们的 的情况下,我们将坚持使用不支持 GPU 的 Open-MPI:
conda install -c conda-forge openmpi
现在,转到克隆的 PyTorch 存储库并执行 .
python setup.py install
为了测试我们新安装的后端,一些修改是 必填。
将 下的内容替换为 。
if __name__ == '__main__':
init_process(0, 0, run, backend='mpi')
跑。
mpirun -n 4 python myscript.py
这些更改的原因是 MPI 需要创建自己的
environment 中。MPI 也将生成自己的
进程并执行初始化中描述的握手
方法,使 和 参数 的 多余。这实际上是相当的
强大,因为你可以将额外的参数传递给
为每个进程定制计算资源。(像
cores per process,手动将机器分配给特定等级,以及一些
更多)
这样做,您应该获得与其他
通信后端。rank
size
init_process_group
mpirun
NCCL 后端
NCCL 后端提供了一个 针对 CUDA 的集合作的优化实现 张。如果您只将 CUDA 张量用于集合运算, 考虑使用此后端以获得一流的性能。这 NCCL 后端包含在支持 CUDA 的预构建二进制文件中。
初始化方法¶
为了结束本教程,让我们检查一下我们调用的初始函数:.具体来说,我们将讨论各种
负责每个进程之间的初步协调步骤的初始化方法。
这些方法使您能够定义如何完成此协调。dist.init_process_group(backend, init_method)
初始化方法的选择取决于您的硬件设置,一种方法可能更多 比其他人更合适。除了以下部分外,请参考官方 文档了解更多信息。
环境变量
我们一直在使用环境变量初始化方法 在本教程中。通过设置以下四个环境 变量,所有进程都将能够正确地 连接到主服务器,获取有关其他进程的信息,以及 最后与他们握手。
MASTER_PORT
:计算机上将托管 排名为 0 的进程。MASTER_ADDR
:将托管进程的计算机的 IP 地址 等级为 0。WORLD_SIZE
:进程总数,以便 master 知道要等待多少 worker。RANK
:每个进程的 Rank,以便他们知道它是否是 master 或 worker 的
共享文件系统
共享文件系统要求所有进程都有权访问共享的 文件系统,并将通过共享文件协调它们。这意味着 每个进程都将打开文件,写入其信息,然后等待 直到每个人都这样做。之后,所有必需的信息都将是 随时可用于所有过程。为了避免竞争条件, 文件系统必须支持通过 FCNTL 进行锁定。
dist.init_process_group(
init_method='file:///mnt/nfs/sharedfile',
rank=args.rank,
world_size=4)
TCP 协议
通过TCP进行初始化可以通过提供进程的IP地址(等级为0)和可访问的端口号来实现。 在这里,所有 worker 都将能够连接到流程 与 rank 0 并交换有关如何相互联系的信息。
dist.init_process_group(
init_method='tcp://10.1.1.20:23456',
rank=args.rank,
world_size=4)
确认
我要感谢 PyTorch 开发人员在 它们的实现、文档和测试。当代码为 不清楚,我总是可以依靠文档或测试来找到答案。我特别要感谢 Soumith Chintala, Adam Paszke 和 Natalia Gimelshein 提供有见地的评论 以及回答有关早期草稿的问题。