目录

分布式 RPC 框架

分布式 RPC 框架为多机模型提供了机制 通过一组原语进行训练以允许远程通信,以及一个 更高级别的 API,可自动区分拆分为多个的模型 机器。

警告

RPC 包中的 API 是稳定的。有多个正在进行的工作项 提高性能和错误处理,这将在未来版本中提供。

警告

CUDA 支持是在 PyTorch 1.9 中引入的,目前仍是一项测试版功能。 并非 RPC 包的所有功能都与 CUDA 支持兼容,并且 因此不鼓励使用它们。这些不支持的功能包括:RRefs、 JIT 兼容性、dist autograd 和 dist 优化器以及分析。这些 缺点将在将来的版本中得到解决。

注意

请参考 PyTorch 分布式概述,简要介绍与分布式训练相关的所有功能。

基本

分布式 RPC 框架使远程运行函数变得容易,支持 引用远程对象而不复制实际数据,并提供 autograd 和 optimizer API,以透明方式向后运行和更新参数 跨 RPC 边界。这些功能可以分为四组 API。

  1. 远程过程调用 (RPC) 支持在指定的 目标 worker 并取回返回值 或创建对返回值的引用。有三个主要的 RPC API: (同步)、 (异步)和 (异步,并返回引用 添加到远程返回值)。如果用户代码无法 继续而不返回值。否则,请使用异步 API 获取 一个 future,并在 访客。当 要求是远程创建一些东西,但永远不需要将其获取到 调用方。想象一下驱动程序进程正在设置参数的情况 服务器和教练。驱动程序可以在 parameter server 的 URL,然后将对嵌入表的引用与 trainer 的 Interface,但它本身永远不会在本地使用 embedding 表。在这种情况下, 它们不再适用,因为它们 always 暗示返回值将返回给调用方 立即或将来。

  2. 远程引用 (RRef) 充当指向本地 或 remote 对象。它可以与其他 worker 共享并进行引用计数 将透明地处理。每个 RRef 只有一个 owner 和 object 仅存在于该所有者身上。持有 RRef 的非所有者工作线程可以获取 对象。这在以下情况下很有用 worker 需要访问一些 data 对象,但其自身也不是创建者 (的调用方)或 对象。正如我们将在下面讨论的那样,分布式优化器就是一个例子 此类用例。

  3. 分布式 Autograd 将 参与 forward pass 的 worker,并自动联系他们 在向后传递期间计算梯度。如果满足以下条件,这将特别有用 传导时,前向通道需要跨越多台机器,例如, 分布式模型并行训练、参数服务器训练等。跟 此功能,用户代码不再需要担心如何发送渐变 跨 RPC 边界以及本地 autograd 引擎应按什么顺序 启动,这可能会变得非常复杂,其中有嵌套和 forward pass 中相互依赖的 RPC 调用。

  4. Distributed Optimizer 的构造函数接受一个(例如、、 等)和一个参数 RRef 列表,在每个不同的 RRef 拥有者上创建一个实例,并且 在运行 时相应地更新参数。当您拥有 分布的向前和向后传递、参数和渐变将是 分散在多个 worker 中,因此它需要在每个 worker 上都有一个优化器 的涉事工人。Distributed Optimizer 将所有这些本地 optimizer 合并为一个,并提供简洁的构造函数和 API。step()step()

RPC

在使用 RPC 和分布式 autograd 原语之前,初始化必须采用 地方。要初始化 RPC 框架,我们需要使用 that will initialize RPC 框架、RRef 框架和分布式 autograd。

torch.distributed.rpc 的init_rpcnamebackend=Nonerank=-1world_size=Nonerpc_backend_options=None[来源]

初始化 RPC 基元,例如本地 RPC 代理 和分布式 autograd,它立即使当前的 进程准备好发送和接收 RPC。

参数
  • namestr) – 此节点的全局唯一名称。(例如, ) 名称只能包含数字、字母、下划线、冒号、 和/或短划线,并且必须短于 128 个字符。Trainer3ParameterServer2MasterWorker1

  • backendBackendType可选) – RPC 后端的类型 实现。支持的值为 (默认值)。 有关更多信息,请参阅后端BackendType.TENSORPIPE

  • rankint) – 此节点的全局唯一 ID/rank。

  • world_sizeint) – 组中的工作线程数量。

  • rpc_backend_optionsRpcBackendOptions可选) – 选项 传递给 RpcAgent 构造函数。它必须是特定于代理的 子类 和 包含特定于代理的初始化配置。由 default,则对于所有代理,它将默认超时设置为 60 秒,并使用底层进程执行 Rendezvous 使用 、 初始化的组 意味着 environment variables 和 需要正确设置。有关更多信息,请参见后端并查找选项 可用。init_method = "env://"MASTER_ADDRMASTER_PORT

以下 API 允许用户远程执行函数以及创建 引用 (RRef) 到远程数据对象。在这些 API 中,当将 a 作为参数或返回值传递时,目标 worker 将尝试 创建具有相同 meta 的 A(即 shape、stride 等)。我们 故意禁止传输 CUDA 张量,因为如果 Source 和 Destination Worker 上的 device 列表不匹配。在这种情况下, 应用程序始终可以将输入张量显式移动到调用方的 CPU 并将其移动到被调用方上所需的设备(如有必要)。TensorTensor

警告

RPC 中的 TorchScript 支持是一项原型功能,可能会发生更改。因为 v1.5.0,支持以 RPC 目标函数,这将有助于提高被调用方的并行度 side 执行 TorchScript 函数不需要 GIL。torch.distributed.rpc

