分布式RPC框架¶
分布式 RPC 框架通过一组基本操作提供了多机模型训练的机制,以实现远程通信,并提供一个高级 API 来自动区分分布在多个机器上的模型。
警告
RPC 包中的 API 是稳定的。目前正在进行多项工作以提升性能和错误处理,这些改进将在未来的版本中发布。
警告
CUDA 支持在 PyTorch 1.9 中引入,目前仍为 测试版 功能。RPC 包的并非所有功能都与 CUDA 支持兼容,因此不建议使用这些功能。不支持的功能包括:RRefs、JIT 兼容性、dist autograd 和 dist optimizer,以及性能分析。这些问题将在未来的版本中得到解决。
注意
请参考 PyTorch 分布式概述,了解所有与分布式训练相关的功能的简要介绍。
基础¶
分布式 RPC 框架使得远程运行函数变得简单,支持在不复制真实数据的情况下引用远程对象,并提供了 autograd 和优化器 API,以透明地在 RPC 边界之间运行反向传播和更新参数。这些功能可以分为四组 API。
远程过程调用(RPC) 支持在指定的目标工作器上运行函数,并使用给定的参数获取返回值 或创建对返回值的引用。主要有三种 RPC API:
rpc_sync()(同步),rpc_async()(异步),以及remote()(异步并返回对远程返回值的引用)。如果用户代码无法在没有返回值的情况下继续执行,请使用同步 API。否则,使用异步 API 获取一个 future,在需要返回值时再等待这个 future。当需求是远程创建某些内容但永远不需要将其取回调用者时,remote()API 就很有用。想象一下,一个驱动进程正在设置一个参数服务器和一个训练器。驱动可以在参数服务器上创建一个嵌入表,然后将该嵌入表的引用与训练器共享,但自己永远不会在本地使用该嵌入表。在这种情况下,rpc_sync()和rpc_async()就不再合适了,因为它们总是意味着返回值会立即或在未来返回给调用者。远程引用(RRef) 是一个分布式共享指针,指向本地或远程对象。它可以与其他工作进程共享,并且引用计数将被透明地处理。每个 RRef 只有一个所有者,而对象只存在于该所有者上。非所有者的工作进程可以通过显式请求从所有者那里获取对象的副本。当某个工作进程需要访问某些数据对象,但本身既不是创建者(
remote()的调用者)也不是该对象的所有者时,这种情况非常有用。正如我们将在下文讨论的那样,分布式优化器就是这种使用场景的一个例子。分布式自动求导 将正向传播中所有工作器上的本地自动求导引擎连接在一起,并在反向传播过程中自动联系它们以计算梯度。如果正向传播需要跨越多台机器进行(例如,分布式模型并行训练、参数服务器训练等),这一功能特别有用。有了这个特性,用户代码不再需要担心如何跨 RPC 边界发送梯度以及应该按什么顺序启动本地自动求导引擎的问题,这在正向传播中存在嵌套和相互依赖的 RPC 调用时可能会变得相当复杂。
分布式优化器的构造函数需要一个
Optimizer()(例如,SGD(),Adagrad()等)和一个参数RRefs列表,在每个不同的RRef所有者上创建一个Optimizer()实例,并在运行step()时相应地更新参数。当你有 分布式前向和反向传播时,参数和梯度将分布在多个工作节点上,因此需要在每个涉及的工作节点上都有一个优化器。分布式优化器将所有这些本地 优化器封装成一个整体,并提供简洁的构造函数和step()API。
RPC¶
在使用RPC和分布式自动梯度原语之前,必须进行初始化。要初始化RPC框架,我们需要使用
init_rpc(),这将初始化RPC框架、RRef框架和分布式自动梯度。
- torch.distributed.rpc.init_rpc(name, backend=None, rank=-1, world_size=None, rpc_backend_options=None)[source]¶
初始化 RPC 原语,例如本地 RPC 代理和分布式自动梯度计算,这会立即使当前进程准备好发送和接收 RPC。
- Parameters
name (str) – 此节点的全局唯一名称。(例如,
Trainer3,ParameterServer2,Master,Worker1) 名称只能包含数字、字母、下划线、冒号和/或连字符,并且必须少于 128 个字符。后端 (BackendType, 可选) – RPC 后端实现的类型 支持的值为
BackendType.TENSORPIPE(默认值)。 有关更多信息,请参见 Backends。rank (int) – 该节点的全局唯一ID/排名。
world_size (int) – 组中工作的数量。
rpc_backend_options (RpcBackendOptions, 可选) – 传递给 RpcAgent 构造函数的选项。它必须是
RpcBackendOptions的特定于代理的子类,并包含特定于代理的初始化配置。默认情况下,对于所有代理,它将默认超时设置为 60 秒,并使用使用init_method = "env://"初始化的基础进程组进行会合,这意味着需要正确设置环境变量MASTER_ADDR和MASTER_PORT。有关更多信息,请参阅 后端 并查找可用的选项。
以下API允许用户远程执行函数以及创建对远程数据对象的引用(RRefs)。在这些API中,当传递一个
Tensor 作为参数或返回值时,目标工作器将尝试创建一个具有相同元数据(即形状、步幅等)的 Tensor。我们故意禁止传输CUDA张量,因为如果源和目标工作器上的设备列表不匹配,可能会导致崩溃。在这种情况下,应用程序始终可以在调用方显式地将输入张量移动到CPU,并在被调用方需要时将其移动到所需的设备上。
警告
RPC中的TorchScript支持是一个原型功能,可能会有所更改。从v1.5.0开始,torch.distributed.rpc 支持将TorchScript函数作为RPC目标函数调用,并且这将有助于提高被调用方的并行性,因为执行TorchScript函数不需要GIL。
- torch.distributed.rpc.rpc_sync(to, func, args=None, kwargs=None, timeout=-1.0)[source]¶
在工作器
to上执行函数func的阻塞式远程过程调用(RPC)。RPC 消息与 Python 代码的执行并行发送和接收。此方法是线程安全的。- Parameters
to (str 或 WorkerInfo 或 int) – 目标工作进程的名称/等级/
WorkerInfo。func (Callable) – 一个可调用函数,例如 Python 可调用对象、内置操作符(如
add())和带注释的 TorchScript 函数。args (tuple) – 用于
func调用的参数元组。kwargs (dict) – 是一个字典,包含用于
func调用的关键字参数。timeout (float, 可选) – 此RPC的超时时间(以秒为单位)。如果RPC在此时间内未完成,将抛出一个表示已超时的异常。值为0表示无限超时,即永远不会抛出超时错误。如果没有提供,则使用初始化时设置的默认值或通过
_set_rpc_timeout设置的值。
- Returns
返回运行
func与args和kwargs的结果。
- Example::
请确保两个工作节点上的
MASTER_ADDR和MASTER_PORT设置正确 。有关更多详细信息,请参考init_process_group()API。例如,export MASTER_ADDR=localhost export MASTER_PORT=5678
然后在两个不同的进程中运行以下代码:
>>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> ret = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(2), 3)) >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
下面是使用 RPC 运行 TorchScript 函数的示例。
>>> # On both workers: >>> @torch.jit.script >>> def my_script_add(tensor: torch.Tensor, scalar: int): >>> return torch.add(tensor, scalar)
>>> # On worker 0: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> ret = rpc.rpc_sync("worker1", my_script_add, args=(torch.ones(2), 3)) >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
- torch.distributed.rpc.rpc_async(to, func, args=None, kwargs=None, timeout=-1.0)[source]¶
进行一次非阻塞的RPC调用,在工作器
to上运行函数func。RPC消息与Python代码的执行并行发送和接收。此方法是线程安全的。此方法将立即返回一个Future,可以对其进行等待。- Parameters
to (str 或 WorkerInfo 或 int) – 目标工作进程的名称/等级/
WorkerInfo。func (Callable) – 一个可调用函数,例如 Python 可调用对象、内置操作符(如
add())和带注释的 TorchScript 函数。args (tuple) – 用于
func调用的参数元组。kwargs (dict) – 是一个字典,包含用于
func调用的关键字参数。timeout (float, 可选) – 此RPC的超时时间(以秒为单位)。如果RPC在此时间内未完成,将抛出一个表示已超时的异常。值为0表示无限超时,即永远不会抛出超时错误。如果没有提供,则使用初始化时设置的默认值或通过
_set_rpc_timeout设置的值。
- Returns
返回一个可以等待的
Future对象。完成后,可以从Future对象中检索func和kwargs的返回值。
警告
将GPU张量用作
func的参数或返回值是不支持的,因为我们不支持通过网络传输GPU张量。在将其用作func的参数或返回值之前,您需要显式地将GPU张量复制到CPU上。警告
The
rpc_asyncAPI 在将参数张量通过网络传输之前不会复制它们的存储,这可能由一个不同的线程根据 RPC 后端类型来完成。调用者应确保这些张量的内容在返回的Future完成之前保持不变。- Example::
请确保两个工作节点上的
MASTER_ADDR和MASTER_PORT设置正确 。有关更多详细信息,请参考init_process_group()API。例如,export MASTER_ADDR=localhost export MASTER_PORT=5678
然后在两个不同的进程中运行以下代码:
>>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> fut1 = rpc.rpc_async("worker1", torch.add, args=(torch.ones(2), 3)) >>> fut2 = rpc.rpc_async("worker1", min, args=(1, 2)) >>> result = fut1.wait() + fut2.wait() >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
下面是使用 RPC 运行 TorchScript 函数的示例。
>>> # On both workers: >>> @torch.jit.script >>> def my_script_add(tensor: torch.Tensor, scalar: int): >>> return torch.add(tensor, scalar)
>>> # On worker 0: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> fut = rpc.rpc_async("worker1", my_script_add, args=(torch.ones(2), 3)) >>> ret = fut.wait() >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
- torch.distributed.rpc.remote(to, func, args=None, kwargs=None, timeout=-1.0)[source]¶
通过远程调用在工作器
to上运行func并立即返回一个RRef作为结果值。 工作器to将成为返回的RRef的所有者,而调用remote的工作器是 用户。所有者管理其RRef的全局引用计数,并且只有当全局范围内 没有对其的活动引用时,所有者RRef才会被销毁。- Parameters
to (str 或 WorkerInfo 或 int) – 目标工作进程的名称/等级/
WorkerInfo。func (Callable) – 一个可调用函数,例如 Python 可调用对象、内置操作符(如
add())和带注释的 TorchScript 函数。args (tuple) – 用于
func调用的参数元组。kwargs (dict) – 是一个字典,包含用于
func调用的关键字参数。超时时间 (浮点数, 可选) – 此远程调用的超时时间(以秒为单位)。如果在指定时间内未能成功处理此
RRef在工作节点to上的创建,则下次尝试使用 RRef 时(例如to_here()),将抛出超时错误,指示此失败。值为 0 表示无限超时,即永远不会抛出超时错误。如果没有提供,默认值将使用初始化时设置的值或通过_set_rpc_timeout设置的值。
- Returns
用户
RRef实例到结果值。使用阻塞 APItorch.distributed.rpc.RRef.to_here()在本地检索结果值。
警告
remoteAPI 在将参数张量通过网络传输之前不会复制它们的存储,这一步可能由一个不同的线程完成,具体取决于 RPC 后端类型。调用者应确保这些张量的内容在所有者确认返回的 RRef 之前保持不变,可以通过使用torch.distributed.rpc.RRef.confirmed_by_owner()API 来检查。警告
诸如
remoteAPI的超时等错误将尽最大努力进行处理。这意味着当由remote发起的远程调用失败(例如出现超时错误)时,我们将以尽最大努力的方式处理错误。这意味着错误将在异步基础上进行处理,并设置在结果RRef上。如果在该处理之前应用程序尚未使用RRef(如to_here或fork调用),那么对RRef的后续使用将适当引发错误。然而,也有可能用户应用程序会在错误被处理之前就使用了RRef。在这种情况下,由于错误尚未被处理,可能不会引发错误。Example:
Make sure that ``MASTER_ADDR`` and ``MASTER_PORT`` are set properly on both workers. Refer to :meth:`~torch.distributed.init_process_group` API for more details. For example, export MASTER_ADDR=localhost export MASTER_PORT=5678 Then run the following code in two different processes: >>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> rref1 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3)) >>> rref2 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 1)) >>> x = rref1.to_here() + rref2.to_here() >>> rpc.shutdown() >>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown() Below is an example of running a TorchScript function using RPC. >>> # On both workers: >>> @torch.jit.script >>> def my_script_add(tensor: torch.Tensor, scalar: int): >>> return torch.add(tensor, scalar) >>> # On worker 0: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> rref = rpc.remote("worker1", my_script_add, args=(torch.ones(2), 3)) >>> rref.to_here() >>> rpc.shutdown() >>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
- torch.distributed.rpc.get_worker_info(worker_name=None)[source]¶
获取给定工作器名称的
WorkerInfo. 使用此WorkerInfo以避免在每次调用时传递 昂贵的字符串。- Parameters
worker_name (str) – 工作者的字符串名称。如果为
None,则返回 当前工作者的 ID。(默认值None)- Returns
WorkerInfo实例用于给定的worker_name或WorkerInfo的 当前工作进程,如果worker_name是None。
- torch.distributed.rpc.shutdown(graceful=True, timeout=0)[source]¶
关闭RPC代理,然后销毁RPC代理。这会阻止本地代理接受未完成的请求,并通过终止所有RPC线程来关闭RPC框架。如果
graceful=True,这将阻塞直到所有本地和远程RPC进程达到此方法并等待所有未完成的工作完成。否则,如果graceful=False,这是一个本地关闭,不会等待其他RPC进程到达此方法。警告
对于
Future由rpc_async()返回的对象,future.wait()不应在shutdown()之后调用。- Parameters
graceful (bool) – 是否执行优雅关闭。如果为 True, 这将 1) 等待
UserRRefs没有挂起的系统消息并删除它们;2) 阻塞 直到所有本地和远程 RPC 进程都到达此方法,并等待所有未完成的工作 完成。
- Example::
请确保两个工作节点上的
MASTER_ADDR和MASTER_PORT设置正确 。有关更多详细信息,请参考init_process_group()API。例如,export MASTER_ADDR=localhost export MASTER_PORT=5678
然后在两个不同的进程中运行以下代码:
>>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> # do some work >>> result = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(1), 1)) >>> # ready to shutdown >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> # wait for worker 0 to finish work, and then shutdown. >>> rpc.shutdown()
- class torch.distributed.rpc.WorkerInfo¶
一个封装系统中工作者信息的结构。 包含工作者的名称和ID。此类不打算直接构造,而是可以通过
get_worker_info()获取实例,并将结果传入诸如rpc_sync(),rpc_async(),remote()等函数以避免每次调用时复制字符串。- property id¶
用于标识工作进程的全局唯一 ID。
- property name¶
工作者的名称。
RPC 包还提供了装饰器,允许应用程序指定在被调用方一侧如何处理给定的函数。
- torch.distributed.rpc.functions.async_execution(fn)[source]¶
一个函数的装饰器,表示该函数的返回值保证是一个
Future对象,并且此函数可以在 RPC 被调用方上异步运行。更具体地说,被调用方会提取由包装函数返回的Future,并将后续处理步骤作为回调安装到该Future上。安装的回调将在Future完成时读取其值,并将该值作为 RPC 响应发送回去。这也意味着返回的Future仅存在于被调用方一侧,不会通过 RPC 发送。当包装函数(fn)的执行需要暂停和恢复时(例如包含rpc_async()或等待其他信号),这个装饰器非常有用。注意
为了启用异步执行,应用程序必须将此装饰器返回的函数对象传递给RPC API。如果RPC检测到此装饰器安装的属性,它就知道该函数返回一个
Future对象,并会相应处理。然而,这并不意味着在定义函数时此装饰器必须是最外层的一个。例如,当与@staticmethod或@classmethod结合使用时,@rpc.functions.async_execution需要作为内部装饰器,以便目标函数被识别为静态或类函数。由于访问静态或类方法时保留了由@rpc.functions.async_execution安装的属性,因此该目标函数仍然可以异步执行。- Example::
返回的
Future对象可以来自rpc_async(),then(), 或Future构造函数。下面的例子展示了直接使用由Future返回的then()。>>> from torch.distributed import rpc >>> >>> # omitting setup and shutdown RPC >>> >>> # On all workers >>> @rpc.functions.async_execution >>> def async_add_chained(to, x, y, z): >>> # This function runs on "worker1" and returns immediately when >>> # the callback is installed through the `then(cb)` API. In the >>> # mean time, the `rpc_async` to "worker2" can run concurrently. >>> # When the return value of that `rpc_async` arrives at >>> # "worker1", "worker1" will run the lambda function accordingly >>> # and set the value for the previously returned `Future`, which >>> # will then trigger RPC to send the result back to "worker0". >>> return rpc.rpc_async(to, torch.add, args=(x, y)).then( >>> lambda fut: fut.wait() + z >>> ) >>> >>> # On worker0 >>> ret = rpc.rpc_sync( >>> "worker1", >>> async_add_chained, >>> args=("worker2", torch.ones(2), 1, 1) >>> ) >>> print(ret) # prints tensor([3., 3.])
与 TorchScript 装饰器结合使用时,此装饰器必须位于最外层。
>>> from torch import Tensor >>> from torch.futures import Future >>> from torch.distributed import rpc >>> >>> # omitting setup and shutdown RPC >>> >>> # On all workers >>> @torch.jit.script >>> def script_add(x: Tensor, y: Tensor) -> Tensor: >>> return x + y >>> >>> @rpc.functions.async_execution >>> @torch.jit.script >>> def async_add(to: str, x: Tensor, y: Tensor) -> Future[Tensor]: >>> return rpc.rpc_async(to, script_add, (x, y)) >>> >>> # On worker0 >>> ret = rpc.rpc_sync( >>> "worker1", >>> async_add, >>> args=("worker2", torch.ones(2), 1) >>> ) >>> print(ret) # prints tensor([2., 2.])
当与静态方法或类方法结合使用时,此装饰器必须位于内部。
>>> from torch.distributed import rpc >>> >>> # omitting setup and shutdown RPC >>> >>> # On all workers >>> class AsyncExecutionClass: >>> >>> @staticmethod >>> @rpc.functions.async_execution >>> def static_async_add(to, x, y, z): >>> return rpc.rpc_async(to, torch.add, args=(x, y)).then( >>> lambda fut: fut.wait() + z >>> ) >>> >>> @classmethod >>> @rpc.functions.async_execution >>> def class_async_add(cls, to, x, y, z): >>> ret_fut = torch.futures.Future() >>> rpc.rpc_async(to, torch.add, args=(x, y)).then( >>> lambda fut: ret_fut.set_result(fut.wait() + z) >>> ) >>> return ret_fut >>> >>> @rpc.functions.async_execution >>> def bound_async_add(self, to, x, y, z): >>> return rpc.rpc_async(to, torch.add, args=(x, y)).then( >>> lambda fut: fut.wait() + z >>> ) >>> >>> # On worker0 >>> ret = rpc.rpc_sync( >>> "worker1", >>> AsyncExecutionClass.static_async_add, >>> args=("worker2", torch.ones(2), 1, 2) >>> ) >>> print(ret) # prints tensor([4., 4.]) >>> >>> ret = rpc.rpc_sync( >>> "worker1", >>> AsyncExecutionClass.class_async_add, >>> args=("worker2", torch.ones(2), 1, 2) >>> ) >>> print(ret) # prints tensor([4., 4.])
此装饰器也适用于RRef辅助函数,即 .
torch.distributed.rpc.RRef.rpc_sync(),torch.distributed.rpc.RRef.rpc_async(), 和torch.distributed.rpc.RRef.remote()。>>> from torch.distributed import rpc >>> >>> # reuse the AsyncExecutionClass class above >>> rref = rpc.remote("worker1", AsyncExecutionClass) >>> ret = rref.rpc_sync().static_async_add("worker2", torch.ones(2), 1, 2) >>> print(ret) # prints tensor([4., 4.]) >>> >>> rref = rpc.remote("worker1", AsyncExecutionClass) >>> ret = rref.rpc_async().static_async_add("worker2", torch.ones(2), 1, 2).wait() >>> print(ret) # prints tensor([4., 4.]) >>> >>> rref = rpc.remote("worker1", AsyncExecutionClass) >>> ret = rref.remote().static_async_add("worker2", torch.ones(2), 1, 2).to_here() >>> print(ret) # prints tensor([4., 4.])
后端¶
RPC 模块可以利用不同的后端来执行节点之间的通信。要使用的后端可以在
init_rpc() 函数中指定,通过传递
BackendType 枚举的某个值。无论使用哪种后端,其余的 RPC API 都不会改变。每个后端还定义了
RpcBackendOptions 类的一个子类,该类的一个实例也可以传递给
init_rpc() 来配置后端的行为。
- class torch.distributed.rpc.BackendType(value)¶
一个可用后端的枚举类。
PyTorch 自带一个内置的
BackendType.TENSORPIPE后端。 可以使用register_backend()函数注册其他后端。
- class torch.distributed.rpc.RpcBackendOptions¶
一个抽象结构,封装传递到RPC后端的选项。可以将此类的一个实例传入
init_rpc()以使用特定配置初始化RPC,例如RPC超时和要使用的init_method。- property init_method¶
指定如何初始化进程组的URL。 默认值为
env://
- property rpc_timeout¶
一个浮点数,表示用于所有 RPC 的超时时间。如果某个 RPC 在此时间段内未完成,它将通过引发异常来表明已超时。
TensorPipe 后端¶
TensorPipe代理是默认的,它利用了TensorPipe库,该库提供了一种原生的点对点通信原语,特别适用于机器学习,从根本上解决了Gloo的一些局限性。与Gloo相比,它具有异步的优势,允许大量传输同时进行,每个传输以自己的速度进行,而不会相互阻塞。它仅在需要时打开节点之间的管道,并且当一个节点失败时,只有其相关的管道会被关闭,而其他管道将继续正常工作。此外,它能够支持多种不同的传输方式(当然包括TCP,还包括共享内存、NVLink、InfiniBand等),并且可以自动检测它们的可用性并协商每条管道的最佳传输方式。
TensorPipe 后端在 PyTorch v1.6 中引入,并正在积极开发中。目前,它仅支持 CPU 张量,GPU 支持即将推出。它提供基于 TCP 的传输方式,与 Gloo 类似。此外,它能够自动将大型张量分块并在多个套接字和线程上进行多路复用,以实现非常高的带宽。代理将能够自行选择最佳的传输方式,无需任何人工干预。
Example:
>>> import os
>>> from torch.distributed import rpc
>>> os.environ['MASTER_ADDR'] = 'localhost'
>>> os.environ['MASTER_PORT'] = '29500'
>>>
>>> rpc.init_rpc(
>>> "worker1",
>>> rank=0,
>>> world_size=2,
>>> rpc_backend_options=rpc.TensorPipeRpcBackendOptions(
>>> num_worker_threads=8,
>>> rpc_timeout=20 # 20 second timeout
>>> )
>>> )
>>>
>>> # omitting init_rpc invocation on worker2
- class torch.distributed.rpc.TensorPipeRpcBackendOptions(*, num_worker_threads=16, rpc_timeout=60.0, init_method='env://', device_maps=None, devices=None, _transports=None, _channels=None)[source]¶
The backend options for
TensorPipeAgent, derived fromRpcBackendOptions.- Parameters
num_worker_threads (int, optional) – 用于执行请求的线程池中的线程数
TensorPipeAgent(默认值: 16)。rpc_timeout (float, optional) – 默认的RPC请求超时时间(以秒为单位,默认为60秒)。如果RPC在此时间内未完成,将抛出一个异常。调用者可以在必要时通过
rpc_sync()和rpc_async()覆盖此超时时间。init_method (str, optional) – 用于初始化分布式存储的URL, 该存储用于会合。它接受与
init_process_group()的相同参数接受的任何值 (默认:env://)。device_maps (Dict[str, Dict], optional) – 从此工作器到被调用者的设备分配映射。键是被调用者的工作器名称,值是将此工作器的设备映射到被调用者工作器设备的字典 (
Dictofint,str, 或torch.device)。 (default:None)设备 (List[int, str, 或
torch.device], 可选) – RPC代理使用的所有本地CUDA设备。默认情况下,它将被初始化为来自其自身的device_maps和来自对等方的相应设备device_maps的所有本地设备。在处理CUDA RPC请求时,代理会为该List中的所有设备正确同步CUDA流。
- property device_maps¶
设备映射位置。
- property devices¶
本地代理使用的所有设备。
- property init_method¶
指定如何初始化进程组的URL。 默认值为
env://
- property num_worker_threads¶
由
TensorPipeAgent使用的线程池中的线程数,用于执行请求。
- property rpc_timeout¶
一个浮点数,表示用于所有 RPC 的超时时间。如果某个 RPC 在此时间段内未完成,它将通过引发异常来表明已超时。
- set_device_map(to, device_map)[source]¶
设置每个RPC调用者和被调用者对之间的设备映射。该函数可以多次调用,以逐步添加设备放置配置。
- Parameters
to (str) – 被调用者的名称。
device_map (Dict of int, str or torch.device) – 从该工作器到被调用者的设备放置映射。此映射必须是可逆的。
示例
>>> # both workers >>> def add(x, y): >>> print(x) # tensor([1., 1.], device='cuda:1') >>> return x + y, (x + y).to(2) >>> >>> # on worker 0 >>> options = TensorPipeRpcBackendOptions( >>> num_worker_threads=8, >>> device_maps={"worker1": {0: 1}} >>> # maps worker0's cuda:0 to worker1's cuda:1 >>> ) >>> options.set_device_map("worker1", {1: 2}) >>> # maps worker0's cuda:1 to worker1's cuda:2 >>> >>> rpc.init_rpc( >>> "worker0", >>> rank=0, >>> world_size=2, >>> backend=rpc.BackendType.TENSORPIPE, >>> rpc_backend_options=options >>> ) >>> >>> x = torch.ones(2) >>> rets = rpc.rpc_sync("worker1", add, args=(x.to(0), 1)) >>> # The first argument will be moved to cuda:1 on worker1. When >>> # sending the return value back, it will follow the invert of >>> # the device map, and hence will be moved back to cuda:0 and >>> # cuda:1 on worker0 >>> print(rets[0]) # tensor([2., 2.], device='cuda:0') >>> print(rets[1]) # tensor([2., 2.], device='cuda:1')
- set_devices(devices)[source]¶
设置 TensorPipe RPC 代理使用的本地设备。在处理 CUDA RPC 请求时,TensorPipe RPC 代理将为该
List中的所有设备正确同步 CUDA 流。- Parameters
设备 (List of int, str or torch.device) – 由 TensorPipe RPC 代理使用的本地设备。
注意
RPC 框架不会自动重试任何
rpc_sync()、
rpc_async() 和
remote() 调用。原因是 RPC 框架无法判断某个操作是否是幂等的,以及重试是否安全。因此,处理失败并根据需要重试的责任在于应用程序。RPC 通信基于 TCP,因此由于网络故障或间歇性网络连接问题可能会发生故障。在这种情况下,应用程序需要适当重试,并采用合理的退避策略,以避免因频繁重试而使网络过载。
RRef¶
警告
在使用 CUDA 张量时,目前不支持 RRefs
An RRef (Remote REFerence) 是对某个类型 T
(例如 Tensor)在远程工作器上的值的引用。此句柄可确保所引用的远程
值在所有者上保持活动状态,但并不意味着该值将来会传输到本地工作器。RRefs 可用于
多机训练中,通过持有对其他工作器上存在的 nn.Modules 的引用,并在训练期间调用适当的函数来检索或修改其
参数。有关更多详细信息,请参阅 Remote Reference Protocol。
- class torch.distributed.rpc.PyRRef(RRef)¶
一个封装了对远程工作器上某个类型值的引用的类。这个句柄将保持远程值在工作器上的存活状态。当1) 应用程序代码和本地 RRef 上下文中都没有对该
UserRRef的引用时,或者 2) 应用程序调用了优雅关闭时,UserRRef将被删除。调用已删除的 RRef 上的方法会导致未定义行为。RRef 实现仅提供最佳努力的错误检测,应用程序不应在调用rpc.shutdown()后使用UserRRefs。警告
RRefs 只能通过 RPC 模块进行序列化和反序列化。在没有 RPC 的情况下(例如 Python pickle, torch
save()/load(), JITsave()/load(), 等等)对 RRefs 进行序列化和反序列化将导致错误。- Parameters
value (对象) – 由这个 RRef 包装的值。
type_hint (Type, 可选) – 应该传递给
TorchScript编译器的 Python 类型,作为value的类型提示。
- Example::
以下示例为简便起见,跳过了 RPC 初始化和关闭代码。有关详细信息,请参阅 RPC 文档。
使用 rpc.remote 创建一个 RRef
>>> import torch >>> import torch.distributed.rpc as rpc >>> rref = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3)) >>> # get a copy of value from the RRef >>> x = rref.to_here()
从本地对象创建 RRef
>>> import torch >>> from torch.distributed.rpc import RRef >>> x = torch.zeros(2, 2) >>> rref = RRef(x)
与其他工作进程共享 RRef
>>> # On both worker0 and worker1: >>> def f(rref): >>> return rref.to_here() + 1
>>> # On worker0: >>> import torch >>> import torch.distributed.rpc as rpc >>> from torch.distributed.rpc import RRef >>> rref = RRef(torch.zeros(2, 2)) >>> # the following RPC shares the rref with worker1, reference >>> # count is automatically updated. >>> rpc.rpc_sync("worker1", f, args=(rref,))
- backward(self: torch._C._distributed_rpc.PyRRef, dist_autograd_ctx_id: int = -1, retain_graph: bool = False) None¶
Runs the backward pass using the RRef as the root of the backward pass. If
dist_autograd_ctx_idis provided, we perform a distributed backward pass using the provided ctx_id starting from the owner of the RRef. In this case,get_gradients()should be used to retrieve the gradients. Ifdist_autograd_ctx_idisNone, it is assumed that this is a local autograd graph and we only perform a local backward pass. In the local case, the node calling this API has to be the owner of the RRef. The value of the RRef is expected to be a scalar Tensor.- Parameters
- Example::
>>> import torch.distributed.autograd as dist_autograd >>> with dist_autograd.context() as context_id: >>> rref.backward(context_id)
- confirmed_by_owner(self: torch._C._distributed_rpc.PyRRef) bool¶
返回此
RRef是否已被所有者确认。OwnerRRef始终返回true,而UserRRef仅在所有者知道此UserRRef时才返回true。
- is_owner(self: torch._C._distributed_rpc.PyRRef) bool¶
返回当前节点是否为此
RRef的所有者。
- local_value(self: torch._C._distributed_rpc.PyRRef) object¶
如果当前节点是所有者,则返回对本地值的引用。否则,抛出异常。
- owner(self: torch._C._distributed_rpc.PyRRef) torch._C._distributed_rpc.WorkerInfo¶
返回拥有此
RRef的节点的工作信息。
- owner_name(self: torch._C._distributed_rpc.PyRRef) str¶
返回拥有此
RRef的节点的工作器名称。
- remote(self: torch._C._distributed_rpc.PyRRef, timeout: float = -1.0) object¶
创建一个辅助代理,以便轻松启动一个
remote,使用 RRef的所有者作为目标,在此RRef引用的对象上运行函数。 更具体地说,rref.remote().func_name(*args, **kwargs)与 以下内容相同:>>> def run(rref, func_name, args, kwargs): >>> return getattr(rref.local_value(), func_name)(*args, **kwargs) >>> >>> rpc.remote(rref.owner(), run, args=(rref, func_name, args, kwargs))
- Parameters
timeout (float, 可选) – 超时时间(以秒为单位)。如果在指定时间内未能成功创建此
RRef,则下次尝试使用该 RRef(例如调用to_here)时将抛出超时异常。若未提供,则使用默认的 RPC 超时设置。有关RRef的具体超时语义,请参阅rpc.remote()。
- Example::
>>> from torch.distributed import rpc >>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1)) >>> rref.remote().size().to_here() # returns torch.Size([2, 2]) >>> rref.remote().view(1, 4).to_here() # returns tensor([[1., 1., 1., 1.]])
- rpc_async(self: torch._C._distributed_rpc.PyRRef, timeout: float = -1.0) object¶
创建一个辅助代理,以便轻松启动一个
rpc_async,使用 RRef 的所有者作为运行函数的目标,以操作此 RRef 引用的对象。更具体地说,rref.rpc_async().func_name(*args, **kwargs)与以下内容相同:>>> def run(rref, func_name, args, kwargs): >>> return getattr(rref.local_value(), func_name)(*args, **kwargs) >>> >>> rpc.rpc_async(rref.owner(), run, args=(rref, func_name, args, kwargs))
- Parameters
超时 (float, 可选) –
rref.rpc_async()的超时时间。 如果调用在此时间内未完成,则会引发异常。如果未提供此参数, 则使用默认的RPC超时时间。
- Example::
>>> from torch.distributed import rpc >>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1)) >>> rref.rpc_async().size().wait() # returns torch.Size([2, 2]) >>> rref.rpc_async().view(1, 4).wait() # returns tensor([[1., 1., 1., 1.]])
- rpc_sync(self: torch._C._distributed_rpc.PyRRef, timeout: float = -1.0) object¶
创建一个辅助代理,以便轻松启动一个
rpc_sync,使用 RRef 的所有者作为运行函数的目标,以操作此 RRef 引用的对象。更具体地说,rref.rpc_sync().func_name(*args, **kwargs)与以下内容相同:>>> def run(rref, func_name, args, kwargs): >>> return getattr(rref.local_value(), func_name)(*args, **kwargs) >>> >>> rpc.rpc_sync(rref.owner(), run, args=(rref, func_name, args, kwargs))
- Parameters
超时 (float, 可选) –
rref.rpc_sync()的超时时间。 如果调用在此时间内未完成,则会引发异常。如果未提供此参数, 则使用默认的RPC超时时间。
- Example::
>>> from torch.distributed import rpc >>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1)) >>> rref.rpc_sync().size() # returns torch.Size([2, 2]) >>> rref.rpc_sync().view(1, 4) # returns tensor([[1., 1., 1., 1.]])
- to_here(self: torch._C._distributed_rpc.PyRRef, timeout: float = -1.0) object¶
从所有者节点将 RRef 的值复制到本地节点并返回的阻塞调用。如果当前节点是所有者,则返回对本地值的引用。
- Parameters
超时 (float, 可选) – 超时时间
to_here. 如果 调用在此时间内未完成,则会引发一个 指示此情况的异常。如果未提供此 参数,则使用默认的RPC超时时间 (60秒)。
关于 RRef 的更多信息
RemoteModule¶
警告
使用 CUDA 张量时,目前不支持 RemoteModule
RemoteModule 是一种在不同进程上远程创建 nn.Module 的简便方法。实际模块位于远程主机上,但本地主机拥有该模块的句柄,并可以像常规 nn.Module 一样调用此模块。
但是,这种调用会引发对远程端的 RPC 调用,并且如果需要的话,可以通过 RemoteModule 支持的额外 API 异步执行。
- class torch.distributed.nn.api.remote_module.RemoteModule(*args, **kwargs)[source]¶
A RemoteModule instance can only be created after RPC initialization.
It creates a user-specified module on a specified remote node. It behaves like a regular
nn.Moduleexcept that theforwardmethod is executed on the remote node. It takes care of autograd recording to ensure the backward pass propagates gradients back to the corresponding remote module.It generates two methods
forward_asyncandforwardbased on the signature of theforwardmethod ofmodule_cls.forward_asyncruns asynchronously and returns a Future. The arguments offorward_asyncandforwardare the same as theforwardmethod of the module returned by themodule_cls.For example, if
module_clsreturns an instance ofnn.Linear, that hasforwardmethod signature:def forward(input: Tensor) -> Tensor:, the generatedRemoteModulewill have 2 methods with the signatures:def forward(input: Tensor) -> Tensor:def forward_async(input: Tensor) -> Future[Tensor]:- Parameters
remote_device (str) – 目标工作器上我们希望放置此模块的设备。格式应为“<workername>/<device>”,其中设备字段可以解析为torch.device类型。例如,“trainer0/cpu”、“trainer0”、“ps0/cuda:0”。此外,设备字段可以是可选的,默认值为“cpu”。
module_cls (nn.Module) –
用于远程创建模块的类。例如,
>>> class MyModule(nn.Module): >>> def forward(input): >>> return input + 1 >>> >>> module_cls = MyModule
args (Sequence, 可选) – 要传递给
module_cls的参数。kwargs (Dict, 可选) – 要传递给
module_cls的 kwargs。
- Returns
一个远程模块实例,它封装了用户提供的
Module所创建的内容,它具有阻塞式module_cls方法和一个异步的forward方法,该方法返回在远程侧对用户提供的模块执行forward_async调用的未来结果forward。
- Example::
在两个不同的进程中运行以下代码:
>>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> from torch import nn, Tensor >>> from torch.distributed.nn.api.remote_module import RemoteModule >>> >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> remote_linear_module = RemoteModule( >>> "worker1/cpu", nn.Linear, args=(20, 30), >>> ) >>> input = torch.randn(128, 20) >>> ret_fut = remote_linear_module.forward_async(input) >>> ret = ret_fut.wait() >>> rpc.shutdown()
>>> # On worker 1: >>> import torch >>> import torch.distributed.rpc as rpc >>> >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
此外,一个结合了 DistributedDataParallel (DDP) 的更实用的例子可以在本 教程 中找到。
- remote_parameters(recurse=True)¶
返回一个
RRef的列表,指向远程模块的参数。这通常可以与
DistributedOptimizer配合使用。
分布式自动求导框架¶
警告
当前使用 CUDA 张量时不支持分布式自动求导。
此模块提供了一个基于RPC的分布式自动微分框架,可用于模型并行训练等应用。简而言之,应用程序可以通过RPC发送和接收梯度记录张量。在前向传播过程中,我们会记录梯度记录张量通过RPC发送的时间;在反向传播过程中,我们利用这些信息使用RPC执行分布式反向传播。更多细节请参见分布式自动微分设计。
- torch.distributed.autograd.backward(context_id: int, roots: List[Tensor], retain_graph=False) None¶
启动分布式反向传播,使用提供的根节点。这目前实现了 快速模式算法,该算法假设在同一分布式自动微分上下文中发送的所有RPC消息在反向传播期间都将是自动微分图的一部分。
我们使用提供的根节点来发现自动求导图并计算适当的依赖关系。此方法会阻塞,直到整个自动求导计算完成。
我们在每个节点上将梯度累积到合适的
torch.distributed.autograd.context。要使用的autograd上下文是根据在调用torch.distributed.autograd.backward()时传入的context_id进行查找的。如果没有与给定ID对应的合法autograd上下文,我们会抛出一个错误。你可以使用get_gradients()API 来检索累积的梯度。- Parameters
- Example::
>>> import torch.distributed.autograd as dist_autograd >>> with dist_autograd.context() as context_id: >>> pred = model.forward() >>> loss = loss_func(pred, loss) >>> dist_autograd.backward(context_id, loss)
- class torch.distributed.autograd.context[source]¶
上下文对象,用于在使用分布式自动梯度时包装前向和反向传播。在
context_id语句中生成的with需要用来在所有工作进程中唯一标识一次分布式反向传播。每个工作进程存储与此context_id相关的元数据,这是正确执行分布式自动梯度传播所必需的。- Example::
>>> import torch.distributed.autograd as dist_autograd >>> with dist_autograd.context() as context_id: >>> t1 = torch.rand((3, 3), requires_grad=True) >>> t2 = torch.rand((3, 3), requires_grad=True) >>> loss = rpc.rpc_sync("worker1", torch.add, args=(t1, t2)).sum() >>> dist_autograd.backward(context_id, [loss])
- torch.distributed.autograd.get_gradients(context_id: int) Dict[Tensor, Tensor]¶
检索一个从张量到该张量适当梯度的映射, 在提供的上下文中累积与给定
context_id对应的内容, 作为分布式自动求导反向传播的一部分。- Parameters
context_id (int) – 我们应该检索梯度的自动求导上下文 ID。
- Returns
一个映射,其中键是张量,值是与该张量相关联的梯度。
- Example::
>>> import torch.distributed.autograd as dist_autograd >>> with dist_autograd.context() as context_id: >>> t1 = torch.rand((3, 3), requires_grad=True) >>> t2 = torch.rand((3, 3), requires_grad=True) >>> loss = t1 + t2 >>> dist_autograd.backward(context_id, [loss.sum()]) >>> grads = dist_autograd.get_gradients(context_id) >>> print(grads[t1]) >>> print(grads[t2])
关于 RPC Autograd 的更多信息
分布式优化器¶
请查看 torch.distributed.optim 页面,了解分布式优化器的文档。
设计说明¶
分布式自动梯度设计说明涵盖了基于 RPC 的分布式自动梯度框架的设计,该框架适用于模型并行训练等应用。
RRef 设计说明涵盖了 RRef(远程引用)协议的设计,该协议用于通过框架引用远程工作器上的值。
教程¶
RPC 教程向用户介绍 RPC 框架,提供几个使用 torch.distributed.rpc API 的示例应用程序,并演示如何使用 性能分析器 对基于 RPC 的工作负载进行性能分析。