目录

分布式通信包 - torch.distributed

注意

请参考 PyTorch 分布式概述,简要介绍与分布式训练相关的所有功能。

后端

torch.distributed支持三个内置后端,每个后端都有 不同的功能。下表显示了可用的功能 用于 CPU/CUDA 张量。 仅当用于构建 PyTorch 的实现支持 CUDA 时,MPI 才支持 CUDA。

后端

gloo

mpi

nccl

装置

中央处理器

图形处理器

中央处理器

图形处理器

中央处理器

图形处理器

发送

?

recv

?

广播

?

all_reduce

?

减少

?

all_gather

?

收集

?

散射

?

reduce_scatter

all_to_all

?

障碍

?

PyTorch 附带的后端

PyTorch 分布式包支持 Linux(稳定)、MacOS(稳定)和 Windows(原型)。 默认情况下,对于 Linux,Gloo 和 NCCL 后端构建并包含在 PyTorch 中 分布式(仅在使用 CUDA 构建时为 NCCL)。MPI 是一个可选的后端,它只能是 如果您从源代码构建 PyTorch,则包括 include。(例如,在具有 MPI 的主机上构建 PyTorch 安装。

注意

从 PyTorch v1.8 开始,Windows 支持所有集体通信后端,但 NCCL、 如果 init_method 参数指向文件,则它必须遵循 添加到以下架构中:

  • 本地文件系统、init_method="file:///d:/tmp/some_file"

  • 共享文件系统、init_method="file://////{machine_name}/{share_folder_name}/some_file"

与 Linux 平台相同,您可以通过设置环境变量来启用 TcpStore, MASTER_ADDR 和 MASTER_PORT。

使用哪个后端?

过去,我们经常被问到:“我应该使用哪个后端?

  • 经验法则

    • 使用 NCCL 后端进行分布式 GPU 训练

    • 使用 Gloo 后端进行分布式 CPU 训练。

  • 具有 InfiniBand 互连的 GPU 主机

    • 使用 NCCL,因为它是目前唯一支持 InfiniBand 和 GPUDirect 的 API API

  • 具有以太网互连的 GPU 主机

    • 使用 NCCL,因为它目前提供了最好的分布式 GPU 训练性能,尤其是对于多进程单节点或 多节点分布式训练。如果您在使用 NCCL 中,请使用 Gloo 作为回退选项。(请注意,Gloo 目前 运行速度比 GPU 的 NCCL 慢。

  • 具有 InfiniBand 互连的 CPU 主机

    • 如果您的 InfiniBand 启用了 IP over IB,请使用 Gloo,否则, 请改用 MPI。我们计划添加 InfiniBand 支持 Gloo 在即将发布的版本中。

  • 具有以太网互连的 CPU 主机

    • 请使用 Gloo,除非您有使用 MPI 的特定原因。

常见环境变量

选择要使用的网络接口

默认情况下,NCCL 和 Gloo 后端都会尝试找到合适的网络接口来使用。 如果自动检测到的接口不正确,您可以使用以下命令覆盖它 环境变量(适用于相应的后端):

  • 例如,NCCL_SOCKET_IFNAMEexport NCCL_SOCKET_IFNAME=eth0

  • 例如,GLOO_SOCKET_IFNAMEexport GLOO_SOCKET_IFNAME=eth0

如果您使用的是 Gloo 后端,则可以通过将 他们用逗号表示,像这样: . 后端将以循环方式跨这些接口分派操作。 所有进程都必须在此变量中指定相同数量的接口。export GLOO_SOCKET_IFNAME=eth0,eth1,eth2,eth3

其他 NCCL 环境变量

调试 - 如果 NCCL 失败,您可以设置打印显式 warning 消息以及基本的 NCCL 初始化信息。NCCL_DEBUG=INFO

您还可以使用获取有关特定 NCCL 的方面。例如,将打印 集体调用,这在调试挂起时可能会有所帮助,尤其是那些 由 collective type 或 message size 不匹配引起。如果是拓扑 检测失败,如果进一步帮助,设置检查详细的检测结果并另存为参考会很有帮助 需要 NCCL 团队。NCCL_DEBUG_SUBSYSNCCL_DEBUG_SUBSYS=COLLNCCL_DEBUG_SUBSYS=GRAPH

性能调优 - NCCL 根据其拓扑检测执行自动调优,以节省用户的 调整努力。在某些基于套接字的系统上,用户可能仍会尝试调整并增加套接字 网络带宽。这两个环境变量已由 NCCL 预先调整 适用于某些云提供商,例如 AWS 或 GCP。NCCL_SOCKET_NTHREADSNCCL_NSOCKS_PERTHREAD

有关 NCCL 环境变量的完整列表,请参阅 NVIDIA NCCL 的官方文档

基本

torch.distributed 包提供 PyTorch 支持和通信原语 用于跨一个或多个上运行的多个计算节点的多进程并行性 机器。该类建立在此基础上 功能,以提供同步分布式训练作为任何 PyTorch 模型。这与 Multiprocessing 包 - torch.multiprocessing 提供的并行性类型不同,因为它支持 多台联网机器,并且用户必须显式启动单独的 每个进程的主训练脚本的副本。

在单机同步情况下,torch.distributed包装器可能仍然比其他 数据并行的方法,包括

  • 每个进程都维护自己的优化器,并对每个 迭 代。虽然这可能看起来多余,因为已经收集了梯度 一起并平均跨流程,因此对于每个流程都是相同的,这意味着 不需要参数广播步骤,从而减少了在 节点。

  • 每个进程都包含一个独立的 Python 解释器,无需额外的解释器 开销和“GIL 抖动”,这来自驱动多个执行线程、模型 副本或 GPU。这对于 大量使用 Python 运行时,包括具有递归层或许多小 组件。

初始化

在调用任何其他方法之前,需要使用 or 函数初始化包。 两者都会阻塞,直到所有进程都加入为止。

torch.distributed 中。is_available[来源]

如果分发的包可用,则返回。True

否则,不会公开任何其他 API。目前,可在 Linux、MacOS 和 Windows 上使用。设置为在从源构建 PyTorch 时启用它。 目前,默认值适用于 Linux 和 Windows,适用于 MacOS。torch.distributedtorch.distributedUSE_DISTRIBUTED=1USE_DISTRIBUTED=1USE_DISTRIBUTED=0

返回类型

布尔

torch.distributed 中。init_process_groupbackend=init_method=timeout=world_size=-1排名=-1store=group_name='', pg_options=device_id=[来源]

初始化默认分布式进程组。

这还将初始化分发的包。

初始化进程组有两种主要方法:
  1. 指定 、 和 显式。storerankworld_size

  2. 指定(一个 URL 字符串),它指示位置/方式 以发现对等节点。(可选)指定 和 , 或者对 URL 中的所有必需参数进行编码并省略它们。init_methodrankworld_size

如果未指定,则假定为 “env://”。init_method

参数
  • backendstrBackend可选) – 要使用的后端。根据 build-time 配置,有效值包括 、 、 、 和 。如果未提供 backend,则将同时创建 a 和 backend,请参阅下面的注释以了解多个 后端是托管的。此字段可以以小写字符串的形式给出 (例如, ),也可以通过属性(例如 )访问。如果使用 每台具有后端的机器上有多个进程,每个进程 必须具有对它使用的每个 GPU 的独占访问权限,就像共享 GPU 一样 进程之间可能会导致死锁。 backend 是 实验的。mpigloonccluccgloonccl"gloo"Backend.GLOOncclucc

  • init_methodstroptional) – 指定如何初始化 进程组。如果未指定 or,则默认为 “env://”。 与 互斥。init_methodstorestore

  • world_sizeintoptional) – 参与的进程数 工作。如果指定,则为 required。store

  • rankintoptional) – 当前进程的 rank (它应该是 介于 0 和 -1 之间的数字)。 如果指定,则为 required。world_sizestore

  • storeStore可选) – 所有工作人员均可访问的键/值存储,已使用 交换连接/地址信息。 与 互斥。init_method

  • timeouttimedeltaoptional) – 针对 进程组。NCCL 的默认值为 10 分钟,其他后端的默认值为 30 分钟。 这是 collectives 将异步中止并且进程将崩溃的持续时间。 这样做是因为 CUDA 执行是异步的,并且继续执行用户代码不再安全,因为 异步 NCCL 操作失败可能会导致后续 CUDA 操作对损坏的数据运行。 设置 TORCH_NCCL_BLOCKING_WAIT 后,进程将阻塞并等待此超时。

  • group_namestroptionaldeprecated) – 组名称。此参数将被忽略

  • pg_optionsProcessGroupOptions可选) – 进程组选项 指定在 特定流程组的构建。截至目前,唯一的 options 是针对后端的,可以指定 NCCL 后端可以在以下情况下获取高优先级 CUDA 流 有计算内核在等待。对于配置 nccl 的其他可用选项, 查看 https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/api/types.html#ncclconfig-tProcessGroupNCCL.Optionsncclis_high_priority_stream

  • device_idtorch.device可选) – 单个特定设备 将此过程“绑定”到,从而允许特定于后端的 优化。目前这有两个效果,仅在 NCCL:立即形成通信器(立即调用而不是正常的 lazy 调用),子组将在 可以避免不必要的组创建开销。如果你 想早点知道 NCCL 初始化错误,也可以使用这个 田。ncclCommInit*ncclCommSplit

注意

要启用 ,需要从源构建 PyTorch 在支持 MPI 的系统上。backend == Backend.MPI

注意

对多个后端的支持是实验性的。当前,当 no backend 为 指定,则将创建 Both 和 Backends。后端 将用于具有 CPU 张量的集合,并将使用 backend 对于具有 CUDA 张量的集合。可以通过传入 格式为“<device_type>:<backend_name>,<device_type>:<backend_name>”的字符串,例如 “cpu:gloo,cuda:custom_backend”。glooncclgloonccl

torch.distributed.device_mesh。init_device_meshdevice_typemesh_shape*mesh_dim_names=[来源]

根据 device_typemesh_shapemesh_dim_names 参数初始化 DeviceMesh

这将创建一个具有 n 维数组布局的 DeviceMesh,其中 nmesh_shape 的长度。 如果提供了 mesh_dim_names,则每个维度都标记为 mesh_dim_names[i]。

注意

init_device_mesh遵循 SPMD 编程模型,即相同的 PyTorch Python 程序 在集群中的所有进程/排名上运行。确保 mesh_shape(nD 数组的维度 描述设备布局)在所有等级中都是相同的。不一致的mesh_shape可能会导致上吊。

注意

如果未找到进程组,init_device_mesh将初始化分布式进程组 对于幕后的分布式通信是必需的。

参数
  • device_typestr) – 网格的设备类型。目前支持:“cpu”、“cuda/cuda-like”。 不允许传入带有 GPU 索引的设备类型,例如“cuda:0”。

  • mesh_shapeTuple[int]) – 定义多维数组维度的元组 描述设备的布局。

  • mesh_dim_namesTuple[str]optional) – 要分配给每个维度的网格维度名称元组 的 Multidimensional 数组中描述设备布局。其长度必须与 length 匹配 mesh_shapemesh_dim_names中的每个字符串都必须是唯一的。

返回

表示设备布局的对象。

返回类型

设备网格

例::
>>> from torch.distributed.device_mesh import init_device_mesh
>>>
>>> mesh_1d = init_device_mesh("cuda", mesh_shape=(8,))
>>> mesh_2d = init_device_mesh("cuda", mesh_shape=(2, 8), mesh_dim_names=("dp", "tp"))
torch.distributed 中。is_initialized[来源]

检查默认进程组是否已初始化。

返回类型

布尔

torch.distributed 中。is_mpi_available[来源]

检查 MPI 后端是否可用。

返回类型

布尔

torch.distributed 中。is_nccl_available)[来源]

检查 NCCL 后端是否可用。

返回类型

布尔

torch.distributed 中。is_gloo_available)[来源]

检查 Gloo 后端是否可用。

返回类型

布尔

torch.distributed 中。is_torchelastic_launched)[来源]

检查此进程是否是使用(又名 torchelastic)启动的。torch.distributed.elastic

环境的存在 变量作为代理,判断当前进程是否 与 TorchElastic 一起启动。这是一个合理的代理,因为 map 到的 rendezvous id 始终是一个 非 null 值,指示用于对等发现目的的作业 ID..TORCHELASTIC_RUN_IDTORCHELASTIC_RUN_ID

返回类型

布尔


目前支持三种初始化方法:

TCP 初始化

使用 TCP 进行初始化有两种方法,都需要网络地址 可从所有进程访问,并且所需的 .第一种方法 需要指定属于 Rank 0 进程的地址。这 初始化方法要求所有进程都手动指定秩。world_size

请注意,最新分布式不再支持多播地址 包。 也被弃用。group_name

import torch.distributed as dist

# Use address of one of the machines
dist.init_process_group(backend, init_method='tcp://10.1.1.20:23456',
                        rank=args.rank, world_size=4)

共享文件系统初始化

另一种初始化方法使用共享的文件系统,并且 对组中的所有计算机可见,以及所需的 .URL 应以 with 并包含不存在文件的路径(在现有的 目录)的文件系统初始化将自动 如果该文件不存在,请创建该文件,但不会删除该文件。因此,它 您有责任确保在下次调用同一文件路径/名称之前清理文件。world_sizefile://

请注意,最新的 中不再支持自动排名分配 distributed 软件包,并且已弃用。group_name

警告

此方法假定文件系统支持使用 - most local 系统和 NFS 都支持它。fcntl

警告

此方法将始终创建文件并尽力清理和删除 程序末尾的文件。换句话说,每个使用 file init 方法需要一个全新的空文件才能进行初始化 才能成功。如果先前初始化使用的相同文件(未发生 进行清理)时,这是意外行为,并且通常会导致 死锁和失败。因此,即使此方法会尽力清理 如果自动删除恰好不成功,则由您负责 确保在训练结束时删除该文件,以防止相同的 文件,以便在下次再次重复使用。这一点尤其重要 如果您计划对同一文件名进行多次调用。 换句话说,如果文件未被删除/清理,并且您再次调用该文件,则预期会失败。 这里的经验法则是,确保文件不存在或 empty 的调用。

import torch.distributed as dist

# rank should always be specified
dist.init_process_group(backend, init_method='file:///mnt/nfs/sharedfile',
                        world_size=4, rank=args.rank)

环境变量初始化

此方法将从环境变量中读取配置,从而允许 一个用于完全自定义获取信息的方式。要设置的变量 是:

  • MASTER_PORT-必填;必须是 rank 为 0 的计算机上的 free port

  • MASTER_ADDR- 必需(等级 0 除外);Rank 0 节点的地址

  • WORLD_SIZE-必填;可以在此处设置,也可以在调用 init 函数中设置

  • RANK-必填;可以在此处设置,也可以在调用 init 函数中设置

等级为 0 的计算机将用于设置所有连接。

这是默认方法,这意味着不必指定(或 可以是 )。init_methodenv://

初始化后

运行后,可以使用以下函数。自 检查进程组是否已初始化 使用

torch.distributed 中。Backendname[来源]

用于后端的类似 enum 的类。

可用后端:GLOO、NCCL、UCC、MPI 和其他已注册的后端。

此类的值为小写字符串,例如 .他们可以 作为属性访问,例如 ."gloo"Backend.NCCL

这个类可以直接调用来解析字符串,例如,会检查是否有效,并且 如果是这样,则返回解析后的小写字符串。它还接受大写字符串, 例如,返回 .Backend(backend_str)backend_strBackend("GLOO")"gloo"

注意

该条目存在,但仅用作 某些字段的初始值。用户不应直接使用它 也不假设它的存在。Backend.UNDEFINED

类方法 register_backendnamefuncextended_api=Falsedevices=None[来源]

使用给定的名称和实例化函数注册一个新的后端。

此类方法由第三方扩展用于 注册新的后端。ProcessGroup

参数
  • namestr) – 扩展的后端名称。它 应与 中的匹配。ProcessGroupinit_process_group()

  • funcfunction) – 实例化后端的函数处理程序。 该函数应在后端实现 extension 并接受四个参数,包括 、 、 和 。storerankworld_sizetimeout

  • extended_apibooloptional) – 后端是否支持扩展参数结构。 违约:。如果设置为 ,则后端 将获取 的实例 ,而 由 Backend 实现定义的 Process Group Options 对象。FalseTruec10d::DistributedBackendOptions

  • device strstr 列表可选) – 此后端的设备类型 支持,例如 “CPU”、“CUDA” 等。如果为 None,则 假设同时使用 “CPU” 和 “CUDA”

注意

对第三方后端的这种支持是实验性的,可能会发生变化。

torch.distributed 中。get_backendgroup=None[来源]

返回给定进程组的后端。

参数

groupProcessGroup可选) – 要处理的流程组。这 default 是 General Main Process 组。如果另一个特定组 时,调用进程必须是 的一部分。group

返回

给定进程组的后端,以小写字符串表示。

返回类型

后端

torch.distributed 中。get_rankgroup=None[来源]

返回当前进程在提供的 , default 中,否则为 default。group

Rank 是分配给分布式中每个进程的唯一标识符 进程组。它们始终是从 0 到 的连续整数。world_size

参数

groupProcessGroup可选) – 要处理的流程组。如果为 None,则 将使用默认进程组。

返回

进程组的排名 -1(如果不是组的一部分)

返回类型

int

torch.distributed 中。get_world_sizegroup=None[来源]

返回当前进程组中的进程数。

参数

groupProcessGroup可选) – 要处理的流程组。如果为 None,则 将使用默认进程组。

返回

进程组的世界大小 -1(如果不是组的一部分)

返回类型

int

关闭

在退出时通过调用 来清理资源非常重要。destroy_process_group()

要遵循的最简单模式是通过对 group 参数使用默认值 None 调用 训练脚本中不再需要通信的点,通常靠近 main() 的结尾。应该在每个 trainer 进程中调用一次,而不是在外部 process-launcher 级别。destroy_process_group()

如果在超时时间内未被 PG 中的所有 rank 调用, 特别是当应用程序中有多个进程组时,例如用于 N-D 并行, 退出时挂起是可能的。这是因为 ProcessGroupNCCL 的析构函数调用 ncclCommAbort, 必须集体调用,但调用 ProcessGroupNCCL 的析构函数的顺序(如果被调用) 的 GC 不是确定性的。致电通过确保 ncclCommAbort 在各个等级中以一致的顺序调用,并避免调用 ncclCommAbort 在 ProcessGroupNCCL 的析构函数期间。destroy_process_group()destroy_process_group()

重新初始化

destroy_process_group 还可用于销毁单个进程组。一次使用 case 可以是容错训练,其中进程组可能会被销毁,然后 在运行时初始化的新 UPDATE。在这种情况下,同步 trainer 至关重要 使用 torch.distributed 原语 _after_ 调用 destroy 和 在随后初始化之前。此行为目前不受支持/未经测试,因为 实现此同步的难度,并被视为一个已知问题。请提交 GitHub 问题或 RFC(如果这是一个阻止您的用例)。


分布式 Key-Value Store

分布式包自带分布式键值存储,可以是 用于在组中的进程之间共享信息,以及 在 (通过显式创建存储 作为指定 .)有 3 种选择 键值存储: .init_method

torch.distributed 中。商店

所有 store 实现的基类,例如 PyTorch 提供的 3 个 已分发: (, , 和 )。

torch.distributed 中。TCPStore

基于 TCP 的分布式键值存储实现。服务器存储包含 数据,而客户端存储可以通过 TCP 连接到服务器存储,并且 执行插入键值等操作 pair,检索键值对等。那里 应始终是一个服务器存储初始化,因为客户端存储将等待 用于建立连接的服务器。set()get()

参数
  • host_namestr) – 服务器存储应在其上运行的主机名或 IP 地址。

  • portint) – 服务器存储应侦听传入请求的端口。

  • world_sizeintoptional) – 存储用户总数 (客户端数 + 服务器 1)。默认值为 None (None 表示非固定数量的商店用户)。

  • is_masterbooloptional) – 初始化服务器存储时为 True,客户端存储为 False。默认值为 False。

  • timeouttimedelta可选) – 存储区在初始化期间以及 和 等方法使用的超时。默认值为 timedelta(seconds=300)get()wait()

  • wait_for_workersbooloptional) – 是否等待所有工作程序与服务器存储连接。仅当 world_size 为 固定 值时,这才适用。默认值为 True。

  • multi_tenantbooloptional) – 如果为 True,则当前进程中具有相同主机/端口的所有实例将使用相同的底层 .默认值为 False。TCPStoreTCPServer

  • master_listen_fdintoptional) – 如果指定,则底层将侦听此文件描述符,该文件描述符必须是已绑定到的套接字。在某些情况下,避免端口分配争用很有用。默认值为 None (意味着服务器创建一个新的套接字并尝试将其绑定到 )。TCPServerportport

  • use_libuvbooloptional) – 如果为 True,则对后端使用 libuv。默认值为 True。TCPServer

例::
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> # Run on process 1 (server)
>>> server_store = dist.TCPStore("127.0.0.1", 1234, 2, True, timedelta(seconds=30))
>>> # Run on process 2 (client)
>>> client_store = dist.TCPStore("127.0.0.1", 1234, 2, False)
>>> # Use any of the store methods from either the client or server after initialization
>>> server_store.set("first_key", "first_value")
>>> client_store.get("first_key")
torch.distributed 中。哈希存储

基于底层 hashmap 的线程安全存储实现。此 store 可以使用 在同一进程中(例如,由其他线程),但不能跨进程使用。

例::
>>> import torch.distributed as dist
>>> store = dist.HashStore()
>>> # store can be used from other threads
>>> # Use any of the store methods after initialization
>>> store.set("first_key", "first_value")
torch.distributed 中。文件存储

一种 store 实现,它使用文件来存储底层键值对。

参数
  • file_namestr) – 存储键值对的文件路径

  • world_sizeintoptional) – 使用存储的进程总数。默认值为 -1(负值表示非固定的商店用户数)。

例::
>>> import torch.distributed as dist
>>> store1 = dist.FileStore("/tmp/filestore", 2)
>>> store2 = dist.FileStore("/tmp/filestore", 2)
>>> # Use any of the store methods from either the client or server after initialization
>>> store1.set("first_key", "first_value")
>>> store2.get("first_key")
torch.distributed 中。前缀Store

3 个键值存储()中任意一个的包装器 这会为插入到存储中的每个键添加一个前缀。

参数
  • prefixstr) – 在插入到存储区之前添加到每个键前面的前缀字符串。

  • storetorch.distributed.store) – 构成底层键值存储的 store 对象。

torch.distributed.Store。setself torch._C._distributed_c10d.存储arg0strarg1str

根据提供的 和 将键值对插入到存储中。如果 store 中已经存在,它将覆盖旧的 值替换为新提供的 .keyvaluekeyvalue

参数
  • keystr) – 要添加到存储中的键。

  • valuestr) – 与要添加到存储区关联的值。key

例::
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.set("first_key", "first_value")
>>> # Should return "first_value"
>>> store.get("first_key")
torch.distributed.Store。getself torch._C._distributed_c10d.存储arg0 str 字节

检索与存储中给定的值关联的值。如果不是 存在于 store 中,该函数将等待 ,这是定义的 初始化 store 时,在引发异常之前。keykeytimeout

参数

keystr) – 该函数将返回与此键关联的值。

返回

与 if 关联的值在存储中。keykey

例::
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.set("first_key", "first_value")
>>> # Should return "first_value"
>>> store.get("first_key")
torch.distributed.Store。addself torch._C._distributed_c10d.存储arg0 strarg1 int int

对给定的 add 的第一次调用会创建一个关联的计数器 with 在 store 中,初始化为 .后续调用以添加 以相同的增量将计数器替换为指定的 . 使用已具有 在 store 中设置 will result 在异常中。keykeyamountkeyamountadd()set()

参数
  • keystr) – 存储中其计数器将递增的键。

  • amountint) – 计数器将递增的数量。

例::
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> # Using TCPStore as an example, other store types can also be used
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.add("first_key", 1)
>>> store.add("first_key", 6)
>>> # Should return 7
>>> store.get("first_key")
torch.distributed.Store。compare_set自我 torch._C._distributed_c10d.存储arg0strarg1strarg2str 字节

根据提供的 和 在 和 插入之前执行比较。 仅当 for the store 中已存在 或 is 空字符串时,才会设置。keyexpected_valuedesired_valuedesired_valueexpected_valuekeyexpected_value

参数
  • keystr) – 要在存储中检查的密钥。

  • expected_valuestr) – 与插入前要检查的值相关联。key

  • desired_valuestr) – 与要添加到存储区关联的值。key

例::
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.set("key", "first_value")
>>> store.compare_set("key", "first_value", "second_value")
>>> # Should return "second_value"
>>> store.get("key")
torch.distributed.Store。wait*args**kwargs)

overloaded 函数。

  1. wait(self: torch._C._distributed_c10d.存储,arg0: list[str]) -> 无

等待将每个密钥添加到存储中。如果不是所有键都是 set before the (set during store initialization) 之前设置),则会引发异常。keystimeoutwait

参数

keyslist) – 在存储中设置它们之前要等待的键列表。

例::
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> # Using TCPStore as an example, other store types can also be used
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> # This will throw an exception after 30 seconds
>>> store.wait(["bad_key"])
  1. wait(self: torch._C._distributed_c10d.存储, arg0: list[str], arg1: datetime.timedelta) -> 无

等待将每个 key in 添加到存储中,并引发异常 如果提供的 .keystimeout

参数
  • keyslist) – 在存储中设置它们之前要等待的键列表。

  • timeouttimedelta) – 在引发异常之前等待添加键的时间。

例::
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> # Using TCPStore as an example, other store types can also be used
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> # This will throw an exception after 10 seconds
>>> store.wait(["bad_key"], timedelta(seconds=10))
torch.distributed.Store。num_keys自我 torch._C._distributed_c10d.存储 int

返回 store 中设置的键数。请注意,此数字通常会 比 和 添加的键数大 1,因为一个键用于协调所有 使用 store 的工作人员。set()add()

警告

与 一起使用时,将返回写入基础文件的键数。如果 store 被销毁,并且使用相同的文件创建了另一个 store,则将保留原始键。num_keys

返回

存储中存在的键数。

例::
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> # Using TCPStore as an example, other store types can also be used
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.set("first_key", "first_value")
>>> # This should return 2
>>> store.num_keys()
torch.distributed.Store。delete_key自我 torch._C._distributed_c10d.存储arg0 str bool

从存储中删除 关联的键值对。如果成功删除了键,则返回 true,如果未成功删除,则返回 falsekey

警告

该 API 仅受 支持。使用此 API 替换为 将导致异常。delete_key

参数

keystr) – 要从存储中删除的密钥

返回

如果已删除,则为 True,否则为 Falsekey

例::
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> # Using TCPStore as an example, HashStore can also be used
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.set("first_key")
>>> # This should return true
>>> store.delete_key("first_key")
>>> # This should return false
>>> store.delete_key("bad_key")
torch.distributed.Store。set_timeout自我 torch._C._distributed_c10d.存储arg0 datetime.timedelta None

设置 store 的默认超时。此超时在初始化期间以及 in 和 中使用。wait()get()

参数

timeouttimedelta) – 要在 store 中设置的超时。

例::
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> # Using TCPStore as an example, other store types can also be used
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.set_timeout(timedelta(seconds=10))
>>> # This will throw an exception after 10 seconds
>>> store.wait(["bad_key"])

默认情况下,集合对 default 组(也称为 world)进行操作,并且 要求所有进程都进入分布式函数调用。但是,某些工作负载可能会受益 来自更细粒度的通信。这就是分布式组的用武之地 进入游戏。函数可以是 用于创建新组,其中包含所有进程的任意子集。它返回 一个不透明的组句柄,可以作为参数提供给所有集合 (集合是分布式函数,用于在某些众所周知的编程模式中交换信息)。group

torch.distributed 中。new_groupranks=Nonetimeout=Nonebackend=Nonepg_options=Noneuse_local_synchronization=Falsegroup_desc=None[来源]

创建新的分布式组。

此功能要求主组中的所有进程(即所有 作为分布式作业一部分的进程)输入此功能,甚至 如果他们不打算成为该组的成员。此外,组 应在所有进程中以相同的顺序创建。

警告

安全并发使用: 当在后端使用多个进程组时,用户 必须确保 Collective 的全局执行顺序一致 行列。NCCL

如果进程中的多个线程发出集合,则显式 同步是确保顺序一致所必需的。

当使用 torch.distributed 通信 API 的异步变体时, 返回一个 work 对象,并且 communication kernel 为 排入单独的 CUDA 流中,允许通信重叠 和计算。在一个进程上发出一个或多个异步操作后 组,则必须在使用另一个进程组之前通过调用 work.wait() 与其他 CUDA 流同步。

有关更多详细信息,请参阅同时使用多个 NCCL 通讯器

参数
  • rankslist[int]) – 组成员的排名列表。如果 ,将为 设置为 All ranks。默认值为 。NoneNone

  • timeouttimedelta可选) – 有关详细信息和默认值,请参阅 init_process_group

  • backendstrBackend可选) – 要使用的后端。根据 构建时配置,有效值为 和 。 默认情况下,使用与全局组相同的后端。此字段 应该以小写字符串的形式给出(例如 ),它可以 也可以通过属性(例如 )进行访问 。如果传入,则后端 将使用与默认进程组对应的进程组。默认值为 。gloonccl"gloo"Backend.GLOONoneNone

  • pg_optionsProcessGroupOptions可选) – 进程组选项 指定在 特定流程组的构建。即,对于后端,可以指定 进程组可以选择高优先级的 CUDA 流。对于配置 nccl 的其他可用选项, 查看 https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/api/types.html#ncclconfig-tncclis_high_priority_stream

  • use_local_synchronizationbooloptional) – 执行组本地 barrier 的 barrier 创建过程组。这是不同的 因为非成员等级不需要调用 API 并且不需要 加入屏障。

  • group_descstroptional) – 用于描述进程组的字符串。

返回

分布式组的句柄,可以提供给集体调用或 GroupMember.NON_GROUP_MEMBER如果排名不是 的一部分。ranks

注意:use_local_synchronization 不适用于 MPI。

注虽然 use_local_synchronization=True 的 URL 越大,越大 集群和小型进程组,必须小心,因为它会改变集群行为 因为非成员等级不会加入组 Barrier()。

注意 use_local_synchronization=True 在每个等级创建 多个重叠的流程组。为避免这种情况,请确保所有等级都遵循 相同的全局创建顺序。

torch.distributed 中。get_group_rankgroupglobal_rank[来源]

将全局排名转换为组排名。

global_rank必须是 part of 的一部分,否则会引发 RuntimeError。group

参数
  • groupProcessGroup) – 用于查找相对排名的 ProcessGroup。

  • global_rankint) – 要查询的全局排名。

返回

的组排名 relative toglobal_rankgroup

返回类型

int

注意:在默认进程组上调用此函数将返回 identity

torch.distributed 中。get_global_rankgroupgroup_rank[来源]

将组排名转换为全局排名。

group_rank必须是 group 的一部分,否则会引发 RuntimeError。

参数
  • groupProcessGroup) – 要从中查找全局排名的 ProcessGroup。

  • group_rankint) – 要查询的组排名。

返回

相对于 的全局排名group_rankgroup

返回类型

int

注意:在默认进程组上调用此函数将返回 identity

torch.distributed 中。get_process_group_ranks[来源]

获取与 关联的所有排名。group

参数

groupProcessGroup) – 要从中获取所有排名的 ProcessGroup。

返回

按组排名排序的全局排名列表。

返回类型

列表[int]

设备网格

DeviceMesh 是管理进程组(或 NCCL 通信器)的更高级别抽象。 它允许用户轻松创建节点间和节点内流程组,而无需担心 如何为不同的子流程组正确设置排名,并帮助管理这些子流程组 分布式进程组。函数可以是 用于创建新的 DeviceMesh,其网格形状描述设备拓扑。

torch.distributed.device_mesh 类DeviceMeshdevice_typemesh*mesh_dim_names=_init_backend=True[来源]

DeviceMesh 表示设备网格,其中设备的布局可以是 表示为 n 维数组,以及 n 维的每个值 array 是默认进程组排名的全局 ID。

DeviceMesh 可用于描述整个集群中设备的布局, 并用作集群内设备列表之间通信的代理。

DeviceMesh 可用作上下文管理器。

注意

DeviceMesh 遵循 SPMD 编程模型,这意味着相同的 PyTorch Python 程序 正在集群中的所有进程/列上运行。因此,用户需要确保 mesh 数组(描述器件的布局)在所有 ranks 中都应该相同。 不一致的网格将导致静默挂起。

参数
  • device_typestr) – 网格的设备类型。目前支持:“cpu”、“cuda/cuda-like”。

  • meshndarray) – 描述布局的多维数组或整数张量 的设备,其中 ID 是默认进程组的全局 ID。

返回

表示设备布局的对象。

返回类型

设备网格

以下程序以 SPMD 方式在每个进程/排名上运行。在此示例中,我们有 2 个 主机,每个主机具有 4 个 GPU。 网格的第一个维度上的减少将减少 across 列 (0, 4), ..和 (3, 7),在第二维度上的缩减 的网格减少 (0, 1, 2, 3) 和 (4, 5, 6, 7)。

例::
>>> from torch.distributed.device_mesh import DeviceMesh
>>>
>>> # Initialize device mesh as (2, 4) to represent the topology
>>> # of cross-host(dim 0), and within-host (dim 1).
>>> mesh = DeviceMesh(device_type="cuda", mesh=[[0, 1, 2, 3],[4, 5, 6, 7]])

点对点通信

torch.distributed 中。sendtensordstgroup=Nonetag=0[来源]

同步发送 Tensor。

警告

tagNCCL 后端不支持。

参数
  • tensorTensor) - 要发送的 Tensor。

  • dstint) – 全局进程组上的目标排名 (无论参数如何)。 目标排名不应与当前进程的排名相同。group

  • groupProcessGroup可选) – 要处理的流程组。如果为 None,则 将使用默认进程组。

  • tagintoptional) – 将 send 与远程接收匹配的标签

torch.distributed 中。recvtensorsrc=Nonegroup=Nonetag=0[来源]

同步接收 Tensor。

警告

tagNCCL 后端不支持。

参数
  • - tensorTensor) - 要填充接收到的数据的张量。

  • srcintoptional) – 全局进程组上的源排名 (无论参数如何)。 如果未指定,将从任何进程接收。group

  • groupProcessGroup可选) – 要处理的流程组。如果为 None,则 将使用默认进程组。

  • tagintoptional) – 用于将 recv 与远程发送匹配的标签

返回

发件人排名 -1(如果不是组的一部分)

返回类型

int

并在使用时返回分布式请求对象。通常,此对象的类型未指定 因为它们永远不应该手动创建,但保证它们支持两种方法:

  • is_completed()- 如果操作已完成,则返回 True

  • wait()- 将阻止进程,直到操作完成。 保证在返回后返回 True。is_completed()

torch.distributed 中。isendtensordstgroup=Nonetag=0[来源]

异步发送张量。

警告

在请求完成之前进行修改会导致 undefined 行为。tensor

警告

tagNCCL 后端不支持。

参数
  • tensorTensor) - 要发送的 Tensor。

  • dstint) – 全局进程组上的目标排名(与参数无关)group

  • groupProcessGroup可选) – 要处理的流程组。如果为 None,则 将使用默认进程组。

  • tagintoptional) – 将 send 与远程接收匹配的标签

返回

分布式请求对象。 无,如果不是组的一部分

返回类型

可选[工作]

torch.distributed 中。irecvtensorsrc=Nonegroup=Nonetag=0[来源]

异步接收 Tensor。

警告

tagNCCL 后端不支持。

参数
  • - tensorTensor) - 要填充接收到的数据的张量。

  • srcintoptional) – 全局进程组上的源排名 (无论参数如何)。 如果未指定,将从任何进程接收。group

  • groupProcessGroup可选) – 要处理的流程组。如果为 None,则 将使用默认进程组。

  • tagintoptional) – 用于将 recv 与远程发送匹配的标签

返回

分布式请求对象。 无,如果不是组的一部分

返回类型

可选[工作]

torch.distributed 中。send_object_listobject_listdstgroup=Nonedevice=None[来源]

同步发送可 picklable 对象。object_list

类似,但可以传入 Python 对象。 请注意,中的所有对象都必须是可 picklable 的,才能 送。object_list

参数
  • object_listList[Any]) – 要发送的输入对象的列表。 每个对象都必须是可腌制的。接收方必须提供同等大小的列表。

  • dstint) – 要发送到的目标排名。 目标排名基于全局进程组(无论参数如何)object_listgroup

  • group —(ProcessGroup,可选):要处理的流程组。如果为 None,则 将使用默认进程组。默认值为 。None

  • device ( , 可选 ) – 如果不是 None,则对象为 序列化并转换为 Tensors,这些张量在 send 之前移动到 。默认值为 。torch.devicedeviceNone

返回

None.

注意

对于基于 NCCL 的进程组,内部张量表示 的对象必须在通信之前移动到 GPU 设备 地方。在这种情况下,使用的设备由 用户提供,用户有责任 确保此设置,以便每个排名都有一个单独的 GPU(通过 )。torch.cuda.current_device()torch.cuda.set_device()

警告

uses 模块,它 已知是不安全的。可以构造恶意的 pickle data 将在 unpickling 期间执行任意代码。只调用这个 函数。pickle

警告

不支持使用 GPU 张量进行调用 并且效率低下,因为它会产生 GPU > CPU 传输,因为张量将是 腌。请考虑改用

例::
>>> # Note: Process group initialization omitted on each rank.
>>> import torch.distributed as dist
>>> # Assumes backend is not NCCL
>>> device = torch.device("cpu")
>>> if dist.get_rank() == 0:
>>>     # Assumes world_size of 2.
>>>     objects = ["foo", 12, {1: 2}] # any picklable object
>>>     dist.send_object_list(objects, dst=1, device=device)
>>> else:
>>>     objects = [None, None, None]
>>>     dist.recv_object_list(objects, src=0, device=device)
>>> objects
['foo', 12, {1: 2}]
torch.distributed 中。recv_object_listobject_listsrc=Nonegroup=Nonedevice=None[来源]

同步接收 picklable 对象。object_list

类似于 ,但可以接收 Python 对象。

参数
  • object_listList[Any]) – 要接收的对象列表。 必须提供与所发送列表的大小相等的大小列表。

  • srcintoptional) – 接收的源排名。 源排名基于全局进程组(无论参数如何) 如果设置为 None,将从任何等级接收。默认值为 。object_listgroupNone

  • group —(ProcessGroup,可选):要处理的流程组。如果为 None,则 将使用默认进程组。默认值为 。None

  • device (, optional) – 如果不是 None,则在此设备上接收。 默认值为 。torch.deviceNone

返回

发件人排名。如果 rank 不属于该组,则为 -1。如果 rank 是组的一部分,则将包含从 rank 发送的对象。object_listsrc

注意

对于基于 NCCL 的进程组,内部张量表示 的对象必须在通信之前移动到 GPU 设备 地方。在这种情况下,使用的设备由 用户提供,用户有责任 确保此设置,以便每个排名都有一个单独的 GPU(通过 )。torch.cuda.current_device()torch.cuda.set_device()

警告

uses 模块,它 已知是不安全的。可以构造恶意的 pickle data 将在 unpickling 期间执行任意代码。只调用这个 函数。pickle

警告

不支持使用 GPU 张量进行调用 并且效率低下,因为它会产生 GPU > CPU 传输,因为张量将是 腌。请考虑改用

例::
>>> # Note: Process group initialization omitted on each rank.
>>> import torch.distributed as dist
>>> # Assumes backend is not NCCL
>>> device = torch.device("cpu")
>>> if dist.get_rank() == 0:
>>>     # Assumes world_size of 2.
>>>     objects = ["foo", 12, {1: 2}] # any picklable object
>>>     dist.send_object_list(objects, dst=1, device=device)
>>> else:
>>>     objects = [None, None, None]
>>>     dist.recv_object_list(objects, src=0, device=device)
>>> objects
['foo', 12, {1: 2}]
torch.distributed 中。batch_isend_irecvp2p_op_list[来源]

异步发送或接收一批张量并返回请求列表。

处理 中的每一个操作,并返回相应的 请求。目前支持 NCCL、Gloo 和 UCC 后端。p2p_op_list

参数

p2p_op_list – 点对点操作的列表(每个运算符的类型为 )。isend/irecv 在列表中的顺序 很重要,并且它需要与 远程端。torch.distributed.P2POp

返回

通过调用相应的 op 在 op_list 中。

例子

>>> send_tensor = torch.arange(2, dtype=torch.float32) + 2 * rank
>>> recv_tensor = torch.randn(2, dtype=torch.float32)
>>> send_op = dist.P2POp(dist.isend, send_tensor, (rank + 1)%world_size)
>>> recv_op = dist.P2POp(dist.irecv, recv_tensor, (rank - 1 + world_size)%world_size)
>>> reqs = batch_isend_irecv([send_op, recv_op])
>>> for req in reqs:
>>>     req.wait()
>>> recv_tensor
tensor([2, 3])     # Rank 0
tensor([0, 1])     # Rank 1

注意

注意,当此 API 与 NCCL PG 后端一起使用时,用户必须设置 当前 GPU 设备具有 torch.cuda.set_device,否则将 导致意外的挂起问题。

此外,如果此 API 是 传递给 中的第一个集体调用,则所有等级都必须参与 此 API 调用;否则,行为为 undefined。如果此 API 调用是 不是批处理 P2P 操作中的第一次集体调用 只允许涉及 的 rank 的子集。groupdist.P2POpgroupgroupgroup

torch.distributed 中。P2POpoptensorpeergroup=Nonetag=0[来源]

用于为 构建点对点操作的类。batch_isend_irecv

此类构建 P2P 操作的类型、通信缓冲区、对等等级、 Process Group 和 tag.此类的实例将被传递给以进行点对点通信。batch_isend_irecv

参数
  • opCallable) – 用于向对等进程发送数据或从对等进程接收数据的函数。 的类型为 或 。optorch.distributed.isendtorch.distributed.irecv

  • tensorTensor) - 要发送或接收的张量。

  • peerint) – 目标或源排名。

  • groupProcessGroup可选) – 要处理的流程组。如果为 None,则 将使用默认进程组。

  • tagintoptional) – 将 send 与 recv 匹配的标签。

同步和异步集合操作

每个集合运算函数都支持以下两种运算, 根据传递到 Collective 的标志的设置:async_op

Synchronous operation - 默认模式,设置为 时。 当函数返回时,保证 执行集体操作。对于 CUDA 操作,不能保证 CUDA 操作已完成,因为 CUDA 操作是异步的。对于 CPU 集合体,任何 利用集体调用输出的进一步函数调用将按预期运行。对于 CUDA 集合, 利用同一 CUDA 流上的输出的函数调用将按预期运行。用户必须注意 在不同流下运行场景下的同步。有关 CUDA 语义的详细信息,例如 stream 同步,请参阅 CUDA 语义。 请参阅以下脚本,查看 CPU 和 CUDA 操作的这些语义差异的示例。async_opFalse

异步操作 - 当设置为 True 时。集合操作函数 返回 Distributed Request 对象。通常,您不需要手动创建它,并且它 保证支持两种方法:async_op

  • is_completed()- 对于 CPU 集合,则返回已完成。对于 CUDA 操作, 如果操作已成功排入 CUDA 流,并且输出可以在 default 流,无需进一步同步。TrueTrue

  • wait()- 对于 CPU 集合,将阻止进程,直到操作完成。在这种情况下 的 CUDA 集合中,将阻塞,直到操作成功排入 CUDA 流中,并且 output 可以在 default 流上使用,而无需进一步同步。

  • get_future()- 返回 object。支持 NCCL,也支持 GLOO 上的大多数操作 和 MPI,但点对点操作除外。 注意:随着我们继续采用 Futures 和合并 API,调用可能会变得多余。torch._C.Futureget_future()

以下代码可以作为使用分布式集合时 CUDA 操作语义的参考。 它显示了在不同的 CUDA 流上使用集体输出时需要同步的明确需求:

# Code runs on each rank.
dist.init_process_group("nccl", rank=rank, world_size=2)
output = torch.tensor([rank]).cuda(rank)
s = torch.cuda.Stream()
handle = dist.all_reduce(output, async_op=True)
# Wait ensures the operation is enqueued, but not necessarily complete.
handle.wait()
# Using result on non-default stream.
with torch.cuda.stream(s):
    s.wait_stream(torch.cuda.default_stream())
    output.add_(100)
if rank == 0:
    # if the explicit call to wait_stream was omitted, the output below will be
    # non-deterministically 1 or 101, depending on whether the allreduce overwrote
    # the value after the add completed.
    print(output)

集合函数

torch.distributed 中。broadcasttensorsrcgroup=Noneasync_op=False[来源]

将 Tensor 广播到整个 Group。

tensor在所有进程中必须具有相同数量的元素 参与集体。

参数
  • tensorTensor) - 如果是当前 process 和 tensor 来保存接收到的数据。src

  • srcint) – 全局进程组上的源排名(与参数无关)。group

  • groupProcessGroup可选) – 要处理的流程组。如果为 None,则 将使用默认进程组。

  • async_opbooloptional) – 此运算是否应为异步运算

返回

异步工作句柄(如果 async_op 设置为 True)。 无,如果不是 async_op 或不属于组

torch.distributed 中。broadcast_object_listobject_listsrc=0group=Nonedevice=None[来源]

将可腌制对象广播到整个组中。object_list

类似,但可以传入 Python 对象。 请注意,中的所有对象都必须是可 picklable 的,才能 播出。object_list

参数
  • object_listList[Any]) – 要广播的输入对象列表。 每个对象都必须是可腌制的。只有等级上的对象才会 广播,但每个排名必须提供大小相等的列表。src

  • srcint) – 要从中广播的源排名。 源排名基于全局进程组(无论参数如何)object_listgroup

  • group —(ProcessGroup,可选):要处理的流程组。如果为 None,则 将使用默认进程组。默认值为 。None

  • device ( , 可选 ) – 如果不是 None,则对象为 serialized 并转换为 Tensors,这些张量被移动到 Before broadcasting 中。默认值为 。torch.devicedeviceNone

返回

None.如果 rank 是组的一部分,则将包含 从 rank 广播对象。object_listsrc

注意

对于基于 NCCL 的进程组,内部张量表示 的对象必须在通信之前移动到 GPU 设备 地方。在这种情况下,使用的设备由 用户提供,用户有责任 确保此设置,以便每个排名都有一个单独的 GPU(通过 )。torch.cuda.current_device()torch.cuda.set_device()

注意

请注意,此 API 与 collective 略有不同,因为它不提供句柄,因此 将是一个阻塞调用。async_op

警告

uses 模块,它 已知是不安全的。可以构造恶意的 pickle data 将在 unpickling 期间执行任意代码。只调用这个 函数。pickle

警告

不支持使用 GPU 张量进行调用 并且效率低下,因为它会产生 GPU > CPU 传输,因为张量将是 腌。请考虑改用

例::
>>> # Note: Process group initialization omitted on each rank.
>>> import torch.distributed as dist
>>> if dist.get_rank() == 0:
>>>     # Assumes world_size of 3.
>>>     objects = ["foo", 12, {1: 2}] # any picklable object
>>> else:
>>>     objects = [None, None, None]
>>> # Assumes backend is not NCCL
>>> device = torch.device("cpu")
>>> dist.broadcast_object_list(objects, src=0, device=device)
>>> objects
['foo', 12, {1: 2}]
torch.distributed 中。all_reducetensorop=<RedOpType.SUM: 0>group=Noneasync_op=False[来源]

减少所有机器上的张量数据,使所有机器都能获得最终结果。

After the call 在所有进程中都将按位相同。tensor

支持复杂张量。

参数
  • - tensorTensor) - 集合的输入和输出。函数 就地操作。

  • op可选) – 枚举中的值之一。指定用于元素级缩减的操作。torch.distributed.ReduceOp

  • groupProcessGroup可选) – 要处理的流程组。如果为 None,则 将使用默认进程组。

  • async_opbooloptional) – 此运算是否应为异步运算

返回

异步工作句柄(如果 async_op 设置为 True)。 无,如果不是 async_op 或不属于组

例子

>>> # All tensors below are of torch.int64 type.
>>> # We have 2 process groups, 2 ranks.
>>> device = torch.device(f'cuda:{rank}')
>>> tensor = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank
>>> tensor
tensor([1, 2], device='cuda:0') # Rank 0
tensor([3, 4], device='cuda:1') # Rank 1
>>> dist.all_reduce(tensor, op=ReduceOp.SUM)
>>> tensor
tensor([4, 6], device='cuda:0') # Rank 0
tensor([4, 6], device='cuda:1') # Rank 1
>>> # All tensors below are of torch.cfloat type.
>>> # We have 2 process groups, 2 ranks.
>>> tensor = torch.tensor([1+1j, 2+2j], dtype=torch.cfloat, device=device) + 2 * rank * (1+1j)
>>> tensor
tensor([1.+1.j, 2.+2.j], device='cuda:0') # Rank 0
tensor([3.+3.j, 4.+4.j], device='cuda:1') # Rank 1
>>> dist.all_reduce(tensor, op=ReduceOp.SUM)
>>> tensor
tensor([4.+4.j, 6.+6.j], device='cuda:0') # Rank 0
tensor([4.+4.j, 6.+6.j], device='cuda:1') # Rank 1
torch.distributed 中。reducetensordstop=<RedOpType.SUM: 0>group=Noneasync_op=False[来源]

减少所有计算机的张量数据。

只有具有 rank 的进程才会收到最终结果。dst

参数
  • - tensorTensor) - 集合的输入和输出。函数 就地操作。

  • dstint) – 全局进程组上的目标排名(与参数无关)group

  • op可选) – 枚举中的值之一。指定用于元素级缩减的操作。torch.distributed.ReduceOp

  • groupProcessGroup可选) – 要处理的流程组。如果为 None,则 将使用默认进程组。

  • async_opbooloptional) – 此运算是否应为异步运算

返回

异步工作句柄(如果 async_op 设置为 True)。 无,如果不是 async_op 或不属于组

torch.distributed 中。all_gathertensor_list张量=async_op=False[来源]

从列表中的整个组中收集张量。

支持复杂且大小不均匀的张量。

参数
  • tensor_listlist[Tensor]) - 输出列表。它应包含 正确大小的张量,用于 Collective 的输出。 支持大小不均匀的张量。

  • tensorTensor) - 要从当前进程广播的张量。

  • groupProcessGroup可选) – 要处理的流程组。如果为 None,则 将使用默认进程组。

  • async_opbooloptional) – 此运算是否应为异步运算

返回

异步工作句柄(如果 async_op 设置为 True)。 无,如果不是 async_op 或不属于组

例子

>>> # All tensors below are of torch.int64 dtype.
>>> # We have 2 process groups, 2 ranks.
>>> device = torch.device(f'cuda:{rank}')
>>> tensor_list = [torch.zeros(2, dtype=torch.int64, device=device) for _ in range(2)]
>>> tensor_list
[tensor([0, 0], device='cuda:0'), tensor([0, 0], device='cuda:0')] # Rank 0
[tensor([0, 0], device='cuda:1'), tensor([0, 0], device='cuda:1')] # Rank 1
>>> tensor = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank
>>> tensor
tensor([1, 2], device='cuda:0') # Rank 0
tensor([3, 4], device='cuda:1') # Rank 1
>>> dist.all_gather(tensor_list, tensor)
>>> tensor_list
[tensor([1, 2], device='cuda:0'), tensor([3, 4], device='cuda:0')] # Rank 0
[tensor([1, 2], device='cuda:1'), tensor([3, 4], device='cuda:1')] # Rank 1
>>> # All tensors below are of torch.cfloat dtype.
>>> # We have 2 process groups, 2 ranks.
>>> tensor_list = [torch.zeros(2, dtype=torch.cfloat, device=device) for _ in range(2)]
>>> tensor_list
[tensor([0.+0.j, 0.+0.j], device='cuda:0'), tensor([0.+0.j, 0.+0.j], device='cuda:0')] # Rank 0
[tensor([0.+0.j, 0.+0.j], device='cuda:1'), tensor([0.+0.j, 0.+0.j], device='cuda:1')] # Rank 1
>>> tensor = torch.tensor([1+1j, 2+2j], dtype=torch.cfloat, device=device) + 2 * rank * (1+1j)
>>> tensor
tensor([1.+1.j, 2.+2.j], device='cuda:0') # Rank 0
tensor([3.+3.j, 4.+4.j], device='cuda:1') # Rank 1
>>> dist.all_gather(tensor_list, tensor)
>>> tensor_list
[tensor([1.+1.j, 2.+2.j], device='cuda:0'), tensor([3.+3.j, 4.+4.j], device='cuda:0')] # Rank 0
[tensor([1.+1.j, 2.+2.j], device='cuda:1'), tensor([3.+3.j, 4.+4.j], device='cuda:1')] # Rank 1
torch.distributed 中。all_gather_into_tensoroutput_tensorinput_tensorgroup=Noneasync_op=False[来源]

从所有列收集张量,并将它们放在单个输出张量中。

此函数要求每个进程上的所有张量大小相同。

参数
  • output_tensorTensor) – 输出张量以容纳张量元素 来自各个等级。它的大小必须正确,才能具有 以下表格: (i) 沿主 Tensor 的所有输入张量的串联 尺寸;有关“串联”的定义,请参阅; (ii) 沿主维度的所有输入张量的堆栈; 有关 “堆栈” 的定义,请参阅。 以下示例可能更好地解释支持的输出形式。torch.cat()torch.stack()

  • input_tensorTensor) – 要从当前排名中收集的张量。 与 API 不同,此 API 在所有排名中必须具有相同的大小。all_gather

  • groupProcessGroup可选) – 要处理的流程组。如果为 None,则 将使用默认进程组。

  • async_opbooloptional) – 此运算是否应为异步运算

返回

异步工作句柄(如果 async_op 设置为 True)。 无,如果不是 async_op 或不属于组

例子

>>> # All tensors below are of torch.int64 dtype and on CUDA devices.
>>> # We have two ranks.
>>> device = torch.device(f'cuda:{rank}')
>>> tensor_in = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank
>>> tensor_in
tensor([1, 2], device='cuda:0') # Rank 0
tensor([3, 4], device='cuda:1') # Rank 1
>>> # Output in concatenation form
>>> tensor_out = torch.zeros(world_size * 2, dtype=torch.int64, device=device)
>>> dist.all_gather_into_tensor(tensor_out, tensor_in)
>>> tensor_out
tensor([1, 2, 3, 4], device='cuda:0') # Rank 0
tensor([1, 2, 3, 4], device='cuda:1') # Rank 1
>>> # Output in stack form
>>> tensor_out2 = torch.zeros(world_size, 2, dtype=torch.int64, device=device)
>>> dist.all_gather_into_tensor(tensor_out2, tensor_in)
>>> tensor_out2
tensor([[1, 2],
        [3, 4]], device='cuda:0') # Rank 0
tensor([[1, 2],
        [3, 4]], device='cuda:1') # Rank 1

警告

Gloo 后端不支持此 API。

torch.distributed 中。all_gather_objectobject_listobjgroup=None[来源]

将整个组中的可腌制对象收集到一个列表中。

类似,但可以传入 Python 对象。 请注意,对象必须是可腌制的才能被收集。

参数
  • object_listlist[Any]) – 输出列表。它的大小应该正确地调整为 size 的组,并将包含输出。

  • objAny) – 要从当前进程广播的可选取 Python 对象。

  • groupProcessGroup可选) – 要处理的流程组。如果为 None,则 将使用默认进程组。默认值为 。None

返回

没有。如果调用秩是此组的一部分,则 collective 将填充到 input 中。如果 调用 rank 不属于组,传入的 未修改。object_listobject_list

注意

请注意,此 API 与 collective 略有不同,因为它不提供句柄,因此 将是一个阻塞调用。async_op

注意

对于基于 NCCL 的处理组,内部张量表示 的对象必须在通信之前移动到 GPU 设备 地方。在这种情况下,使用的设备由 用户提供,用户有责任 确保此设置,以便每个排名都有一个单独的 GPU(通过 )。torch.cuda.current_device()torch.cuda.set_device()

警告

uses 模块,即 已知不安全。可以构造恶意的 pickle 数据 它将在 unpickling 期间执行任意代码。只调用这个 函数。pickle

警告

不支持使用 GPU 张量进行调用 并且效率低下,因为它会产生 GPU > CPU 传输,因为张量将是 腌。请考虑改用

例::
>>> # Note: Process group initialization omitted on each rank.
>>> import torch.distributed as dist
>>> # Assumes world_size of 3.
>>> gather_objects = ["foo", 12, {1: 2}] # any picklable object
>>> output = [None for _ in gather_objects]
>>> dist.all_gather_object(output, gather_objects[dist.get_rank()])
>>> output
['foo', 12, {1: 2}]
torch.distributed 中。gather张量gather_list=dst=0=async_op=False[来源]

在单个进程中收集张量列表。

此函数要求每个进程上的所有张量大小相同。

参数
  • - tensorTensor) - 输入张量。

  • gather_listlist[Tensor]optional) – 适当的列表, 用于收集的数据的相同大小的张量 (默认值为 None,必须在目标排名中指定)

  • dstintoptional) – 全局进程组上的目标排名 (无论参数如何)。(默认值为 0)group

  • groupProcessGroup可选) – 要处理的流程组。如果为 None,则 将使用默认进程组。

  • async_opbooloptional) – 此运算是否应为异步运算

返回

异步工作句柄(如果 async_op 设置为 True)。 无,如果不是 async_op 或不属于组

torch.distributed 中。gather_objectobjobject_gather_list=Nonedst=0group=None[来源]

在单个进程中从整个组中收集可腌制对象。

类似,但可以传入 Python 对象。请注意, object 必须是可腌制的才能被收集。

参数
  • - objAny) - 输入对象。必须是可腌制的。

  • object_gather_listlist[Any]) – 输出列表。在等级上,它 应正确调整为此的组的大小 collective 并包含输出。必须位于非 DST 上 行列。(默认值为dstNoneNone)

  • dstintoptional) – 全局进程组上的目标排名 (无论参数如何)。(默认值为 0)group

  • group —(ProcessGroup,可选):要处理的流程组。如果为 None,则 将使用默认进程组。默认值为 。None

返回

没有。在排名上,将包含 output 的 Collective 的 output 的 Portfoliodstobject_gather_list

注意

请注意,此 API 与 gather collective 略有不同 因为它不提供async_op句柄,因此会阻塞 叫。

注意

对于基于 NCCL 的处理组,内部张量表示 的对象必须在通信之前移动到 GPU 设备 地方。在这种情况下,使用的设备由 用户提供,用户有责任 确保此设置,以便每个排名都有一个单独的 GPU(通过 )。torch.cuda.current_device()torch.cuda.set_device()

警告

uses 模块,即 已知不安全。可以构造恶意的 pickle 数据 它将在 unpickling 期间执行任意代码。只调用这个 函数。pickle

警告

不支持使用 GPU 张量进行调用 并且效率低下,因为它会产生 GPU > CPU 传输,因为张量将是 腌。请考虑改用

例::
>>> # Note: Process group initialization omitted on each rank.
>>> import torch.distributed as dist
>>> # Assumes world_size of 3.
>>> gather_objects = ["foo", 12, {1: 2}] # any picklable object
>>> output = [None for _ in gather_objects]
>>> dist.gather_object(
...     gather_objects[dist.get_rank()],
...     output if dist.get_rank() == 0 else None,
...     dst=0
... )
>>> # On rank 0
>>> output
['foo', 12, {1: 2}]
torch.distributed 中。scatter张量scatter_list=src=0group=async_op=False[来源]

将张量列表分散到组中的所有进程。

每个进程将只接收一个张量并将其数据存储在参数中。tensor

支持复杂张量。

参数
  • - tensorTensor) - 输出张量。

  • scatter_listlist[Tensor]) – 要分散的张量列表(默认为 None,必须在源排名中指定)

  • srcint) – 全局进程组上的源排名(与参数无关)。 默认值为 0group

  • groupProcessGroup可选) – 要处理的流程组。如果为 None,则 将使用默认进程组。

  • async_opbooloptional) – 此运算是否应为异步运算

返回

异步工作句柄(如果 async_op 设置为 True)。 无,如果不是 async_op 或不属于组

注意

请注意,scatter_list 中的所有 Tensor 必须具有相同的大小。

例::
>>> # Note: Process group initialization omitted on each rank.
>>> import torch.distributed as dist
>>> tensor_size = 2
>>> t_ones = torch.ones(tensor_size)
>>> t_fives = torch.ones(tensor_size) * 5
>>> output_tensor = torch.zeros(tensor_size)
>>> if dist.get_rank() == 0:
>>>     # Assumes world_size of 2.
>>>     # Only tensors, all of which must be the same size.
>>>     scatter_list = [t_ones, t_fives]
>>> else:
>>>     scatter_list = None
>>> dist.scatter(output_tensor, scatter_list, src=0)
>>> # Rank i gets scatter_list[i]. For example, on rank 1:
>>> output_tensor
tensor([5., 5.])
torch.distributed 中。scatter_object_listscatter_object_output_listscatter_object_input_listsrc=0group=None[来源]

将可腌制对象分散到整个组中。scatter_object_input_list

类似,但可以传入 Python 对象。上 每个 rank,scattered 对象将存储为 的第一个元素。请注意,中的所有对象都必须是可腅制的,以便进行分散。scatter_object_output_listscatter_object_input_list

参数
  • scatter_object_output_listList[Any]) – 其第一个 元素将存储分散到此 rank 的对象。

  • scatter_object_input_listList[Any]) – 要散布的输入对象列表。 每个对象都必须是可腌制的。只有等级上的对象才会 是 scattered,并且参数可以是非 src 秩。srcNone

  • srcint) – 要从中分散的源排名。 源排名基于全局进程组(无论参数如何)。scatter_object_input_listgroup

  • group —(ProcessGroup,可选):要处理的流程组。如果为 None,则 将使用默认进程组。默认值为 。None

返回

None.如果 rank 是组的一部分,则将其第一个元素设置为此 rank 的分散对象。scatter_object_output_list

注意

请注意,此 API 与 Scatter Collective 略有不同 因为它不提供句柄,因此将是一个 blocking 调用。async_op

警告

uses 模块,它 已知是不安全的。可以构造恶意的 pickle data 将在 unpickling 期间执行任意代码。只调用这个 函数。pickle

警告

不支持使用 GPU 张量进行调用 并且效率低下,因为它会产生 GPU > CPU 传输,因为张量将是 腌。请考虑改用

例::
>>> # Note: Process group initialization omitted on each rank.
>>> import torch.distributed as dist
>>> if dist.get_rank() == 0:
>>>     # Assumes world_size of 3.
>>>     objects = ["foo", 12, {1: 2}] # any picklable object
>>> else:
>>>     # Can be any list on non-src ranks, elements are not used.
>>>     objects = [None, None, None]
>>> output_list = [None]
>>> dist.scatter_object_list(output_list, objects, src=0)
>>> # Rank i gets objects[i]. For example, on rank 2:
>>> output_list
[{1: 2}]
torch.distributed 中。reduce_scatteroutputinput_listop=<RedOpType.SUM: 0>group=Noneasync_op=False[来源]

Reduce,然后将张量列表分散到组中的所有进程。

参数
  • outputTensor) - 输出张量。

  • input_listlist[Tensor]) - 要减少和分散的张量列表。

  • op可选) – 枚举中的值之一。指定用于元素级缩减的操作。torch.distributed.ReduceOp

  • groupProcessGroup可选) – 要处理的流程组。如果为 None,则 将使用默认进程组。

  • async_opbooloptional) – 此运算是否应为异步运算。

返回

异步工作句柄(如果 async_op 设置为 True)。 如果不是 async_op 或不是组的一部分,则无。

torch.distributed 中。reduce_scatter_tensoroutputinputop=<RedOpType.SUM: 0>group=Noneasync_op=False[来源]

减少张量,然后将张量分散到组中的所有 rank。

参数
  • outputTensor) - 输出张量。它应该具有相同的大小 行列。

  • inputTensor) - 要减少和分散的输入张量。它的大小 应为 output Tensor Size 乘以 World size。输入张量 可以具有以下形状之一: (i) 沿主 Quantum 的 output Tensor 的串联 维度或 (ii) 沿主要维度的输出张量堆栈。 有关“串联”的定义,请参阅。 有关 “堆栈” 的定义,请参阅。torch.cat()torch.stack()

  • groupProcessGroup可选) – 要处理的流程组。如果为 None,则 将使用默认进程组。

  • async_opbooloptional) – 此运算是否应为异步运算。

返回

异步工作句柄(如果 async_op 设置为 True)。 如果不是 async_op 或不是组的一部分,则无。

例子

>>> # All tensors below are of torch.int64 dtype and on CUDA devices.
>>> # We have two ranks.
>>> device = torch.device(f'cuda:{rank}')
>>> tensor_out = torch.zeros(2, dtype=torch.int64, device=device)
>>> # Input in concatenation form
>>> tensor_in = torch.arange(world_size * 2, dtype=torch.int64, device=device)
>>> tensor_in
tensor([0, 1, 2, 3], device='cuda:0') # Rank 0
tensor([0, 1, 2, 3], device='cuda:1') # Rank 1
>>> dist.reduce_scatter_tensor(tensor_out, tensor_in)
>>> tensor_out
tensor([0, 2], device='cuda:0') # Rank 0
tensor([4, 6], device='cuda:1') # Rank 1
>>> # Input in stack form
>>> tensor_in = torch.reshape(tensor_in, (world_size, 2))
>>> tensor_in
tensor([[0, 1],
        [2, 3]], device='cuda:0') # Rank 0
tensor([[0, 1],
        [2, 3]], device='cuda:1') # Rank 1
>>> dist.reduce_scatter_tensor(tensor_out, tensor_in)
>>> tensor_out
tensor([0, 2], device='cuda:0') # Rank 0
tensor([4, 6], device='cuda:1') # Rank 1

警告

Gloo 后端不支持此 API。

torch.distributed 中。all_to_all_singleoutputoutput, inputoutput_split_sizes=input_split_sizes=group=Noneasync_op=False[来源]

拆分输入张量,然后将拆分列表分散到一个组中的所有进程。

稍后,从组中的所有进程中连接接收到的张量 并作为单个输出张量返回。

支持复杂张量。

参数
  • outputTensor) - 收集的串联输出张量。

  • inputTensor) - 要分散的输入张量。

  • output_split_sizes – (list[Int], optional):暗淡 0 的输出分割大小 如果指定 None 或为空,则张量的 dim 0 必须除以 同样由 .outputworld_size

  • input_split_sizes – (list[Int], 可选):输入 dim 0 的拆分大小 如果指定 None 或为空,则张量的 dim 0 必须除以 同样由 .inputworld_size

  • groupProcessGroup可选) – 要处理的流程组。如果为 None,则 将使用默认进程组。

  • async_opbooloptional) – 此运算是否应为异步运算。

返回

异步工作句柄(如果 async_op 设置为 True)。 如果不是 async_op 或不是组的一部分,则无。

警告

all_to_all_single 是实验性的,可能会发生变化。

例子

>>> input = torch.arange(4) + rank * 4
>>> input
tensor([0, 1, 2, 3])     # Rank 0
tensor([4, 5, 6, 7])     # Rank 1
tensor([8, 9, 10, 11])   # Rank 2
tensor([12, 13, 14, 15]) # Rank 3
>>> output = torch.empty([4], dtype=torch.int64)
>>> dist.all_to_all_single(output, input)
>>> output
tensor([0, 4, 8, 12])    # Rank 0
tensor([1, 5, 9, 13])    # Rank 1
tensor([2, 6, 10, 14])   # Rank 2
tensor([3, 7, 11, 15])   # Rank 3
>>> # Essentially, it is similar to following operation:
>>> scatter_list = list(input.chunk(world_size))
>>> gather_list  = list(output.chunk(world_size))
>>> for i in range(world_size):
>>>     dist.scatter(gather_list[i], scatter_list if i == rank else [], src = i)
>>> # Another example with uneven split
>>> input
tensor([0, 1, 2, 3, 4, 5])                                       # Rank 0
tensor([10, 11, 12, 13, 14, 15, 16, 17, 18])                     # Rank 1
tensor([20, 21, 22, 23, 24])                                     # Rank 2
tensor([30, 31, 32, 33, 34, 35, 36])                             # Rank 3
>>> input_splits
[2, 2, 1, 1]                                                     # Rank 0
[3, 2, 2, 2]                                                     # Rank 1
[2, 1, 1, 1]                                                     # Rank 2
[2, 2, 2, 1]                                                     # Rank 3
>>> output_splits
[2, 3, 2, 2]                                                     # Rank 0
[2, 2, 1, 2]                                                     # Rank 1
[1, 2, 1, 2]                                                     # Rank 2
[1, 2, 1, 1]                                                     # Rank 3
>>> output = ...
>>> dist.all_to_all_single(output, input, output_splits, input_splits)
>>> output
tensor([ 0,  1, 10, 11, 12, 20, 21, 30, 31])                     # Rank 0
tensor([ 2,  3, 13, 14, 22, 32, 33])                             # Rank 1
tensor([ 4, 15, 16, 23, 34, 35])                                 # Rank 2
tensor([ 5, 17, 18, 24, 36])                                     # Rank 3
>>> # Another example with tensors of torch.cfloat type.
>>> input = torch.tensor([1+1j, 2+2j, 3+3j, 4+4j], dtype=torch.cfloat) + 4 * rank * (1+1j)
>>> input
tensor([1+1j, 2+2j, 3+3j, 4+4j])                                # Rank 0
tensor([5+5j, 6+6j, 7+7j, 8+8j])                                # Rank 1
tensor([9+9j, 10+10j, 11+11j, 12+12j])                          # Rank 2
tensor([13+13j, 14+14j, 15+15j, 16+16j])                        # Rank 3
>>> output = torch.empty([4], dtype=torch.int64)
>>> dist.all_to_all_single(output, input)
>>> output
tensor([1+1j, 5+5j, 9+9j, 13+13j])                              # Rank 0
tensor([2+2j, 6+6j, 10+10j, 14+14j])                            # Rank 1
tensor([3+3j, 7+7j, 11+11j, 15+15j])                            # Rank 2
tensor([4+4j, 8+8j, 12+12j, 16+16j])                            # Rank 3
torch.distributed 中。all_to_alloutput_tensor_listinput_tensor_listgroup=Noneasync_op=False[来源]

将输入张量列表分散到组中的所有进程,并在输出列表中返回收集的张量列表。

支持复杂张量。

参数
  • output_tensor_listlist[Tensor]) – 要收集的张量列表 每个等级。

  • input_tensor_listlist[Tensor]) - 每个秩要分散一个的张量列表。

  • groupProcessGroup可选) – 要处理的流程组。如果为 None,则 将使用默认进程组。

  • async_opbooloptional) – 此运算是否应为异步运算。

返回

异步工作句柄(如果 async_op 设置为 True)。 如果不是 async_op 或不是组的一部分,则无。

警告

all_to_all 是实验性的,可能会发生变化。

例子

>>> input = torch.arange(4) + rank * 4
>>> input = list(input.chunk(4))
>>> input
[tensor([0]), tensor([1]), tensor([2]), tensor([3])]     # Rank 0
[tensor([4]), tensor([5]), tensor([6]), tensor([7])]     # Rank 1
[tensor([8]), tensor([9]), tensor([10]), tensor([11])]   # Rank 2
[tensor([12]), tensor([13]), tensor([14]), tensor([15])] # Rank 3
>>> output = list(torch.empty([4], dtype=torch.int64).chunk(4))
>>> dist.all_to_all(output, input)
>>> output
[tensor([0]), tensor([4]), tensor([8]), tensor([12])]    # Rank 0
[tensor([1]), tensor([5]), tensor([9]), tensor([13])]    # Rank 1
[tensor([2]), tensor([6]), tensor([10]), tensor([14])]   # Rank 2
[tensor([3]), tensor([7]), tensor([11]), tensor([15])]   # Rank 3
>>> # Essentially, it is similar to following operation:
>>> scatter_list = input
>>> gather_list  = output
>>> for i in range(world_size):
>>>     dist.scatter(gather_list[i], scatter_list if i == rank else [], src=i)
>>> input
tensor([0, 1, 2, 3, 4, 5])                                       # Rank 0
tensor([10, 11, 12, 13, 14, 15, 16, 17, 18])                     # Rank 1
tensor([20, 21, 22, 23, 24])                                     # Rank 2
tensor([30, 31, 32, 33, 34, 35, 36])                             # Rank 3
>>> input_splits
[2, 2, 1, 1]                                                     # Rank 0
[3, 2, 2, 2]                                                     # Rank 1
[2, 1, 1, 1]                                                     # Rank 2
[2, 2, 2, 1]                                                     # Rank 3
>>> output_splits
[2, 3, 2, 2]                                                     # Rank 0
[2, 2, 1, 2]                                                     # Rank 1
[1, 2, 1, 2]                                                     # Rank 2
[1, 2, 1, 1]                                                     # Rank 3
>>> input = list(input.split(input_splits))
>>> input
[tensor([0, 1]), tensor([2, 3]), tensor([4]), tensor([5])]                   # Rank 0
[tensor([10, 11, 12]), tensor([13, 14]), tensor([15, 16]), tensor([17, 18])] # Rank 1
[tensor([20, 21]), tensor([22]), tensor([23]), tensor([24])]                 # Rank 2
[tensor([30, 31]), tensor([32, 33]), tensor([34, 35]), tensor([36])]         # Rank 3
>>> output = ...
>>> dist.all_to_all(output, input)
>>> output
[tensor([0, 1]), tensor([10, 11, 12]), tensor([20, 21]), tensor([30, 31])]   # Rank 0
[tensor([2, 3]), tensor([13, 14]), tensor([22]), tensor([32, 33])]           # Rank 1
[tensor([4]), tensor([15, 16]), tensor([23]), tensor([34, 35])]              # Rank 2
[tensor([5]), tensor([17, 18]), tensor([24]), tensor([36])]                  # Rank 3
>>> # Another example with tensors of torch.cfloat type.
>>> input = torch.tensor([1+1j, 2+2j, 3+3j, 4+4j], dtype=torch.cfloat) + 4 * rank * (1+1j)
>>> input = list(input.chunk(4))
>>> input
[tensor([1+1j]), tensor([2+2j]), tensor([3+3j]), tensor([4+4j])]            # Rank 0
[tensor([5+5j]), tensor([6+6j]), tensor([7+7j]), tensor([8+8j])]            # Rank 1
[tensor([9+9j]), tensor([10+10j]), tensor([11+11j]), tensor([12+12j])]      # Rank 2
[tensor([13+13j]), tensor([14+14j]), tensor([15+15j]), tensor([16+16j])]    # Rank 3
>>> output = list(torch.empty([4], dtype=torch.int64).chunk(4))
>>> dist.all_to_all(output, input)
>>> output
[tensor([1+1j]), tensor([5+5j]), tensor([9+9j]), tensor([13+13j])]          # Rank 0
[tensor([2+2j]), tensor([6+6j]), tensor([10+10j]), tensor([14+14j])]        # Rank 1
[tensor([3+3j]), tensor([7+7j]), tensor([11+11j]), tensor([15+15j])]        # Rank 2
[tensor([4+4j]), tensor([8+8j]), tensor([12+12j]), tensor([16+16j])]        # Rank 3
torch.distributed 中。barriergroup=Noneasync_op=Falsedevice_ids=None[来源]

同步所有进程。

这个 collective 会阻塞进程,直到整个 group 进入这个函数, 如果 async_op 为 False,或者如果在 wait() 上调用异步工作句柄。

参数
  • groupProcessGroup可选) – 要处理的流程组。如果为 None,则 将使用默认进程组。

  • async_opbooloptional) – 此运算是否应为异步运算

  • device_ids[int]optional) – 设备/GPU ID 列表。

返回

异步工作句柄(如果 async_op 设置为 True)。 无,如果不是 async_op 或不属于组

注意

ProcessGroupNCCL 现在依赖于流同步,而不是 device synchronization 来阻止 CPU。因此,请不要假设 barrier() 会执行设备同步。

torch.distributed 中。monitored_barriergroup=Nonetimeout=Nonewait_all_ranks=False[来源]

同步进程类似于 ,但请考虑可配置的超时。torch.distributed.barrier

它能够报告在提供的超时时间内未通过此屏障的排名。 具体来说,对于非零排名,将阻止,直到从排名 0 处理 send/recv。 排名 0 将阻止,直到处理完来自其他排名的所有发送 /recv,并将报告 未能及时响应的 ranks 的失败。请注意,如果一个等级未达到 monitored_barrier(例如由于挂起),所有其他排名都将在 monitored_barrier 中失败。

此 collective 将阻止组中的所有进程/排名,直到 整个 group 成功退出函数,使其可用于调试 和同步。但是,它可能会对性能产生影响,并且只应 用于调试或需要完全同步点的方案 在主机端。出于调试目的,可以插入此屏障 在应用程序的集体调用之前,检查是否有任何秩为 desynchronized。

注意

请注意,此 collective 仅支持 GLOO 后端。

参数
  • groupProcessGroup可选) – 要处理的流程组。如果 ,将使用默认进程组。None

  • timeoutdatetime.timedelta可选) – monitored_barrier的超时。 如果 ,将使用默认的进程组超时。None

  • wait_all_ranksbooloptional) – 是收集所有失败的排名还是 不。默认情况下,这是 and on rank 0 将抛出它遇到的第一个失败的等级以失败 快。通过设置 will 收集所有失败的排名并引发包含信息的错误 关于所有失败的等级。Falsemonitored_barrierwait_all_ranks=Truemonitored_barrier

返回

None.

例::
>>> # Note: Process group initialization omitted on each rank.
>>> import torch.distributed as dist
>>> if dist.get_rank() != 1:
>>>     dist.monitored_barrier() # Raises exception indicating that
>>> # rank 1 did not call into monitored_barrier.
>>> # Example with wait_all_ranks=True
>>> if dist.get_rank() == 0:
>>>     dist.monitored_barrier(wait_all_ranks=True) # Raises exception
>>> # indicating that ranks 1, 2, ... world_size - 1 did not call into
>>> # monitored_barrier.
torch.distributed 中。工作

Work 对象表示 中挂起的异步操作的句柄 PyTorch 的分布式包。它由非阻塞的集合操作返回, 例如 dist.all_reduce(tensor, async_op=True)。

torch.distributed 中。ReduceOp (减少操作)

可用归约运算的类似枚举的类:、、、、 和 。SUMPRODUCTMINMAXBANDBORBXORPREMUL_SUM

BAND、 和 减少 在以下情况下不可用 使用后端。BORBXORNCCL

AVG将值除以世界大小,然后在各个等级之间求和。 仅适用于后端, 并且仅适用于 NCCL 版本 2.10 或更高版本。AVGNCCL

PREMUL_SUM在归约之前将输入局部乘以给定的标量。 仅适用于后端, 并且仅适用于 NCCL 版本 2.11 或更高版本。用户应该 用。PREMUL_SUMNCCLtorch.distributed._make_nccl_premul_sum

此外,复杂张量不支持 和 。MAXMINPRODUCT

此类的值可以作为属性访问,例如 . 它们用于指定归约集合的策略,例如 .ReduceOp.SUM

此类不支持 property。__members__

torch.distributed 中。reduce_op

已弃用的类似 enum 的类 reduction 操作:、、 和 。SUMPRODUCTMINMAX

建议改用。

分析 Collective 通信

请注意,您可以使用 (推荐,仅在 1.8.1 之后可用) 或分析此处提到的集体通信和点对点通信 API。支持所有开箱即用的后端 (、 、 ),并且集体通信使用将在分析输出/跟踪中按预期呈现。分析代码与任何常规 torch 操作符相同:torch.profilertorch.autograd.profilerglooncclmpi

import torch
import torch.distributed as dist
with torch.profiler():
    tensor = torch.randn(20, 10)
    dist.all_reduce(tensor)

请参阅 Profiler 文档,以全面了解 Profiler 功能。

多 GPU 集合函数

警告

多 GPU 函数(代表每个 CPU 线程多个 GPU)是 荒废的。截至今天,PyTorch Distributed 的首选编程模型 是每个线程一个设备,如本文档中的 API 所示。如果 您是后端开发人员,希望每个线程支持多个设备, 请联系 PyTorch Distributed 的维护者。

第三方后端

除了内置的 GLOO/MPI/NCCL 后端外,PyTorch 分布式还支持 第三方后端。 有关如何通过 C++ Extension 开发第三方后端的参考, 请参考 教程 - 自定义 C++ 和 CUDA 扩展 和 。第三方的能力 backend 由它们自己的 implementations 决定。test/cpp_extensions/cpp_c10d_extension.cpp

新后端派生自并注册后端 name 和实例化接口。c10d::ProcessGroup

当手动导入此后端并使用相应的后端名称调用时,软件包将在 新的后端。torch.distributed

警告

对第三方后端的支持是实验性的,可能会发生变化。

Launch 实用程序

torch.distributed 包还在 torch.distributed.launch 中提供了启动实用程序。此帮助程序实用程序可用于启动 每个节点有多个进程,用于分布式训练。

模块。torch.distributed.launch

torch.distributed.launch是一个产生多个分布式 每个训练节点上的训练过程。

警告

此模块将被弃用,取而代之的是 torchrun

该实用程序可用于单节点分布式训练,其中 1 个 或 每个节点将生成更多进程。该实用程序可用于 CPU 训练或 GPU 训练。如果该实用程序用于 GPU 训练, 每个分布式进程都将在单个 GPU 上运行。这可以实现 大幅提升单节点训练性能。它也可以用于 多节点分布式训练,在每个节点上生成多个进程 还可以很好地提高多节点分布式训练性能。 这对于具有多个 Infiniband 的系统尤其有益 具有直接 GPU 支持的接口,因为它们都可用于 聚合通信带宽。

在单节点分布式训练或多节点分布式两种情况下 training 中,此实用程序将为每个节点启动给定数量的进程 ().如果用于 GPU 训练,则此数字需要更小 或等于当前系统上的 GPU 数量 (), 并且每个进程都将在从 GPU 0 到 GPU (nproc_per_node - 1) 的 GPU ( - 1)。--nproc-per-nodenproc_per_node

如何使用此模块:

  1. 单节点多进程分布式训练

python -m torch.distributed.launch --nproc-per-node=NUM_GPUS_YOU_HAVE
           YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3 and all other
           arguments of your training script)
  1. 多节点多进程分布式训练:(例如两个节点)

节点 1:(IP:192.168.1.1,具有空闲端口:1234)

python -m torch.distributed.launch --nproc-per-node=NUM_GPUS_YOU_HAVE
           --nnodes=2 --node-rank=0 --master-addr="192.168.1.1"
           --master-port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
           and all other arguments of your training script)

节点 2:

python -m torch.distributed.launch --nproc-per-node=NUM_GPUS_YOU_HAVE
           --nnodes=2 --node-rank=1 --master-addr="192.168.1.1"
           --master-port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
           and all other arguments of your training script)
  1. 要查找此模块提供的可选参数:

python -m torch.distributed.launch --help

重要通知:

1. 本实用程序和多进程分布式(单节点或 多节点)GPU 训练目前只能使用 NCCL 分布式后端。因此,NCCL 后端是推荐的后端 用于 GPU 训练。

2. 在您的训练程序中,您必须解析命令行参数:,该参数将由本模块提供。 如果您的训练程序使用 GPU,则应确保仅 运行在 LOCAL_PROCESS_RANK 的 GPU 设备上。这可以通过以下方式完成:--local-rank=LOCAL_PROCESS_RANK

解析 local_rank 参数

>>> import argparse
>>> parser = argparse.ArgumentParser()
>>> parser.add_argument("--local-rank", "--local_rank", type=int)
>>> args = parser.parse_args()

使用 either 将设备设置为本地排名

>>> torch.cuda.set_device(args.local_rank)  # before your code runs

>>> with torch.cuda.device(args.local_rank):
>>>    # your code to run
>>>    ...

在 2.0.0 版本发生变更: 启动器会将参数传递给您的脚本。 从 PyTorch 2.0.0 开始,虚线优先于 以前使用的 under划线 。--local-rank=<rank>--local-rank--local_rank

为了实现向后兼容性,用户可能需要同时处理 cases 的参数解析代码中。这意味着在参数解析器中同时包含 and。如果只有 ,启动器将触发错误:“error: unrecognized arguments: –local-rank=<rank>”。对于仅支持 PyTorch 2.0.0+ 的训练代码, 包括应该就足够了。"--local-rank""--local_rank""--local_rank""--local-rank"

3. 在您的训练程序中,您应该调用以下函数 在开始时启动分布式后端。强烈建议 那。其他 init 方法(例如 )可能有效, 但是是本模块正式支持的那个。init_method=env://tcp://env://

>>> torch.distributed.init_process_group(backend='YOUR BACKEND',
>>>                                      init_method='env://')

4. 在您的训练计划中,您可以使用常规分布式函数 或使用 module。如果您的 training program 使用 GPU 进行训练,并且您希望使用 module, 以下是配置方法。

>>> model = torch.nn.parallel.DistributedDataParallel(model,
>>>                                                   device_ids=[args.local_rank],
>>>                                                   output_device=args.local_rank)

请确保将参数设置为唯一的 GPU 设备 ID 您的代码将在其上运行。这通常是 过程。换句话说,需要是 , 并且需要 be 才能使用它 效用device_idsdevice_ids[args.local_rank]output_deviceargs.local_rank

5. 另一种通过环境变量传递给子进程的方法。当您使用 .您必须调整上面的 subprocess 示例以替换为 ;启动器 不会通过指定此标志。local_rankLOCAL_RANK--use-env=Trueargs.local_rankos.environ['LOCAL_RANK']--local-rank

警告

local_rank不是全局唯一的:它仅每个进程唯一 在计算机上。因此,不要用它来决定是否应该这样做,例如, 写入网络文件系统。请参阅 https://github.com/pytorch/pytorch/issues/12042 的示例 如果你没有正确地做到这一点,事情会怎么出错。

生成实用程序

Multiprocessing 包 - torch.multiprocessing 包还在 .此辅助函数 可用于生成多个进程。它的工作原理是将 函数,并生成 N 个进程来运行它。这 也可用于多进程分布式训练。spawn

有关如何使用它的参考,请参考 PyTorch 示例 - ImageNet 实现

请注意,此功能需要 Python 3.4 或更高版本。

调试应用程序torch.distributed

调试分布式应用程序可能具有挑战性,因为难以理解挂起、崩溃或跨等级的行为不一致。 提供 一套工具,可帮助以自助方式调试训练应用程序:torch.distributed

Python 断点

在分布式环境中使用 python 的调试器非常方便,但由于它不是开箱即用的,很多人根本不使用它。 PyTorch 提供了围绕 pdb 的自定义包装器,可简化流程。

torch.distributed.breakpoint 使此过程变得简单。在内部,它以两种方式自定义 pdb 的断点行为,但其他方面的行为与普通 pdb 相同。 1. 仅在一个等级 (由用户指定) 上附加调试器。 2. 通过使用 torch.distributed.barrier() 确保所有其他 rank 停止,一旦调试的 rank 发出 continue 3.从子进程重新路由 stdin,使其连接到您的终端。

要使用它,只需在所有 ranks上发出 torch.distributed.breakpoint(rank),在每种情况下对 rank 使用相同的值。

监控屏障

从 v1.10 开始,作为 which 的替代方案存在,其中包含有关哪个等级可能有问题的有用信息 崩溃时,即并非所有 rank 都在提供的 timeout 内调用 in实现主机端 barrier using / communication 原语在类似于确认的过程中,允许排名 0 报告哪些排名未能确认 时间的障碍。例如,考虑以下函数,其中 rank 1 无法调用 into (实际上这可能是由于 添加到应用程序 bug 或挂起在上一个集合中):sendrecv

import os
from datetime import timedelta

import torch
import torch.distributed as dist
import torch.multiprocessing as mp


def worker(rank):
    dist.init_process_group("nccl", rank=rank, world_size=2)
    # monitored barrier requires gloo process group to perform host-side sync.
    group_gloo = dist.new_group(backend="gloo")
    if rank not in [1]:
        dist.monitored_barrier(group=group_gloo, timeout=timedelta(seconds=2))


if __name__ == "__main__":
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "29501"
    mp.spawn(worker, nprocs=2, args=())

排名 0 时生成以下错误消息,允许用户确定哪些排名可能有问题并进一步调查:

RuntimeError: Rank 1 failed to pass monitoredBarrier in 2000 ms
 Original exception:
[gloo/transport/tcp/pair.cc:598] Connection closed by peer [2401:db00:eef0:1100:3560:0:1c05:25d]:8594

TORCH_DISTRIBUTED_DEBUG

使用 ,环境变量可用于触发其他有用的日志记录和集体同步检查,以确保所有排名 已适当同步。 可以设置为 (default)、 或 取决于调试级别 必填。请注意,最详细的选项可能会影响应用程序性能,因此只能在调试问题时使用。TORCH_CPP_LOG_LEVEL=INFOTORCH_DISTRIBUTED_DEBUGTORCH_DISTRIBUTED_DEBUGOFFINFODETAILDETAIL

设置将导致在初始化用于训练的模型时产生额外的调试日志记录,并且还会记录选定迭代次数的运行时性能统计信息。这些运行时统计信息 包括前进时间、后退时间、梯度通信时间等数据。例如,给定以下应用程序:TORCH_DISTRIBUTED_DEBUG=INFOTORCH_DISTRIBUTED_DEBUG=DETAIL

import os

import torch
import torch.distributed as dist
import torch.multiprocessing as mp


class TwoLinLayerNet(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.a = torch.nn.Linear(10, 10, bias=False)
        self.b = torch.nn.Linear(10, 1, bias=False)

    def forward(self, x):
        a = self.a(x)
        b = self.b(x)
        return (a, b)


def worker(rank):
    dist.init_process_group("nccl", rank=rank, world_size=2)
    torch.cuda.set_device(rank)
    print("init model")
    model = TwoLinLayerNet().cuda()
    print("init ddp")
    ddp_model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[rank])

    inp = torch.randn(10, 10).cuda()
    print("train")

    for _ in range(20):
        output = ddp_model(inp)
        loss = output[0] + output[1]
        loss.sum().backward()


if __name__ == "__main__":
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "29501"
    os.environ["TORCH_CPP_LOG_LEVEL"]="INFO"
    os.environ[
        "TORCH_DISTRIBUTED_DEBUG"
    ] = "DETAIL"  # set to DETAIL for runtime logging.
    mp.spawn(worker, nprocs=2, args=())

初始化时呈现以下日志:

I0607 16:10:35.739390 515217 logger.cpp:173] [Rank 0]: DDP Initialized with:
broadcast_buffers: 1
bucket_cap_bytes: 26214400
find_unused_parameters: 0
gradient_as_bucket_view: 0
is_multi_device_module: 0
iteration: 0
num_parameter_tensors: 2
output_device: 0
rank: 0
total_parameter_size_bytes: 440
world_size: 2
backend_name: nccl
bucket_sizes: 440
cuda_visible_devices: N/A
device_ids: 0
dtypes: float
master_addr: localhost
master_port: 29501
module_name: TwoLinLayerNet
nccl_async_error_handling: N/A
nccl_blocking_wait: N/A
nccl_debug: WARN
nccl_ib_timeout: N/A
nccl_nthreads: N/A
nccl_socket_ifname: N/A
torch_distributed_debug: INFO

以下日志在运行时(设置时)呈现:TORCH_DISTRIBUTED_DEBUG=DETAIL

I0607 16:18:58.085681 544067 logger.cpp:344] [Rank 1 / 2] Training TwoLinLayerNet unused_parameter_size=0
 Avg forward compute time: 40838608
 Avg backward compute time: 5983335
Avg backward comm. time: 4326421
 Avg backward comm/comp overlap time: 4207652
I0607 16:18:58.085693 544066 logger.cpp:344] [Rank 0 / 2] Training TwoLinLayerNet unused_parameter_size=0
 Avg forward compute time: 42850427
 Avg backward compute time: 3885553
Avg backward comm. time: 2357981
 Avg backward comm/comp overlap time: 2234674

此外,还增强了由于模型中未使用的参数而导致的崩溃日志记录。目前,如果正向传递中存在可能未使用的参数,则必须将其传递到初始化中,并且从 v1.10 开始,所有模型输出都是必需的 用于损失计算,因为不支持 backwards pass 中未使用的参数。这些限制尤其具有挑战性,尤其是对于较大的 models,因此,当崩溃出现错误时,将记录所有未使用的参数的完全限定名称。例如,在上面的应用程序中, 如果我们修改为改为 计算为 ,则在向后传递中不会接收梯度,并且 因此会导致失败。在崩溃时,用户会传递有关未使用的参数的信息,这对于大型模型来说可能很难手动查找:TORCH_DISTRIBUTED_DEBUG=INFOfind_unused_parameters=Truelossloss = output[1]TwoLinLayerNet.aDDP

RuntimeError: Expected to have finished reduction in the prior iteration before starting a new one. This error indicates that your module has parameters that were not used in producing loss. You can enable unused parameter detection by passing
 the keyword argument `find_unused_parameters=True` to `torch.nn.parallel.DistributedDataParallel`, and by
making sure all `forward` function outputs participate in calculating loss.
If you already have done the above, then the distributed data parallel module wasn't able to locate the output tensors in the return value of your module's `forward` function. Please include the loss function and the structure of the return va
lue of `forward` of your module when reporting this issue (e.g. list, dict, iterable).
Parameters which did not receive grad for rank 0: a.weight
Parameter indices which did not receive grad for rank 0: 0

设置将在用户发出的每个集体调用上触发额外的一致性和同步检查 直接或间接(例如 DDP)。这是通过创建一个包装器进程组来完成的,该包装器进程组包装 和 API 返回的所有进程组。因此,这些 API 将返回一个包装器进程组,该组可以像常规进程一样使用 组,但在将集合体分派到底层进程组之前执行一致性检查。目前,这些检查包括 、 这可确保所有等级完成其出色的集体呼叫并报告卡住的等级。接下来,通过以下方式检查 Collective 本身的一致性 确保所有集合函数都匹配并以一致的张量形状调用。如果不是这种情况,则在 应用程序崩溃,而不是挂起或无信息性的错误消息。例如,请考虑以下函数,该函数将不匹配的输入形状转换为 TORCH_DISTRIBUTED_DEBUG=DETAILallreduce

import torch
import torch.distributed as dist
import torch.multiprocessing as mp


def worker(rank):
    dist.init_process_group("nccl", rank=rank, world_size=2)
    torch.cuda.set_device(rank)
    tensor = torch.randn(10 if rank == 0 else 20).cuda()
    dist.all_reduce(tensor)
    torch.cuda.synchronize(device=rank)


if __name__ == "__main__":
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "29501"
    os.environ["TORCH_CPP_LOG_LEVEL"]="INFO"
    os.environ["TORCH_DISTRIBUTED_DEBUG"] = "DETAIL"
    mp.spawn(worker, nprocs=2, args=())

对于后端,此类应用程序可能会导致挂起,在重要情况下,这可能很难找到根本原因。如果用户启用并重新运行应用程序,则以下错误消息将揭示根本原因:NCCLTORCH_DISTRIBUTED_DEBUG=DETAIL

work = default_pg.allreduce([tensor], opts)
RuntimeError: Error when verifying shape tensors for collective ALLREDUCE on rank 0. This likely indicates that input shapes into the collective are mismatched across ranks. Got shapes:  10
[ torch.LongTensor{1} ]

注意

为了在运行时对调试级别进行精细控制,也可以使用函数 , , 和 。torch.distributed.set_debug_level()torch.distributed.set_debug_level_from_env()torch.distributed.get_debug_level()

此外, TORCH_DISTRIBUTED_DEBUG=DETAIL 可以与 TORCH_SHOW_CPP_STACKTRACES=1 结合使用,以便在检测到集体不同步时记录整个调用堆栈。这些 集体反同步检查将适用于所有使用集体调用的应用程序,这些调用由通过 API 创建的进程组提供支持。c10d

伐木

除了通过 和 提供显式调试支持外,底层 C++ 库还会输出 log 消息。这些消息有助于了解分布式训练作业的执行状态,并排查网络连接故障等问题。这 以下 Matrix 显示了如何通过 和 环境变量的组合来调整日志级别。TORCH_DISTRIBUTED_DEBUGtorch.distributedTORCH_CPP_LOG_LEVELTORCH_DISTRIBUTED_DEBUG

TORCH_CPP_LOG_LEVEL

TORCH_DISTRIBUTED_DEBUG

有效日志级别

ERROR

忽视

错误

WARNING

忽视

警告

INFO

忽视

信息

INFO

INFO

调试

INFO

DETAIL

Trace (又名全部)

分布式组件会引发从 RuntimeError 派生的自定义 Exception 类型:

  • torch.distributed.DistError:这是所有分布式异常的基本类型。

  • torch.distributed.DistBackendError:当发生特定于后端的错误时,会引发此异常。例如,如果 使用 NCCL 后端,并且用户尝试使用 NCCL 库不可用的 GPU。

  • torch.distributed.DistNetworkError:联网时抛出此异常 库遇到错误(例如:对等方重置连接)

  • torch.distributed.DistStoreError:当 Store 遇到 错误(例如:TCPStore 超时)

torch.distributed 中。DistError

分布式库中发生错误时引发异常

torch.distributed 中。DistBackendError

分布式中发生后端错误时引发异常

torch.distributed 中。DistNetwork错误

分布式中发生网络错误时引发异常

torch.distributed 中。DistStoreError

分布式存储中发生错误时引发异常

如果您正在运行单节点训练,则以交互方式断点脚本可能会很方便。我们提供了一种方便地断点单个秩的方法:

torch.distributed 中。breakpointrank=0skip=0[来源]

设置断点,但仅限于单个等级。所有其他等级将等待您 在继续之前完成断点。

参数
  • rankint) – 要突破的等级。违约:0

  • skipint) – 跳过对此断点的第一次调用。违约:。skip0

文档

访问 PyTorch 的全面开发人员文档

查看文档

教程

获取面向初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并解答您的问题

查看资源