使用分布式RPC框架入门¶
创建时间:2020年1月01日 | 最后更新时间:2023年1月09日 | 最后验证时间:2024年11月05日
作者: 李申
注意
查看和编辑此教程在 github。
Prerequisites:
本教程使用两个简单的示例来演示如何使用 torch.distributed.rpc 包构建分布式训练。该包最初作为实验性功能在 PyTorch v1.4 中引入。两个示例的源代码可以在 PyTorch 示例 中找到。
之前的教程, 分布式数据并行入门 和 使用 PyTorch 编写分布式应用程序, 描述了 DistributedDataParallel 它支持一种特定的训练范式,其中模型在多个进程中复制,每个进程处理输入数据的一部分。 有时,您可能会遇到需要不同训练范式的场景。例如:
在强化学习中,从环境中获取训练数据可能会相对昂贵,而模型本身可能相当小。在这种情况下,生成多个并行运行的观察者并共享一个智能体可能是有用的。在这种情况下,智能体负责本地的训练,但应用程序仍然需要库来在观察者和训练器之间发送和接收数据。
您的模型可能太大,无法适应单个机器上的GPU,因此需要一个库来帮助将模型拆分到多个机器上。或者您可能正在实现一个参数服务器训练框架,其中模型参数和训练器位于不同的机器上。
The torch.distributed.rpc package can help with the above scenarios. In case 1, RPC and RRef allow sending data from one worker to another while easily referencing remote data objects. In case 2, distributed autograd and distributed optimizer make executing backward pass and optimizer step as if it is local training. In the next two sections, we will demonstrate APIs of torch.distributed.rpc using a reinforcement learning example and a language model example. Please note, this tutorial does not aim at building the most accurate or efficient models to solve given problems, instead, the main goal here is to show how to use the torch.distributed.rpc package to build distributed training applications.
使用RPC和RRef进行分布式强化学习¶
此部分描述了使用RPC构建一个玩具分布式强化学习模型的步骤,以解决来自OpenAI Gym的CartPole-v1问题。
策略代码大部分借鉴了现有的单线程
示例
,如下所示。我们将跳过Policy设计的细节,并专注于RPC的用法。
import torch.nn as nn
import torch.nn.functional as F
class Policy(nn.Module):
def __init__(self):
super(Policy, self).__init__()
self.affine1 = nn.Linear(4, 128)
self.dropout = nn.Dropout(p=0.6)
self.affine2 = nn.Linear(128, 2)
def forward(self, x):
x = self.affine1(x)
x = self.dropout(x)
x = F.relu(x)
action_scores = self.affine2(x)
return F.softmax(action_scores, dim=1)
我们准备展示观察者。在这个例子中,每个观察者都会创建自己的环境,并等待代理的指令来运行一个回合。在每个回合中,一个观察者最多循环 n_steps 次迭代,在每次迭代中,它使用RPC将环境状态传递给代理并获取一个动作。然后它将该动作应用于环境,并从环境中获得奖励和下一个状态。之后,观察者使用另一个RPC向代理报告奖励。请注意,这显然不是最高效的观察者实现方式。例如,一个简单的优化可以是将当前状态和上一次奖励打包到一次RPC中以减少通信开销。然而,我们的目标是演示RPC API,而不是为CartPole构建最佳求解器。因此,在这个例子中,我们保持逻辑简单,并且将这两个步骤明确分开。
import argparse
import gym
import torch.distributed.rpc as rpc
parser = argparse.ArgumentParser(
description="RPC Reinforcement Learning Example",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument('--world_size', default=2, type=int, metavar='W',
help='number of workers')
parser.add_argument('--log_interval', type=int, default=10, metavar='N',
help='interval between training status logs')
parser.add_argument('--gamma', type=float, default=0.99, metavar='G',
help='how much to value future rewards')
parser.add_argument('--seed', type=int, default=1, metavar='S',
help='random seed for reproducibility')
args = parser.parse_args()
class Observer:
def __init__(self):
self.id = rpc.get_worker_info().id
self.env = gym.make('CartPole-v1')
self.env.seed(args.seed)
def run_episode(self, agent_rref):
state, ep_reward = self.env.reset(), 0
for _ in range(10000):
# send the state to the agent to get an action
action = agent_rref.rpc_sync().select_action(self.id, state)
# apply the action to the environment, and get the reward
state, reward, done, _ = self.env.step(action)
# report the reward to the agent for training purpose
agent_rref.rpc_sync().report_reward(self.id, reward)
# finishes after the number of self.env._max_episode_steps
if done:
break
代理的代码稍微复杂一些,我们将它分成多个部分。在这个例子中,代理既是训练器又是主控,它向多个分布式观察者发送命令来运行事件,并且它也会在每个事件后记录所有的动作和奖励,这些将在训练阶段使用。下面的代码展示了Agent构造函数,其中大多数行都在初始化各种组件。最后的循环在其他工作节点上远程初始化观察者,并在本地持有对这些观察者的RRefs引用。代理将使用这些观察者的RRefs后来发送命令。应用程序不需要担心RRefs的生命周期。每个RRef的所有者维护一个引用计数映射来跟踪其生命周期,并保证只要还有任何对该RRef的活跃用户,远程数据对象就不会被删除。请参阅RRef设计文档以获取更多细节。
import gym
import numpy as np
import torch
import torch.distributed.rpc as rpc
import torch.optim as optim
from torch.distributed.rpc import RRef, rpc_async, remote
from torch.distributions import Categorical
class Agent:
def __init__(self, world_size):
self.ob_rrefs = []
self.agent_rref = RRef(self)
self.rewards = {}
self.saved_log_probs = {}
self.policy = Policy()
self.optimizer = optim.Adam(self.policy.parameters(), lr=1e-2)
self.eps = np.finfo(np.float32).eps.item()
self.running_reward = 0
self.reward_threshold = gym.make('CartPole-v1').spec.reward_threshold
for ob_rank in range(1, world_size):
ob_info = rpc.get_worker_info(OBSERVER_NAME.format(ob_rank))
self.ob_rrefs.append(remote(ob_info, Observer))
self.rewards[ob_info.id] = []
self.saved_log_probs[ob_info.id] = []
接下来,代理向观察者暴露了两个API,用于选择动作和报告奖励。这些函数仅在代理本地运行,但将通过RPC由观察者触发。
class Agent:
...
def select_action(self, ob_id, state):
state = torch.from_numpy(state).float().unsqueeze(0)
probs = self.policy(state)
m = Categorical(probs)
action = m.sample()
self.saved_log_probs[ob_id].append(m.log_prob(action))
return action.item()
def report_reward(self, ob_id, reward):
self.rewards[ob_id].append(reward)
让我们在代理上添加一个 run_episode 函数,该函数会通知所有观察者执行一个回合。在这个函数中,它首先创建一个列表来收集来自异步RPC的未来结果,然后遍历所有观察者 RRefs 以发起异步RPC。在这些RPC中,代理还会将自己的 RRef 传递给观察者,以便观察者也可以调用代理上的函数。如上所示,每个观察者都会向代理发起RPC,这些RPC是嵌套的RPC。每个回合结束后,saved_log_probs 和 rewards 将包含记录的动作概率和奖励。
class Agent:
...
def run_episode(self):
futs = []
for ob_rref in self.ob_rrefs:
# make async RPC to kick off an episode on all observers
futs.append(
rpc_async(
ob_rref.owner(),
ob_rref.rpc_sync().run_episode,
args=(self.agent_rref,)
)
)
# wait until all obervers have finished this episode
for fut in futs:
fut.wait()
最后,在一个情节结束后,代理需要训练模型,这在下面的finish_episode函数中实现。此函数中没有RPC调用,且大部分内容借鉴自单线程示例。因此,我们跳过描述其内容。
class Agent:
...
def finish_episode(self):
# joins probs and rewards from different observers into lists
R, probs, rewards = 0, [], []
for ob_id in self.rewards:
probs.extend(self.saved_log_probs[ob_id])
rewards.extend(self.rewards[ob_id])
# use the minimum observer reward to calculate the running reward
min_reward = min([sum(self.rewards[ob_id]) for ob_id in self.rewards])
self.running_reward = 0.05 * min_reward + (1 - 0.05) * self.running_reward
# clear saved probs and rewards
for ob_id in self.rewards:
self.rewards[ob_id] = []
self.saved_log_probs[ob_id] = []
policy_loss, returns = [], []
for r in rewards[::-1]:
R = r + args.gamma * R
returns.insert(0, R)
returns = torch.tensor(returns)
returns = (returns - returns.mean()) / (returns.std() + self.eps)
for log_prob, R in zip(probs, returns):
policy_loss.append(-log_prob * R)
self.optimizer.zero_grad()
policy_loss = torch.cat(policy_loss).sum()
policy_loss.backward()
self.optimizer.step()
return min_reward
使用 Policy、Observer 和 Agent 类,我们准备启动
多个进程来进行分布式训练。在此示例中,所有进程运行相同的 run_worker 函数,并通过 rank 来
区分各自的角色。rank 0 始终是代理,其他 rank 是观察者。代理通过反复调用 run_episode 和
finish_episode 直到运行奖励超过环境指定的奖励阈值。所有观察者被动等待代理的指令。代码由
rpc.init_rpc 和
rpc.shutdown
包裹,分别用于初始化和终止 RPC 实例。更多细节请参阅 API 页面。
import os
from itertools import count
import torch.multiprocessing as mp
AGENT_NAME = "agent"
OBSERVER_NAME="obs{}"
def run_worker(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '29500'
if rank == 0:
# rank0 is the agent
rpc.init_rpc(AGENT_NAME, rank=rank, world_size=world_size)
agent = Agent(world_size)
print(f"This will run until reward threshold of {agent.reward_threshold}"
" is reached. Ctrl+C to exit.")
for i_episode in count(1):
agent.run_episode()
last_reward = agent.finish_episode()
if i_episode % args.log_interval == 0:
print(f"Episode {i_episode}\tLast reward: {last_reward:.2f}\tAverage reward: "
f"{agent.running_reward:.2f}")
if agent.running_reward > agent.reward_threshold:
print(f"Solved! Running reward is now {agent.running_reward}!")
break
else:
# other ranks are the observer
rpc.init_rpc(OBSERVER_NAME.format(rank), rank=rank, world_size=world_size)
# observers passively waiting for instructions from the agent
# block until all rpcs finish, and shutdown the RPC instance
rpc.shutdown()
mp.spawn(
run_worker,
args=(args.world_size, ),
nprocs=args.world_size,
join=True
)
以下是使用 world_size=2 训练时的一些示例输出。
This will run until reward threshold of 475.0 is reached. Ctrl+C to exit.
Episode 10 Last reward: 26.00 Average reward: 10.01
Episode 20 Last reward: 16.00 Average reward: 11.27
Episode 30 Last reward: 49.00 Average reward: 18.62
Episode 40 Last reward: 45.00 Average reward: 26.09
Episode 50 Last reward: 44.00 Average reward: 30.03
Episode 60 Last reward: 111.00 Average reward: 42.23
Episode 70 Last reward: 131.00 Average reward: 70.11
Episode 80 Last reward: 87.00 Average reward: 76.51
Episode 90 Last reward: 86.00 Average reward: 95.93
Episode 100 Last reward: 13.00 Average reward: 123.93
Episode 110 Last reward: 33.00 Average reward: 91.39
Episode 120 Last reward: 73.00 Average reward: 76.38
Episode 130 Last reward: 137.00 Average reward: 88.08
Episode 140 Last reward: 89.00 Average reward: 104.96
Episode 150 Last reward: 97.00 Average reward: 98.74
Episode 160 Last reward: 150.00 Average reward: 100.87
Episode 170 Last reward: 126.00 Average reward: 104.38
Episode 180 Last reward: 500.00 Average reward: 213.74
Episode 190 Last reward: 322.00 Average reward: 300.22
Episode 200 Last reward: 165.00 Average reward: 272.71
Episode 210 Last reward: 168.00 Average reward: 233.11
Episode 220 Last reward: 184.00 Average reward: 195.02
Episode 230 Last reward: 284.00 Average reward: 208.32
Episode 240 Last reward: 395.00 Average reward: 247.37
Episode 250 Last reward: 500.00 Average reward: 335.42
Episode 260 Last reward: 500.00 Average reward: 386.30
Episode 270 Last reward: 500.00 Average reward: 405.29
Episode 280 Last reward: 500.00 Average reward: 443.29
Episode 290 Last reward: 500.00 Average reward: 464.65
Solved! Running reward is now 475.3163778435275!
在这个示例中,我们将展示如何使用RPC作为通信方式在工作者之间传递数据,以及如何使用RRef来引用远程对象。确实,你可以直接在ProcessGroup
send和recvAPIs或使用其他通信/RPC库之上构建整个结构。然而,
通过使用torch.distributed.rpc,你可以获得原生支持和
底层持续优化的性能。
接下来,我们将展示如何结合RPC和RRef与分布式自动求导和分布式优化器来进行分布式模型并行训练。
使用分布式自动微分和分布式优化器的分布式RNN¶
在本节中,我们使用一个RNN模型来展示如何通过RPC API构建分布式模型并行训练。这个示例RNN模型非常小,可以轻松地放入单个GPU中,但我们仍然将其层分配到两个不同的工作进程中,以说明这一理念。开发者可以应用类似的技巧,将更大规模的模型分布到多个设备和机器上。
RNN模型设计借鉴自PyTorch中的词语言模型
示例
仓库,该仓库包含三个主要组件:一个嵌入表、一个
LSTM 层和一个解码器。下面的代码将嵌入表和解码器封装为子模块,以便可以将它们的构造函数传递给RPC
API。在 EmbeddingTable 子模块中,我们有意将
Embedding 层放在GPU上以覆盖用例。在v1.4版本中,RPC总是在目标工作节点上创建CPU张量参数或返回值。如果函数需要GPU张量,则需要显式地将其移动到适当的设备。
class EmbeddingTable(nn.Module):
r"""
Encoding layers of the RNNModel
"""
def __init__(self, ntoken, ninp, dropout):
super(EmbeddingTable, self).__init__()
self.drop = nn.Dropout(dropout)
self.encoder = nn.Embedding(ntoken, ninp).cuda()
self.encoder.weight.data.uniform_(-0.1, 0.1)
def forward(self, input):
return self.drop(self.encoder(input.cuda()).cpu()
class Decoder(nn.Module):
def __init__(self, ntoken, nhid, dropout):
super(Decoder, self).__init__()
self.drop = nn.Dropout(dropout)
self.decoder = nn.Linear(nhid, ntoken)
self.decoder.bias.data.zero_()
self.decoder.weight.data.uniform_(-0.1, 0.1)
def forward(self, output):
return self.decoder(self.drop(output))
通过上述子模块,我们现在可以使用RPC将它们组合起来创建一个RNN模型。在下面的代码中 ps 表示一个参数服务器,它托管嵌入表和解码器的参数。构造函数使用 remote
API 在参数服务器上创建一个 EmbeddingTable 对象和一个 Decoder 对象,并在本地创建 LSTM 子模块。在前向传递过程中,训练器使用 EmbeddingTable RRef 来查找远程子模块,并通过RPC将输入数据传递给 EmbeddingTable,并获取查找结果。然后,它将嵌入通过本地 LSTM 层运行,最后使用另一个RPC将输出发送到 Decoder 子模块。一般来说,要实现分布式模型并行训练,开发者可以将模型划分为子模块,调用RPC在远程创建子模块实例,并在需要时使用 RRef 来查找它们。如你可以在下面的代码中看到,它看起来非常类似于单机模型并行训练。主要区别是用RPC函数替换 Tensor.to(device)。
class RNNModel(nn.Module):
def __init__(self, ps, ntoken, ninp, nhid, nlayers, dropout=0.5):
super(RNNModel, self).__init__()
# setup embedding table remotely
self.emb_table_rref = rpc.remote(ps, EmbeddingTable, args=(ntoken, ninp, dropout))
# setup LSTM locally
self.rnn = nn.LSTM(ninp, nhid, nlayers, dropout=dropout)
# setup decoder remotely
self.decoder_rref = rpc.remote(ps, Decoder, args=(ntoken, nhid, dropout))
def forward(self, input, hidden):
# pass input to the remote embedding table and fetch emb tensor back
emb = _remote_method(EmbeddingTable.forward, self.emb_table_rref, input)
output, hidden = self.rnn(emb, hidden)
# pass output to the rremote decoder and get the decoded output back
decoded = _remote_method(Decoder.forward, self.decoder_rref, output)
return decoded, hidden
在介绍分布式优化器之前,让我们添加一个辅助函数来生成模型参数的RRef列表,该列表将被分布式优化器使用。在本地训练中,应用程序可以调用Module.parameters()来获取所有参数张量的引用,并将其传递给本地优化器以进行后续更新。然而,在分布式训练场景中,相同的API无法正常工作,因为某些参数位于远程机器上。因此,与本地优化器不同,分布式优化器接受一个RRefs列表,每个模型参数对应一个RRef,无论是本地还是远程模型参数。这个辅助函数非常简单,只需调用Module.parameters(),并在每个参数上创建一个本地RRef。
def _parameter_rrefs(module):
param_rrefs = []
for param in module.parameters():
param_rrefs.append(RRef(param))
return param_rrefs
然后,由于 RNNModel 包含三个子模块,我们需要调用
_parameter_rrefs 三次,并将其封装到另一个辅助函数中。
class RNNModel(nn.Module):
...
def parameter_rrefs(self):
remote_params = []
# get RRefs of embedding table
remote_params.extend(_remote_method(_parameter_rrefs, self.emb_table_rref))
# create RRefs for local parameters
remote_params.extend(_parameter_rrefs(self.rnn))
# get RRefs of decoder
remote_params.extend(_remote_method(_parameter_rrefs, self.decoder_rref))
return remote_params
现在,我们已经准备好实现训练循环。在初始化模型参数后,我们创建 RNNModel 和 DistributedOptimizer。分布式优化器将接收一个参数列表 RRefs,找到所有唯一的拥有者工作节点,并使用给定的参数(即 SGD 在这种情况下,你也可以使用其他本地优化器)在每个拥有者工作节点上创建指定的本地优化器(即 lr=0.05)。
在训练循环中,它首先创建一个分布式自动求导上下文,这将帮助分布式自动求导引擎找到梯度和涉及的RPC发送/接收函数。分布式自动求导引擎的设计细节可以在其设计说明中找到。
然后,它像处理本地模型一样启动前向传播,并运行分布式反向传播。对于分布式反向传播,你只需要指定一个根节点列表,在这个例子中,它是损失Tensor。分布式自动求导引擎会自动遍历分布式图并正确写入梯度。接下来,它在分布式优化器上运行step函数,该函数会联系所有相关的本地优化器来更新模型参数。与本地训练相比,一个较小的区别是你不需要运行zero_grad(),因为每个自动求导上下文都有专门的空间来存储梯度,并且由于我们在每次迭代中创建一个上下文,不同迭代中的梯度不会累积到同一组Tensors中。
def run_trainer():
batch = 5
ntoken = 10
ninp = 2
nhid = 3
nindices = 3
nlayers = 4
hidden = (
torch.randn(nlayers, nindices, nhid),
torch.randn(nlayers, nindices, nhid)
)
model = rnn.RNNModel('ps', ntoken, ninp, nhid, nlayers)
# setup distributed optimizer
opt = DistributedOptimizer(
optim.SGD,
model.parameter_rrefs(),
lr=0.05,
)
criterion = torch.nn.CrossEntropyLoss()
def get_next_batch():
for _ in range(5):
data = torch.LongTensor(batch, nindices) % ntoken
target = torch.LongTensor(batch, ntoken) % nindices
yield data, target
# train for 10 iterations
for epoch in range(10):
for data, target in get_next_batch():
# create distributed autograd context
with dist_autograd.context() as context_id:
hidden[0].detach_()
hidden[1].detach_()
output, hidden = model(data, hidden)
loss = criterion(output, target)
# run distributed backward pass
dist_autograd.backward(context_id, [loss])
# run distributed optimizer
opt.step(context_id)
# not necessary to zero grads since they are
# accumulated into the distributed autograd context
# which is reset every iteration.
print("Training epoch {}".format(epoch))
最后,让我们添加一些胶水代码来启动参数服务器和训练进程。
def run_worker(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '29500'
if rank == 1:
rpc.init_rpc("trainer", rank=rank, world_size=world_size)
_run_trainer()
else:
rpc.init_rpc("ps", rank=rank, world_size=world_size)
# parameter server do nothing
pass
# block until all rpcs finish
rpc.shutdown()
if __name__=="__main__":
world_size = 2
mp.spawn(run_worker, args=(world_size, ), nprocs=world_size, join=True)