分布式 RPC 框架入门¶
创建时间: 2020年1月1日 |上次更新时间: 2023 年 1 月 9 日 |上次验证: Nov 05, 2024
作者: 沈丽
注意
在 github 中查看和编辑本教程。
先决条件:
本教程使用两个简单的示例来演示如何构建分布式 使用 torch.distributed.rpc 包进行训练,该包最初在 PyTorch v1.4 中作为实验性功能引入。 这两个示例的源代码可以在 PyTorch 示例中找到。
之前的教程 分布式数据并行入门 和 使用 PyTorch 编写分布式应用程序 描述了 DistributedDataParallel,它支持特定的训练范例,其中模型在 多个进程,每个进程处理输入数据的拆分。 有时,您可能会遇到需要不同培训的场景 范式。例如:
在强化学习中,获取可能相对昂贵 来自环境的训练数据,而模型本身可能非常小。在 在这种情况下,生成并行运行的多个观察者可能很有用 并共享单个代理。在这种情况下,代理负责培训 本地,但应用程序仍然需要库来发送和接收 观察者和 Trainer 之间的数据。
您的模型可能太大,无法放入单台计算机上的 GPU 中,因此 需要一个库来帮助将模型拆分到多台机器上。或者您 可能正在实现 Parameter Server 训练框架,其中模型参数和训练器位于不同的 机器。
torch.distributed.rpc 包 可以帮助解决上述情况。在情况 1 中,RPC 和 RRef 允许发送数据 从一个工作程序到另一个工作程序,同时轻松引用远程数据对象。在 案例 2 ,分布式 Autograd 和分布式优化器使执行 backward pass 和 optimizer 步骤就像本地训练一样。在 在接下来的两节中,我们将使用 强化学习示例和语言模型示例。请注意,此 教程的目标不是构建最准确或最有效的模型 解决给定的问题,相反,这里的主要目标是展示如何使用 torch.distributed.rpc 包来 构建分布式训练应用程序。
使用 RPC 和 RRef 的分布式强化学习¶
本节介绍构建玩具分布式强化学习的步骤
模型使用 RPC 解决来自 OpenAI Gym 的 CartPole-v1。
策略代码主要借鉴了现有的单线程示例,如下所示。我们将跳过设计的细节,专注于 RPC
用法。Policy
import torch.nn as nn
import torch.nn.functional as F
class Policy(nn.Module):
def __init__(self):
super(Policy, self).__init__()
self.affine1 = nn.Linear(4, 128)
self.dropout = nn.Dropout(p=0.6)
self.affine2 = nn.Linear(128, 2)
def forward(self, x):
x = self.affine1(x)
x = self.dropout(x)
x = F.relu(x)
action_scores = self.affine2(x)
return F.softmax(action_scores, dim=1)
我们准备介绍观察者。在此示例中,每个观察者都会创建其
自己的环境,并等待代理的命令运行剧集。在每个
事件中,一个观察者在大多数迭代中循环,并且在每个迭代中
迭代中,它使用 RPC 将其环境状态传递给代理,并获取一个
action back 操作。然后,它将该操作应用于其环境,并获得奖励
以及 environment 中的 next state。之后,观察者使用另一个
RPC 向 Agent 报告奖励。同样,请注意,这是
显然不是最有效的 observer 实现。例如,一个
简单的优化可以是将当前状态和最后奖励打包在一个 RPC 中,以
减少通信开销。但是,目标是演示 RPC API
而不是为 CartPole 构建最佳求解器。那么,让我们保持逻辑
simple 和此示例中的 explicit 两个步骤。n_steps
import argparse
import gym
import torch.distributed.rpc as rpc
parser = argparse.ArgumentParser(
description="RPC Reinforcement Learning Example",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument('--world_size', default=2, type=int, metavar='W',
help='number of workers')
parser.add_argument('--log_interval', type=int, default=10, metavar='N',
help='interval between training status logs')
parser.add_argument('--gamma', type=float, default=0.99, metavar='G',
help='how much to value future rewards')
parser.add_argument('--seed', type=int, default=1, metavar='S',
help='random seed for reproducibility')
args = parser.parse_args()
class Observer:
def __init__(self):
self.id = rpc.get_worker_info().id
self.env = gym.make('CartPole-v1')
self.env.seed(args.seed)
def run_episode(self, agent_rref):
state, ep_reward = self.env.reset(), 0
for _ in range(10000):
# send the state to the agent to get an action
action = agent_rref.rpc_sync().select_action(self.id, state)
# apply the action to the environment, and get the reward
state, reward, done, _ = self.env.step(action)
# report the reward to the agent for training purpose
agent_rref.rpc_sync().report_reward(self.id, reward)
# finishes after the number of self.env._max_episode_steps
if done:
break
agent 的代码稍微复杂一些,我们会将其分解为多个
件。在此示例中,代理既是 trainer 又是 master。
这样它就会向多个分布式观察者发送命令来运行剧集,
它还在本地记录所有操作和奖励,这些操作和奖励将在
每集后的训练阶段。下面的代码显示了大多数行初始化各种组件的构造函数。位于
end 在其他 worker 上远程初始化 observers,并保持
那些当地的观察员。代理稍后将使用这些观察者来
send 命令。应用程序无需担心 .
每个 Bean 的所有者都维护一个引用计数映射来跟踪其
生命周期,并保证只要
该 .有关详细信息,请参阅设计文档。Agent
RRefs
RRefs
RRefs
RRef
RRef
RRef
import gym
import numpy as np
import torch
import torch.distributed.rpc as rpc
import torch.optim as optim
from torch.distributed.rpc import RRef, rpc_async, remote
from torch.distributions import Categorical
class Agent:
def __init__(self, world_size):
self.ob_rrefs = []
self.agent_rref = RRef(self)
self.rewards = {}
self.saved_log_probs = {}
self.policy = Policy()
self.optimizer = optim.Adam(self.policy.parameters(), lr=1e-2)
self.eps = np.finfo(np.float32).eps.item()
self.running_reward = 0
self.reward_threshold = gym.make('CartPole-v1').spec.reward_threshold
for ob_rank in range(1, world_size):
ob_info = rpc.get_worker_info(OBSERVER_NAME.format(ob_rank))
self.ob_rrefs.append(remote(ob_info, Observer))
self.rewards[ob_info.id] = []
self.saved_log_probs[ob_info.id] = []
接下来,代理向观察者公开两个 API,用于选择操作,以及 报告 奖励。这些函数仅在代理上本地运行,但会 由观察者通过 RPC 触发。
class Agent:
...
def select_action(self, ob_id, state):
state = torch.from_numpy(state).float().unsqueeze(0)
probs = self.policy(state)
m = Categorical(probs)
action = m.sample()
self.saved_log_probs[ob_id].append(m.log_prob(action))
return action.item()
def report_reward(self, ob_id, reward):
self.rewards[ob_id].append(reward)
让我们在 agent 上添加一个函数,告诉所有观察者
执行剧集。在此函数中,它首先创建一个要收集的列表
futures 的 Futures,然后遍历所有 observer 到
制作异步 RPC。在这些 RPC 中,代理还会传递
自身传递给观察者,以便观察者可以将代理上的函数作为
井。如上所示,每个观察者都会将 RPC 返回给代理,这些 RPC 是
嵌套的 RPC。每集结束后,和 将
包含录制的操作 probs 和 rewards。run_episode
RRefs
RRef
saved_log_probs
rewards
class Agent:
...
def run_episode(self):
futs = []
for ob_rref in self.ob_rrefs:
# make async RPC to kick off an episode on all observers
futs.append(
rpc_async(
ob_rref.owner(),
ob_rref.rpc_sync().run_episode,
args=(self.agent_rref,)
)
)
# wait until all obervers have finished this episode
for fut in futs:
fut.wait()
最后,在一次发作后,代理需要训练模型,该模型
在下面的函数中实现。中没有 RPC
这个函数,它主要是从单线程示例中借来的。
因此,我们跳过描述其内容。finish_episode
class Agent:
...
def finish_episode(self):
# joins probs and rewards from different observers into lists
R, probs, rewards = 0, [], []
for ob_id in self.rewards:
probs.extend(self.saved_log_probs[ob_id])
rewards.extend(self.rewards[ob_id])
# use the minimum observer reward to calculate the running reward
min_reward = min([sum(self.rewards[ob_id]) for ob_id in self.rewards])
self.running_reward = 0.05 * min_reward + (1 - 0.05) * self.running_reward
# clear saved probs and rewards
for ob_id in self.rewards:
self.rewards[ob_id] = []
self.saved_log_probs[ob_id] = []
policy_loss, returns = [], []
for r in rewards[::-1]:
R = r + args.gamma * R
returns.insert(0, R)
returns = torch.tensor(returns)
returns = (returns - returns.mean()) / (returns.std() + self.eps)
for log_prob, R in zip(probs, returns):
policy_loss.append(-log_prob * R)
self.optimizer.zero_grad()
policy_loss = torch.cat(policy_loss).sum()
policy_loss.backward()
self.optimizer.step()
return min_reward
有了 , , 和 类,我们就可以开始了
多个进程来执行分布式训练。在此示例中,所有
进程运行相同的函数,它们使用 Rank 来
区分他们的角色。等级 0 始终是代理,所有其他等级都是
观察员。代理通过反复调用担任 master ,直到运行奖励超过奖励阈值
由环境指定。所有观察者被动等待命令
从代理。代码由 rpc.init_rpc 和 rpc.shutdown 包装,
它分别初始化和终止 RPC 实例。更多详情如下
在 API 页面中可用。Policy
Observer
Agent
run_worker
run_episode
finish_episode
import os
from itertools import count
import torch.multiprocessing as mp
AGENT_NAME = "agent"
OBSERVER_NAME="obs{}"
def run_worker(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '29500'
if rank == 0:
# rank0 is the agent
rpc.init_rpc(AGENT_NAME, rank=rank, world_size=world_size)
agent = Agent(world_size)
print(f"This will run until reward threshold of {agent.reward_threshold}"
" is reached. Ctrl+C to exit.")
for i_episode in count(1):
agent.run_episode()
last_reward = agent.finish_episode()
if i_episode % args.log_interval == 0:
print(f"Episode {i_episode}\tLast reward: {last_reward:.2f}\tAverage reward: "
f"{agent.running_reward:.2f}")
if agent.running_reward > agent.reward_threshold:
print(f"Solved! Running reward is now {agent.running_reward}!")
break
else:
# other ranks are the observer
rpc.init_rpc(OBSERVER_NAME.format(rank), rank=rank, world_size=world_size)
# observers passively waiting for instructions from the agent
# block until all rpcs finish, and shutdown the RPC instance
rpc.shutdown()
mp.spawn(
run_worker,
args=(args.world_size, ),
nprocs=args.world_size,
join=True
)
以下是使用 world_size=2 进行训练时的一些示例输出。
This will run until reward threshold of 475.0 is reached. Ctrl+C to exit.
Episode 10 Last reward: 26.00 Average reward: 10.01
Episode 20 Last reward: 16.00 Average reward: 11.27
Episode 30 Last reward: 49.00 Average reward: 18.62
Episode 40 Last reward: 45.00 Average reward: 26.09
Episode 50 Last reward: 44.00 Average reward: 30.03
Episode 60 Last reward: 111.00 Average reward: 42.23
Episode 70 Last reward: 131.00 Average reward: 70.11
Episode 80 Last reward: 87.00 Average reward: 76.51
Episode 90 Last reward: 86.00 Average reward: 95.93
Episode 100 Last reward: 13.00 Average reward: 123.93
Episode 110 Last reward: 33.00 Average reward: 91.39
Episode 120 Last reward: 73.00 Average reward: 76.38
Episode 130 Last reward: 137.00 Average reward: 88.08
Episode 140 Last reward: 89.00 Average reward: 104.96
Episode 150 Last reward: 97.00 Average reward: 98.74
Episode 160 Last reward: 150.00 Average reward: 100.87
Episode 170 Last reward: 126.00 Average reward: 104.38
Episode 180 Last reward: 500.00 Average reward: 213.74
Episode 190 Last reward: 322.00 Average reward: 300.22
Episode 200 Last reward: 165.00 Average reward: 272.71
Episode 210 Last reward: 168.00 Average reward: 233.11
Episode 220 Last reward: 184.00 Average reward: 195.02
Episode 230 Last reward: 284.00 Average reward: 208.32
Episode 240 Last reward: 395.00 Average reward: 247.37
Episode 250 Last reward: 500.00 Average reward: 335.42
Episode 260 Last reward: 500.00 Average reward: 386.30
Episode 270 Last reward: 500.00 Average reward: 405.29
Episode 280 Last reward: 500.00 Average reward: 443.29
Episode 290 Last reward: 500.00 Average reward: 464.65
Solved! Running reward is now 475.3163778435275!
在此示例中,我们将演示如何使用 RPC 作为通信工具来传递
数据,以及如何使用 RRef 引用远程对象。这是真的
您可以直接在 API 上构建整个结构,或者使用其他通信/RPC 库。然而
通过使用 torch.distributed.rpc,您可以获得原生支持和
在后台持续优化性能。ProcessGroup
send
recv
接下来,我们将展示如何将 RPC 和 RRef 与分布式 autograd 和 分布式优化器来执行分布式模型并行训练。
使用 Distributed Autograd 和 Distributed Optimizer 的分布式 RNN¶
在本节中,我们使用 RNN 模型来展示如何构建分布式模型 使用 RPC API 进行并行训练。示例 RNN 模型非常小,并且 可以很容易地放入单个 GPU 中,但我们仍然将其层划分为两个 不同的工人来演示这个想法。开发者可以应用类似的 在多个设备上分发更大模型的技术,以及 机器。
RNN 模型设计借鉴了 PyTorch 示例仓库中的 word 语言模型,其中包含三个主要组件:嵌入表、层和解码器。下面的代码包装了 embedding 表和
decoder 转换为子模块,以便它们的构造函数可以传递给 RPC
应用程序接口。在子模块中,我们特意将 GPU 层放在 GPU 上以覆盖用例。在 v1.4 中,RPC 始终创建
目标工作程序上的 CPU 张量参数或返回值。如果函数
获取 GPU 张量,则需要将其显式移动到适当的设备。LSTM
EmbeddingTable
Embedding
class EmbeddingTable(nn.Module):
r"""
Encoding layers of the RNNModel
"""
def __init__(self, ntoken, ninp, dropout):
super(EmbeddingTable, self).__init__()
self.drop = nn.Dropout(dropout)
self.encoder = nn.Embedding(ntoken, ninp).cuda()
self.encoder.weight.data.uniform_(-0.1, 0.1)
def forward(self, input):
return self.drop(self.encoder(input.cuda()).cpu()
class Decoder(nn.Module):
def __init__(self, ntoken, nhid, dropout):
super(Decoder, self).__init__()
self.drop = nn.Dropout(dropout)
self.decoder = nn.Linear(nhid, ntoken)
self.decoder.bias.data.zero_()
self.decoder.weight.data.uniform_(-0.1, 0.1)
def forward(self, output):
return self.decoder(self.drop(output))
通过上述子模块,我们现在可以使用 RPC 将它们拼凑在一起,以
创建 RNN 模型。在下面的代码中,表示一个参数服务器
它承载 embedding table 和 decoder 的参数。构造函数
使用远程 API 创建一个对象,并在
parameter server 并在本地创建子模块。在
forward pass 的 Trainer 使用 来查找
remote 子模块中,并将输入数据传递给 using RPC
并获取查找结果。然后,它通过本地层运行 embedding,最后使用另一个 RPC 将输出发送到子模块。一般来说,要实现分布式模型并行
训练时,开发者可以将模型划分为多个子模块,调用 RPC 来创建
sub-module 实例,并在必要时使用 on 来查找它们。
正如您在下面的代码中看到的,它看起来与单机模型非常相似
并行训练。主要区别在于替换为
RPC 函数。ps
EmbeddingTable
Decoder
LSTM
EmbeddingTable
RRef
EmbeddingTable
LSTM
Decoder
RRef
Tensor.to(device)
class RNNModel(nn.Module):
def __init__(self, ps, ntoken, ninp, nhid, nlayers, dropout=0.5):
super(RNNModel, self).__init__()
# setup embedding table remotely
self.emb_table_rref = rpc.remote(ps, EmbeddingTable, args=(ntoken, ninp, dropout))
# setup LSTM locally
self.rnn = nn.LSTM(ninp, nhid, nlayers, dropout=dropout)
# setup decoder remotely
self.decoder_rref = rpc.remote(ps, Decoder, args=(ntoken, nhid, dropout))
def forward(self, input, hidden):
# pass input to the remote embedding table and fetch emb tensor back
emb = _remote_method(EmbeddingTable.forward, self.emb_table_rref, input)
output, hidden = self.rnn(emb, hidden)
# pass output to the rremote decoder and get the decoded output back
decoded = _remote_method(Decoder.forward, self.decoder_rref, output)
return decoded, hidden
在介绍分布式优化器之前,让我们添加一个辅助函数
生成模型参数的 RRef 列表,该列表将被
分布式优化器。在本地训练中,应用程序可以调用 grab 对所有参数张量的引用,并将其传递
添加到本地优化器以进行后续更新。但是,相同的 API 不会
在分布式训练场景中工作,因为某些参数位于远程
机器。因此,不是采用参数列表,
分布式优化器采用一个列表 ,每个模型一个
parameter 获取本地和远程模型参数。辅助函数是
很简单,只需调用 并创建一个本地 on
每个参数。Module.parameters()
Tensors
RRefs
RRef
Module.parameters()
RRef
def _parameter_rrefs(module):
param_rrefs = []
for param in module.parameters():
param_rrefs.append(RRef(param))
return param_rrefs
然后,由于 包含 三个子模块,我们需要调用 3 次,并将其包装到另一个辅助函数中。RNNModel
_parameter_rrefs
class RNNModel(nn.Module):
...
def parameter_rrefs(self):
remote_params = []
# get RRefs of embedding table
remote_params.extend(_remote_method(_parameter_rrefs, self.emb_table_rref))
# create RRefs for local parameters
remote_params.extend(_parameter_rrefs(self.rnn))
# get RRefs of decoder
remote_params.extend(_remote_method(_parameter_rrefs, self.decoder_rref))
return remote_params
现在,我们已准备好实现训练循环。初始化模型后
参数,我们创建 和 .这
分布式优化器会取一个 list of parameter ,找到所有 distinct
owner worker,并创建给定的本地优化器(即,在本例中为
您也可以使用其他本地优化器)在每个 owner worker 上使用
给定的参数(即 )。RNNModel
DistributedOptimizer
RRefs
SGD
lr=0.05
在训练循环中,它首先创建一个分布式 autograd 上下文,该上下文
将帮助分布式 autograd 引擎找到渐变和涉及的 RPC
send/recv 函数。分布式 autograd 引擎的设计细节可以
可以在其设计说明中找到。
然后,它开始向前传球,就好像它是本地的一样
model 的 SET 和 SET SET 的 SUB对于分布式向后,您
只需要指定一个 root 列表,这种情况下就是 loss 了。
分布式 autograd 引擎将遍历分布式图
并正确写入渐变。接下来,它在分布式优化器上运行该函数,该优化器将联系所有相关人员
本地优化器来更新模型参数。与本地培训相比,一个
细微的区别是你不需要运行,因为每个
autograd 上下文有专门的空间来存储渐变,当我们创建一个
context 中,来自不同迭代的那些梯度将不会
累积到同一组 。Tensor
step
zero_grad()
Tensors
def run_trainer():
batch = 5
ntoken = 10
ninp = 2
nhid = 3
nindices = 3
nlayers = 4
hidden = (
torch.randn(nlayers, nindices, nhid),
torch.randn(nlayers, nindices, nhid)
)
model = rnn.RNNModel('ps', ntoken, ninp, nhid, nlayers)
# setup distributed optimizer
opt = DistributedOptimizer(
optim.SGD,
model.parameter_rrefs(),
lr=0.05,
)
criterion = torch.nn.CrossEntropyLoss()
def get_next_batch():
for _ in range(5):
data = torch.LongTensor(batch, nindices) % ntoken
target = torch.LongTensor(batch, ntoken) % nindices
yield data, target
# train for 10 iterations
for epoch in range(10):
for data, target in get_next_batch():
# create distributed autograd context
with dist_autograd.context() as context_id:
hidden[0].detach_()
hidden[1].detach_()
output, hidden = model(data, hidden)
loss = criterion(output, target)
# run distributed backward pass
dist_autograd.backward(context_id, [loss])
# run distributed optimizer
opt.step(context_id)
# not necessary to zero grads since they are
# accumulated into the distributed autograd context
# which is reset every iteration.
print("Training epoch {}".format(epoch))
最后,让我们添加一些 glue 代码来启动 parameter server 和 trainer 过程。
def run_worker(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '29500'
if rank == 1:
rpc.init_rpc("trainer", rank=rank, world_size=world_size)
_run_trainer()
else:
rpc.init_rpc("ps", rank=rank, world_size=world_size)
# parameter server do nothing
pass
# block until all rpcs finish
rpc.shutdown()
if __name__=="__main__":
world_size = 2
mp.spawn(run_worker, args=(world_size, ), nprocs=world_size, join=True)