分布式通信包 - torch.distributed¶
注意
请参阅PyTorch 分布式概述 ,以获取有关分布式训练所有功能的简要介绍。
后端¶
torch.distributed 支持三种内置后端,每种后端具有不同的功能。下表显示了哪些函数可用于 CPU / CUDA 张量。
如果用于构建 PyTorch 的实现支持 CUDA,则 MPI 仅支持 CUDA。
后端 |
|
|
|
|||
|---|---|---|---|---|---|---|
设备 |
CPU |
GPU |
CPU |
GPU |
CPU |
GPU |
发送 |
✓ |
✘ |
✓ |
? |
✘ |
✓ |
接收 |
✓ |
✘ |
✓ |
? |
✘ |
✓ |
广播 |
✓ |
✓ |
✓ |
? |
✘ |
✓ |
all_reduce |
✓ |
✓ |
✓ |
? |
✘ |
✓ |
减少 |
✓ |
✘ |
✓ |
? |
✘ |
✓ |
all_gather |
✓ |
✘ |
✓ |
? |
✘ |
✓ |
聚集 |
✓ |
✘ |
✓ |
? |
✘ |
✓ |
分散 |
✓ |
✘ |
✓ |
? |
✘ |
✓ |
reduce_scatter |
✘ |
✘ |
✘ |
✘ |
✘ |
✓ |
all_to_all |
✘ |
✘ |
✓ |
? |
✘ |
✓ |
屏障 |
✓ |
✘ |
✓ |
? |
✘ |
✓ |
随PyTorch一起提供的后端¶
PyTorch 分布式包支持 Linux(稳定版)、MacOS(稳定版)和 Windows(原型)。默认情况下,对于 Linux,Gloo 和 NCCL 后端会构建并包含在 PyTorch 分布式中(仅在使用 CUDA 构建时包含 NCCL)。MPI 是一个可选的后端,只有在从源代码构建 PyTorch 时才能包含。例如,在已安装 MPI 的主机上构建 PyTorch。
注意
截至PyTorch v1.8,Windows支持所有集体通信后端,但不包括NCCL,
如果init_process_group()的init_method参数指向一个文件,则必须遵循以下模式:
本地文件系统,
init_method="file:///d:/tmp/some_file"共享文件系统,
init_method="file://////{machine_name}/{share_folder_name}/some_file"
与在Linux平台上的操作相同,您可以通过设置环境变量MASTER_ADDR和MASTER_PORT来启用TcpStore。
使用哪个后端?¶
在过去,我们经常被问到:“我应该使用哪个后端?”。
经验法则
使用NCCL后端进行分布式 GPU 训练
使用Gloo后端进行分布式 CPU 训练。
带有InfiniBand互连的GPU主机
使用NCCL,因为它是目前唯一支持InfiniBand和GPUDirect的后端。
带有以太网互联的GPU主机
使用NCCL,因为它目前提供了最佳的分布式GPU训练性能,特别是在多进程单节点或跨节点分布式训练中。如果你在使用NCCL时遇到任何问题,请使用Gloo作为备选方案。(请注意,目前Gloo在GPU上的运行速度比NCCL慢。)
带有InfiniBand互连的CPU主机
如果你的InfiniBand启用了IP over IB,请使用Gloo,否则, 请使用MPI。我们计划在未来的版本中为 Gloo添加InfiniBand支持。
带有以太网互联的CPU主机
使用Gloo,除非你有特定的理由使用MPI。
常见环境变量¶
选择要使用的网络接口¶
默认情况下,NCCL 和 Gloo 后端都会尝试找到正确的网络接口来使用。 如果自动检测到的接口不正确,您可以使用以下环境变量(适用于相应的后端)进行覆盖:
NCCL_SOCKET_IFNAME, 例如
export NCCL_SOCKET_IFNAME=eth0GLOO_SOCKET_IFNAME, 例如
export GLOO_SOCKET_IFNAME=eth0
如果你使用的是Gloo后端,可以通过用逗号分隔来指定多个接口,如下所示: export GLOO_SOCKET_IFNAME=eth0,eth1,eth2,eth3。
后端将以轮询方式在这些接口之间分发操作。
所有进程必须在此变量中指定相同数量的接口,这是至关重要的。
其他NCCL环境变量¶
调试 - 如果发生NCCL故障,您可以将 NCCL_DEBUG=INFO 设置为打印明确的警告消息以及基本的NCCL初始化信息。
您还可以使用 NCCL_DEBUG_SUBSYS 来获取有关NCCL特定方面的更多详细信息。例如,NCCL_DEBUG_SUBSYS=COLL 会打印集体调用的日志,这在调试挂起时可能很有帮助,特别是那些由集体类型或消息大小不匹配引起的挂起。如果拓扑检测失败,将 NCCL_DEBUG_SUBSYS=GRAPH 设置为检查详细的检测结果并保存以供参考可能会有所帮助,如果需要NCCL团队的进一步帮助。
性能调优 - NCCL 根据其拓扑检测自动进行调优,以节省用户的调优工作。在某些基于套接字的系统上,用户仍可能尝试调整 NCCL_SOCKET_NTHREADS 和 NCCL_NSOCKS_PERTHREAD 以增加套接字网络带宽。这两个环境变量已被 NCCL 预先为一些云提供商(如 AWS 或 GCP)进行了调优。
有关 NCCL 环境变量的完整列表,请参阅 NVIDIA NCCL 官方文档
基础¶
The torch.distributed 包提供了 PyTorch 支持和通信原语,用于在一台或多台机器上的多个计算节点之间实现多进程并行。类 torch.nn.parallel.DistributedDataParallel() 基于此功能,作为任何 PyTorch 模型的包装器,提供同步分布式训练。这与 多进程包 - torch.multiprocessing 和 torch.nn.DataParallel() 提供的并行类型不同,因为它支持多个网络连接的机器,并且用户必须为每个进程显式启动一个单独的主训练脚本副本。
在单机同步情况下,torch.distributed 或者 torch.nn.parallel.DistributedDataParallel() 包装器可能仍然比其他
数据并行方法具有优势,包括 torch.nn.DataParallel():
每个进程维护自己的优化器,并在每次迭代中执行一个完整的优化步骤。虽然这可能看起来是多余的,因为梯度已经被收集并跨进程平均,因此对每个进程来说都是相同的,但这意味着不需要参数广播步骤,减少了在节点之间传输张量所花费的时间。
每个进程包含一个独立的Python解释器,消除了从单个Python进程驱动多个执行线程、模型副本或GPU时产生的额外解释器开销和“GIL-thrashing”。这对于大量使用Python运行时的模型尤为重要,包括具有循环层或许多小组件的模型。
初始化¶
该包需要使用 torch.distributed.init_process_group()
或 torch.distributed.device_mesh.init_device_mesh() 函数进行初始化,然后再调用其他任何方法。
两者都会等待所有进程加入。
- torch.distributed.is_available()[source]¶
如果分布式包可用,则返回
True。否则,
torch.distributed不提供任何其他API。目前,torch.distributed在Linux、MacOS和Windows上可用。在从源代码构建PyTorch时,设置USE_DISTRIBUTED=1以启用它。 目前,默认值为USE_DISTRIBUTED=1对于Linux和Windows,USE_DISTRIBUTED=0对于MacOS。- Return type
- torch.distributed.init_process_group(backend=None, init_method=None, timeout=None, world_size=-1, rank=-1, store=None, group_name='', pg_options=None, device_id=None)[source]¶
初始化默认的分布式进程组。
这也将初始化分布式包。
- There are 2 main ways to initialize a process group:
明确指定
store、rank和world_size。指定
init_method(一个URL字符串),指示在哪里/如何 发现对等节点。可选地指定rank和world_size, 或将所有必需的参数编码在URL中并省略它们。
如果两者都没有指定,则假定
init_method为“env://”。- Parameters
后端 (str 或 Backend, 可选) – 要使用的后端。根据构建时的配置,有效值包括
mpi,gloo,nccl, 和ucc. 如果未提供后端,则将创建一个gloo和nccl后端,有关如何管理多个后端,请参见下面的说明。此字段可以作为小写字符串给出 (例如,"gloo"),也可以通过Backend属性访问(例如,Backend.GLOO)。如果在每台机器上使用多个进程与nccl后端,每个进程 必须对其使用的每个GPU具有独占访问权限,因为在进程之间共享GPU可能会导致死锁。ucc后端是 实验性的。init_method (str, 可选) – 指定如何初始化进程组的URL。默认情况下,如果未指定
init_method或store,则为“env://”。与store互斥。world_size (int, 可选) – 参与任务的进程数量。如果指定了
store,则为必填项。排名 (int, 可选) – 当前进程的排名(应为0到
world_size-1之间的数字)。 如果指定了store,则为必填项。存储 (存储, 可选) – 所有工作者均可访问的键/值存储,用于交换连接/地址信息。 与
init_method互斥。超时 (timedelta, 可选) – 对进程组执行的操作的超时时间。默认值为NCCL的10分钟和其他后端的30分钟。 这是在该持续时间之后,集体操作将被异步中止并且进程将崩溃的时间。 这是因为在CUDA执行是异步的,并且由于失败的异步NCCL操作可能导致后续CUDA操作在损坏的数据上运行,因此继续执行用户代码不再安全。 当设置TORCH_NCCL_BLOCKING_WAIT时,进程将阻塞并等待此超时。
组名 (str, 可选, 已弃用) – 组名称。此参数将被忽略
pg_options (ProcessGroupOptions, 可选) – 指定在构建特定进程组时需要传递的其他选项。 目前,我们支持的唯一选项是在
ProcessGroupNCCL.Options后端使用nccl, 可以通过指定is_high_priority_stream,使nccl后端在有计算内核等待时拾取高优先级cuda流。 有关可用于配置nccl的其他选项,请参见https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/api/types.html#ncclconfig-t。device_id (torch.device, 可选) – 一个单一的、特定的设备 用于“绑定”此进程,允许进行后端特定的 优化。目前这有两个效果,仅在 NCCL下:通信器立即形成(立即调用
ncclCommInit*而不是正常的惰性 调用)并且子组将在可能的情况下使用ncclCommSplit以避免不必要的组创建开销。如果你想尽早知道NCCL初始化错误,也可以使用这个 字段。
注意
要启用
backend == Backend.MPI,需要在支持MPI的系统上从源代码构建PyTorch。注意
对多个后端的支持是实验性的。当前,如果没有指定后端,则将创建
gloo和nccl后端。gloo后端 将用于具有CPU张量的集合操作,而nccl后端将用于 具有CUDA张量的集合操作。可以通过传递格式为“<device_type>:<backend_name>,<device_type>:<backend_name>”的字符串来指定自定义后端,例如 “cpu:gloo,cuda:custom_backend”。
- torch.distributed.device_mesh.init_device_mesh(device_type, mesh_shape, *, mesh_dim_names=None)[source]¶
基于device_type、mesh_shape和mesh_dim_names参数初始化DeviceMesh。
这创建了一个具有n维数组布局的DeviceMesh,其中n是mesh_shape的长度。 如果提供了mesh_dim_names,每一维都会被标记为mesh_dim_names[i]。
注意
init_device_mesh 遵循SPMD编程模型,这意味着相同的PyTorch Python程序会在集群中的所有进程/排名上运行。确保mesh_shape(描述设备布局的nD数组的维度)在所有排名中一致。不一致的mesh_shape可能导致挂起。
注意
如果没有找到进程组,init_device_mesh 将在幕后初始化分布式通信所需的分布式进程组。
- Parameters
- Returns
一个
DeviceMesh对象,表示设备布局。- Return type
- Example::
>>> from torch.distributed.device_mesh import init_device_mesh >>> >>> mesh_1d = init_device_mesh("cuda", mesh_shape=(8,)) >>> mesh_2d = init_device_mesh("cuda", mesh_shape=(2, 8), mesh_dim_names=("dp", "tp"))
- torch.distributed.is_torchelastic_launched()[source]¶
检查此进程是否以
torch.distributed.elastic(即torchelastic)启动。环境变量
TORCHELASTIC_RUN_ID的存在被用作代理来确定当前进程是否是通过torchelastic启动的。这是一个合理的代理,因为TORCHELASTIC_RUN_ID映射到rendezvous id,该值始终是非空的,用于指示对等发现目的的工作ID。- Return type
目前支持三种初始化方法:
TCP初始化¶
有两种方法可以使用TCP进行初始化,都需要一个所有进程都可以访问的网络地址和一个期望的world_size。第一种方法需要指定一个属于秩为0的进程的地址。这种初始化方法要求所有进程都手动指定了秩。
请注意,最新的分布式包中不再支持多播地址。group_name 也被弃用了。
import torch.distributed as dist
# Use address of one of the machines
dist.init_process_group(backend, init_method='tcp://10.1.1.20:23456',
rank=args.rank, world_size=4)
环境变量初始化¶
此方法将从环境变量中读取配置,允许完全自定义信息的获取方式。需要设置的变量为:
MASTER_PORT- 必需;必须是排名为0的机器上的空闲端口MASTER_ADDR- 必需(除了秩为0的节点);秩为0节点的地址WORLD_SIZE- 必需;可以在此处设置,也可以在调用初始化函数时设置RANK- 必需;可以在此处设置,也可以在调用初始化函数时设置
排名为0的机器将用于设置所有连接。
这是默认方法,这意味着 init_method 不需要指定(或者
可以是 env://)。
Post-Initialization¶
一旦运行了 torch.distributed.init_process_group(),就可以使用以下函数。要检查进程组是否已经初始化,请使用 torch.distributed.is_initialized()。
- class torch.distributed.Backend(name)[source]¶
一个类似枚举的类,用于后端。
可用的后端:GLOO、NCCL、UCC、MPI 和其他已注册的后端。
这个类的值是小写字符串,例如,
"gloo"。它们可以通过属性访问,例如,Backend.NCCL。这个类可以直接调用来解析字符串,例如,
Backend(backend_str)将检查backend_str是否有效,并 如果有效则返回解析后的小写字符串。它还接受大写字符串, 例如,Backend("GLOO")返回"gloo"。注意
条目
Backend.UNDEFINED存在,但仅用作某些字段的初始值。用户不应直接使用它,也不应假设其存在。- classmethod register_backend(name, func, extended_api=False, devices=None)[source]¶
使用给定的名称和实例化函数注册一个新的后端。
此类方法用于第三方
ProcessGroup扩展注册新的后端。- Parameters
名称 (str) – 后端名称为
ProcessGroup扩展。它 应该与init_process_group()中的名称匹配。函数 (函数) – 实例化后端的函数处理器。 该函数应在后端扩展中实现,并接受四个参数,包括
store,rank,world_size, 和timeout。extended_api (布尔值, 可选) – 后端是否支持扩展参数结构。 默认值:
False。如果设置为True,后端 将获取一个c10d::DistributedBackendOptions的实例,并且 根据后端实现定义一个进程组选项对象。设备 (str 或 列表 的 str,可选) – 该后端支持的设备类型,例如“cpu”,“cuda”等。如果为None, 则假定同时支持“cpu”和“cuda”
注意
对第三方后端的支持是实验性的,可能会发生变化。
- torch.distributed.get_backend(group=None)[source]¶
返回给定进程组的后端。
- Parameters
组 (进程组, 可选) – 要工作的进程组。默认是通用主进程组。如果指定了另一个特定的组,则调用进程必须是
group的一部分。- Returns
给定进程组的后端作为一个小写字符串。
- Return type
关闭¶
通过调用destroy_process_group()来在退出时清理资源是很重要的。
遵循的最简单模式是在训练脚本中不再需要通信的地方调用
destroy_process_group(),使用group参数的默认值None来销毁每个进程组和后端,通常在main()的末尾附近。该调用应在每个训练进程内进行一次,而不是在外层进程启动器级别。
如果在超时时间内destroy_process_group()没有被pg中的所有进程调用,特别是在应用程序中有多个进程组的情况下,例如用于N维并行性,
退出时可能会挂起。这是因为ProcessGroupNCCL的析构函数调用了ncclCommAbort,
而ncclCommAbort必须集体调用,但当由python的GC调用时,ProcessGroupNCCL的析构函数的调用顺序是不确定的。调用destroy_process_group()有助于确保
ncclCommAbort以一致的顺序在各个进程中调用,并避免在ProcessGroupNCCL的析构函数中调用ncclCommAbort。
重新初始化¶
destroy_process_group 也可以用于销毁单个进程组。一种使用场景可能是容错训练,在这种情况下,进程组可能在运行时被销毁并重新初始化。在这种情况下,在调用销毁和随后初始化之间,必须通过某种其他方式同步训练器进程。由于实现这种同步的难度,此行为目前不支持/未测试,并被认为是已知问题。如果这种情况阻碍了您的工作,请提交github问题或RFC。
分布式键值存储¶
分布式包附带了一个分布式键值存储,可以用于在组中的进程之间共享信息以及初始化分布式包在
torch.distributed.init_process_group()(通过显式创建存储作为指定init_method的替代方案。)有3种选择用于键值存储:TCPStore,
FileStore, 和 HashStore。
- class torch.distributed.TCPStore¶
基于TCP的分布式键值存储实现。服务器存储持有数据,而客户端存储可以通过TCP连接到服务器存储并执行诸如
set()插入键值对,get()检索键值对等操作。应该始终初始化一个服务器存储,因为客户端存储将等待服务器建立连接。- Parameters
主机名 (str) – 服务器存储应运行的主机名或IP地址。
端口 (int) – 服务器存储应监听传入请求的端口。
world_size (int, 可选) – 存储用户的总数(客户端数量 + 1 用于服务器)。默认为 None(None 表示存储用户数量不固定)。
is_master (bool, 可选) – 在初始化服务器存储时为True,在客户端存储时为False。默认值为False。
超时 (timedelta, 可选) – 存储在初始化和诸如
get()和wait()的方法中使用的超时时间。默认值为 timedelta(seconds=300)wait_for_workers (bool, 可选) – 是否等待所有工作进程与服务器存储连接。这仅在世界大小为固定值时适用。默认值为True。
多租户 (布尔值, 可选) – 如果为 True,则当前进程中具有相同主机/端口的所有
TCPStore实例将使用相同的底层TCPServer。默认值为 False。master_listen_fd (int, 可选) – 如果指定,底层
TCPServer将监听此文件描述符,该描述符必须是已经绑定到port的套接字。在某些情况下,这有助于避免端口分配竞争。默认值为 None(表示服务器创建一个新套接字并尝试将其绑定到port)。use_libuv (bool, 可选) – 如果为 True,则对
TCPServer后端使用 libuv。默认值为 True。
- Example::
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Run on process 1 (server) >>> server_store = dist.TCPStore("127.0.0.1", 1234, 2, True, timedelta(seconds=30)) >>> # Run on process 2 (client) >>> client_store = dist.TCPStore("127.0.0.1", 1234, 2, False) >>> # Use any of the store methods from either the client or server after initialization >>> server_store.set("first_key", "first_value") >>> client_store.get("first_key")
- class torch.distributed.HashStore¶
基于底层哈希映射的线程安全存储实现。此存储可以在同一进程内使用(例如,由其他线程使用),但不能跨进程使用。
- Example::
>>> import torch.distributed as dist >>> store = dist.HashStore() >>> # store can be used from other threads >>> # Use any of the store methods after initialization >>> store.set("first_key", "first_value")
- class torch.distributed.FileStore¶
一种使用文件来存储底层键值对的存储实现。
- Example::
>>> import torch.distributed as dist >>> store1 = dist.FileStore("/tmp/filestore", 2) >>> store2 = dist.FileStore("/tmp/filestore", 2) >>> # Use any of the store methods from either the client or server after initialization >>> store1.set("first_key", "first_value") >>> store2.get("first_key")
- class torch.distributed.PrefixStore¶
围绕任何3个键值存储(
TCPStore,FileStore, 和HashStore) 添加前缀到插入存储的每个键的包装器。- Parameters
前缀 (str) – 在插入存储之前,附加到每个键前面的前缀字符串。
存储 (torch.distributed.store) – 一个构成底层键值存储的存储对象。
- torch.distributed.Store.set(self: torch._C._distributed_c10d.Store, arg0: str, arg1: str) None¶
将键值对插入存储中,基于提供的
key和value。如果key已经在存储中存在,它将用新提供的value覆盖旧值。- Example::
>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key", "first_value") >>> # Should return "first_value" >>> store.get("first_key")
- torch.distributed.Store.get(self: torch._C._distributed_c10d.Store, arg0: str) bytes¶
从存储中检索与给定
key关联的值。如果key不在 存储中,函数将等待timeout,这是在初始化存储时定义的, 然后抛出异常。- Parameters
键 (str) – 该函数将返回与此键关联的值。
- Returns
与
key关联的值,如果key存在于存储中。
- Example::
>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key", "first_value") >>> # Should return "first_value" >>> store.get("first_key")
- torch.distributed.Store.add(self: torch._C._distributed_c10d.Store, arg0: str, arg1: int) int¶
对给定的
key的第一次调用会创建一个与key在存储中关联的计数器,初始化为amount。随后使用相同的key调用add会将计数器按指定的amount递增。 使用已经通过set()在存储中设置过的键调用add()将会导致异常。- Example::
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.add("first_key", 1) >>> store.add("first_key", 6) >>> # Should return 7 >>> store.get("first_key")
- torch.distributed.Store.compare_set(self: torch._C._distributed_c10d.Store, arg0: str, arg1: str, arg2: str) bytes¶
将键值对插入存储中,基于提供的
key,并在插入之前执行expected_value和desired_value之间的比较。desired_value仅在key已经存在于存储中或expected_value为空字符串时才会被设置。- Example::
>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("key", "first_value") >>> store.compare_set("key", "first_value", "second_value") >>> # Should return "second_value" >>> store.get("key")
- torch.distributed.Store.wait(*args, **kwargs)¶
重载函数。
等待(self: torch._C._distributed_c10d.Store, arg0: list[str]) -> None
等待
keys中的每个键被添加到存储中。如果在timeout(在存储初始化期间设置)之前并非所有键都已设置,则wait将抛出异常。- Parameters
键 (列表) – 在存储中设置之前需要等待的键列表。
- Example::
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> # This will throw an exception after 30 seconds >>> store.wait(["bad_key"])
等待(self: torch._C._distributed_c10d.Store, arg0: list[str], arg1: datetime.timedelta) -> None
等待
keys中的每个键被添加到存储中,并在提供的timeout未设置这些键时抛出异常。- Parameters
键 (列表) – 在存储中设置之前需要等待的键列表。
超时 (时间间隔) – 等待键被添加的时间,超过此时间将抛出异常。
- Example::
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> # This will throw an exception after 10 seconds >>> store.wait(["bad_key"], timedelta(seconds=10))
- torch.distributed.Store.num_keys(self: torch._C._distributed_c10d.Store) int¶
返回存储中设置的键的数量。请注意,这个数量通常会比通过
set()和add()添加的键的数量多一个,因为一个键用于协调所有使用存储的工作者。警告
当与
TCPStore,num_keys一起使用时,返回写入底层文件的键的数量。如果存储被销毁并用相同的文件创建另一个存储,则原始键将被保留。- Returns
存储中键的数量。
- Example::
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key", "first_value") >>> # This should return 2 >>> store.num_keys()
- torch.distributed.Store.delete_key(self: torch._C._distributed_c10d.Store, arg0: str) bool¶
删除与
key关联的键值对。如果成功删除则返回true,否则返回false。- Parameters
键 (str) – 要从存储中删除的键
- Returns
True 如果
key被删除,否则 False。
- Example::
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, HashStore can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key") >>> # This should return true >>> store.delete_key("first_key") >>> # This should return false >>> store.delete_key("bad_key")
- torch.distributed.Store.set_timeout(self: torch._C._distributed_c10d.Store, arg0: datetime.timedelta) None¶
设置存储的默认超时时间。此超时时间在初始化期间以及在
wait()和get()中使用。- Parameters
超时 (时间间隔) – 在存储中设置的超时。
- Example::
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set_timeout(timedelta(seconds=10)) >>> # This will throw an exception after 10 seconds >>> store.wait(["bad_key"])
组¶
默认情况下,集合操作在默认组(也称为世界)上进行,并且需要所有进程进入分布式函数调用。然而,某些工作负载可以从更细粒度的通信中受益。这就是分布式组发挥作用的地方。可以使用 new_group() 函数来创建新的组,这些组可以包含所有进程的任意子集。它返回一个不透明的组句柄,该句柄可以作为 group 参数传递给所有集合操作(集合操作是用于在某些已知编程模式中交换信息的分布式函数)。
- torch.distributed.new_group(ranks=None, timeout=None, backend=None, pg_options=None, use_local_synchronization=False, group_desc=None)[source]¶
创建一个新的分布式组。
此函数要求主组中的所有进程(即分布式作业的所有组成部分)都进入此函数,即使它们不会成为该组的成员。此外,所有进程中应以相同的顺序创建组。
警告
安全的并发使用: 当使用多个进程组和
NCCL后端时,用户必须确保在所有排名中跨集体执行顺序全局一致。如果进程内的多个线程发出集合操作,为了确保顺序一致,必须进行显式同步。
当使用异步变体的torch.distributed通信API时,会返回一个工作对象,并将通信内核排队到单独的CUDA流中,从而允许通信和计算重叠。一旦在一个进程组上发出一个或多个异步操作,必须通过调用work.wait()与其他CUDA流同步,然后再使用另一个进程组。
参见并行使用多个NCCL通信器以了解更多详情。
- Parameters
超时时间 (timedelta, 可选) – 有关详细信息和默认值,请参见init_process_group。
后端 (str 或 Backend, 可选) – 要使用的后端。根据构建时的配置,有效值为
gloo和nccl。 默认情况下使用与全局组相同的后端。此字段应以小写字符串形式给出(例如,"gloo"),也可以通过Backend属性访问(例如,Backend.GLOO)。如果传递了None,则将使用默认进程组对应的后端。默认值为None。pg_options (ProcessGroupOptions, 可选) – 指定在特定进程组构建期间需要传递的其他选项。 例如,在
nccl后端中,可以指定is_high_priority_stream, 以便进程组可以选择高优先级的CUDA流。有关可用于配置NCCL的其他选项,请参见https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/api/types.html#ncclconfig-tuse_local_synchronization (bool, 可选) – 在进程组创建结束时执行一个组本地 屏障。这与非成员秩不需要调用API并且不 加入屏障的情况不同。
组描述 (字符串, 可选) – 用于描述进程组的字符串。
- Returns
一个可以传递给集体调用的分布式组句柄,或者如果该排名不属于
ranks组,则为GroupMember.NON_GROUP_MEMBER。
注意:use_local_synchronization 与 MPI 不兼容。
注意:虽然使用 use_local_synchronization=True 在较大的集群和较小的进程组中可以显著加快速度,但必须小心,因为它会改变集群的行为,因为非成员秩不会加入组屏障()。
注意:当每个秩创建多个重叠的进程组时,使用 use_local_synchronization=True 可能会导致死锁。为了避免这种情况,请确保所有秩都遵循相同的全局创建顺序。
- torch.distributed.get_group_rank(group, global_rank)[source]¶
将全局排名转换为组内排名。
global_rank必须是group的一部分,否则将引发 RuntimeError。- Parameters
组 (进程组) – 用于查找相对排名的进程组。
全局排名 (int) – 要查询的全局排名。
- Returns
相对于
group的global_rank的组排名- Return type
注意:在默认进程组上调用此函数将返回身份标识
DeviceMesh¶
DeviceMesh 是一个更高层次的抽象,用于管理进程组(或 NCCL 通信器)。它允许用户轻松创建节点间和节点内进程组,而无需担心如何为不同的子进程组正确设置排名,并且它有助于轻松管理这些分布式进程组。可以使用 init_device_mesh() 函数来创建新的 DeviceMesh,其中网格形状描述了设备拓扑结构。
- class torch.distributed.device_mesh.DeviceMesh(device_type, mesh, *, mesh_dim_names=None, _init_backend=True)[source]¶
DeviceMesh 表示一个设备网格,其中设备的布局可以表示为 n 维数组,而该 n 维数组中的每个值都是默认进程组秩的全局 ID。
DeviceMesh 可用于描述设备在集群中的布局,并作为集群内设备列表之间通信的代理。
DeviceMesh 可以用作上下文管理器。
注意
DeviceMesh 遵循 SPMD 编程模型,这意味着相同的 PyTorch Python 程序 在集群中的所有进程/排名上运行。因此,用户需要确保 mesh 数组(描述设备布局)在所有排名中应保持一致。 不一致的 mesh 将导致静默挂起。
- Parameters
设备类型 (str) – 网格的设备类型。目前支持:“cpu”,“cuda/cuda-like”。
网格 (ndarray) – 描述设备布局的多维数组或整数张量,其中ID是默认进程组的全局ID。
- Returns
一个
DeviceMesh对象,表示设备布局。- Return type
以下程序以SPMD方式在每个进程/排名上运行。在这个例子中,我们有2个主机,每个主机有4个GPU。 对网格的第一维度进行规约将在列(0, 4)、.. 和(3, 7)之间进行规约,对网格的第二维度进行规约将在行(0, 1, 2, 3)和(4, 5, 6, 7)之间进行规约。
- Example::
>>> from torch.distributed.device_mesh import DeviceMesh >>> >>> # Initialize device mesh as (2, 4) to represent the topology >>> # of cross-host(dim 0), and within-host (dim 1). >>> mesh = DeviceMesh(device_type="cuda", mesh=[[0, 1, 2, 3],[4, 5, 6, 7]])
点对点通信¶
- torch.distributed.recv(tensor, src=None, group=None, tag=0)[source]¶
同步接收一个张量。
警告
tag不支持与NCCL后端一起使用。
isend() 和 irecv()
在使用时返回分布式请求对象。一般来说,这种对象的类型是未指定的
因为它们不应该手动创建,但可以保证支持两种方法:
is_completed()- 如果操作已完成,则返回Truewait()- 将阻塞进程,直到操作完成。is_completed()保证一旦返回则返回True。
- torch.distributed.isend(tensor, dst, group=None, tag=0)[source]¶
异步发送一个张量。
警告
在请求完成之前修改
tensor会导致未定义的行为。警告
tag不支持与NCCL后端一起使用。
- torch.distributed.irecv(tensor, src=None, group=None, tag=0)[source]¶
异步接收一个张量。
警告
tag不支持与NCCL后端一起使用。
- torch.distributed.send_object_list(object_list, dst, group=None, device=None)[source]¶
发送可序列化对象
object_list同步。类似于
send(),但可以传入Python对象。 请注意,object_list中的所有对象都必须是可 picklable 的才能被发送。- Parameters
对象列表 (列表[任意类型]) – 输入对象的列表。 每个对象必须是可以序列化的。接收方必须提供大小相等的列表。
目标 (int) – 发送
object_list到的目标秩。 目标秩基于全局进程组(无论group参数如何)组 – (进程组,可选):要操作的进程组。如果为None,则将使用默认进程组。默认值是
None。设备 (
torch.device,可选) – 如果不为 None,则对象会被序列化并转换为张量,并在发送前移动到device。默认值为None。
- Returns
None.
注意
对于基于NCCL的进程组,对象的内部张量表示必须在通信之前移动到GPU设备。在这种情况下,使用的设备由
torch.cuda.current_device()给出,并且用户有责任确保设置为每个秩都有一个单独的GPU,通过torch.cuda.set_device()。警告
send_object_list()隐式使用了pickle模块,这已知是不安全的。可以构造恶意的pickle数据,在反序列化过程中执行任意代码。仅在信任数据时调用此函数。警告
调用
send_object_list()使用GPU张量并不被很好地支持 并且效率低下,因为它会引发GPU -> CPU传输,因为张量会被 序列化。请考虑使用send()代替。- Example::
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> # Assumes backend is not NCCL >>> device = torch.device("cpu") >>> if dist.get_rank() == 0: >>> # Assumes world_size of 2. >>> objects = ["foo", 12, {1: 2}] # any picklable object >>> dist.send_object_list(objects, dst=1, device=device) >>> else: >>> objects = [None, None, None] >>> dist.recv_object_list(objects, src=0, device=device) >>> objects ['foo', 12, {1: 2}]
- torch.distributed.recv_object_list(object_list, src=None, group=None, device=None)[source]¶
接收可序列化对象
object_list个,同步进行。类似于
recv(),但可以接收Python对象。- Parameters
对象列表 (列表[任意类型]) – 需要接收的对象列表。 必须提供一个与发送列表大小相等的列表。
src (int, 可选) – 从哪个源等级接收数据
object_list。 源等级基于全局进程组(无论group参数如何) 如果设置为 None,则会从任意等级接收。默认值是None。组 – (进程组,可选):要操作的进程组。如果为None,则将使用默认进程组。默认值是
None。设备 (
torch.device,可选) – 如果不为 None,则在此设备上接收。 默认值为None。
- Returns
发送者排名。如果排名不属于组,则为-1。如果排名属于组,
object_list将包含来自src排名的对象。
注意
对于基于NCCL的进程组,对象的内部张量表示必须在通信之前移动到GPU设备。在这种情况下,使用的设备由
torch.cuda.current_device()给出,并且用户有责任确保设置为每个秩都有一个单独的GPU,通过torch.cuda.set_device()。警告
recv_object_list()隐式使用了pickle模块,这已知是不安全的。可以构造恶意的pickle数据,在反序列化过程中执行任意代码。仅在信任数据时调用此函数。警告
调用
recv_object_list()使用GPU张量并不被很好地支持 并且效率低下,因为它会引发GPU -> CPU传输,因为张量会被 序列化。请考虑使用recv()代替。- Example::
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> # Assumes backend is not NCCL >>> device = torch.device("cpu") >>> if dist.get_rank() == 0: >>> # Assumes world_size of 2. >>> objects = ["foo", 12, {1: 2}] # any picklable object >>> dist.send_object_list(objects, dst=1, device=device) >>> else: >>> objects = [None, None, None] >>> dist.recv_object_list(objects, src=0, device=device) >>> objects ['foo', 12, {1: 2}]
- torch.distributed.batch_isend_irecv(p2p_op_list)[source]¶
异步发送或接收一批张量并返回请求列表。
处理
p2p_op_list中的每个操作,并返回相应的请求。目前支持NCCL、Gloo和UCC后端。- Parameters
p2p_op_list – 一个点对点操作的列表(每个操作符的类型为
torch.distributed.P2POp)。列表中isend/irecv的顺序很重要,需要与远程端对应的isend/irecv匹配。- Returns
调用 op_list 中相应操作返回的分布式请求对象列表。
示例
>>> send_tensor = torch.arange(2, dtype=torch.float32) + 2 * rank >>> recv_tensor = torch.randn(2, dtype=torch.float32) >>> send_op = dist.P2POp(dist.isend, send_tensor, (rank + 1)%world_size) >>> recv_op = dist.P2POp(dist.irecv, recv_tensor, (rank - 1 + world_size)%world_size) >>> reqs = batch_isend_irecv([send_op, recv_op]) >>> for req in reqs: >>> req.wait() >>> recv_tensor tensor([2, 3]) # Rank 0 tensor([0, 1]) # Rank 1
注意
注意,当此API与NCCL PG后端一起使用时,用户必须将当前GPU设备设置为torch.cuda.set_device,否则会导致意外挂起问题。
此外,如果此API是
group中的第一个集体调用,并且传递给dist.P2POp,则group的所有秩都必须参与此API调用;否则,行为是未定义的。如果此API调用不是group中的第一个集体调用,则允许仅涉及group的一组秩的批量P2P操作。
同步和异步集体操作¶
每个集体操作函数支持以下两种操作,具体取决于传递给集体的async_op标志的设置:
同步操作 - 默认模式,当 async_op 被设置为 False 时。
当函数返回时,保证集体操作已经执行。对于 CUDA 操作,不保证 CUDA 操作已完成,因为 CUDA 操作是异步的。对于 CPU 集体操作,任何使用集体调用输出的进一步函数调用将按预期行为运行。对于 CUDA 集体操作,在同一 CUDA 流上使用输出的函数调用将按预期行为运行。用户必须注意在不同流下运行时的同步问题。有关 CUDA 语义(如流同步)的详细信息,请参阅 CUDA 语义。
请参阅以下脚本,以查看 CPU 和 CUDA 操作中这些语义的差异示例。
异步操作 - 当 async_op 设置为 True 时。集体操作函数返回一个分布式请求对象。通常情况下,你不需要手动创建它,并且保证支持两种方法:
is_completed()- 在CPU集合操作的情况下,如果已完成则返回True。在CUDA操作的情况下, 如果操作已成功排队到CUDA流,并且可以在默认流上使用输出而无需进一步同步,则返回True。wait()- 在CPU集合操作的情况下,将阻塞进程直到操作完成。在CUDA集合操作的情况下,将阻塞直到操作成功地排队到CUDA流上,并且可以在默认流上使用输出而无需进一步同步。get_future()- 返回torch._C.Future对象。支持NCCL,也支持GLOO和MPI上的大多数操作,除了点对点操作。 注意:随着我们继续采用Futures并合并API,get_future()调用可能会变得冗余。
示例
以下代码可以作为使用分布式集合操作时CUDA操作语义的参考。 它显示了在不同CUDA流上使用集合输出时显式同步的必要性:
# Code runs on each rank.
dist.init_process_group("nccl", rank=rank, world_size=2)
output = torch.tensor([rank]).cuda(rank)
s = torch.cuda.Stream()
handle = dist.all_reduce(output, async_op=True)
# Wait ensures the operation is enqueued, but not necessarily complete.
handle.wait()
# Using result on non-default stream.
with torch.cuda.stream(s):
s.wait_stream(torch.cuda.default_stream())
output.add_(100)
if rank == 0:
# if the explicit call to wait_stream was omitted, the output below will be
# non-deterministically 1 or 101, depending on whether the allreduce overwrote
# the value after the add completed.
print(output)
集体函数¶
- torch.distributed.broadcast(tensor, src, group=None, async_op=False)[source]¶
将张量广播到整个组。
tensor必须在所有参与集体操作的进程中具有相同数量的元素。
- torch.distributed.broadcast_object_list(object_list, src=0, group=None, device=None)[source]¶
在
object_list中广播可pickle的对象到整个组。类似于
broadcast(),但可以传递Python对象。 请注意,在object_list中的所有对象都必须是可pickle的才能进行广播。- Parameters
object_list (List[Any]) – 要广播的输入对象列表。 每个对象都必须是可pickle的。只有
src秩上的对象会被广播,但每个秩都必须提供相同大小的列表。src (int) – 广播的源秩
object_list。 源秩基于全局进程组(与group参数无关)组 – (进程组,可选):要操作的进程组。如果为None,则将使用默认进程组。默认值是
None。设备 (
torch.device, 可选) – 如果不是None,对象将被序列化并转换为张量,然后在广播之前移动到device。默认值是None。
- Returns
None. 如果秩是组的一部分,object_list将包含从src秩广播的对象。
注意
对于基于NCCL的进程组,对象的内部张量表示必须在通信之前移动到GPU设备。在这种情况下,使用的设备由
torch.cuda.current_device()给出,并且用户有责任确保设置为每个秩都有一个单独的GPU,通过torch.cuda.set_device()。注意
请注意,此API与
broadcast()集体略有不同,因为它不提供async_op句柄,因此 将是一个阻塞调用。警告
broadcast_object_list()隐式使用了pickle模块,这已知是不安全的。可以构造恶意的pickle数据,在反序列化过程中执行任意代码。仅在信任数据时调用此函数。警告
调用
broadcast_object_list()使用GPU张量并不被很好地支持 并且效率低下,因为它会引发GPU -> CPU传输,因为张量会被 序列化。请考虑使用broadcast()代替。- Example::
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> if dist.get_rank() == 0: >>> # Assumes world_size of 3. >>> objects = ["foo", 12, {1: 2}] # any picklable object >>> else: >>> objects = [None, None, None] >>> # Assumes backend is not NCCL >>> device = torch.device("cpu") >>> dist.broadcast_object_list(objects, src=0, device=device) >>> objects ['foo', 12, {1: 2}]
- torch.distributed.all_reduce(tensor, op=<RedOpType.SUM: 0>, group=None, async_op=False)[source]¶
在所有机器上以一种所有机器都能获得最终结果的方式减少张量数据。
调用后
tensor在所有进程中将逐位完全相同。支持复数张量。
- Parameters
- Returns
异步工作句柄,如果 async_op 设置为 True。 如果没有设置 async_op 或者不属于该组,则为 None。
示例
>>> # All tensors below are of torch.int64 type. >>> # We have 2 process groups, 2 ranks. >>> device = torch.device(f'cuda:{rank}') >>> tensor = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank >>> tensor tensor([1, 2], device='cuda:0') # Rank 0 tensor([3, 4], device='cuda:1') # Rank 1 >>> dist.all_reduce(tensor, op=ReduceOp.SUM) >>> tensor tensor([4, 6], device='cuda:0') # Rank 0 tensor([4, 6], device='cuda:1') # Rank 1
>>> # All tensors below are of torch.cfloat type. >>> # We have 2 process groups, 2 ranks. >>> tensor = torch.tensor([1+1j, 2+2j], dtype=torch.cfloat, device=device) + 2 * rank * (1+1j) >>> tensor tensor([1.+1.j, 2.+2.j], device='cuda:0') # Rank 0 tensor([3.+3.j, 4.+4.j], device='cuda:1') # Rank 1 >>> dist.all_reduce(tensor, op=ReduceOp.SUM) >>> tensor tensor([4.+4.j, 6.+6.j], device='cuda:0') # Rank 0 tensor([4.+4.j, 6.+6.j], device='cuda:1') # Rank 1
- torch.distributed.reduce(tensor, dst, op=<RedOpType.SUM: 0>, group=None, async_op=False)[source]¶
在所有机器上减少张量数据。
只有排名为
dst的进程将接收最终结果。
- torch.distributed.all_gather(tensor_list, tensor, group=None, async_op=False)[source]¶
从整个组中收集张量并放入列表中。
支持复杂和大小不一的张量。
- Parameters
- Returns
异步工作句柄,如果 async_op 设置为 True。 如果没有设置 async_op 或者不属于该组,则为 None。
示例
>>> # All tensors below are of torch.int64 dtype. >>> # We have 2 process groups, 2 ranks. >>> device = torch.device(f'cuda:{rank}') >>> tensor_list = [torch.zeros(2, dtype=torch.int64, device=device) for _ in range(2)] >>> tensor_list [tensor([0, 0], device='cuda:0'), tensor([0, 0], device='cuda:0')] # Rank 0 [tensor([0, 0], device='cuda:1'), tensor([0, 0], device='cuda:1')] # Rank 1 >>> tensor = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank >>> tensor tensor([1, 2], device='cuda:0') # Rank 0 tensor([3, 4], device='cuda:1') # Rank 1 >>> dist.all_gather(tensor_list, tensor) >>> tensor_list [tensor([1, 2], device='cuda:0'), tensor([3, 4], device='cuda:0')] # Rank 0 [tensor([1, 2], device='cuda:1'), tensor([3, 4], device='cuda:1')] # Rank 1
>>> # All tensors below are of torch.cfloat dtype. >>> # We have 2 process groups, 2 ranks. >>> tensor_list = [torch.zeros(2, dtype=torch.cfloat, device=device) for _ in range(2)] >>> tensor_list [tensor([0.+0.j, 0.+0.j], device='cuda:0'), tensor([0.+0.j, 0.+0.j], device='cuda:0')] # Rank 0 [tensor([0.+0.j, 0.+0.j], device='cuda:1'), tensor([0.+0.j, 0.+0.j], device='cuda:1')] # Rank 1 >>> tensor = torch.tensor([1+1j, 2+2j], dtype=torch.cfloat, device=device) + 2 * rank * (1+1j) >>> tensor tensor([1.+1.j, 2.+2.j], device='cuda:0') # Rank 0 tensor([3.+3.j, 4.+4.j], device='cuda:1') # Rank 1 >>> dist.all_gather(tensor_list, tensor) >>> tensor_list [tensor([1.+1.j, 2.+2.j], device='cuda:0'), tensor([3.+3.j, 4.+4.j], device='cuda:0')] # Rank 0 [tensor([1.+1.j, 2.+2.j], device='cuda:1'), tensor([3.+3.j, 4.+4.j], device='cuda:1')] # Rank 1
- torch.distributed.all_gather_into_tensor(output_tensor, input_tensor, group=None, async_op=False)[source]¶
从所有秩收集张量并将其放入一个单一的输出张量中。
该函数要求每个进程中的所有张量大小相同。
- Parameters
output_tensor (Tensor) – 用于容纳所有秩的张量元素的输出张量。它必须正确地调整大小以具有以下形式之一: (i) 沿主维度连接所有输入张量;关于“连接”的定义,请参见
torch.cat(); (ii) 沿主维度堆叠所有输入张量;关于“堆叠”的定义,请参见torch.stack()。 下面的示例可能更好地解释支持的输出形式。input_tensor (Tensor) – 从当前秩收集的张量。 与
all_gatherAPI 不同,此 API 中的输入张量在所有秩上必须具有相同的大小。组 (进程组, 可选) – 要操作的进程组。如果为None, 将使用默认的进程组。
async_op (bool, optional) – 此操作是否应为异步操作
- Returns
异步工作句柄,如果 async_op 设置为 True。 如果没有设置 async_op 或者不属于该组,则为 None。
示例
>>> # All tensors below are of torch.int64 dtype and on CUDA devices. >>> # We have two ranks. >>> device = torch.device(f'cuda:{rank}') >>> tensor_in = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank >>> tensor_in tensor([1, 2], device='cuda:0') # Rank 0 tensor([3, 4], device='cuda:1') # Rank 1 >>> # Output in concatenation form >>> tensor_out = torch.zeros(world_size * 2, dtype=torch.int64, device=device) >>> dist.all_gather_into_tensor(tensor_out, tensor_in) >>> tensor_out tensor([1, 2, 3, 4], device='cuda:0') # Rank 0 tensor([1, 2, 3, 4], device='cuda:1') # Rank 1 >>> # Output in stack form >>> tensor_out2 = torch.zeros(world_size, 2, dtype=torch.int64, device=device) >>> dist.all_gather_into_tensor(tensor_out2, tensor_in) >>> tensor_out2 tensor([[1, 2], [3, 4]], device='cuda:0') # Rank 0 tensor([[1, 2], [3, 4]], device='cuda:1') # Rank 1
警告
Gloo 后端不支持此 API。
- torch.distributed.all_gather_object(object_list, obj, group=None)[source]¶
从整个组中收集可pickle的对象到一个列表中。
类似于
all_gather(),但可以传递Python对象。 请注意,该对象必须是可pickle的才能被收集。- Parameters
对象列表 (列表[任意]) – 输出列表。它的大小应与该组的大小相匹配,并且将包含输出。
对象 (任意类型) – 从当前进程广播的可序列化的Python对象。
组 (进程组, 可选) – 要操作的进程组。如果为 None, 将使用默认的进程组。默认值是
None。
- Returns
无。如果调用的秩是该组的一部分,则集体操作的结果将填充到输入
object_list中。如果调用的秩不是该组的一部分,则传递的object_list将保持不变。
注意
请注意,此API与
all_gather()集体略有不同,因为它不提供async_op句柄,因此 将是一个阻塞调用。注意
对于基于NCCL的进程组,对象的内部张量表示必须在通信之前移动到GPU设备。在这种情况下,使用的设备由
torch.cuda.current_device()给出,并且用户有责任确保设置为每个秩都有一个独立的GPU,通过torch.cuda.set_device()。警告
all_gather_object()隐式使用了pickle模块,这被认为是不安全的。可以构造恶意的pickle数据,在反序列化过程中执行任意代码。仅在信任数据时调用此函数。警告
调用
all_gather_object()使用GPU张量并不被很好地支持 并且效率低下,因为它会引发GPU -> CPU传输,因为张量会被 序列化。请考虑使用all_gather()代替。- Example::
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> # Assumes world_size of 3. >>> gather_objects = ["foo", 12, {1: 2}] # any picklable object >>> output = [None for _ in gather_objects] >>> dist.all_gather_object(output, gather_objects[dist.get_rank()]) >>> output ['foo', 12, {1: 2}]
- torch.distributed.gather(tensor, gather_list=None, dst=0, group=None, async_op=False)[source]¶
在单个进程中收集一组张量。
该函数要求每个进程中的所有张量大小相同。
- Parameters
- Returns
异步工作句柄,如果 async_op 设置为 True。 如果没有设置 async_op 或者不属于该组,则为 None。
- torch.distributed.gather_object(obj, object_gather_list=None, dst=0, group=None)[source]¶
从整个组中收集可pickle的对象到一个单一进程中。
类似于
gather(),但可以传递Python对象。请注意,该对象必须是可pickle的才能被收集。- Parameters
- Returns
无。在
dst排名中,object_gather_list将包含集体操作的输出。
注意
请注意,此API与gather集体操作略有不同 因为它不提供异步操作句柄,因此将是一个阻塞调用。
注意
对于基于NCCL的进程组,对象的内部张量表示必须在通信之前移动到GPU设备。在这种情况下,使用的设备由
torch.cuda.current_device()给出,并且用户有责任确保设置为每个秩都有一个独立的GPU,通过torch.cuda.set_device()。警告
gather_object()隐式使用了pickle模块,这被认为是不安全的。可以构造恶意的pickle数据,在反序列化过程中执行任意代码。仅在信任数据时调用此函数。警告
调用
gather_object()使用GPU张量并不被很好地支持 并且效率低下,因为它会引发GPU -> CPU传输,因为张量会被 序列化。请考虑使用gather()代替。- Example::
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> # Assumes world_size of 3. >>> gather_objects = ["foo", 12, {1: 2}] # any picklable object >>> output = [None for _ in gather_objects] >>> dist.gather_object( ... gather_objects[dist.get_rank()], ... output if dist.get_rank() == 0 else None, ... dst=0 ... ) >>> # On rank 0 >>> output ['foo', 12, {1: 2}]
- torch.distributed.scatter(tensor, scatter_list=None, src=0, group=None, async_op=False)[source]¶
将一个张量列表分散到组中的所有进程。
每个进程将接收一个张量,并将其数据存储在
tensor参数中。支持复数张量。
- Parameters
- Returns
异步工作句柄,如果 async_op 设置为 True。 如果没有设置 async_op 或者不属于该组,则为 None。
注意
请注意,scatter_list 中的所有 Tensor 必须具有相同的大小。
- Example::
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> tensor_size = 2 >>> t_ones = torch.ones(tensor_size) >>> t_fives = torch.ones(tensor_size) * 5 >>> output_tensor = torch.zeros(tensor_size) >>> if dist.get_rank() == 0: >>> # Assumes world_size of 2. >>> # Only tensors, all of which must be the same size. >>> scatter_list = [t_ones, t_fives] >>> else: >>> scatter_list = None >>> dist.scatter(output_tensor, scatter_list, src=0) >>> # Rank i gets scatter_list[i]. For example, on rank 1: >>> output_tensor tensor([5., 5.])
- torch.distributed.scatter_object_list(scatter_object_output_list, scatter_object_input_list, src=0, group=None)[source]¶
将可序列化的对象从
scatter_object_input_list分发到整个组。类似于
scatter(),但可以传递Python对象。在 每个秩上,分散的对象将作为scatter_object_output_list的第一个元素存储。 请注意,scatter_object_input_list中的所有对象都必须是可pickle的才能被分散。- Parameters
scatter_object_output_list (List[Any]) – 非空列表,其第一个元素将存储散列到此秩的对象。
scatter_object_input_list (List[Any]) – 需要分发的输入对象列表。 每个对象都必须是可pickle的。只有在
src等级上的对象才会被分发,对于非源等级,该参数可以是None。src (int) – 用于分发的源秩
scatter_object_input_list。 源秩基于全局进程组(与group参数无关)。组 – (进程组,可选):要操作的进程组。如果为None,则将使用默认进程组。默认值是
None。
- Returns
None. 如果秩是组的一部分,scatter_object_output_list将把其第一个元素设置为该秩的分散对象。
注意
请注意,此API与scatter集体略有不同 因为它不提供一个
async_op句柄,因此将是一个 阻塞调用。警告
scatter_object_list()隐式使用了pickle模块,这已知是不安全的。可以构造恶意的pickle数据,在反序列化过程中执行任意代码。仅在信任数据时调用此函数。警告
调用
scatter_object_list()使用GPU张量并不被很好地支持 并且效率低下,因为它会引发GPU -> CPU传输,因为张量会被 序列化。请考虑使用scatter()代替。- Example::
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> if dist.get_rank() == 0: >>> # Assumes world_size of 3. >>> objects = ["foo", 12, {1: 2}] # any picklable object >>> else: >>> # Can be any list on non-src ranks, elements are not used. >>> objects = [None, None, None] >>> output_list = [None] >>> dist.scatter_object_list(output_list, objects, src=0) >>> # Rank i gets objects[i]. For example, on rank 2: >>> output_list [{1: 2}]
- torch.distributed.reduce_scatter(output, input_list, op=<RedOpType.SUM: 0>, group=None, async_op=False)[source]¶
减少,然后将一组张量列表分散到所有进程中。
- torch.distributed.reduce_scatter_tensor(output, input, op=<RedOpType.SUM: 0>, group=None, async_op=False)[source]¶
减少,然后将张量分散到组中的所有秩。
- Parameters
- Returns
异步工作句柄,如果 async_op 设置为 True。 如果没有设置 async_op 或者不属于该组,则为 None。
示例
>>> # All tensors below are of torch.int64 dtype and on CUDA devices. >>> # We have two ranks. >>> device = torch.device(f'cuda:{rank}') >>> tensor_out = torch.zeros(2, dtype=torch.int64, device=device) >>> # Input in concatenation form >>> tensor_in = torch.arange(world_size * 2, dtype=torch.int64, device=device) >>> tensor_in tensor([0, 1, 2, 3], device='cuda:0') # Rank 0 tensor([0, 1, 2, 3], device='cuda:1') # Rank 1 >>> dist.reduce_scatter_tensor(tensor_out, tensor_in) >>> tensor_out tensor([0, 2], device='cuda:0') # Rank 0 tensor([4, 6], device='cuda:1') # Rank 1 >>> # Input in stack form >>> tensor_in = torch.reshape(tensor_in, (world_size, 2)) >>> tensor_in tensor([[0, 1], [2, 3]], device='cuda:0') # Rank 0 tensor([[0, 1], [2, 3]], device='cuda:1') # Rank 1 >>> dist.reduce_scatter_tensor(tensor_out, tensor_in) >>> tensor_out tensor([0, 2], device='cuda:0') # Rank 0 tensor([4, 6], device='cuda:1') # Rank 1
警告
Gloo 后端不支持此 API。
- torch.distributed.all_to_all_single(output, input, output_split_sizes=None, input_split_sizes=None, group=None, async_op=False)[source]¶
将输入张量拆分,然后将拆分后的列表分散到组中的所有进程。
稍后,接收到的张量将从组中的所有进程进行拼接,并作为单个输出张量返回。
支持复数张量。
- Parameters
输出 (张量) – 聚合的拼接输出张量。
输入 (张量) – 用于散射的输入张量。
output_split_sizes – (list[Int], 可选): 输出分割大小,用于维度 0 如果指定为 None 或空,则
output张量的维度 0 必须能被world_size整除。input_split_sizes – (list[Int], 可选): 输入分割大小,用于维度 0 如果指定为 None 或空,则
input张量的维度 0 必须能被world_size整除。组 (进程组, 可选) – 要操作的进程组。如果为None, 将使用默认的进程组。
async_op (bool, optional) – 此操作是否应为异步操作。
- Returns
异步工作句柄,如果 async_op 设置为 True。 如果没有设置 async_op 或者不属于该组,则为 None。
警告
all_to_all_single 是实验性的,可能会发生变化。
示例
>>> input = torch.arange(4) + rank * 4 >>> input tensor([0, 1, 2, 3]) # Rank 0 tensor([4, 5, 6, 7]) # Rank 1 tensor([8, 9, 10, 11]) # Rank 2 tensor([12, 13, 14, 15]) # Rank 3 >>> output = torch.empty([4], dtype=torch.int64) >>> dist.all_to_all_single(output, input) >>> output tensor([0, 4, 8, 12]) # Rank 0 tensor([1, 5, 9, 13]) # Rank 1 tensor([2, 6, 10, 14]) # Rank 2 tensor([3, 7, 11, 15]) # Rank 3
>>> # Essentially, it is similar to following operation: >>> scatter_list = list(input.chunk(world_size)) >>> gather_list = list(output.chunk(world_size)) >>> for i in range(world_size): >>> dist.scatter(gather_list[i], scatter_list if i == rank else [], src = i)
>>> # Another example with uneven split >>> input tensor([0, 1, 2, 3, 4, 5]) # Rank 0 tensor([10, 11, 12, 13, 14, 15, 16, 17, 18]) # Rank 1 tensor([20, 21, 22, 23, 24]) # Rank 2 tensor([30, 31, 32, 33, 34, 35, 36]) # Rank 3 >>> input_splits [2, 2, 1, 1] # Rank 0 [3, 2, 2, 2] # Rank 1 [2, 1, 1, 1] # Rank 2 [2, 2, 2, 1] # Rank 3 >>> output_splits [2, 3, 2, 2] # Rank 0 [2, 2, 1, 2] # Rank 1 [1, 2, 1, 2] # Rank 2 [1, 2, 1, 1] # Rank 3 >>> output = ... >>> dist.all_to_all_single(output, input, output_splits, input_splits) >>> output tensor([ 0, 1, 10, 11, 12, 20, 21, 30, 31]) # Rank 0 tensor([ 2, 3, 13, 14, 22, 32, 33]) # Rank 1 tensor([ 4, 15, 16, 23, 34, 35]) # Rank 2 tensor([ 5, 17, 18, 24, 36]) # Rank 3
>>> # Another example with tensors of torch.cfloat type. >>> input = torch.tensor([1+1j, 2+2j, 3+3j, 4+4j], dtype=torch.cfloat) + 4 * rank * (1+1j) >>> input tensor([1+1j, 2+2j, 3+3j, 4+4j]) # Rank 0 tensor([5+5j, 6+6j, 7+7j, 8+8j]) # Rank 1 tensor([9+9j, 10+10j, 11+11j, 12+12j]) # Rank 2 tensor([13+13j, 14+14j, 15+15j, 16+16j]) # Rank 3 >>> output = torch.empty([4], dtype=torch.int64) >>> dist.all_to_all_single(output, input) >>> output tensor([1+1j, 5+5j, 9+9j, 13+13j]) # Rank 0 tensor([2+2j, 6+6j, 10+10j, 14+14j]) # Rank 1 tensor([3+3j, 7+7j, 11+11j, 15+15j]) # Rank 2 tensor([4+4j, 8+8j, 12+12j, 16+16j]) # Rank 3
- torch.distributed.all_to_all(output_tensor_list, input_tensor_list, group=None, async_op=False)[source]¶
将输入张量列表分散到组中的所有进程,并在输出列表中返回收集的张量列表。
支持复数张量。
- Parameters
- Returns
异步工作句柄,如果 async_op 设置为 True。 如果没有设置 async_op 或者不属于该组,则为 None。
警告
all_to_all 是实验性的,可能会发生变化。
示例
>>> input = torch.arange(4) + rank * 4 >>> input = list(input.chunk(4)) >>> input [tensor([0]), tensor([1]), tensor([2]), tensor([3])] # Rank 0 [tensor([4]), tensor([5]), tensor([6]), tensor([7])] # Rank 1 [tensor([8]), tensor([9]), tensor([10]), tensor([11])] # Rank 2 [tensor([12]), tensor([13]), tensor([14]), tensor([15])] # Rank 3 >>> output = list(torch.empty([4], dtype=torch.int64).chunk(4)) >>> dist.all_to_all(output, input) >>> output [tensor([0]), tensor([4]), tensor([8]), tensor([12])] # Rank 0 [tensor([1]), tensor([5]), tensor([9]), tensor([13])] # Rank 1 [tensor([2]), tensor([6]), tensor([10]), tensor([14])] # Rank 2 [tensor([3]), tensor([7]), tensor([11]), tensor([15])] # Rank 3
>>> # Essentially, it is similar to following operation: >>> scatter_list = input >>> gather_list = output >>> for i in range(world_size): >>> dist.scatter(gather_list[i], scatter_list if i == rank else [], src=i)
>>> input tensor([0, 1, 2, 3, 4, 5]) # Rank 0 tensor([10, 11, 12, 13, 14, 15, 16, 17, 18]) # Rank 1 tensor([20, 21, 22, 23, 24]) # Rank 2 tensor([30, 31, 32, 33, 34, 35, 36]) # Rank 3 >>> input_splits [2, 2, 1, 1] # Rank 0 [3, 2, 2, 2] # Rank 1 [2, 1, 1, 1] # Rank 2 [2, 2, 2, 1] # Rank 3 >>> output_splits [2, 3, 2, 2] # Rank 0 [2, 2, 1, 2] # Rank 1 [1, 2, 1, 2] # Rank 2 [1, 2, 1, 1] # Rank 3 >>> input = list(input.split(input_splits)) >>> input [tensor([0, 1]), tensor([2, 3]), tensor([4]), tensor([5])] # Rank 0 [tensor([10, 11, 12]), tensor([13, 14]), tensor([15, 16]), tensor([17, 18])] # Rank 1 [tensor([20, 21]), tensor([22]), tensor([23]), tensor([24])] # Rank 2 [tensor([30, 31]), tensor([32, 33]), tensor([34, 35]), tensor([36])] # Rank 3 >>> output = ... >>> dist.all_to_all(output, input) >>> output [tensor([0, 1]), tensor([10, 11, 12]), tensor([20, 21]), tensor([30, 31])] # Rank 0 [tensor([2, 3]), tensor([13, 14]), tensor([22]), tensor([32, 33])] # Rank 1 [tensor([4]), tensor([15, 16]), tensor([23]), tensor([34, 35])] # Rank 2 [tensor([5]), tensor([17, 18]), tensor([24]), tensor([36])] # Rank 3
>>> # Another example with tensors of torch.cfloat type. >>> input = torch.tensor([1+1j, 2+2j, 3+3j, 4+4j], dtype=torch.cfloat) + 4 * rank * (1+1j) >>> input = list(input.chunk(4)) >>> input [tensor([1+1j]), tensor([2+2j]), tensor([3+3j]), tensor([4+4j])] # Rank 0 [tensor([5+5j]), tensor([6+6j]), tensor([7+7j]), tensor([8+8j])] # Rank 1 [tensor([9+9j]), tensor([10+10j]), tensor([11+11j]), tensor([12+12j])] # Rank 2 [tensor([13+13j]), tensor([14+14j]), tensor([15+15j]), tensor([16+16j])] # Rank 3 >>> output = list(torch.empty([4], dtype=torch.int64).chunk(4)) >>> dist.all_to_all(output, input) >>> output [tensor([1+1j]), tensor([5+5j]), tensor([9+9j]), tensor([13+13j])] # Rank 0 [tensor([2+2j]), tensor([6+6j]), tensor([10+10j]), tensor([14+14j])] # Rank 1 [tensor([3+3j]), tensor([7+7j]), tensor([11+11j]), tensor([15+15j])] # Rank 2 [tensor([4+4j]), tensor([8+8j]), tensor([12+12j]), tensor([16+16j])] # Rank 3
- torch.distributed.barrier(group=None, async_op=False, device_ids=None)[source]¶
同步所有进程。
这个集体块会一直处理,直到整个组进入此函数, 如果 async_op 为 False,或者异步工作句柄在调用 wait() 时。
- Parameters
- Returns
异步工作句柄,如果 async_op 设置为 True。 如果没有设置 async_op 或者不属于该组,则为 None。
注意
ProcessGroupNCCL 现在依赖于流同步而不是设备同步来阻塞CPU。因此,请不要假设 barrier() 会执行设备同步。
- torch.distributed.monitored_barrier(group=None, timeout=None, wait_all_ranks=False)[source]¶
同步类似于
torch.distributed.barrier的过程,但考虑一个可配置的超时时间。它能够报告在给定超时时间内未能通过此屏障的秩。 具体来说,对于非零秩,将阻塞直到从秩0处理完发送/接收操作。 秩0将阻塞直到处理完来自其他所有秩的发送/接收操作,并会报告那些未能及时响应的秩的失败情况。请注意,如果一个秩没有达到被监控的屏障(例如由于挂起),那么所有其他秩在被监控的屏障中也会失败。
这个集体操作将阻塞组中的所有进程/秩,直到整个组成功退出函数,这使得它在调试和同步时非常有用。然而,它可能会对性能产生影响,因此只应在需要主机端完全同步点的调试或场景中使用。为了调试目的,可以在应用程序的集体调用之前插入此屏障,以检查是否有任何秩处于不同步状态。
注意
请注意,此集体操作仅支持GLOO后端。
- Parameters
组 (进程组, 可选) – 要操作的进程组。如果为
None,将使用默认的进程组。超时 (datetime.timedelta, 可选) – 监控屏障的超时时间。 如果为
None,将使用默认进程组超时时间。wait_all_ranks (布尔值, 可选) – 是否收集所有失败的等级。默认情况下,此值为
False和monitored_barrier,在等级 0 上遇到第一个失败的等级时会快速失败并抛出错误。通过设置wait_all_ranks=Truemonitored_barrier将收集所有失败的等级,并抛出包含所有失败等级信息的错误。
- Returns
None.
- Example::
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> if dist.get_rank() != 1: >>> dist.monitored_barrier() # Raises exception indicating that >>> # rank 1 did not call into monitored_barrier. >>> # Example with wait_all_ranks=True >>> if dist.get_rank() == 0: >>> dist.monitored_barrier(wait_all_ranks=True) # Raises exception >>> # indicating that ranks 1, 2, ... world_size - 1 did not call into >>> # monitored_barrier.
- class torch.distributed.Work¶
一个Work对象表示PyTorch分布式包中待处理异步操作的句柄。它由非阻塞集合操作返回,例如dist.all_reduce(tensor, async_op=True)。
- class torch.distributed.ReduceOp¶
一个类似枚举的类,用于可用的减少操作:
SUM,PRODUCT,MIN,MAX,BAND,BOR,BXOR, 和PREMUL_SUM。BAND,BOR, 和BXOR约简在使用NCCL后端时不可用。AVG将值除以世界大小,然后再在各个秩之间求和。AVG仅在使用NCCL后端时可用, 并且仅适用于 NCCL 版本 2.10 或更高版本。PREMUL_SUM在本地将输入乘以给定的标量,然后再进行归约。PREMUL_SUM仅在NCCL后端可用, 并且仅在 NCCL 版本 2.11 或更高版本中可用。用户应使用torch.distributed._make_nccl_premul_sum。此外,
MAX、MIN和PRODUCT不支持复数张量。这个类的值可以作为属性访问,例如,
ReduceOp.SUM。 它们用于指定减少集合策略,例如,reduce()。该类不支持
__members__属性。
集体通信性能分析¶
请注意,您可以使用 torch.profiler(推荐,仅在 1.8.1 版本后可用)或 torch.autograd.profiler 来分析此处提到的集体通信和点对点通信 API。所有开箱即用的后端(gloo, nccl, mpi)都受支持,并且集体通信的使用将在分析输出/跟踪中如预期那样呈现。分析您的代码与任何常规的 torch 操作符相同:
import torch
import torch.distributed as dist
with torch.profiler():
tensor = torch.randn(20, 10)
dist.all_reduce(tensor)
请参阅分析器文档以获取分析器功能的完整概述。
多GPU集体函数¶
警告
多GPU函数(代表每个CPU线程多个GPU)已被弃用。从今天起,PyTorch Distributed首选的编程模型是每个线程一个设备,如本文档中的API所示。如果你是一名后端开发人员,并希望支持每个线程多个设备,请联系PyTorch Distributed的维护者。
第三方后端¶
除了内置的 GLOO/MPI/NCCL 后端,PyTorch 分布式通过运行时注册机制支持第三方后端。
有关如何通过 C++ 扩展开发第三方后端的参考资料,请参阅 教程 - 自定义 C++ 和 CUDA 扩展 和
test/cpp_extensions/cpp_c10d_extension.cpp。第三方后端的功能由其自身的实现决定。
新的后端继承自 c10d::ProcessGroup 并在导入时通过 torch.distributed.Backend.register_backend()
注册后端名称和实例化接口。
当手动导入此后端并使用相应的后端名称调用 torch.distributed.init_process_group()
时,torch.distributed 包将在新后端上运行。
警告
第三方后端的支持是实验性的,可能会发生变化。
启动工具¶
torch.distributed 包还提供了启动工具程序在 torch.distributed.launch。此辅助工具可用于为分布式训练在每个节点上启动多个进程。
模块 torch.distributed.launch。
torch.distributed.launch 是一个模块,它在每个训练节点上启动多个分布式
训练进程。
警告
此模块将被废弃,转而使用 torchrun。
该工具可用于单节点分布式训练,其中每个节点将启动一个或多个进程。该工具既可用于CPU训练,也可用于GPU训练。如果该工具用于GPU训练,每个分布式进程将在单个GPU上运行。这可以显著提高单节点训练性能。它也可以用于多节点分布式训练,在每个节点上启动多个进程以实现更好的多节点分布式训练性能。这对于具有多个Infiniband接口且支持直接GPU的系统特别有利,因为所有这些接口都可以被利用来聚合通信带宽。
在单节点分布式训练或多节点分布式训练的情况下,此工具将在每个节点上启动给定数量的进程(--nproc-per-node)。如果用于GPU训练,这个数字需要小于或等于当前系统上的GPU数量(nproc_per_node),并且每个进程将在从 GPU 0 到 GPU (nproc_per_node - 1) 的单个GPU上运行。
如何使用此模块:
单节点多进程分布式训练
python -m torch.distributed.launch --nproc-per-node=NUM_GPUS_YOU_HAVE
YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3 and all other
arguments of your training script)
多节点多进程分布式训练:(例如,两个节点)
节点 1: (IP: 192.168.1.1, 并且有一个空闲端口: 1234)
python -m torch.distributed.launch --nproc-per-node=NUM_GPUS_YOU_HAVE
--nnodes=2 --node-rank=0 --master-addr="192.168.1.1"
--master-port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
and all other arguments of your training script)
节点2:
python -m torch.distributed.launch --nproc-per-node=NUM_GPUS_YOU_HAVE
--nnodes=2 --node-rank=1 --master-addr="192.168.1.1"
--master-port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
and all other arguments of your training script)
要查找此模块提供的可选参数:
python -m torch.distributed.launch --help
重要通知:
此工具和多进程分布式(单节点或多节点)GPU训练目前仅在使用NCCL分布式后端时才能达到最佳性能。因此,对于GPU训练,建议使用NCCL后端。
2. 在你的训练程序中,你必须解析命令行参数:
--local-rank=LOCAL_PROCESS_RANK,该参数将由本模块提供。
如果你的训练程序使用GPU,你应该确保你的代码只在LOCAL_PROCESS_RANK对应的GPU设备上运行。这可以通过以下方式实现:
解析 local_rank 参数
>>> import argparse
>>> parser = argparse.ArgumentParser()
>>> parser.add_argument("--local-rank", "--local_rank", type=int)
>>> args = parser.parse_args()
将您的设备设置为本地排名,使用以下任一方法
>>> torch.cuda.set_device(args.local_rank) # before your code runs
or
>>> with torch.cuda.device(args.local_rank):
>>> # your code to run
>>> ...
在 2.0.0 版本中更改:启动器将传递 --local-rank=<rank> 参数到您的脚本。
从 PyTorch 2.0.0 开始,建议使用带连字符的 --local-rank 而不是之前使用的带下划线的 --local_rank。
为了向后兼容,用户可能需要在其参数解析代码中处理这两种情况。这意味着在参数解析器中同时包括"--local-rank"
和"--local_rank"。如果只提供"--local_rank",
启动程序将触发错误:“错误:未识别的参数:–local-rank=<rank>”。对于仅支持PyTorch 2.0.0+的训练代码,
包括"--local-rank"就足够了。
3. 在你的训练程序中,你应该在开始时调用以下函数来启动分布式后端。强烈建议使用init_method=env://。其他初始化方法(例如tcp://)可能有效,但env://是该模块正式支持的方法。
>>> torch.distributed.init_process_group(backend='YOUR BACKEND',
>>> init_method='env://')
4. 在你的训练程序中,你可以使用常规的分布式函数,或者使用 torch.nn.parallel.DistributedDataParallel() 模块。如果你的训练程序使用GPU进行训练,并且你希望使用 torch.nn.parallel.DistributedDataParallel() 模块,以下是配置方法。
>>> model = torch.nn.parallel.DistributedDataParallel(model,
>>> device_ids=[args.local_rank],
>>> output_device=args.local_rank)
请确保 device_ids 参数被设置为你的代码将要运行的唯一GPU设备ID。
这通常是进程的本地排名。换句话说,device_ids 需要是 [args.local_rank],
并且 output_device 需要是 args.local_rank 才能使用此工具
5. 另一种方法是通过环境变量将 local_rank 传递给子进程
LOCAL_RANK。当您使用
--use-env=True 启动脚本时,此行为会被启用。您必须调整上面的子进程示例,将
args.local_rank 替换为 os.environ['LOCAL_RANK'];指定此标志时,启动器不会传递 --local-rank。
警告
local_rank 并不是全局唯一的:它只是在一台机器上的每个进程中是唯一的。因此,不要用它来决定是否应该执行某些操作,例如,写入网络文件系统。请参见
https://github.com/pytorch/pytorch/issues/12042 了解如果处理不当可能会出现的问题。
生成工具¶
多进程包 - torch.multiprocessing 也提供了一个 spawn 函数在 torch.multiprocessing.spawn() 中。这个辅助函数可以用于生成多个进程。它的工作原理是传入你想要运行的函数,并生成 N 个进程来运行它。这也可以用于多进程分布式训练。
有关如何使用它的参考资料,请参阅 PyTorch 示例 - ImageNet 实现
请注意,此函数需要Python 3.4或更高版本。
调试 torch.distributed 应用程序¶
调试分布式应用程序可能具有挑战性,因为难以理解的挂起、崩溃或跨秩的不一致行为。torch.distributed 提供了一套工具,帮助以自助服务的方式调试训练应用程序:
Python 断点¶
在分布式环境中使用Python的调试器是非常方便的,但由于它不能直接开箱即用,许多人根本不使用它。 PyTorch提供了一个围绕pdb的定制包装器,简化了这一过程。
torch.distributed.breakpoint 让这个过程变得简单。 内部,它以两种方式自定义了pdb的断点行为,但其他方面正常工作如pdb。 1. 只在用户指定的一个进程(rank)上附加调试器。 2. 确保所有其他进程停止,通过使用一个torch.distributed.barrier(),该torch.distributed.barrier()会在调试的进程发出continue时释放。 3. 重定向子进程的stdin,使其连接到您的终端。
要使用它,请在所有排名中简单地发出torch.distributed.breakpoint(rank),并在每种情况下使用相同的rank值。
监控屏障¶
从v1.10开始,torch.distributed.monitored_barrier() 作为 torch.distributed.barrier() 的替代品存在,后者在崩溃时会提供有关哪个秩可能出错的有用信息,即并非所有秩都在给定超时内调用 torch.distributed.monitored_barrier()。 torch.distributed.monitored_barrier() 使用类似于确认的过程中的 send/recv 通信原语实现主机端屏障,允许秩0报告哪些秩未能及时确认屏障。例如,考虑以下函数,其中秩1未能调用 torch.distributed.monitored_barrier()(实际上这可能是由于应用程序错误或先前集体操作中的挂起):
import os
from datetime import timedelta
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
def worker(rank):
dist.init_process_group("nccl", rank=rank, world_size=2)
# monitored barrier requires gloo process group to perform host-side sync.
group_gloo = dist.new_group(backend="gloo")
if rank not in [1]:
dist.monitored_barrier(group=group_gloo, timeout=timedelta(seconds=2))
if __name__ == "__main__":
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "29501"
mp.spawn(worker, nprocs=2, args=())
以下错误消息在秩0上生成,允许用户确定哪些秩可能存在问题并进一步调查:
RuntimeError: Rank 1 failed to pass monitoredBarrier in 2000 ms
Original exception:
[gloo/transport/tcp/pair.cc:598] Connection closed by peer [2401:db00:eef0:1100:3560:0:1c05:25d]:8594
TORCH_DISTRIBUTED_DEBUG¶
使用 TORCH_CPP_LOG_LEVEL=INFO,环境变量 TORCH_DISTRIBUTED_DEBUG 可以用来触发额外的有用日志记录和集体同步检查,以确保所有秩都适当同步。 TORCH_DISTRIBUTED_DEBUG 可以设置为 OFF(默认),INFO 或 DETAIL,具体取决于所需的调试级别。请注意,最详细选项 DETAIL 可能会影响应用程序性能,因此仅在调试问题时使用。
设置 TORCH_DISTRIBUTED_DEBUG=INFO 将在使用 torch.nn.parallel.DistributedDataParallel() 训练的模型初始化时生成额外的调试日志,并且
TORCH_DISTRIBUTED_DEBUG=DETAIL 还将记录选定迭代次数的运行时性能统计信息。这些运行时统计信息
包括前向时间、后向时间、梯度通信时间等数据。例如,给定以下应用程序:
import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
class TwoLinLayerNet(torch.nn.Module):
def __init__(self):
super().__init__()
self.a = torch.nn.Linear(10, 10, bias=False)
self.b = torch.nn.Linear(10, 1, bias=False)
def forward(self, x):
a = self.a(x)
b = self.b(x)
return (a, b)
def worker(rank):
dist.init_process_group("nccl", rank=rank, world_size=2)
torch.cuda.set_device(rank)
print("init model")
model = TwoLinLayerNet().cuda()
print("init ddp")
ddp_model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[rank])
inp = torch.randn(10, 10).cuda()
print("train")
for _ in range(20):
output = ddp_model(inp)
loss = output[0] + output[1]
loss.sum().backward()
if __name__ == "__main__":
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "29501"
os.environ["TORCH_CPP_LOG_LEVEL"]="INFO"
os.environ[
"TORCH_DISTRIBUTED_DEBUG"
] = "DETAIL" # set to DETAIL for runtime logging.
mp.spawn(worker, nprocs=2, args=())
以下日志在初始化时生成:
I0607 16:10:35.739390 515217 logger.cpp:173] [Rank 0]: DDP Initialized with:
broadcast_buffers: 1
bucket_cap_bytes: 26214400
find_unused_parameters: 0
gradient_as_bucket_view: 0
is_multi_device_module: 0
iteration: 0
num_parameter_tensors: 2
output_device: 0
rank: 0
total_parameter_size_bytes: 440
world_size: 2
backend_name: nccl
bucket_sizes: 440
cuda_visible_devices: N/A
device_ids: 0
dtypes: float
master_addr: localhost
master_port: 29501
module_name: TwoLinLayerNet
nccl_async_error_handling: N/A
nccl_blocking_wait: N/A
nccl_debug: WARN
nccl_ib_timeout: N/A
nccl_nthreads: N/A
nccl_socket_ifname: N/A
torch_distributed_debug: INFO
以下日志在运行时生成(当 TORCH_DISTRIBUTED_DEBUG=DETAIL 被设置时):
I0607 16:18:58.085681 544067 logger.cpp:344] [Rank 1 / 2] Training TwoLinLayerNet unused_parameter_size=0
Avg forward compute time: 40838608
Avg backward compute time: 5983335
Avg backward comm. time: 4326421
Avg backward comm/comp overlap time: 4207652
I0607 16:18:58.085693 544066 logger.cpp:344] [Rank 0 / 2] Training TwoLinLayerNet unused_parameter_size=0
Avg forward compute time: 42850427
Avg backward compute time: 3885553
Avg backward comm. time: 2357981
Avg backward comm/comp overlap time: 2234674
此外,TORCH_DISTRIBUTED_DEBUG=INFO 在 torch.nn.parallel.DistributedDataParallel() 中增强了崩溃日志记录,因为模型中存在未使用的参数。目前,如果在前向传递过程中可能有未使用的参数,则必须将 find_unused_parameters=True 传递给 torch.nn.parallel.DistributedDataParallel() 初始化,并且从 v1.10 开始,所有模型输出都必须用于损失计算,因为 torch.nn.parallel.DistributedDataParallel() 不支持在反向传递中使用未使用的参数。这些约束对于较大的模型来说尤其具有挑战性,因此当出现错误时,torch.nn.parallel.DistributedDataParallel() 将记录所有未使用参数的全限定名称。例如,在上述应用程序中,如果我们修改 loss 以计算为 loss = output[1],则 TwoLinLayerNet.a 在反向传递中不会接收到梯度,从而导致 DDP 失败。在崩溃时,用户会收到有关未使用参数的信息,这对于大型模型来说可能很难手动查找:
RuntimeError: Expected to have finished reduction in the prior iteration before starting a new one. This error indicates that your module has parameters that were not used in producing loss. You can enable unused parameter detection by passing
the keyword argument `find_unused_parameters=True` to `torch.nn.parallel.DistributedDataParallel`, and by
making sure all `forward` function outputs participate in calculating loss.
If you already have done the above, then the distributed data parallel module wasn't able to locate the output tensors in the return value of your module's `forward` function. Please include the loss function and the structure of the return va
lue of `forward` of your module when reporting this issue (e.g. list, dict, iterable).
Parameters which did not receive grad for rank 0: a.weight
Parameter indices which did not receive grad for rank 0: 0
设置 TORCH_DISTRIBUTED_DEBUG=DETAIL 将在用户发出的每个集体调用上触发额外的一致性和同步检查,无论是直接还是间接(如 DDP allreduce)。这是通过创建一个包装进程组来实现的,该进程组包装了由
torch.distributed.init_process_group() 和 torch.distributed.new_group() API 返回的所有进程组。因此,这些API将返回一个可以像普通进程组一样使用的包装进程组,但在将集体分派到底层进程组之前执行一致性检查。目前,这些检查包括一个 torch.distributed.monitored_barrier(),
它确保所有秩完成其未决的集体调用,并报告卡住的秩。接下来,通过确保所有集体函数匹配并且以一致的张量形状被调用来检查集体本身的一致性。如果不是这种情况,则会在应用程序崩溃时包含详细的错误报告,而不是挂起或无信息的错误消息。例如,考虑以下函数,其中输入形状与
torch.distributed.all_reduce() 不匹配:
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
def worker(rank):
dist.init_process_group("nccl", rank=rank, world_size=2)
torch.cuda.set_device(rank)
tensor = torch.randn(10 if rank == 0 else 20).cuda()
dist.all_reduce(tensor)
torch.cuda.synchronize(device=rank)
if __name__ == "__main__":
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "29501"
os.environ["TORCH_CPP_LOG_LEVEL"]="INFO"
os.environ["TORCH_DISTRIBUTED_DEBUG"] = "DETAIL"
mp.spawn(worker, nprocs=2, args=())
使用NCCL后端,这样的应用程序可能会导致挂起,在非平凡场景中很难找到根本原因。如果用户启用了TORCH_DISTRIBUTED_DEBUG=DETAIL并重新运行应用程序,则以下错误消息揭示了根本原因:
work = default_pg.allreduce([tensor], opts)
RuntimeError: Error when verifying shape tensors for collective ALLREDUCE on rank 0. This likely indicates that input shapes into the collective are mismatched across ranks. Got shapes: 10
[ torch.LongTensor{1} ]
注意
为了在运行时对调试级别进行细粒度控制,还可以使用函数torch.distributed.set_debug_level()、torch.distributed.set_debug_level_from_env()和torch.distributed.get_debug_level()。
此外,TORCH_DISTRIBUTED_DEBUG=DETAIL 可以与TORCH_SHOW_CPP_STACKTRACES=1 结合使用,在检测到集体去同步时记录整个调用堆栈。这些集体去同步检查适用于所有使用c10d 集体调用的应用程序,这些调用由使用torch.distributed.init_process_group() 和torch.distributed.new_group() API 创建的过程组支持。
日志记录¶
除了通过 torch.distributed.monitored_barrier() 和 TORCH_DISTRIBUTED_DEBUG 提供的显式调试支持外,torch.distributed 的底层 C++ 库还会在不同级别输出日志消息。这些消息有助于理解分布式训练任务的执行状态,并解决诸如网络连接失败等问题。以下矩阵显示了如何通过组合 TORCH_CPP_LOG_LEVEL 和 TORCH_DISTRIBUTED_DEBUG 环境变量来调整日志级别。
|
|
有效的日志级别 |
|---|---|---|
|
忽略 |
错误 |
|
忽略 |
警告 |
|
忽略 |
信息 |
|
|
调试 |
|
|
跟踪(也称为全部) |
分布式组件引发自RuntimeError派生的自定义异常类型:
torch.distributed.DistError: 这是所有分布式异常的基础类型。
torch.distributed.DistBackendError: 当出现特定于后端的错误时,会抛出此异常。例如,如果使用NCCL后端,并且用户尝试使用一个NCCL库无法访问的GPU。
torch.distributed.DistNetworkError: 此异常在网络库遇到错误时抛出(例如:连接重置)
torch.distributed.DistStoreError: 此异常在Store遇到错误时抛出(例如:TCPStore超时)
- class torch.distributed.DistError¶
当分布式库中发生错误时引发的异常
- class torch.distributed.DistBackendError¶
当分布式后端发生错误时引发的异常
- class torch.distributed.DistNetworkError¶
当分布式网络中发生错误时引发的异常
- class torch.distributed.DistStoreError¶
当分布式存储中发生错误时引发的异常
如果你正在运行单节点训练,可能方便在脚本中进行交互式断点。我们提供了一种方便的方法来为单个秩设置断点: