分布式通信包 - torch.distributed¶
注意
请参考 PyTorch 分布式概述,简要介绍与分布式训练相关的所有功能。
后端¶
torch.distributed
支持三个内置后端,每个后端都有
不同的功能。下表显示了可用的功能
用于 CPU/CUDA 张量。
仅当用于构建 PyTorch 的实现支持 CUDA 时,MPI 才支持 CUDA。
后端 |
|
|
|
|||
---|---|---|---|---|---|---|
装置 |
中央处理器 |
图形处理器 |
中央处理器 |
图形处理器 |
中央处理器 |
图形处理器 |
发送 |
✓ |
✘ |
✓ |
? |
✘ |
✓ |
recv |
✓ |
✘ |
✓ |
? |
✘ |
✓ |
广播 |
✓ |
✓ |
✓ |
? |
✘ |
✓ |
all_reduce |
✓ |
✓ |
✓ |
? |
✘ |
✓ |
减少 |
✓ |
✘ |
✓ |
? |
✘ |
✓ |
all_gather |
✓ |
✘ |
✓ |
? |
✘ |
✓ |
收集 |
✓ |
✘ |
✓ |
? |
✘ |
✓ |
散射 |
✓ |
✘ |
✓ |
? |
✘ |
✘ |
reduce_scatter |
✘ |
✘ |
✘ |
✘ |
✘ |
✓ |
all_to_all |
✘ |
✘ |
✓ |
? |
✘ |
✓ |
障碍 |
✓ |
✘ |
✓ |
? |
✘ |
✓ |
PyTorch 附带的后端¶
PyTorch 分布式包支持 Linux(稳定)、MacOS(稳定)和 Windows(原型)。 默认情况下,对于 Linux,Gloo 和 NCCL 后端构建并包含在 PyTorch 中 分布式(仅在使用 CUDA 构建时为 NCCL)。MPI 是一个可选的后端,它只能是 如果您从源代码构建 PyTorch,则包括 include。(例如,在具有 MPI 的主机上构建 PyTorch 安装。
使用哪个后端?¶
过去,我们经常被问到:“我应该使用哪个后端?
经验法则
使用 NCCL 后端进行分布式 GPU 训练
使用 Gloo 后端进行分布式 CPU 训练。
具有 InfiniBand 互连的 GPU 主机
使用 NCCL,因为它是目前唯一支持 InfiniBand 和 GPUDirect 的 API API
具有以太网互连的 GPU 主机
使用 NCCL,因为它目前提供了最好的分布式 GPU 训练性能,尤其是对于多进程单节点或 多节点分布式训练。如果您在使用 NCCL 中,请使用 Gloo 作为回退选项。(请注意,Gloo 目前 运行速度比 GPU 的 NCCL 慢。
具有 InfiniBand 互连的 CPU 主机
如果您的 InfiniBand 启用了 IP over IB,请使用 Gloo,否则, 请改用 MPI。我们计划添加 InfiniBand 支持 Gloo 在即将发布的版本中。
具有以太网互连的 CPU 主机
请使用 Gloo,除非您有使用 MPI 的特定原因。
常见环境变量¶
选择要使用的网络接口¶
默认情况下,NCCL 和 Gloo 后端都会尝试找到合适的网络接口来使用。 如果自动检测到的接口不正确,您可以使用以下命令覆盖它 环境变量(适用于相应的后端):
例如,NCCL_SOCKET_IFNAME
export NCCL_SOCKET_IFNAME=eth0
例如,GLOO_SOCKET_IFNAME
export GLOO_SOCKET_IFNAME=eth0
如果您使用的是 Gloo 后端,则可以通过将
他们用逗号表示,像这样: .
后端将以循环方式跨这些接口分派操作。
所有进程都必须在此变量中指定相同数量的接口。export GLOO_SOCKET_IFNAME=eth0,eth1,eth2,eth3
其他 NCCL 环境变量¶
NCCL 还提供了许多用于微调目的的环境变量。
常用的包括以下用于调试目的的:
export NCCL_DEBUG=INFO
export NCCL_DEBUG_SUBSYS=ALL
有关 NCCL 环境变量的完整列表,请参阅 NVIDIA NCCL 的官方文档
基本¶
torch.distributed 包提供 PyTorch 支持和通信原语
用于跨一个或多个上运行的多个计算节点的多进程并行性
机器。该类建立在此基础上
功能,以提供同步分布式训练作为任何
PyTorch 模型。这与 Multiprocessing 包 - torch.multiprocessing 提供的并行性类型不同,
因为它支持
多台联网机器,并且用户必须显式启动单独的
每个进程的主训练脚本的副本。
在单机同步情况下,torch.distributed 或包装器可能仍然比其他
数据并行的方法,包括
:
每个进程都维护自己的优化器,并对每个 迭 代。虽然这可能看起来多余,因为已经收集了梯度 一起并平均跨流程,因此对于每个流程都是相同的,这意味着 不需要参数广播步骤,从而减少了在 节点。
每个进程都包含一个独立的 Python 解释器,无需额外的解释器 开销和“GIL 抖动”,这来自驱动多个执行线程、模型 副本或 GPU。这对于 大量使用 Python 运行时,包括具有递归层或许多小 组件。
初始化¶
在调用任何其他方法之前,需要使用函数初始化包。这会阻塞,直到所有进程都
加入。
-
torch.distributed.
is_available
()[来源]¶ 如果分发的包可用,则返回。否则,不会公开任何其他 API。目前,可在 Linux、MacOS 和 Windows 上使用。设置为在从源构建 PyTorch 时启用它。 目前,默认值适用于 Linux 和 Windows,适用于 MacOS。
True
torch.distributed
torch.distributed
USE_DISTRIBUTED=1
USE_DISTRIBUTED=1
USE_DISTRIBUTED=0
-
torch.distributed.
init_process_group
(backend, init_method=None, timeout=datetime.timedelta(seconds=1800), world_size=- 1, rank=- 1, store=无, group_name='', pg_options=无)[来源]¶ 初始化默认的分布式进程组,这也将 初始化分发的包。
- 初始化进程组有两种主要方法:
指定 、 和 显式。
store
rank
world_size
指定(一个 URL 字符串),它指示位置/方式 以发现对等节点。(可选)指定 和 , 或者对 URL 中的所有必需参数进行编码并省略它们。
init_method
rank
world_size
如果未指定,则假定为 “env://”。
init_method
- 参数
backend (str 或 Backend) – 要使用的后端。根据 build-time 配置,有效值包括 、 、 和。此字段应为小写字符串 (例如, ),也可以通过属性(例如 )访问
。如果使用 每台具有后端的机器上有多个进程,每个进程 必须具有对它使用的每个 GPU 的独占访问权限,就像共享 GPU 一样 进程之间可能会导致死锁。
mpi
gloo
nccl
"gloo"
Backend.GLOO
nccl
init_method (str, optional) – 指定如何初始化 进程组。如果未指定 or,则默认为 “env://”。 与 互斥。
init_method
store
store
world_size (int, optional) – 参与的进程数 工作。如果指定,则为 required。
store
rank (int, optional) – 当前进程的 rank (它应该是 介于 0 和 -1 之间的数字)。 如果指定,则为 required。
world_size
store
store (Store,可选) – 所有工作人员均可访问的键/值存储,已使用 交换连接/地址信息。 与 互斥。
init_method
timeout (timedelta, optional) – 针对 进程组。默认值等于 30 分钟。 这适用于后端。对于 ,这是 仅当环境变量 OR 设置为 1 时适用。设置后,这是 process 将阻塞并等待 collectives 完成 引发异常。当 set 时, 这是 Collective 将中止的持续时间 异步,进程将崩溃。 将向用户提供可以捕获和处理的错误, 但是由于其阻塞性质,它会产生性能开销。上 另一方面,它几乎没有 性能开销,但在出现错误时会导致进程崩溃。这是 done 的,因为 CUDA 执行是异步的,并且不再安全 在异步 NCCL 操作失败后继续执行用户代码 可能会导致后续 CUDA 操作在损坏的 数据。只应设置这两个环境变量中的一个。
gloo
nccl
NCCL_BLOCKING_WAIT
NCCL_ASYNC_ERROR_HANDLING
NCCL_BLOCKING_WAIT
NCCL_ASYNC_ERROR_HANDLING
NCCL_BLOCKING_WAIT
NCCL_ASYNC_ERROR_HANDLING
group_name (str, optional, deprecated) – 组名称。
pg_options (ProcessGroupOptions,可选) – 进程组选项 指定在 特定流程组的构建。截至目前,唯一的 options 是针对后端的,可以指定 NCCL 后端可以在以下情况下获取高优先级 CUDA 流 有计算内核在等待。
ProcessGroupNCCL.Options
nccl
is_high_priority_stream
注意
要启用 ,需要从源构建 PyTorch 在支持 MPI 的系统上。
backend == Backend.MPI
-
torch.distributed.
is_torchelastic_launched
()[来源]¶ 检查此进程是否是使用 (aka torchelastic) 启动的。环境的存在 变量作为代理,判断当前进程是否 与 TorchElastic 一起启动。这是一个合理的代理,因为 map 到的 rendezvous id 始终是一个 非 null 值,指示用于对等发现目的的作业 ID..
torch.distributed.elastic
TORCHELASTIC_RUN_ID
TORCHELASTIC_RUN_ID
目前支持三种初始化方法:
TCP 初始化¶
使用 TCP 进行初始化有两种方法,都需要网络地址
可从所有进程访问,并且所需的 .第一种方法
需要指定属于 Rank 0 进程的地址。这
初始化方法要求所有进程都手动指定秩。world_size
请注意,最新分布式不再支持多播地址
包。 也被弃用。group_name
import torch.distributed as dist
# Use address of one of the machines
dist.init_process_group(backend, init_method='tcp://10.1.1.20:23456',
rank=args.rank, world_size=4)
环境变量初始化¶
此方法将从环境变量中读取配置,从而允许 一个用于完全自定义获取信息的方式。要设置的变量 是:
MASTER_PORT
-必填;必须是 rank 为 0 的计算机上的 free portMASTER_ADDR
- 必需(等级 0 除外);Rank 0 节点的地址WORLD_SIZE
-必填;可以在此处设置,也可以在调用 init 函数中设置RANK
-必填;可以在此处设置,也可以在调用 init 函数中设置
等级为 0 的计算机将用于设置所有连接。
这是默认方法,这意味着不必指定(或
可以是 )。init_method
env://
初始化后¶
运行后,可以使用以下函数。自
检查进程组是否已初始化 使用
。
-
class (name)[来源]
torch.distributed.
Backend
¶ 可用后端的类似枚举的类:GLOO、NCCL、MPI 和其他已注册的 backends 的
此类的值为小写字符串,例如 .他们可以 作为属性访问,例如 .
"gloo"
Backend.NCCL
这个类可以直接调用来解析字符串,例如,会检查是否有效,并且 如果是这样,则返回解析后的小写字符串。它还接受大写字符串, 例如,返回 .
Backend(backend_str)
backend_str
Backend("GLOO")
"gloo"
注意
该条目存在,但仅用作 某些字段的初始值。用户不应直接使用它 也不假设它的存在。
Backend.UNDEFINED
-
torch.distributed.
get_backend
(group=None)[来源]¶ 返回给定进程组的后端。
- 参数
group (ProcessGroup,可选) – 要处理的流程组。这 default 是 General Main Process 组。如果另一个特定组 时,调用进程必须是 的一部分。
group
- 返回
给定进程组的后端,以小写字符串表示。
分布式 Key-Value Store¶
分布式包自带分布式键值存储,可以是
用于在组中的进程之间共享信息,以及
在 (通过显式创建存储
作为指定 .)有 3 种选择
键值存储:
、
和
.
init_method
-
类
torch.distributed.
Store
¶
-
类
torch.distributed.
TCPStore
¶ 基于 TCP 的分布式键值存储实现。服务器存储包含 数据,而客户端存储可以通过 TCP 连接到服务器存储,并且 执行插入键值等操作 pair,检索键值对等。那里 应始终是一个服务器存储初始化,因为客户端存储将等待 用于建立连接的服务器。
set()
get()
- 参数
host_name (str) – 服务器存储应在其上运行的主机名或 IP 地址。
port (int) – 服务器存储应侦听传入请求的端口。
world_size (int, optional) – 存储用户总数 (客户端数 + 服务器 1)。默认值为 -1(负值表示非固定的商店用户数)。
is_master (bool, optional) – 初始化服务器存储时为 True,客户端存储为 False。默认值为 False。
timeout (timedelta,可选) – 存储区在初始化期间以及 和 等方法使用的超时。默认值为 timedelta(seconds=300)
get()
wait()
wait_for_worker (bool, optional) – 是否等待所有工作程序与服务器存储连接。仅当 world_size 为 固定 值时,这才适用。默认值为 True。
- 例::
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Run on process 1 (server) >>> server_store = dist.TCPStore("127.0.0.1", 1234, 2, True, timedelta(seconds=30)) >>> # Run on process 2 (client) >>> client_store = dist.TCPStore("127.0.0.1", 1234, 2, False) >>> # Use any of the store methods from either the client or server after initialization >>> server_store.set("first_key", "first_value") >>> client_store.get("first_key")
-
类
torch.distributed.
HashStore
¶ 基于底层 hashmap 的线程安全存储实现。此 store 可以使用 在同一进程中(例如,由其他线程),但不能跨进程使用。
- 例::
>>> import torch.distributed as dist >>> store = dist.HashStore() >>> # store can be used from other threads >>> # Use any of the store methods after initialization >>> store.set("first_key", "first_value")
-
类
torch.distributed.
FileStore
¶ 一种 store 实现,它使用文件来存储底层键值对。
- 例::
>>> import torch.distributed as dist >>> store1 = dist.FileStore("/tmp/filestore", 2) >>> store2 = dist.FileStore("/tmp/filestore", 2) >>> # Use any of the store methods from either the client or server after initialization >>> store1.set("first_key", "first_value") >>> store2.get("first_key")
-
类
torch.distributed.
PrefixStore
¶ 3 个键值存储(
、
和
)中任意一个的包装器 这会为插入到存储中的每个键添加一个前缀。
- 参数
prefix (str) – 在插入到存储区之前添加到每个键前面的前缀字符串。
store (torch.distributed.store) – 构成底层键值存储的 store 对象。
-
torch.distributed.Store.
set
(自我:torch._C._distributed_c10d。存储、arg0:str、arg1:str) → 无¶ 根据提供的 和 将键值对插入到存储中。如果 store 中已经存在,它将覆盖旧的 值替换为新提供的 .
key
value
key
value
- 例::
>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key", "first_value") >>> # Should return "first_value" >>> store.get("first_key")
-
torch.distributed.Store.
get
(自我:torch._C._distributed_c10d。存储,arg0: str) → 字节¶ 检索与存储中给定的值关联的值。如果不是 存在于 store 中,该函数将等待 ,这是定义的 初始化 store 时,在引发异常之前。
key
key
timeout
- 参数
key (str) – 该函数将返回与此键关联的值。
- 返回
与 if 关联的值在存储中。
key
key
- 例::
>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key", "first_value") >>> # Should return "first_value" >>> store.get("first_key")
-
torch.distributed.Store.
add
(自我:torch._C._distributed_c10d。存储, arg0: str, arg1: int) → int¶ 对给定的 add 的第一次调用会创建一个关联的计数器 with 在 store 中,初始化为 .后续调用以添加 以相同的增量将计数器替换为指定的 . 使用已具有 在 store 中设置 will result 在异常中。
key
key
amount
key
amount
add()
set()
- 例::
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.add("first_key", 1) >>> store.add("first_key", 6) >>> # Should return 7 >>> store.get("first_key")
-
torch.distributed.Store.
compare_set
(自我:torch._C._distributed_c10d。存储、arg0:str、arg1:str、arg2:str) → 字节¶ 根据提供的 和 在 和 插入之前执行比较。 仅当 for the store 中已存在 或 is 空字符串时,才会设置。
key
expected_value
desired_value
desired_value
expected_value
key
expected_value
- 参数
- 例::
>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("key", "first_value") >>> store.compare_set("key", "first_value", "second_value") >>> # Should return "second_value" >>> store.get("key")
-
torch.distributed.Store.
wait
(*args, **kwargs)¶ overloaded 函数。
wait(self: torch._C._distributed_c10d.存储,arg0: List[str]) -> 无
等待将每个密钥添加到存储中。如果不是所有键都是 set before the (set during store initialization) 之前设置),则会引发异常。
keys
timeout
wait
- 参数
keys (list) – 在存储中设置它们之前要等待的键列表。
- 例::
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> # This will throw an exception after 30 seconds >>> store.wait(["bad_key"])
wait(self: torch._C._distributed_c10d.存储, arg0: List[str], arg1: datetime.timedelta) -> 无
等待将每个 key in 添加到存储中,并引发异常 如果提供的 .
keys
timeout
- 参数
keys (list) – 在存储中设置它们之前要等待的键列表。
timeout (timedelta) – 在引发异常之前等待添加键的时间。
- 例::
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> # This will throw an exception after 10 seconds >>> store.wait(["bad_key"], timedelta(seconds=10))
-
torch.distributed.Store.
num_keys
(自我:torch._C._distributed_c10d。存储) → int¶ 返回 store 中设置的键数。请注意,此数字通常会 比 和 添加的键数大 1,因为一个键用于协调所有 使用 store 的工作人员。
set()
add()
- 返回
存储中存在的键数。
- 例::
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key", "first_value") >>> # This should return 2 >>> store.num_keys()
-
torch.distributed.Store.
delete_key
(自我:torch._C._distributed_c10d。存储,arg0: str) → bool¶ 从存储中删除 关联的键值对。如果成功删除了键,则返回 true,如果未成功删除,则返回 false。
key
- 参数
key (str) – 要从存储中删除的密钥
- 返回
如果已删除,则为 True,否则为 False。
key
- 例::
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, HashStore can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key") >>> # This should return true >>> store.delete_key("first_key") >>> # This should return false >>> store.delete_key("bad_key")
-
torch.distributed.Store.
set_timeout
(自我:torch._C._distributed_c10d。存储,arg0: datetime.timedelta) → None¶ 设置 store 的默认超时。此超时在初始化期间以及 in 和 中使用。
wait()
get()
- 参数
timeout (timedelta) – 要在 store 中设置的超时。
- 例::
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set_timeout(timedelta(seconds=10)) >>> # This will throw an exception after 10 seconds >>> store.wait(["bad_key"])
组¶
默认情况下,集合对 default 组(也称为 world)进行操作,并且
要求所有进程都进入分布式函数调用。但是,某些工作负载可能会受益
来自更细粒度的通信。这就是分布式组的用武之地
进入游戏。函数可以是
用于创建新组,其中包含所有进程的任意子集。它返回
一个不透明的组句柄,可以作为参数提供给所有集合
(集合是分布式函数,用于在某些众所周知的编程模式中交换信息)。
group
-
torch.distributed.
new_group
(ranks=None, timeout=datetime.timedelta(seconds=1800),backend=None, pg_options=None)[来源]¶ 创建新的分布式组。
此功能要求主组中的所有进程(即所有 作为分布式作业一部分的进程)输入此功能,甚至 如果他们不打算成为该组的成员。此外,组 应在所有进程中以相同的顺序创建。
警告
在后端同时使用多个进程组 不安全,用户应在 他们的应用程序来确保一次只使用一个进程组。 这意味着来自一个流程组的集合应该已经完成 执行(不仅仅是排队,因为 CUDA 执行是 async) 的 Collectives 之前。 有关更多详细信息,请参阅同时使用多个 NCCL 通讯器。
NCCL
- 参数
ranks (list[int]) – 组成员的排名列表。如果 ,将为 设置为 All ranks。默认值为 。
None
None
timeout (timedelta, optional) – 针对 进程组。默认值等于 30 分钟。 这适用于后端。对于 ,这是 仅当环境变量 OR 设置为 1 时适用。设置后,这是 process 将阻塞并等待 collectives 完成 引发异常。当 set 时, 这是 Collective 将中止的持续时间 异步,进程将崩溃。 将向用户提供可以捕获和处理的错误, 但是由于其阻塞性质,它会产生性能开销。上 另一方面,它几乎没有 性能开销,但在出现错误时会导致进程崩溃。这是 done 的,因为 CUDA 执行是异步的,并且不再安全 在异步 NCCL 操作失败后继续执行用户代码 可能会导致后续 CUDA 操作在损坏的 数据。只应设置这两个环境变量中的一个。
gloo
nccl
NCCL_BLOCKING_WAIT
NCCL_ASYNC_ERROR_HANDLING
NCCL_BLOCKING_WAIT
NCCL_ASYNC_ERROR_HANDLING
NCCL_BLOCKING_WAIT
NCCL_ASYNC_ERROR_HANDLING
backend (str 或 Backend,可选) – 要使用的后端。根据 构建时配置,有效值为 和 。 默认情况下,使用与全局组相同的后端。此字段 应该以小写字符串的形式给出(例如 ),它可以 也可以通过属性(例如 )进行访问
。如果传入,则后端 将使用与默认进程组对应的进程组。默认值为 。
gloo
nccl
"gloo"
Backend.GLOO
None
None
pg_options (ProcessGroupOptions,可选) – 进程组选项 指定在 特定流程组的构建。即,对于后端,可以指定 进程组可以选择高优先级的 CUDA 流。
nccl
is_high_priority_stream
- 返回
可提供给集体调用的分布式组的句柄。
点对点通信¶
并在
使用时返回分布式请求对象。通常,此对象的类型未指定
因为它们永远不应该手动创建,但保证它们支持两种方法:
is_completed()
- 如果操作已完成,则返回 Truewait()
- 将阻止进程,直到操作完成。 保证在返回后返回 True。is_completed()
同步和异步集合操作¶
每个集合运算函数都支持以下两种运算,
根据传递到 Collective 的标志的设置:async_op
Synchronous operation - 默认模式,设置为 时。
当函数返回时,保证
执行集体操作。对于 CUDA 操作,不能保证
CUDA 操作已完成,因为 CUDA 操作是异步的。对于 CPU 集合体,任何
利用集体调用输出的进一步函数调用将按预期运行。对于 CUDA 集合,
利用同一 CUDA 流上的输出的函数调用将按预期运行。用户必须注意
在不同流下运行场景下的同步。有关 CUDA 语义的详细信息,例如 stream
同步,请参阅 CUDA 语义。
请参阅以下脚本,查看 CPU 和 CUDA 操作的这些语义差异的示例。async_op
False
异步操作 - 当设置为 True 时。集合操作函数
返回 Distributed Request 对象。通常,您不需要手动创建它,并且它
保证支持两种方法:async_op
is_completed()
- 对于 CPU 集合,则返回已完成。对于 CUDA 操作, 如果操作已成功排入 CUDA 流,并且输出可以在 default 流,无需进一步同步。True
True
wait()
- 对于 CPU 集合,将阻止进程,直到操作完成。在这种情况下 的 CUDA 集合中,将阻塞,直到操作成功排入 CUDA 流中,并且 output 可以在 default 流上使用,而无需进一步同步。get_future()
- 返回 object。支持 NCCL,也支持 GLOO 上的大多数操作 和 MPI,但点对点操作除外。 注意:随着我们继续采用 Futures 和合并 API,调用可能会变得多余。torch._C.Future
get_future()
例
以下代码可以作为使用分布式集合时 CUDA 操作语义的参考。 它显示了在不同的 CUDA 流上使用集体输出时需要同步的明确需求:
# Code runs on each rank.
dist.init_process_group("nccl", rank=rank, world_size=2)
output = torch.tensor([rank]).cuda(rank)
s = torch.cuda.Stream()
handle = dist.all_reduce(output, async_op=True)
# Wait ensures the operation is enqueued, but not necessarily complete.
handle.wait()
# Using result on non-default stream.
with torch.cuda.stream(s):
s.wait_stream(torch.cuda.default_stream())
output.add_(100)
if rank == 0:
# if the explicit call to wait_stream was omitted, the output below will be
# non-deterministically 1 or 101, depending on whether the allreduce overwrote
# the value after the add completed.
print(output)
集合函数¶
-
torch.distributed.
broadcast
(张量、src、group=None、async_op=False)[来源]¶ 将 Tensor 广播到整个 Group。
tensor
在所有进程中必须具有相同数量的元素 参与集体。
-
torch.distributed.
broadcast_object_list
(object_list, src=0, group=None, device=None)[来源]¶ 将可腌制对象广播到整个组中。类似 to
中,但可以传入 Python 对象。 请注意,中的所有对象都必须是可 picklable 的,才能 播出。
object_list
object_list
- 参数
object_list (List[Any]) – 要广播的输入对象列表。 每个对象都必须是可腌制的。只有等级上的对象才会 广播,但每个排名必须提供大小相等的列表。
src
src (int) – 要从中广播的源排名。
object_list
group —(ProcessGroup,可选):要处理的流程组。如果为 None,则 将使用默认进程组。默认值为 。
None
device ( , 可选 ) – 如果不是 None,则对象为 serialized 并转换为 Tensors,这些张量被移动到 Before broadcasting 中。默认值为 。
torch.device
device
None
- 返回
None
.如果 rank 是组的一部分,则将包含 从 rank 广播对象。object_list
src
注意
对于基于 NCCL 的处理组,内部张量表示 的对象必须在通信之前移动到 GPU 设备 地方。在这种情况下,使用的设备由 用户提供,用户有责任 确保此设置,以便每个排名都有一个单独的 GPU(通过 )。
torch.cuda.current_device()
torch.cuda.set_device()
- 例::
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> if dist.get_rank() == 0: >>> # Assumes world_size of 3. >>> objects = ["foo", 12, {1: 2}] # any picklable object >>> else: >>> objects = [None, None, None] >>> # Assumes backend is not NCCL >>> device = torch.device("cpu") >>> dist.broadcast_object_list(objects, src=0, device=device) >>> objects ['foo', 12, {1: 2}]
-
torch.distributed.
all_reduce
(张量,op=<ReduceOp.SUM:0>,group=None,async_op=False)[来源]¶ 减少所有机器上的张量数据,使所有机器都得到 最终结果。
After the call 在所有进程中都将按位相同。
tensor
支持复杂张量。
- 参数
- 返回
异步工作句柄(如果 async_op 设置为 True)。 无,如果不是 async_op 或不属于组
例子
>>> # All tensors below are of torch.int64 type. >>> # We have 2 process groups, 2 ranks. >>> tensor = torch.arange(2, dtype=torch.int64) + 1 + 2 * rank >>> tensor tensor([1, 2]) # Rank 0 tensor([3, 4]) # Rank 1 >>> dist.all_reduce(tensor, op=ReduceOp.SUM) >>> tensor tensor([4, 6]) # Rank 0 tensor([4, 6]) # Rank 1
>>> # All tensors below are of torch.cfloat type. >>> # We have 2 process groups, 2 ranks. >>> tensor = torch.tensor([1+1j, 2+2j], dtype=torch.cfloat) + 2 * rank * (1+1j) >>> tensor tensor([1.+1.j, 2.+2.j]) # Rank 0 tensor([3.+3.j, 4.+4.j]) # Rank 1 >>> dist.all_reduce(tensor, op=ReduceOp.SUM) >>> tensor tensor([4.+4.j, 6.+6.j]) # Rank 0 tensor([4.+4.j, 6.+6.j]) # Rank 1
-
torch.distributed.
reduce
(张量,dst,op=<ReduceOp.SUM:0>,group=None,async_op=False)[来源]¶ 减少所有计算机的张量数据。
只有具有 rank 的进程才会收到最终结果。
dst
-
torch.distributed.
all_gather
(tensor_list,张量,组=无,async_op=False)[来源]¶ 从列表中的整个组中收集张量。
支持复杂张量。
- 参数
- 返回
异步工作句柄(如果 async_op 设置为 True)。 无,如果不是 async_op 或不属于组
例子
>>> # All tensors below are of torch.int64 dtype. >>> # We have 2 process groups, 2 ranks. >>> tensor_list = [torch.zeros(2, dtype=torch.int64) for _ in range(2)] >>> tensor_list [tensor([0, 0]), tensor([0, 0])] # Rank 0 and 1 >>> tensor = torch.arange(2, dtype=torch.int64) + 1 + 2 * rank >>> tensor tensor([1, 2]) # Rank 0 tensor([3, 4]) # Rank 1 >>> dist.all_gather(tensor_list, tensor) >>> tensor_list [tensor([1, 2]), tensor([3, 4])] # Rank 0 [tensor([1, 2]), tensor([3, 4])] # Rank 1
>>> # All tensors below are of torch.cfloat dtype. >>> # We have 2 process groups, 2 ranks. >>> tensor_list = [torch.zeros(2, dtype=torch.cfloat) for _ in range(2)] >>> tensor_list [tensor([0.+0.j, 0.+0.j]), tensor([0.+0.j, 0.+0.j])] # Rank 0 and 1 >>> tensor = torch.tensor([1+1j, 2+2j], dtype=torch.cfloat) + 2 * rank * (1+1j) >>> tensor tensor([1.+1.j, 2.+2.j]) # Rank 0 tensor([3.+3.j, 4.+4.j]) # Rank 1 >>> dist.all_gather(tensor_list, tensor) >>> tensor_list [tensor([1.+1.j, 2.+2.j]), tensor([3.+3.j, 4.+4.j])] # Rank 0 [tensor([1.+1.j, 2.+2.j]), tensor([3.+3.j, 4.+4.j])] # Rank 1
-
torch.distributed.
all_gather_object
(object_list, obj, group=None)[来源]¶ 将整个组中的可腌制对象收集到一个列表中。与
类似,但可以传入 Python 对象。请注意,对象 必须可腌制才能被收集。
- 参数
object_list (list[Any]) – 输出列表。它的大小应该正确地调整为 size 的组,并将包含输出。
object (Any) – 要从当前进程广播的可选取 Python 对象。
group (ProcessGroup,可选) – 要处理的流程组。如果为 None,则 将使用默认进程组。默认值为 。
None
- 返回
没有。如果调用秩是此组的一部分,则 collective 将填充到 input 中。如果 调用 rank 不属于组,传入的 未修改。
object_list
object_list
注意
对于基于 NCCL 的处理组,内部张量表示 的对象必须在通信之前移动到 GPU 设备 地方。在这种情况下,使用的设备由 用户提供,用户有责任 确保此设置,以便每个排名都有一个单独的 GPU(通过 )。
torch.cuda.current_device()
torch.cuda.set_device()
- 例::
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> # Assumes world_size of 3. >>> gather_objects = ["foo", 12, {1: 2}] # any picklable object >>> output = [None for _ in gather_objects] >>> dist.all_gather_object(output, gather_objects[dist.get_rank()]) >>> output ['foo', 12, {1: 2}]
-
torch.distributed.
gather_object
(obj, object_gather_list=无, dst=0, group=无)[来源]¶ 在单个进程中从整个组中收集可腌制对象。 与
类似,但可以传入 Python 对象。请注意, object 必须是可腌制的才能被收集。
- 参数
- 返回
没有。在排名上,将包含 output 的 Collective 的 output 的 Portfolio
dst
object_gather_list
注意
请注意,此 API 与 gather collective 略有不同 因为它不提供async_op句柄,因此会阻塞 叫。
注意
对于基于 NCCL 的处理组,内部张量表示 的对象必须在通信之前移动到 GPU 设备 地方。在这种情况下,使用的设备由 用户提供,用户有责任 确保此设置,以便每个排名都有一个单独的 GPU(通过 )。
torch.cuda.current_device()
torch.cuda.set_device()
- 例::
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> # Assumes world_size of 3. >>> gather_objects = ["foo", 12, {1: 2}] # any picklable object >>> output = [None for _ in gather_objects] >>> dist.gather_object( gather_objects[dist.get_rank()], output if dist.get_rank() == 0 else None, dst=0 ) >>> # On rank 0 >>> output ['foo', 12, {1: 2}]
-
torch.distributed.
scatter
(张量,scatter_list=无,src=0,组=无,async_op=False)[来源]¶ 将张量列表分散到组中的所有进程。
每个进程将只接收一个张量并将其数据存储在参数中。
tensor
支持复杂张量。
-
torch.distributed.
scatter_object_list
(scatter_object_output_list, scatter_object_input_list, src=0, group=None)[来源]¶ 将可腌制对象分散到整体中 群。与
类似,但可以传入 Python 对象。上 每个 rank,scattered 对象将存储为 的第一个元素。请注意,中的所有对象都必须是可腅制的,以便进行分散。
scatter_object_input_list
scatter_object_output_list
scatter_object_input_list
- 参数
scatter_object_output_list (List[Any]) – 其第一个 元素将存储分散到此 rank 的对象。
scatter_object_input_list (List[Any]) – 要散布的输入对象列表。 每个对象都必须是可腌制的。只有等级上的对象才会 是 scattered,并且参数可以是非 src 秩。
src
None
src (int) – 要从中分散的源排名。
scatter_object_input_list
group —(ProcessGroup,可选):要处理的流程组。如果为 None,则 将使用默认进程组。默认值为 。
None
- 返回
None
.如果 rank 是组的一部分,则将其第一个元素设置为此 rank 的分散对象。scatter_object_output_list
注意
请注意,此 API 与 Scatter Collective 略有不同 因为它不提供句柄,因此将是一个 blocking 调用。
async_op
注意
请注意,此 API 不支持 NCCL 后端,因为 ProcessGroupNCCL 不支持基于张量的分散集合体。
- 例::
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> if dist.get_rank() == 0: >>> # Assumes world_size of 3. >>> objects = ["foo", 12, {1: 2}] # any picklable object >>> else: >>> # Can be any list on non-src ranks, elements are not used. >>> objects = [None, None, None] >>> output_list = [None] >>> dist.scatter_object_list(output_list, objects, src=0) >>> # Rank i gets objects[i]. For example, on rank 2: >>> output_list [{1: 2}]
-
torch.distributed.
reduce_scatter
(output, input_list, op=<ReduceOp.SUM: 0>, group=None, async_op=False)[来源]¶ Reduce,然后将张量列表分散到组中的所有进程。
-
torch.distributed.
all_to_all
(output_tensor_list、input_tensor_list、组=无、async_op=False)[来源]¶ 每个进程将输入张量列表分散到一个组中的所有进程,并且 返回 output list 中收集的 Tensors 列表。
支持复杂张量。
- 参数
- 返回
异步工作句柄(如果 async_op 设置为 True)。 如果不是 async_op 或不是组的一部分,则无。
警告
all_to_all 是实验性的,可能会发生变化。
例子
>>> input = torch.arange(4) + rank * 4 >>> input = list(input.chunk(4)) >>> input [tensor([0]), tensor([1]), tensor([2]), tensor([3])] # Rank 0 [tensor([4]), tensor([5]), tensor([6]), tensor([7])] # Rank 1 [tensor([8]), tensor([9]), tensor([10]), tensor([11])] # Rank 2 [tensor([12]), tensor([13]), tensor([14]), tensor([15])] # Rank 3 >>> output = list(torch.empty([4], dtype=torch.int64).chunk(4)) >>> dist.all_to_all(output, input) >>> output [tensor([0]), tensor([4]), tensor([8]), tensor([12])] # Rank 0 [tensor([1]), tensor([5]), tensor([9]), tensor([13])] # Rank 1 [tensor([2]), tensor([6]), tensor([10]), tensor([14])] # Rank 2 [tensor([3]), tensor([7]), tensor([11]), tensor([15])] # Rank 3
>>> # Essentially, it is similar to following operation: >>> scatter_list = input >>> gather_list = output >>> for i in range(world_size): >>> dist.scatter(gather_list[i], scatter_list if i == rank else [], src = i)
>>> input tensor([0, 1, 2, 3, 4, 5]) # Rank 0 tensor([10, 11, 12, 13, 14, 15, 16, 17, 18]) # Rank 1 tensor([20, 21, 22, 23, 24]) # Rank 2 tensor([30, 31, 32, 33, 34, 35, 36]) # Rank 3 >>> input_splits [2, 2, 1, 1] # Rank 0 [3, 2, 2, 2] # Rank 1 [2, 1, 1, 1] # Rank 2 [2, 2, 2, 1] # Rank 3 >>> output_splits [2, 3, 2, 2] # Rank 0 [2, 2, 1, 2] # Rank 1 [1, 2, 1, 2] # Rank 2 [1, 2, 1, 1] # Rank 3 >>> input = list(input.split(input_splits)) >>> input [tensor([0, 1]), tensor([2, 3]), tensor([4]), tensor([5])] # Rank 0 [tensor([10, 11, 12]), tensor([13, 14]), tensor([15, 16]), tensor([17, 18])] # Rank 1 [tensor([20, 21]), tensor([22]), tensor([23]), tensor([24])] # Rank 2 [tensor([30, 31]), tensor([32, 33]), tensor([34, 35]), tensor([36])] # Rank 3 >>> output = ... >>> dist.all_to_all(output, input) >>> output [tensor([0, 1]), tensor([10, 11, 12]), tensor([20, 21]), tensor([30, 31])] # Rank 0 [tensor([2, 3]), tensor([13, 14]), tensor([22]), tensor([32, 33])] # Rank 1 [tensor([4]), tensor([15, 16]), tensor([23]), tensor([34, 35])] # Rank 2 [tensor([5]), tensor([17, 18]), tensor([24]), tensor([36])] # Rank 3
>>> # Another example with tensors of torch.cfloat type. >>> input = torch.tensor([1+1j, 2+2j, 3+3j, 4+4j], dtype=torch.cfloat) + 4 * rank * (1+1j) >>> input = list(input.chunk(4)) >>> input [tensor([1+1j]), tensor([2+2j]), tensor([3+3j]), tensor([4+4j])] # Rank 0 [tensor([5+5j]), tensor([6+6j]), tensor([7+7j]), tensor([8+8j])] # Rank 1 [tensor([9+9j]), tensor([10+10j]), tensor([11+11j]), tensor([12+12j])] # Rank 2 [tensor([13+13j]), tensor([14+14j]), tensor([15+15j]), tensor([16+16j])] # Rank 3 >>> output = list(torch.empty([4], dtype=torch.int64).chunk(4)) >>> dist.all_to_all(output, input) >>> output [tensor([1+1j]), tensor([5+5j]), tensor([9+9j]), tensor([13+13j])] # Rank 0 [tensor([2+2j]), tensor([6+6j]), tensor([10+10j]), tensor([14+14j])] # Rank 1 [tensor([3+3j]), tensor([7+7j]), tensor([11+11j]), tensor([15+15j])] # Rank 2 [tensor([4+4j]), tensor([8+8j]), tensor([12+12j]), tensor([16+16j])] # Rank 3
-
torch.distributed.
barrier
(group=None, async_op=False, device_ids=None)[来源]¶ 同步所有进程。
这个 collective 会阻塞进程,直到整个 group 进入这个函数, 如果 async_op 为 False,或者如果在 wait() 上调用异步工作句柄。
-
torch.distributed.
monitored_barrier
(group=None, timeout=None, wait_all_ranks=False)[来源]¶ 同步所有进程,类似于 ,但采用 可配置的超时,并且能够报告未通过此 barrier 的 barrier 中。具体来说,对于非零等级,将阻止 直到从排名 0 处理 send/recv。等级 0 将阻止,直到所有发送 处理来自其他排名的 /recv,并将报告排名失败 未能及时做出回应。请注意,如果一个等级未达到 monitored_barrier(例如,由于挂起),所有其他等级都将失败 在 monitored_barrier。
torch.distributed.barrier
此 collective 将阻止组中的所有进程/排名,直到 整个 group 成功退出函数,使其可用于调试 和同步。但是,它可能会对性能产生影响,并且只应 用于调试或需要完全同步点的方案 在主机端。出于调试目的,可以插入此屏障 在应用程序的集体调用之前,检查是否有任何秩为 desynchronized。
注意
请注意,此 collective 仅支持 GLOO 后端。
- 参数
group (ProcessGroup,可选) – 要处理的流程组。如果 ,将使用默认进程组。
None
timeout (datetime.timedelta,可选) – monitored_barrier的超时。 如果 ,将使用默认的进程组超时。
None
wait_all_ranks (bool, optional) – 是收集所有失败的排名还是 不。默认情况下,这是 and on rank 0 将抛出它遇到的第一个失败的等级以失败 快。通过设置 will 收集所有失败的排名并引发包含信息的错误 关于所有失败的等级。
False
monitored_barrier
wait_all_ranks=True
monitored_barrier
- 返回
None
.
- 例::
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> if dist.get_rank() != 1: >>> dist.monitored_barrier() # Raises exception indicating that >>> # rank 1 did not call into monitored_barrier. >>> # Example with wait_all_ranks=True >>> if dist.get_rank() == 0: >>> dist.monitored_barrier(wait_all_ranks=True) # Raises exception >>> # indicating that ranks 1, 2, ... world_size - 1 did not call into >>> # monitored_barrier.
-
类
torch.distributed.
ReduceOp
¶ 可用归约运算的类似枚举的类:、、、、 和 。
SUM
AVG
PRODUCT
MIN
MAX
BAND
BOR
BXOR
BAND
、 和 减少 在以下情况下不可用 使用后端。BOR
BXOR
NCCL
AVG
将值除以世界大小,然后在各个等级之间求和。 仅适用于后端, 并且仅适用于 NCCL 版本 2.10 或更高版本。AVG
NCCL
此外,复杂张量不支持 和 。
MAX
MIN
PRODUCT
此类的值可以作为属性访问,例如 . 它们用于指定归约集合的策略,例如 ,
,
, 等。
ReduceOp.SUM
成员:
和
平均 (AVG)
产品
最小
麦克斯
乐队
博尔
BXOR
-
类
torch.distributed.
reduce_op
¶ 已弃用的类似 enum 的类 reduction 操作:、、 和 。
SUM
PRODUCT
MIN
MAX
分析 Collective 通信¶
请注意,您可以使用 (推荐,仅在 1.8.1 之后可用) 或分析此处提到的集体通信和点对点通信 API。支持所有开箱即用的后端 (、 、 ),并且集体通信使用将在分析输出/跟踪中按预期呈现。分析代码与任何常规 torch 操作符相同:torch.profiler
torch.autograd.profiler
gloo
nccl
mpi
import torch
import torch.distributed as dist
with torch.profiler():
tensor = torch.randn(20, 10)
dist.all_reduce(tensor)
请参阅 Profiler 文档,以全面了解 Profiler 功能。
多 GPU 集合函数¶
如果每个节点上有多个 GPU,则在使用 NCCL 和 Gloo 后端时,
并支持
分布式 collective
操作。这些功能可能会
提升整体分布式训练性能,方便使用
传递张量列表。传递的 Tensor 列表中的每个 Tensor 都需要
位于调用该函数的主机的单独 GPU 设备上。注意
张量列表的长度在所有
分布式进程。另请注意,目前的多 GPU 集体
函数仅支持 NCCL 后端。
例如,如果我们用于分布式训练的系统有 2 个节点,则每个节点 其中有 8 个 GPU。在 16 个 GPU 中的每一个 GPU 上,我们都有一个 喜欢 all-reduce。以下代码可以作为参考:
在节点 0 上运行的代码
import torch
import torch.distributed as dist
dist.init_process_group(backend="nccl",
init_method="file:///distributed_test",
world_size=2,
rank=0)
tensor_list = []
for dev_idx in range(torch.cuda.device_count()):
tensor_list.append(torch.FloatTensor([1]).cuda(dev_idx))
dist.all_reduce_multigpu(tensor_list)
在节点 1 上运行的代码
import torch
import torch.distributed as dist
dist.init_process_group(backend="nccl",
init_method="file:///distributed_test",
world_size=2,
rank=1)
tensor_list = []
for dev_idx in range(torch.cuda.device_count()):
tensor_list.append(torch.FloatTensor([1]).cuda(dev_idx))
dist.all_reduce_multigpu(tensor_list)
调用后,两个节点上的所有 16 个张量都将具有 all-reduced 值 共 16 页
-
torch.distributed.
broadcast_multigpu
(tensor_list, src, group=None, async_op=False, src_tensor=0)[来源]¶ 将 Tensor 广播到具有多个 GPU Tensor 的整个组 每个节点。
tensor
在所有 GPU 中必须具有相同数量的元素 所有参与集体的进程。列表中的每个张量都必须 位于不同的 GPU 上目前仅支持 nccl 和 gloo 后端 张量只能是 GPU 张量
- 参数
tensor_list (List[Tensor]) – 参与 collective 的 Tensor 操作。如果 是排名,则 () 的指定元素将为 广播到 src 进程中的所有其他张量(在不同 GPU 上) 以及其他非 src 进程中的所有张量。 您还需要确保这是相同的 对于所有调用此函数的分布式进程。
src
src_tensor
tensor_list
tensor_list[src_tensor]
tensor_list
len(tensor_list)
src (int) – 源排名。
group (ProcessGroup,可选) – 要处理的流程组。如果为 None,则 将使用默认进程组。
async_op (bool, optional) – 此运算是否应为异步运算
src_tensor (int, optional) – 源张量排名
tensor_list
- 返回
异步工作句柄(如果 async_op 设置为 True)。 无,如果不是 async_op 或不属于组
-
torch.distributed.
all_reduce_multigpu
(tensor_list,op=<ReduceOp.SUM:0>,group=None,async_op=False)[来源]¶ 减少所有机器上的张量数据,使所有机器都得到 最终结果。此函数减少了每个节点上的张量数量, 而每个张量驻留在不同的 GPU 上。 因此,tensor 列表中的 input tensor 需要是 GPU Tensor。 此外,张量列表中的每个张量都需要驻留在不同的 GPU 上。
调用后,all in 将是按位的 所有过程都相同。
tensor
tensor_list
支持复杂张量。
目前仅支持 nccl 和 gloo 后端 张量只能是 GPU 张量
- 参数
- 返回
异步工作句柄(如果 async_op 设置为 True)。 无,如果不是 async_op 或不属于组
-
torch.distributed.
reduce_multigpu
(tensor_list、dst、op=<ReduceOp.SUM:0>、group=None、async_op=False、dst_tensor=0)[来源]¶ 减少所有计算机上多个 GPU 上的张量数据。每个 Tensor in 应驻留在单独的 GPU 上
tensor_list
只有具有 rank 的进程的 GPU 才会收到最终结果。
tensor_list[dst_tensor]
dst
目前仅支持 nccl 后端 张量只能是 GPU 张量
- 参数
tensor_list (List[Tensor]) – 输入和输出 GPU 张量 集体。该函数就地运行。 您还需要确保 相同 所有调用此函数的分布式进程。
len(tensor_list)
dst (int) – 目标排名
op (可选) – 枚举中的值之一。指定用于元素级缩减的操作。
torch.distributed.ReduceOp
group (ProcessGroup,可选) – 要处理的流程组。如果为 None,则 将使用默认进程组。
async_op (bool, optional) – 此运算是否应为异步运算
dst_tensor (int, optional) – 目标张量排名
tensor_list
- 返回
异步工作句柄(如果 async_op 设置为 True)。 None,否则
-
torch.distributed.
all_gather_multigpu
(output_tensor_lists、input_tensor_list、组=无、async_op=False)[来源]¶ 从列表中的整个组中收集张量。 中的每个张量都应该驻留在单独的 GPU 上
tensor_list
目前仅支持 nccl 后端 张量只能是 GPU 张量
支持复杂张量。
- 参数
output_tensor_lists (List[List[Tensor]]) –
输出列表。它应该 在每个 GPU 上包含大小正确的张量以用于输出 集体的 包含 all_gather 的结果。
output_tensor_lists[i]
input_tensor_list[i]
请注意,每个元素的大小为 ,因为函数 all 收集组中每个 GPU 的结果。解释 注意 的每个元素 ,秩 k 的元素将出现在
output_tensor_lists
world_size * len(input_tensor_list)
output_tensor_lists[i]
input_tensor_list[j]
output_tensor_lists[i][k * world_size + j]
另请注意,以及每个 element in (每个元素都是一个列表, 因此 ) 需要相同 对于所有调用此函数的分布式进程。
len(output_tensor_lists)
output_tensor_lists
len(output_tensor_lists[i])
input_tensor_list (List[Tensor]) – 张量列表(在不同 GPU 上) 从当前进程广播。 请注意,需要相同 所有调用此函数的分布式进程。
len(input_tensor_list)
group (ProcessGroup,可选) – 要处理的流程组。如果为 None,则 将使用默认进程组。
async_op (bool, optional) – 此运算是否应为异步运算
- 返回
异步工作句柄(如果 async_op 设置为 True)。 无,如果不是 async_op 或不属于组
-
torch.distributed.
reduce_scatter_multigpu
(output_tensor_list、input_tensor_lists、op=<ReduceOp.SUM:0>、group=None、async_op=False)[来源]¶ 将张量列表减少并分散到整个组中。只有 nccl 后端 当前受支持。
中的每个张量都应该驻留在单独的 GPU 上,因为 应 中的每个张量列表。
output_tensor_list
input_tensor_lists
- 参数
output_tensor_list (List[Tensor]) –
输出张量(在不同的 GPU 上) 以接收操作结果。
请注意,所有 调用该函数的分布式进程。
len(output_tensor_list)
input_tensor_lists (List[List[Tensor]]) –
输入列表。它应该 在每个 GPU 上包含大小正确的张量,用于输入 集体,例如 包含 reduce_scatter 的 GPU 上。
input_tensor_lists[i]
output_tensor_list[i]
请注意,每个元素的大小都是 ,因为函数 分散组中每个 GPU 的结果。自 解释 的每个元素,请注意,秩 k 接收 reduce-scattered 结果来自
input_tensor_lists
world_size * len(output_tensor_list)
input_tensor_lists[i]
output_tensor_list[j]
input_tensor_lists[i][k * world_size + j]
另请注意,以及每个 element in (每个元素都是一个列表, 因此 ) 需要相同 所有调用此函数的分布式进程。
len(input_tensor_lists)
input_tensor_lists
len(input_tensor_lists[i])
group (ProcessGroup,可选) – 要处理的流程组。如果为 None,则 将使用默认进程组。
async_op (bool, optional) – 此运算是否应为异步运算。
- 返回
异步工作句柄(如果 async_op 设置为 True)。 如果不是 async_op 或不是组的一部分,则无。
第三方后端¶
除了内置的 GLOO/MPI/NCCL 后端外,PyTorch 分布式还支持
第三方后端。
有关如何通过 C++ Extension 开发第三方后端的参考,
请参考 教程 - 自定义 C++ 和 CUDA 扩展 和 。第三方的能力
backend 由它们自己的 implementations 决定。test/cpp_extensions/cpp_c10d_extension.cpp
新后端派生自并注册后端
name 和实例化接口。c10d::ProcessGroup
当手动导入此后端并使用相应的后端名称调用时,软件包将在
新的后端。
torch.distributed
警告
对第三方后端的支持是实验性的,可能会发生变化。
Launch 实用程序¶
torch.distributed 包还在 torch.distributed.launch 中提供了启动实用程序。此帮助程序实用程序可用于启动 每个节点有多个进程,用于分布式训练。
torch.distributed.launch
是一个产生多个分布式
每个训练节点上的训练过程。
警告
此模块将被弃用,取而代之的是 torchrun。
该实用程序可用于单节点分布式训练,其中 1 个 或 每个节点将生成更多进程。该实用程序可用于 CPU 训练或 GPU 训练。如果该实用程序用于 GPU 训练, 每个分布式进程都将在单个 GPU 上运行。这可以实现 大幅提升单节点训练性能。它也可以用于 多节点分布式训练,在每个节点上生成多个进程 还可以很好地提高多节点分布式训练性能。 这对于具有多个 Infiniband 的系统尤其有利 具有直接 GPU 支持的接口,因为它们都可用于 聚合通信带宽。
在单节点分布式训练或多节点分布式两种情况下
training 中,此实用程序将为每个节点启动给定数量的进程
().如果用于 GPU 训练,则此数字需要更小
或等于当前系统上的 GPU 数量 (),
并且每个进程都将在从 GPU 0 到
GPU (nproc_per_node - 1) 的 GPU ( - 1)。--nproc_per_node
nproc_per_node
如何使用此模块:
单节点多进程分布式训练
>>> python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE
YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3 and all other
arguments of your training script)
多节点多进程分布式训练:(例如两个节点)
节点 1:(IP:192.168.1.1,具有空闲端口:1234)
>>> python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE
--nnodes=2 --node_rank=0 --master_addr="192.168.1.1"
--master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
and all other arguments of your training script)
节点 2:
>>> python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE
--nnodes=2 --node_rank=1 --master_addr="192.168.1.1"
--master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
and all other arguments of your training script)
要查找此模块提供的可选参数:
>>> python -m torch.distributed.launch --help
重要通知:
1. 本实用程序和多进程分布式(单节点或 多节点)GPU 训练目前只能使用 NCCL 分布式后端。因此,NCCL 后端是推荐的后端 用于 GPU 训练。
2. 在您的训练程序中,您必须解析命令行参数:,该参数将由本模块提供。
如果您的训练程序使用 GPU,则应确保仅
运行在 LOCAL_PROCESS_RANK 的 GPU 设备上。这可以通过以下方式完成:--local_rank=LOCAL_PROCESS_RANK
解析 local_rank 参数
>>> import argparse
>>> parser = argparse.ArgumentParser()
>>> parser.add_argument("--local_rank", type=int)
>>> args = parser.parse_args()
使用 either 将设备设置为本地排名
>>> torch.cuda.set_device(args.local_rank) # before your code runs
或
>>> with torch.cuda.device(args.local_rank):
>>> # your code to run
3. 在您的训练程序中,您应该调用以下函数
在开始时启动分布式后端。强烈建议
那。其他 init 方法(例如 )可能有效,
但是是本模块正式支持的那个。init_method=env://
tcp://
env://
torch.distributed.init_process_group(backend='YOUR BACKEND',
init_method='env://')
4. 在您的训练计划中,您可以使用常规分布式函数
或使用 module。如果您的
training program 使用 GPU 进行训练,并且您希望使用
module,
以下是配置方法。
model = torch.nn.parallel.DistributedDataParallel(model,
device_ids=[args.local_rank],
output_device=args.local_rank)
请确保将参数设置为唯一的 GPU 设备 ID
您的代码将在其上运行。这通常是
过程。换句话说,需要是 ,
并且需要 be 才能使用它
效用device_ids
device_ids
[args.local_rank]
output_device
args.local_rank
5. 另一种通过环境变量传递给子进程的方法。当您使用 .您必须调整上面的 subprocess 示例以替换为 ;启动器
不会通过指定此标志。local_rank
LOCAL_RANK
--use_env=True
args.local_rank
os.environ['LOCAL_RANK']
--local_rank
警告
local_rank
不是全局唯一的:它仅每个进程唯一
在计算机上。因此,不要用它来决定是否应该这样做,例如,
写入网络文件系统。请参阅 https://github.com/pytorch/pytorch/issues/12042 的示例
如果你没有正确地做到这一点,事情会怎么出错。
生成实用程序¶
Multiprocessing 包 - torch.multiprocessing 包还在 .此辅助函数
可用于生成多个进程。它的工作原理是将
函数,并生成 N 个进程来运行它。这
也可用于多进程分布式训练。
spawn
有关如何使用它的参考,请参考 PyTorch 示例 - ImageNet 实现
请注意,此功能需要 Python 3.4 或更高版本。
调试应用程序torch.distributed
¶
调试分布式应用程序可能具有挑战性,因为难以理解挂起、崩溃或跨等级的行为不一致。 提供
一套工具,可帮助以自助方式调试训练应用程序:torch.distributed
监控屏障¶
从 v1.10 开始,作为 which 的替代方案
存在,其中包含有关哪个等级可能有问题的有用信息
崩溃时,即并非所有 rank 都在提供的 timeout 内调用 in
。
实现主机端
barrier using / communication 原语在类似于确认的过程中,允许排名 0 报告哪些排名未能确认
时间的障碍。例如,考虑以下函数,其中 rank 1 无法调用 into
(实际上这可能是由于
添加到应用程序 bug 或挂起在上一个集合中):
send
recv
import os
from datetime import timedelta
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
def worker(rank):
dist.init_process_group("nccl", rank=rank, world_size=2)
# monitored barrier requires gloo process group to perform host-side sync.
group_gloo = dist.new_group(backend="gloo")
if rank not in [1]:
dist.monitored_barrier(group=group_gloo, timeout=timedelta(seconds=2))
if __name__ == "__main__":
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "29501"
mp.spawn(worker, nprocs=2, args=())
排名 0 时生成以下错误消息,允许用户确定哪些排名可能有问题并进一步调查:
RuntimeError: Rank 1 failed to pass monitoredBarrier in 2000 ms
Original exception:
[gloo/transport/tcp/pair.cc:598] Connection closed by peer [2401:db00:eef0:1100:3560:0:1c05:25d]:8594
TORCH_DISTRIBUTED_DEBUG
¶
接下来,环境变量可用于触发其他有用的日志记录和集体同步检查,以确保所有排名
已适当同步。 可以设置为 (default)、 或 取决于调试级别
必填。请注意,最详细的选项可能会影响应用程序性能,因此只能在调试问题时使用。TORCH_DISTRIBUTED_DEBUG
TORCH_DISTRIBUTED_DEBUG
OFF
INFO
DETAIL
DETAIL
设置将导致在初始化用于训练的模型时产生额外的调试日志记录,并且还会记录选定迭代次数的运行时性能统计信息。这些运行时统计信息
包括前进时间、后退时间、梯度通信时间等数据。例如,给定以下应用程序:
TORCH_DISTRIBUTED_DEBUG=INFO
TORCH_DISTRIBUTED_DEBUG=DETAIL
import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
class TwoLinLayerNet(torch.nn.Module):
def __init__(self):
super().__init__()
self.a = torch.nn.Linear(10, 10, bias=False)
self.b = torch.nn.Linear(10, 1, bias=False)
def forward(self, x):
a = self.a(x)
b = self.b(x)
return (a, b)
def worker(rank):
dist.init_process_group("nccl", rank=rank, world_size=2)
torch.cuda.set_device(rank)
print("init model")
model = TwoLinLayerNet().cuda()
print("init ddp")
ddp_model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[rank])
inp = torch.randn(10, 10).cuda()
print("train")
for _ in range(20):
output = ddp_model(inp)
loss = output[0] + output[1]
loss.sum().backward()
if __name__ == "__main__":
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "29501"
os.environ[
"TORCH_DISTRIBUTED_DEBUG"
] = "DETAIL" # set to DETAIL for runtime logging.
mp.spawn(worker, nprocs=2, args=())
初始化时呈现以下日志:
I0607 16:10:35.739390 515217 logger.cpp:173] [Rank 0]: DDP Initialized with:
broadcast_buffers: 1
bucket_cap_bytes: 26214400
find_unused_parameters: 0
gradient_as_bucket_view: 0
is_multi_device_module: 0
iteration: 0
num_parameter_tensors: 2
output_device: 0
rank: 0
total_parameter_size_bytes: 440
world_size: 2
backend_name: nccl
bucket_sizes: 440
cuda_visible_devices: N/A
device_ids: 0
dtypes: float
master_addr: localhost
master_port: 29501
module_name: TwoLinLayerNet
nccl_async_error_handling: N/A
nccl_blocking_wait: N/A
nccl_debug: WARN
nccl_ib_timeout: N/A
nccl_nthreads: N/A
nccl_socket_ifname: N/A
torch_distributed_debug: INFO
以下日志在运行时(设置时)呈现:TORCH_DISTRIBUTED_DEBUG=DETAIL
I0607 16:18:58.085681 544067 logger.cpp:344] [Rank 1 / 2] Training TwoLinLayerNet unused_parameter_size=0
Avg forward compute time: 40838608
Avg backward compute time: 5983335
Avg backward comm. time: 4326421
Avg backward comm/comp overlap time: 4207652
I0607 16:18:58.085693 544066 logger.cpp:344] [Rank 0 / 2] Training TwoLinLayerNet unused_parameter_size=0
Avg forward compute time: 42850427
Avg backward compute time: 3885553
Avg backward comm. time: 2357981
Avg backward comm/comp overlap time: 2234674
此外,还增强了由于模型中未使用的参数而导致的崩溃日志记录。目前,如果正向传递中存在可能未使用的参数,则必须将其传递到
初始化中,并且从 v1.10 开始,所有模型输出都是必需的
用于损失计算,因为
不支持 backwards pass 中未使用的参数。这些限制尤其具有挑战性,尤其是对于较大的
models,因此,当崩溃出现错误时,
将记录所有未使用的参数的完全限定名称。例如,在上面的应用程序中,
如果我们修改为改为 计算为 ,则在向后传递中不会接收梯度,并且
因此会导致失败。在崩溃时,用户会传递有关未使用的参数的信息,这对于大型模型来说可能很难手动查找:
TORCH_DISTRIBUTED_DEBUG=INFO
find_unused_parameters=True
loss
loss = output[1]
TwoLinLayerNet.a
DDP
RuntimeError: Expected to have finished reduction in the prior iteration before starting a new one. This error indicates that your module has parameters that were not used in producing loss. You can enable unused parameter detection by passing
the keyword argument `find_unused_parameters=True` to `torch.nn.parallel.DistributedDataParallel`, and by
making sure all `forward` function outputs participate in calculating loss.
If you already have done the above, then the distributed data parallel module wasn't able to locate the output tensors in the return value of your module's `forward` function. Please include the loss function and the structure of the return va
lue of `forward` of your module when reporting this issue (e.g. list, dict, iterable).
Parameters which did not receive grad for rank 0: a.weight
Parameter indices which did not receive grad for rank 0: 0
设置将在用户发出的每个集体调用上触发额外的一致性和同步检查
直接或间接(例如 DDP)。这是通过创建一个包装器进程组来完成的,该包装器进程组包装 和 API 返回的所有
进程组。因此,这些 API 将返回一个包装器进程组,该组可以像常规进程一样使用
组,但在将集合体分派到底层进程组之前执行一致性检查。目前,这些检查包括
、
这可确保所有等级完成其出色的集体呼叫并报告卡住的等级。接下来,通过以下方式检查 Collective 本身的一致性
确保所有集合函数都匹配并以一致的张量形状调用。如果不是这种情况,则在
应用程序崩溃,而不是挂起或无信息性的错误消息。例如,请考虑以下函数,该函数将不匹配的输入形状转换为
:
TORCH_DISTRIBUTED_DEBUG=DETAIL
allreduce
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
def worker(rank):
dist.init_process_group("nccl", rank=rank, world_size=2)
torch.cuda.set_device(rank)
tensor = torch.randn(10 if rank == 0 else 20).cuda()
dist.all_reduce(tensor)
torch.cuda.synchronize(device=rank)
if __name__ == "__main__":
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "29501"
os.environ["TORCH_DISTRIBUTED_DEBUG"] = "DETAIL"
mp.spawn(worker, nprocs=2, args=())
对于后端,此类应用程序可能会导致挂起,在重要情况下,这可能很难找到根本原因。如果用户启用并重新运行应用程序,则以下错误消息将揭示根本原因:NCCL
TORCH_DISTRIBUTED_DEBUG=DETAIL
work = default_pg.allreduce([tensor], opts)
RuntimeError: Error when verifying shape tensors for collective ALLREDUCE on rank 0. This likely indicates that input shapes into the collective are mismatched across ranks. Got shapes: 10
[ torch.LongTensor{1} ]
注意
为了在运行时对调试级别进行精细控制,也可以使用函数 , , 和 。torch.distributed.set_debug_level()
torch.distributed.set_debug_level_from_env()
torch.distributed.get_debug_level()
此外, TORCH_DISTRIBUTED_DEBUG=DETAIL 可以与 TORCH_SHOW_CPP_STACKTRACES=1 结合使用,以便在检测到集体不同步时记录整个调用堆栈。这些
集体反同步检查将适用于所有使用集体调用的应用程序,这些调用由通过 和
API 创建的进程组提供支持。
c10d