torch.distributed.rpc 的rpc_synctofuncargs=Nonekwargs=Nonetimeout=-1.0[来源]

进行阻塞 RPC 调用以在 worker 上运行函数。RPC 消息的发送和接收与 Python 代码的执行并行。这 method 是线程安全的。functo

参数
  • tostr or WorkerInfo or int) – 目标 worker 的名称/rank/。WorkerInfo

  • funcCallable) – 一个可调用的函数,比如 Python callables,内置 运算符(例如 )和 annotated TorchScript 函数。

  • argstuple) – 调用的参数元组。func

  • kwargsdict) – 是调用的关键字参数字典。func

  • timeoutfloatoptional) – 用于此 RPC 的超时(以秒为单位)。如果 RPC 未以 time,则表示它已 timed out 将引发。值为 0 表示无限超时,即超时 error 永远不会引发。如果未提供,则 初始化期间设置的默认值 或 与 是 使用。_set_rpc_timeout

返回

返回使用 和 运行的结果。funcargskwargs

例::

确保 和 设置正确 在两个工人身上。有关更多详细信息,请参阅 API。例如MASTER_ADDRMASTER_PORT

导出 MASTER_ADDR=localhost 导出 MASTER_PORT=5678

然后在两个不同的进程中运行以下代码:

>>> # On worker 0:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> ret = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(2), 3))
>>> rpc.shutdown()
>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()

以下是使用 RPC 运行 TorchScript 函数的示例。

>>> # On both workers:
>>> @torch.jit.script
>>> def my_script_add(tensor: torch.Tensor, scalar: int):
>>>    return torch.add(tensor, scalar)
>>> # On worker 0:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> ret = rpc.rpc_sync("worker1", my_script_add, args=(torch.ones(2), 3))
>>> rpc.shutdown()
>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()
torch.distributed.rpc 的rpc_asynctofuncargs=Nonekwargs=Nonetimeout=-1.0[来源]

进行非阻塞 RPC 调用以在 worker 上运行函数。RPC 消息的发送和接收与 Python 代码的执行并行。这 method 是线程安全的。此方法将立即返回一个可以等待的 a。functo

参数
  • tostr or WorkerInfo or int) – 目标 worker 的名称/rank/。WorkerInfo

  • funcCallable) – 一个可调用的函数,比如 Python callables,内置 运算符(例如 )和 annotated TorchScript 函数。

  • argstuple) – 调用的参数元组。func

  • kwargsdict) – 是调用的关键字参数字典。func

  • timeoutfloatoptional) – 用于此 RPC 的超时(以秒为单位)。如果 RPC 未以 time,则表示它已 timed out 将引发。值为 0 表示无限超时,即超时 error 永远不会引发。如果未提供,则 初始化期间设置的默认值 或 与 是 使用。_set_rpc_timeout

返回

返回一个可以等待的对象 上。完成后,可以从对象中检索 on 和 的返回值。funcargskwargs

警告

使用 GPU 张量作为参数或返回值 is not 支持,因为我们不支持通过网络发送 GPU 张量。你 在将 GPU 张量用作 CPU 之前,需要将其显式复制到 CPU 参数或返回值 .funcfunc

警告

API 不会复制参数张量的存储,直到 通过网络发送它们,这可以通过不同的线程来完成 取决于 RPC 后端类型。调用方应确保 这些张量的内容保持不变,直到返回的 Zhangs 完成。rpc_async

例::

确保 和 设置正确 在两个工人身上。有关更多详细信息,请参阅 API。例如MASTER_ADDRMASTER_PORT

导出 MASTER_ADDR=localhost 导出 MASTER_PORT=5678

然后在两个不同的进程中运行以下代码:

>>> # On worker 0:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> fut1 = rpc.rpc_async("worker1", torch.add, args=(torch.ones(2), 3))
>>> fut2 = rpc.rpc_async("worker1", min, args=(1, 2))
>>> result = fut1.wait() + fut2.wait()
>>> rpc.shutdown()
>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()

以下是使用 RPC 运行 TorchScript 函数的示例。

>>> # On both workers:
>>> @torch.jit.script
>>> def my_script_add(tensor: torch.Tensor, scalar: int):
>>>    return torch.add(tensor, scalar)
>>> # On worker 0:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> fut = rpc.rpc_async("worker1", my_script_add, args=(torch.ones(2), 3))
>>> ret = fut.wait()
>>> rpc.shutdown()
>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()
torch.distributed.rpc 的remotetofuncargs=Nonekwargs=Nonetimeout=-1.0[来源]

进行远程调用以在 worker 上运行并立即返回 to 结果值。 Worker 将是返回的 的 owner ,并且调用 worker 的 是 一个用户。所有者管理其 的全局引用计数,并且所有者仅在全局存在时被销毁 没有活生生的参考资料。functoRReftoRRefremoteRRefRRef

参数
  • tostr or WorkerInfo or int) – 目标 worker 的名称/rank/。WorkerInfo

  • funcCallable) – 一个可调用的函数,比如 Python callables,内置 运算符(例如 )和 annotated TorchScript 函数。

  • argstuple) – 调用的参数元组。func

  • kwargsdict) – 是调用的关键字参数字典。func

  • timeoutfloat, optional) (超时,浮点数,可选) – 此远程调用的超时时间(以秒为单位)。如果 在 worker 上创建 this 未成功处理 this worker 在此超时时间内,则下次 尝试使用 RRef(例如 ),将引发超时 指示此失败。值为 0 表示 无限超时,即超时错误将 永远不要被抚养。如果未提供,则默认的 值 set during initialization 或 with 。RReftoto_here()_set_rpc_timeout

返回

结果的用户实例 价值。使用阻止 API 在本地检索结果值。RReftorch.distributed.rpc.RRef.to_here()

警告

API 不会复制参数张量的存储,直到 通过网络发送它们,这可以通过不同的线程来完成 取决于 RPC 后端类型。调用方应确保 这些张量的内容保持不变,直到返回的 RRef 为 由所有者确认,可以使用 API 进行检查。remotetorch.distributed.rpc.RRef.confirmed_by_owner()

警告

API 超时等错误在 尽力而为。这意味着,当远程调用失败时(例如出现超时错误),我们会尽最大努力 错误处理方法。这意味着错误会被处理和设置 在生成的 RRef 上异步。如果 RRef 尚未 由应用程序在此处理之前使用(例如 或 fork 调用),则将来使用的 Will 会适当地引发 错误。但是,用户应用程序可能会在处理错误之前使用 。在这种情况下,错误可能不会 由于尚未处理而提高。remoteremoteto_hereRRefRRef

例:

Make sure that ``MASTER_ADDR`` and ``MASTER_PORT`` are set properly
on both workers. Refer to :meth:`~torch.distributed.init_process_group`
API for more details. For example,

export MASTER_ADDR=localhost
export MASTER_PORT=5678

Then run the following code in two different processes:

>>> # On worker 0:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> rref1 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3))
>>> rref2 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 1))
>>> x = rref1.to_here() + rref2.to_here()
>>> rpc.shutdown()

>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()

Below is an example of running a TorchScript function using RPC.

>>> # On both workers:
>>> @torch.jit.script
>>> def my_script_add(tensor: torch.Tensor, scalar: int):
>>>    return torch.add(tensor, scalar)

>>> # On worker 0:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> rref = rpc.remote("worker1", my_script_add, args=(torch.ones(2), 3))
>>> rref.to_here()
>>> rpc.shutdown()

>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()
torch.distributed.rpc 的get_worker_infoworker_name=[来源]

get 给定的 worker 名称。 使用此函数可避免将 expensive 字符串。

参数

worker_namestr) – 工作程序的字符串名称。如果 ,则返回 当前 worker 的 ID。(默认NoneNone)

返回

实例 当前工作线程 (如果为 )。worker_nameworker_nameNone

torch.distributed.rpc 的shutdowngraceful=Truetimeout=0[来源]

关闭 RPC 代理,然后销毁 RPC 代理。这 阻止本地代理接受未完成的请求,并关闭 通过终止所有 RPC 线程来关闭 RPC 框架。如果 这将阻塞,直到所有本地和远程 RPC 进程都到达此方法 并等待所有未完成的工作完成。否则,如果 ,则为本地关闭,并且不会等待其他 RPC 进程来访问此方法。graceful=Truegraceful=False

警告

对于 返回的对象,不应 在 之后调用 。future.wait()shutdown()

参数

gracefulbool) – 是否执行正常关闭。如果为 True,则 这将 1) 等待,直到没有待处理的系统 消息并删除它们;2) 块 直到所有本地和远程 RPC 进程都已到达 此方法并等待所有未完成的工作 完成。UserRRefs

例::

确保 和 设置正确 在两个工人身上。有关更多详细信息,请参阅 API。例如MASTER_ADDRMASTER_PORT

导出 MASTER_ADDR=localhost 导出 MASTER_PORT=5678

然后在两个不同的进程中运行以下代码:

>>> # On worker 0:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> # do some work
>>> result = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(1), 1))
>>> # ready to shutdown
>>> rpc.shutdown()
>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> # wait for worker 0 to finish work, and then shutdown.
>>> rpc.shutdown()
torch.distributed.rpc 中。工人信息

一种在系统中封装 worker 信息的结构。 包含工作人员的名称和 ID。此类并非旨在 直接构造,而是可以检索实例 through 和 result 可以传递给函数以避免在 每次调用。

住宿 ID

用于标识工作程序的全局唯一 ID。

属性名称

worker 的名称。

RPC 包还提供了装饰器,允许应用程序指定 在被调用方端应该如何处理给定的函数。

torch.distributed.rpc.functions 的async_executionfn[来源]

函数的装饰器,指示函数的返回值 保证是一个对象,并且 this 函数可以在 RPC 被调用方上异步运行。更具体地说, callee 提取 wrapped 返回的 函数,并将后续处理步骤安装为该 .已安装的回调将读取值 从 When completion 并发送 值返回为 RPC 响应。这也意味着 return 仅存在于被调用方端,并且永远不会 通过 RPC 发送。当包装函数的 () 执行需要暂停和恢复,例如,由于包含或等待其他信号。fn

注意

要启用异步执行,应用程序必须将 function 对象返回给 RPC API。如果检测到 RPC attributes 时,它知道这个函数 返回一个对象,并将相应地处理该对象。 然而,这并不意味着这个装饰器必须在 定义函数。例如,当与 或 组合时,需要是 inner decorator 允许将目标函数识别为 static 或 class 函数。此目标函数仍可异步执行 因为,在访问时,static 或 class 方法会保留属性 安装者 。Future@staticmethod@classmethod@rpc.functions.async_execution@rpc.functions.async_execution

例::

返回的对象可以来自 constructor。下面的示例显示了直接使用 由 返回的 .

>>> from torch.distributed import rpc
>>>
>>> # omitting setup and shutdown RPC
>>>
>>> # On all workers
>>> @rpc.functions.async_execution
>>> def async_add_chained(to, x, y, z):
>>>     # This function runs on "worker1" and returns immediately when
>>>     # the callback is installed through the `then(cb)` API. In the
>>>     # mean time, the `rpc_async` to "worker2" can run concurrently.
>>>     # When the return value of that `rpc_async` arrives at
>>>     # "worker1", "worker1" will run the lambda function accordingly
>>>     # and set the value for the previously returned `Future`, which
>>>     # will then trigger RPC to send the result back to "worker0".
>>>     return rpc.rpc_async(to, torch.add, args=(x, y)).then(
>>>         lambda fut: fut.wait() + z
>>>     )
>>>
>>> # On worker0
>>> ret = rpc.rpc_sync(
>>>     "worker1",
>>>     async_add_chained,
>>>     args=("worker2", torch.ones(2), 1, 1)
>>> )
>>> print(ret)  # prints tensor([3., 3.])

当与 TorchScript 装饰器结合使用时,此装饰器必须是 最外面的一个。

>>> from torch import Tensor
>>> from torch.futures import Future
>>> from torch.distributed import rpc
>>>
>>> # omitting setup and shutdown RPC
>>>
>>> # On all workers
>>> @torch.jit.script
>>> def script_add(x: Tensor, y: Tensor) -> Tensor:
>>>     return x + y
>>>
>>> @rpc.functions.async_execution
>>> @torch.jit.script
>>> def async_add(to: str, x: Tensor, y: Tensor) -> Future[Tensor]:
>>>     return rpc.rpc_async(to, script_add, (x, y))
>>>
>>> # On worker0
>>> ret = rpc.rpc_sync(
>>>     "worker1",
>>>     async_add,
>>>     args=("worker2", torch.ones(2), 1)
>>> )
>>> print(ret)  # prints tensor([2., 2.])

当与 static 或 class 方法结合使用时,此装饰器必须是 内一。

>>> from torch.distributed import rpc
>>>
>>> # omitting setup and shutdown RPC
>>>
>>> # On all workers
>>> class AsyncExecutionClass:
>>>
>>>     @staticmethod
>>>     @rpc.functions.async_execution
>>>     def static_async_add(to, x, y, z):
>>>         return rpc.rpc_async(to, torch.add, args=(x, y)).then(
>>>             lambda fut: fut.wait() + z
>>>         )
>>>
>>>     @classmethod
>>>     @rpc.functions.async_execution
>>>     def class_async_add(cls, to, x, y, z):
>>>         ret_fut = torch.futures.Future()
>>>         rpc.rpc_async(to, torch.add, args=(x, y)).then(
>>>             lambda fut: ret_fut.set_result(fut.wait() + z)
>>>         )
>>>         return ret_fut
>>>
>>>     @rpc.functions.async_execution
>>>     def bound_async_add(self, to, x, y, z):
>>>         return rpc.rpc_async(to, torch.add, args=(x, y)).then(
>>>             lambda fut: fut.wait() + z
>>>         )
>>>
>>> # On worker0
>>> ret = rpc.rpc_sync(
>>>     "worker1",
>>>     AsyncExecutionClass.static_async_add,
>>>     args=("worker2", torch.ones(2), 1, 2)
>>> )
>>> print(ret)  # prints tensor([4., 4.])
>>>
>>> ret = rpc.rpc_sync(
>>>     "worker1",
>>>     AsyncExecutionClass.class_async_add,
>>>     args=("worker2", torch.ones(2), 1, 2)
>>> )
>>> print(ret)  # prints tensor([4., 4.])

这个装饰器也可以与 RRef 辅助函数一起使用,即 .和。torch.distributed.rpc.RRef.rpc_sync()torch.distributed.rpc.RRef.rpc_async()torch.distributed.rpc.RRef.remote()

>>> from torch.distributed import rpc
>>>
>>> # reuse the AsyncExecutionClass class above
>>> rref = rpc.remote("worker1", AsyncExecutionClass)
>>> ret = rref.rpc_sync().static_async_add("worker2", torch.ones(2), 1, 2)
>>> print(ret)  # prints tensor([4., 4.])
>>>
>>> rref = rpc.remote("worker1", AsyncExecutionClass)
>>> ret = rref.rpc_async().static_async_add("worker2", torch.ones(2), 1, 2).wait()
>>> print(ret)  # prints tensor([4., 4.])
>>>
>>> rref = rpc.remote("worker1", AsyncExecutionClass)
>>> ret = rref.remote().static_async_add("worker2", torch.ones(2), 1, 2).to_here()
>>> print(ret)  # prints tensor([4., 4.])

后端

RPC 模块可以利用不同的后端来执行通信 节点之间。可以使用的后端可以在函数中指定,通过传递一定的 枚举。无论什么后端 ,则 RPC API 的其余部分不会更改。每个后端还定义了自己的 子类中,一个 实例也可以传递给 以配置后端的行为。

torch.distributed.rpc 中。BackendType)

可用后端的 enum 类。

PyTorch 附带一个内置后端。 可以使用该函数注册其他 Git。BackendType.TENSORPIPEregister_backend()

torch.distributed.rpc 中。RpcBackendOptions

一个抽象结构,封装传递到 RPC 中的选项 backend 的可以传入此类的实例以初始化 RPC 具有特定的配置,例如 RPC 超时和 to be used。init_method

属性 init_method

指定如何初始化进程组的 URL。 默认值为env://

物业rpc_timeout

一个 float,指示要用于所有 RPC 的。如果 RPC 未在此时间范围内完成,它将 complete 并出现异常,指示它已超时。

TensorPipe 后端

TensorPipe 代理是默认代理,它利用 TensorPipe 库,该库提供原生的 特别适合机器学习的点对点通信原语 从根本上解决了 Gloo 的一些限制。与 Gloo 相比, 它具有异步的优点,允许大量 传输同时发生,每个传输都以自己的速度进行,而不会阻塞 彼此。它只会在需要时打开节点对之间的管道,在 demand 的 Pipe S S 的 T Paypal 节点,当一个节点发生故障时,只有其事件管道会被关闭,而 所有其他 API 将继续正常工作。此外,它还能够支持 多种不同的传输方式(当然还有 TCP 协议,但也包括共享内存、NVLink、 InfiniBand 等)并且可以自动检测他们的可用性并进行协商 用于每个管道的最佳传输。

TensorPipe 后端已在 PyTorch v1.6 中引入,并且正在积极进行 发达。目前,它仅支持 CPU 张量,GPU 支持即将推出 很快。它带有基于 TCP 的传输,就像 Gloo 一样。它还能够 在多个 sockets 上自动对大型 Tensor 进行分块和多路复用,并且 threads 来实现非常高的带宽。代理将能够选择 最好的运输方式,无需干预。

例:

>>> import os
>>> from torch.distributed import rpc
>>> os.environ['MASTER_ADDR'] = 'localhost'
>>> os.environ['MASTER_PORT'] = '29500'
>>>
>>> rpc.init_rpc(
>>>     "worker1",
>>>     rank=0,
>>>     world_size=2,
>>>     rpc_backend_options=rpc.TensorPipeRpcBackendOptions(
>>>         num_worker_threads=8,
>>>         rpc_timeout=20 # 20 second timeout
>>>     )
>>> )
>>>
>>> # omitting init_rpc invocation on worker2
torch.distributed.rpc 中。TensorPipeRpcBackendOptions*num_worker_threads=16rpc_timeout=60.0init_method='env://'device_maps=devices=_transports=_channels=[来源]

的后端选项 ,派生自 TensorPipeAgent

参数
  • num_worker_threadsintoptional) – 的 thread-pool 用于执行 requests (默认值:16)。TensorPipeAgent

  • rpc_timeoutfloatoptional) – 默认超时(以秒为单位)。 对于 RPC 请求(默认值:60 秒)。如果 RPC 没有 完成,则表示 so 将 被提高。调用方可以覆盖单个 RPC 在必要时

  • init_methodstroptional) – 用于初始化分布式 用于 Rendezvous 的存储。它采用接受的 的相同参数 (default: )。env://

  • device_mapsDict[strDict]optional) – 设备放置映射 this worker 到被调用方。Key 是被调用工作线程的名称和值 字典 ( of , , or ) 将此 worker 的设备映射到被调用方 worker 的设备。 (默认:Dictintstrtorch.deviceNone)

  • devices (List[int, str, or ], optional) – 所有本地 RPC 代理使用的 CUDA 设备。默认情况下,它将被初始化 到所有本地设备,以及相应的 设备 .处理 CUDA RPC 时 请求,代理将正确同步 CUDA 流 此 中的所有设备。torch.devicedevice_mapsdevice_mapsList

属性device_maps

设备映射位置。

属性设备

本地代理使用的所有设备。

属性 init_method

指定如何初始化进程组的 URL。 默认值为env://

属性 num_worker_threads

用于执行的线程池中的线程数 请求。TensorPipeAgent

物业rpc_timeout

一个 float,指示要用于所有 RPC 的。如果 RPC 未在此时间范围内完成,它将 complete 并出现异常,指示它已超时。

set_device_maptodevice_map[来源]

设置每个 RPC 调用方和被调用方对之间的设备映射。这 function 可以多次调用以增量方式添加 设备放置配置。

参数
  • tostr) – 被调用方名称。

  • device_mapDict of intstr, 或 torch.device) – 设备放置 从此工作程序到被调用方的映射。此映射必须为 可逆的。

>>> # both workers
>>> def add(x, y):
>>>     print(x)  # tensor([1., 1.], device='cuda:1')
>>>     return x + y, (x + y).to(2)
>>>
>>> # on worker 0
>>> options = TensorPipeRpcBackendOptions(
>>>     num_worker_threads=8,
>>>     device_maps={"worker1": {0: 1}}
>>>     # maps worker0's cuda:0 to worker1's cuda:1
>>> )
>>> options.set_device_map("worker1", {1: 2})
>>> # maps worker0's cuda:1 to worker1's cuda:2
>>>
>>> rpc.init_rpc(
>>>     "worker0",
>>>     rank=0,
>>>     world_size=2,
>>>     backend=rpc.BackendType.TENSORPIPE,
>>>     rpc_backend_options=options
>>> )
>>>
>>> x = torch.ones(2)
>>> rets = rpc.rpc_sync("worker1", add, args=(x.to(0), 1))
>>> # The first argument will be moved to cuda:1 on worker1. When
>>> # sending the return value back, it will follow the invert of
>>> # the device map, and hence will be moved back to cuda:0 and
>>> # cuda:1 on worker0
>>> print(rets[0])  # tensor([2., 2.], device='cuda:0')
>>> print(rets[1])  # tensor([2., 2.], device='cuda:1')
set_devices设备[来源]

设置 TensorPipe RPC 代理使用的本地设备。处理时 CUDA RPC 请求,TensorPipe RPC 代理会正确同步 此 .List

参数

devices int str torch.device 的列表) – 使用 的本地设备 TensorPipe RPC 代理。

注意

RPC 框架不会自动重试任何 调用 .原因是有 RPC 框架无法确定操作是幂等还是 not 以及重试是否安全。因此,它是应用程序的 负责处理故障并在必要时重试。RPC 通信 基于 TCP,因此可能会因网络故障而发生故障 或间歇性网络连接问题。在这种情况下,应用程序 需要通过合理的回退适当地重试,以确保网络 不会被激进的重试所淹没。

RRef

警告

使用 CUDA 张量时,目前不支持 RRef

(Remote REFerence) 是对远程 worker 上某种类型的值(例如 )的引用。此句柄使引用的远程 值 alive,但并不意味着该值将为 将来转移到本地工作程序。RRefs 可用于 多机训练。存在于 其他 worker,并调用适当的函数来检索或修改其 参数。有关更多信息,请参阅 Remote Reference Protocol 详。RRefTTensor

torch.distributed.rpc 中。PyRRefRRef)

一个类,用于封装对远程 工人。此句柄将使引用的 remote 值在 工人。当 1) 中没有对 A 的引用时,将删除 A 应用程序代码和本地 RRef 上下文中,或者 2) 应用程序已调用正常关闭。在 删除 RRef 会导致未定义的行为。仅限 RRef 实现 提供尽力而为的错误检测,应用程序不应在 之后使用 。UserRRefUserRRefsrpc.shutdown()

警告

RRefs 只能由 RPC 模块进行序列化和反序列化。 在没有 RPC 的情况下序列化和反序列化 RRef(例如 Python) 泡菜, 火把 / , JIT / 等)将 导致错误。

参数
  • - valueobject) - 此 RRef 要包装的值。

  • type_hintTypeoptional) – 应作为 的类型提示传递给编译器的 Python 类型。TorchScriptvalue

例::

以下示例跳过 RPC 初始化和关闭代码 为了简单。有关这些详细信息,请参阅 RPC 文档。

  1. 使用 rpc.remote 创建 RRef

>>> import torch
>>> import torch.distributed.rpc as rpc
>>> rref = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3))
>>> # get a copy of value from the RRef
>>> x = rref.to_here()
  1. 从本地对象创建 RRef

>>> import torch
>>> from torch.distributed.rpc import RRef
>>> x = torch.zeros(2, 2)
>>> rref = RRef(x)
  1. 与其他 worker 共享 RRef

>>> # On both worker0 and worker1:
>>> def f(rref):
>>>   return rref.to_here() + 1
>>> # On worker0:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> from torch.distributed.rpc import RRef
>>> rref = RRef(torch.zeros(2, 2))
>>> # the following RPC shares the rref with worker1, reference
>>> # count is automatically updated.
>>> rpc.rpc_sync("worker1", f, args=(rref,))
backwardself torch._C._distributed_rpc.PyRRefdist_autograd_ctx_idint = -1retain_graph:bool = False 没有

使用 RRef 作为 向后传递。如果提供,则 我们使用提供的 ctx_id从 RRef.在这种情况下,应该是 用于检索梯度。如果是 ,则假定这是一个本地 autograd 图 我们只执行本地向后传递。在本地案例中, 调用该接口的节点必须是 RRef 的 owner。 RRef 的值应为标量 Tensor。dist_autograd_ctx_iddist_autograd_ctx_idNone

参数
  • dist_autograd_ctx_idintoptional) – 分布式 autograd 上下文 ID,我们应该检索其 gradients (默认值: -1)。

  • retain_graphbooloptional) – 如果 ,则图表用于 compute 的 grad 将被释放。请注意,在几乎所有 不需要将此选项设置为 case 的情况,并且 通常可以以更有效的方式解决。 通常,您需要将此项设置为 run backward 多次(默认值:False)。FalseTrueTrue

例::
>>> import torch.distributed.autograd as dist_autograd
>>> with dist_autograd.context() as context_id:
>>>     rref.backward(context_id)
confirmed_by_owner自我 torch._C._distributed_rpc.PyRRef bool

返回所有者是否已确认这一点。 always 返回 true,而只返回 当所有者知道此 时,返回 true 。RRefOwnerRRefUserRRefUserRRef

is_owner自我torch._C._distributed_rpc.PyRRef bool

返回当前节点是否是此节点的所有者。RRef

local_value自我torch._C._distributed_rpc.PyRRef 对象

如果当前节点是所有者,则返回对 local 值。否则,将引发异常。

ownerself torch._C._distributed_rpc.PyRRef torch._C._distributed_rpc 的工人信息

返回拥有此 的节点的工作程序信息 。RRef

owner_name自我torch._C._distributed_rpc.PyRRef str

返回拥有此 的节点的工作程序名称 。RRef

remoteself torch._C._distributed_rpc.PyRRef超时float = -1.0对象

创建帮助程序代理以轻松启动使用 作为运行函数的目标的 RRef 的所有者 此 RRef 引用的对象。更具体地说,与 以下内容:remoterref.remote().func_name(*args, **kwargs)

>>> def run(rref, func_name, args, kwargs):
>>>   return getattr(rref.local_value(), func_name)(*args, **kwargs)
>>>
>>> rpc.remote(rref.owner(), run, args=(rref, func_name, args, kwargs))
参数

timeoutfloatoptional) – 的超时。如果 此 API 的创建未在超时内成功完成,则 下次尝试使用 RRef 时 (例如 ),将引发超时。如果不是 ,则将使用默认的 RPC 超时。请参阅 有关的特定超时语义。rref.remote()RRefto_hererpc.remote()RRef

例::
>>> from torch.distributed import rpc
>>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1))
>>> rref.remote().size().to_here()  # returns torch.Size([2, 2])
>>> rref.remote().view(1, 4).to_here()  # returns tensor([[1., 1., 1., 1.]])
rpc_async自我 torch._C._distributed_rpc.PyRRef超时float = -1.0对象

创建帮助程序代理以轻松启动 using 作为运行函数的目标的 RRef 的所有者 此 RRef 引用的对象。更具体地说,与 以下内容:rpc_asyncrref.rpc_async().func_name(*args, **kwargs)

>>> def run(rref, func_name, args, kwargs):
>>>   return getattr(rref.local_value(), func_name)(*args, **kwargs)
>>>
>>> rpc.rpc_async(rref.owner(), run, args=(rref, func_name, args, kwargs))
参数

timeoutfloatoptional) – 的超时。 如果调用未在此时间范围内完成,则 异常,指示 so 将被引发。如果此参数 ,则将使用默认的 RPC 超时。rref.rpc_async()

例::
>>> from torch.distributed import rpc
>>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1))
>>> rref.rpc_async().size().wait()  # returns torch.Size([2, 2])
>>> rref.rpc_async().view(1, 4).wait()  # returns tensor([[1., 1., 1., 1.]])
rpc_sync自我 torch._C._distributed_rpc.PyRRef超时float = -1.0对象

创建帮助程序代理以轻松启动 using 作为运行函数的目标的 RRef 的所有者 此 RRef 引用的对象。更具体地说,与 以下内容:rpc_syncrref.rpc_sync().func_name(*args, **kwargs)

>>> def run(rref, func_name, args, kwargs):
>>>   return getattr(rref.local_value(), func_name)(*args, **kwargs)
>>>
>>> rpc.rpc_sync(rref.owner(), run, args=(rref, func_name, args, kwargs))
参数

timeoutfloatoptional) – 的超时。 如果调用未在此时间范围内完成,则 异常,指示 so 将被引发。如果此参数 ,则将使用默认的 RPC 超时。rref.rpc_sync()

例::
>>> from torch.distributed import rpc
>>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1))
>>> rref.rpc_sync().size()  # returns torch.Size([2, 2])
>>> rref.rpc_sync().view(1, 4)  # returns tensor([[1., 1., 1., 1.]])
to_here自我 torch._C._distributed_rpc.PyRRef超时float = -1.0对象

从所有者复制 RRef 值的阻塞调用 添加到本地节点并返回它。如果当前节点是 owner 返回对 local 值的引用。

参数

timeoutfloatoptional) – 的超时。如果 调用未在此时间范围内完成,则 异常,指示 so 将被引发。如果此 参数,则默认 RPC 超时 (60 秒)。to_here

远程模块

警告

使用 CUDA 张量时,当前不支持 RemoteModule

RemoteModule是创建 NN 的一种简单方法。模块远程连接到不同的 过程。实际模块驻留在远程主机上,但本地主机具有 handle 并调用此模块,类似于常规 NN.模块。 但是,该调用会引发对远程端的 RPC 调用,并且可以执行 如果需要,通过 RemoteModule 支持的其他 API 进行异步。

torch.distributed.nn.api.remote_module 类RemoteModule*args**kwargs[来源]

RemoteModule 实例只能在 RPC 初始化后创建。

它会在指定的远程节点上创建用户指定的模块。 它的行为类似于常规,只是该方法为 在远程节点上执行。 它负责 autograd 录制以确保向后传递传播 gradients 返回相应的 remote 模块。nn.Moduleforward

它生成两个方法,并基于 方法 的签名。 异步运行并返回一个 Future。和 的参数与模块的方法相同 由 返回 .forward_asyncforwardforwardmodule_clsforward_asyncforward_asyncforwardforwardmodule_cls

例如,如果返回 的实例 , 具有方法签名: , 生成的将有 2 个带有签名的方法:module_clsnn.Linearforwarddef forward(input: Tensor) -> Tensor:RemoteModule

def forward(input: Tensor) -> Tensor:
def forward_async(input: Tensor) -> Future[Tensor]:
参数
  • remote_devicestr) – 目标工作程序上要放置此模块的设备。 格式应为 “<workername>/<device>”,其中 device 字段可以解析为 torch.device 类型。 例如,“trainer0/cpu”、“trainer0”、“ps0/cuda:0”。 此外,device 字段可以是可选的,默认值为 “cpu”。

  • module_clsnn.模块) –

    Class 来远程创建模块。例如

    >>> class MyModule(nn.Module):
    >>>     def forward(input):
    >>>         return input + 1
    >>>
    >>> module_cls = MyModule
    

  • argsSequenceoptional) – 要传递给的 args 。module_cls

  • kwargsDictoptional) – 要传递给 的 kwargs 。module_cls

返回

一个远程模块实例,它包装了由 user-provided 的 API 中,它有一个 blocking 方法和一个 asynchronous 方法,该方法返回调用的 future 在远程端用户提供的模块上。Modulemodule_clsforwardforward_asyncforward

例::

在两个不同的进程中运行以下代码:

>>> # On worker 0:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> from torch import nn, Tensor
>>> from torch.distributed.nn.api.remote_module import RemoteModule
>>>
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> remote_linear_module = RemoteModule(
>>>     "worker1/cpu", nn.Linear, args=(20, 30),
>>> )
>>> input = torch.randn(128, 20)
>>> ret_fut = remote_linear_module.forward_async(input)
>>> ret = ret_fut.wait()
>>> rpc.shutdown()
>>> # On worker 1:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>>
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()

此外,还有一个与 DistributedDataParallel (DDP) 结合使用的更实际的示例 可以在本教程中找到。

get_module_rref()

返回一个指向 remote 模块的 ()。RRefRRef[nn.Module]

返回类型

RRef[模块]

remote_parameters递归=)

返回指向 remote 模块参数的列表。RRef

这通常可以结合使用 与 .

参数

recursebool) – 如果为 True,则返回远程的参数 module 和远程模块的所有子模块。否则 仅返回作为 remote 模块。

返回

的列表 () 添加到 remote 模块的参数中。RRefList[RRef[nn.Parameter]]

返回类型

List[RRef[参数]]

分布式 Autograd 框架

警告

使用 CUDA 张量时,目前不支持分布式 autograd

该模块提供了一个基于 RPC 的分布式 autograd 框架,该框架可以是 用于模型并行训练等应用程序。简而言之,应用程序 可以通过 RPC 发送和接收梯度记录张量。在前向传递中, 我们记录了何时通过 RPC 发送梯度记录张量,以及在 backward pass 我们使用此信息来执行分布式 backward pass 使用 RPC。有关更多详细信息,请参阅 分布式 Autograd 设计

torch.distributed.autograd 的backwardcontext_id introots List[Tensor]retain_graph=False 没有

使用提供的根启动分布式向后传递。这 当前实现了 FAST 模式算法,该算法 假设所有 RPC 消息都在同一分布式 autograd 上下文中发送 在向后传递期间,across workers 将成为 autograd graph 的一部分。

我们使用提供的根来发现 autograd 图并计算 适当的依赖项。此方法会阻塞,直到整个 autograd 计算完成。

我们在每个节点上适当地累积梯度。autograd 在调用时,根据传入的 that 查找要使用的上下文。如果没有有效的 autograd 上下文,我们会抛出一个错误。您可以 使用 API 检索累积的梯度。context_id

参数
  • context_idint) – 我们应该检索其梯度的 autograd 上下文 ID。

  • rootslist) – 表示 autograd 根的张量 计算。所有张量都应该是标量。

  • retain_graphbooloptional) – 如果为 False,则为用于计算 grad 的图形 将被释放。请注意,在几乎所有情况下,将此 选项设置为 True 不是必需的,通常可以解决 以更有效的方式。通常,您需要设置此 设置为 True 可多次向后运行。

例::
>>> import torch.distributed.autograd as dist_autograd
>>> with dist_autograd.context() as context_id:
>>>     pred = model.forward()
>>>     loss = loss_func(pred, loss)
>>>     dist_autograd.backward(context_id, loss)
torch.distributed.autograd 中。背景[来源]

Context 对象来包装前向和后向传递 分布式 autograd 的语句中生成的用于唯一标识分布式向后传递 在所有 worker 上。每个 worker 都存储与 this 关联的元数据,这是正确执行分布式 autograd 通行证。context_idwithcontext_id

例::
>>> import torch.distributed.autograd as dist_autograd
>>> with dist_autograd.context() as context_id:
>>>     t1 = torch.rand((3, 3), requires_grad=True)
>>>     t2 = torch.rand((3, 3), requires_grad=True)
>>>     loss = rpc.rpc_sync("worker1", torch.add, args=(t1, t2)).sum()
>>>     dist_autograd.backward(context_id, [loss])
torch.distributed.autograd 的get_gradientscontext_id: int Dict[张量 张量]

从 Tensor 检索到该 Tensor 的适当梯度的映射 在提供的上下文中累积,作为分布式 autograd 向后传递的一部分。context_id

参数

context_idint) – 我们应该检索其 梯度。

返回

一个映射,其中 key 是 Tensor,值是关联的梯度 对于该 Tensor。

例::
>>> import torch.distributed.autograd as dist_autograd
>>> with dist_autograd.context() as context_id:
>>>     t1 = torch.rand((3, 3), requires_grad=True)
>>>     t2 = torch.rand((3, 3), requires_grad=True)
>>>     loss = t1 + t2
>>>     dist_autograd.backward(context_id, [loss.sum()])
>>>     grads = dist_autograd.get_gradients(context_id)
>>>     print(grads[t1])
>>>     print(grads[t2])

分布式优化器

有关分布式优化器的文档,请参阅 torch.distributed.optim 页面。

设计说明

分布式 autograd 设计说明涵盖了基于 RPC 的分布式 autograd 框架的设计,该框架对于模型并行训练等应用程序非常有用。

RRef 设计说明涵盖了 RRef (Remote REREFERENCE) 协议的设计,该协议用于引用框架在远程工作人员上的值。

教程

RPC 教程向用户介绍了 RPC 框架,提供了几个示例应用程序 使用 torch.distributed.rpc API,并演示如何 以使用 Profiler 分析基于 RPC 的工作负载。

文档

访问 PyTorch 的全面开发人员文档

查看文档

教程

获取面向初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并解答您的问题

查看资源