目录

分布式 RPC 框架入门

创建时间: 2020年1月1日 |上次更新时间: 2023 年 1 月 9 日 |上次验证: Nov 05, 2024

作者沈丽

注意

编辑github 中查看和编辑本教程。

先决条件:

本教程使用两个简单的示例来演示如何构建分布式 使用 torch.distributed.rpc 包进行训练,该包最初在 PyTorch v1.4 中作为实验性功能引入。 这两个示例的源代码可以在 PyTorch 示例中找到。

之前的教程 分布式数据并行入门使用 PyTorch 编写分布式应用程序 描述了 DistributedDataParallel,它支持特定的训练范例,其中模型在 多个进程,每个进程处理输入数据的拆分。 有时,您可能会遇到需要不同培训的场景 范式。例如:

  1. 在强化学习中,获取可能相对昂贵 来自环境的训练数据,而模型本身可能非常小。在 在这种情况下,生成并行运行的多个观察者可能很有用 并共享单个代理。在这种情况下,代理负责培训 本地,但应用程序仍然需要库来发送和接收 观察者和 Trainer 之间的数据。

  2. 您的模型可能太大,无法放入单台计算机上的 GPU 中,因此 需要一个库来帮助将模型拆分到多台机器上。或者您 可能正在实现 Parameter Server 训练框架,其中模型参数和训练器位于不同的 机器。

torch.distributed.rpc 包 可以帮助解决上述情况。在情况 1 中,RPCRRef 允许发送数据 从一个工作程序到另一个工作程序,同时轻松引用远程数据对象。在 案例 2 ,分布式 Autograd分布式优化器使执行 backward pass 和 optimizer 步骤就像本地训练一样。在 在接下来的两节中,我们将使用 强化学习示例和语言模型示例。请注意,此 教程的目标不是构建最准确或最有效的模型 解决给定的问题,相反,这里的主要目标是展示如何使用 torch.distributed.rpc 包来 构建分布式训练应用程序。

使用 RPC 和 RRef 的分布式强化学习

本节介绍构建玩具分布式强化学习的步骤 模型使用 RPC 解决来自 OpenAI Gym 的 CartPole-v1。 策略代码主要借鉴了现有的单线程示例,如下所示。我们将跳过设计的细节,专注于 RPC 用法。Policy

import torch.nn as nn
import torch.nn.functional as F

class Policy(nn.Module):

    def __init__(self):
        super(Policy, self).__init__()
        self.affine1 = nn.Linear(4, 128)
        self.dropout = nn.Dropout(p=0.6)
        self.affine2 = nn.Linear(128, 2)

    def forward(self, x):
        x = self.affine1(x)
        x = self.dropout(x)
        x = F.relu(x)
        action_scores = self.affine2(x)
        return F.softmax(action_scores, dim=1)

我们准备介绍观察者。在此示例中,每个观察者都会创建其 自己的环境,并等待代理的命令运行剧集。在每个 事件中,一个观察者在大多数迭代中循环,并且在每个迭代中 迭代中,它使用 RPC 将其环境状态传递给代理,并获取一个 action back 操作。然后,它将该操作应用于其环境,并获得奖励 以及 environment 中的 next state。之后,观察者使用另一个 RPC 向 Agent 报告奖励。同样,请注意,这是 显然不是最有效的 observer 实现。例如,一个 简单的优化可以是将当前状态和最后奖励打包在一个 RPC 中,以 减少通信开销。但是,目标是演示 RPC API 而不是为 CartPole 构建最佳求解器。那么,让我们保持逻辑 simple 和此示例中的 explicit 两个步骤。n_steps

import argparse
import gym
import torch.distributed.rpc as rpc

parser = argparse.ArgumentParser(
    description="RPC Reinforcement Learning Example",
    formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)

parser.add_argument('--world_size', default=2, type=int, metavar='W',
                    help='number of workers')
parser.add_argument('--log_interval', type=int, default=10, metavar='N',
                    help='interval between training status logs')
parser.add_argument('--gamma', type=float, default=0.99, metavar='G',
                    help='how much to value future rewards')
parser.add_argument('--seed', type=int, default=1, metavar='S',
                    help='random seed  for reproducibility')
args = parser.parse_args()

class Observer:

    def __init__(self):
        self.id = rpc.get_worker_info().id
        self.env = gym.make('CartPole-v1')
        self.env.seed(args.seed)

    def run_episode(self, agent_rref):
        state, ep_reward = self.env.reset(), 0
        for _ in range(10000):
            # send the state to the agent to get an action
            action = agent_rref.rpc_sync().select_action(self.id, state)

            # apply the action to the environment, and get the reward
            state, reward, done, _ = self.env.step(action)

            # report the reward to the agent for training purpose
            agent_rref.rpc_sync().report_reward(self.id, reward)

            # finishes after the number of self.env._max_episode_steps
            if done:
                break

agent 的代码稍微复杂一些,我们会将其分解为多个 件。在此示例中,代理既是 trainer 又是 master。 这样它就会向多个分布式观察者发送命令来运行剧集, 它还在本地记录所有操作和奖励,这些操作和奖励将在 每集后的训练阶段。下面的代码显示了大多数行初始化各种组件的构造函数。位于 end 在其他 worker 上远程初始化 observers,并保持 那些当地的观察员。代理稍后将使用这些观察者来 send 命令。应用程序无需担心 . 每个 Bean 的所有者都维护一个引用计数映射来跟踪其 生命周期,并保证只要 该 .有关详细信息,请参阅设计文档AgentRRefsRRefsRRefsRRefRRefRRef

import gym
import numpy as np

import torch
import torch.distributed.rpc as rpc
import torch.optim as optim
from torch.distributed.rpc import RRef, rpc_async, remote
from torch.distributions import Categorical

class Agent:
    def __init__(self, world_size):
        self.ob_rrefs = []
        self.agent_rref = RRef(self)
        self.rewards = {}
        self.saved_log_probs = {}
        self.policy = Policy()
        self.optimizer = optim.Adam(self.policy.parameters(), lr=1e-2)
        self.eps = np.finfo(np.float32).eps.item()
        self.running_reward = 0
        self.reward_threshold = gym.make('CartPole-v1').spec.reward_threshold
        for ob_rank in range(1, world_size):
            ob_info = rpc.get_worker_info(OBSERVER_NAME.format(ob_rank))
            self.ob_rrefs.append(remote(ob_info, Observer))
            self.rewards[ob_info.id] = []
            self.saved_log_probs[ob_info.id] = []

接下来,代理向观察者公开两个 API,用于选择操作,以及 报告 奖励。这些函数仅在代理上本地运行,但会 由观察者通过 RPC 触发。

class Agent:
    ...
    def select_action(self, ob_id, state):
        state = torch.from_numpy(state).float().unsqueeze(0)
        probs = self.policy(state)
        m = Categorical(probs)
        action = m.sample()
        self.saved_log_probs[ob_id].append(m.log_prob(action))
        return action.item()

    def report_reward(self, ob_id, reward):
        self.rewards[ob_id].append(reward)

让我们在 agent 上添加一个函数,告诉所有观察者 执行剧集。在此函数中,它首先创建一个要收集的列表 futures 的 Futures,然后遍历所有 observer 到 制作异步 RPC。在这些 RPC 中,代理还会传递 自身传递给观察者,以便观察者可以将代理上的函数作为 井。如上所示,每个观察者都会将 RPC 返回给代理,这些 RPC 是 嵌套的 RPC。每集结束后,和 将 包含录制的操作 probs 和 rewards。run_episodeRRefsRRefsaved_log_probsrewards

class Agent:
    ...
    def run_episode(self):
        futs = []
        for ob_rref in self.ob_rrefs:
            # make async RPC to kick off an episode on all observers
            futs.append(
                rpc_async(
                    ob_rref.owner(),
                    ob_rref.rpc_sync().run_episode,
                    args=(self.agent_rref,)
                )
            )

        # wait until all obervers have finished this episode
        for fut in futs:
            fut.wait()

最后,在一次发作后,代理需要训练模型,该模型 在下面的函数中实现。中没有 RPC 这个函数,它主要是从单线程示例中借来的。 因此,我们跳过描述其内容。finish_episode

class Agent:
    ...
    def finish_episode(self):
      # joins probs and rewards from different observers into lists
      R, probs, rewards = 0, [], []
      for ob_id in self.rewards:
          probs.extend(self.saved_log_probs[ob_id])
          rewards.extend(self.rewards[ob_id])

      # use the minimum observer reward to calculate the running reward
      min_reward = min([sum(self.rewards[ob_id]) for ob_id in self.rewards])
      self.running_reward = 0.05 * min_reward + (1 - 0.05) * self.running_reward

      # clear saved probs and rewards
      for ob_id in self.rewards:
          self.rewards[ob_id] = []
          self.saved_log_probs[ob_id] = []

      policy_loss, returns = [], []
      for r in rewards[::-1]:
          R = r + args.gamma * R
          returns.insert(0, R)
      returns = torch.tensor(returns)
      returns = (returns - returns.mean()) / (returns.std() + self.eps)
      for log_prob, R in zip(probs, returns):
          policy_loss.append(-log_prob * R)
      self.optimizer.zero_grad()
      policy_loss = torch.cat(policy_loss).sum()
      policy_loss.backward()
      self.optimizer.step()
      return min_reward

有了 , , 和 类,我们就可以开始了 多个进程来执行分布式训练。在此示例中,所有 进程运行相同的函数,它们使用 Rank 来 区分他们的角色。等级 0 始终是代理,所有其他等级都是 观察员。代理通过反复调用担任 master ,直到运行奖励超过奖励阈值 由环境指定。所有观察者被动等待命令 从代理。代码由 rpc.init_rpcrpc.shutdown 包装, 它分别初始化和终止 RPC 实例。更多详情如下 在 API 页面中可用。PolicyObserverAgentrun_workerrun_episodefinish_episode

import os
from itertools import count

import torch.multiprocessing as mp

AGENT_NAME = "agent"
OBSERVER_NAME="obs{}"

def run_worker(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '29500'
    if rank == 0:
        # rank0 is the agent
        rpc.init_rpc(AGENT_NAME, rank=rank, world_size=world_size)

        agent = Agent(world_size)
        print(f"This will run until reward threshold of {agent.reward_threshold}"
                " is reached. Ctrl+C to exit.")
        for i_episode in count(1):
            agent.run_episode()
            last_reward = agent.finish_episode()

            if i_episode % args.log_interval == 0:
                print(f"Episode {i_episode}\tLast reward: {last_reward:.2f}\tAverage reward: "
                    f"{agent.running_reward:.2f}")
            if agent.running_reward > agent.reward_threshold:
                print(f"Solved! Running reward is now {agent.running_reward}!")
                break
    else:
        # other ranks are the observer
        rpc.init_rpc(OBSERVER_NAME.format(rank), rank=rank, world_size=world_size)
        # observers passively waiting for instructions from the agent

    # block until all rpcs finish, and shutdown the RPC instance
    rpc.shutdown()


mp.spawn(
    run_worker,
    args=(args.world_size, ),
    nprocs=args.world_size,
    join=True
)

以下是使用 world_size=2 进行训练时的一些示例输出。

This will run until reward threshold of 475.0 is reached. Ctrl+C to exit.
Episode 10      Last reward: 26.00      Average reward: 10.01
Episode 20      Last reward: 16.00      Average reward: 11.27
Episode 30      Last reward: 49.00      Average reward: 18.62
Episode 40      Last reward: 45.00      Average reward: 26.09
Episode 50      Last reward: 44.00      Average reward: 30.03
Episode 60      Last reward: 111.00     Average reward: 42.23
Episode 70      Last reward: 131.00     Average reward: 70.11
Episode 80      Last reward: 87.00      Average reward: 76.51
Episode 90      Last reward: 86.00      Average reward: 95.93
Episode 100     Last reward: 13.00      Average reward: 123.93
Episode 110     Last reward: 33.00      Average reward: 91.39
Episode 120     Last reward: 73.00      Average reward: 76.38
Episode 130     Last reward: 137.00     Average reward: 88.08
Episode 140     Last reward: 89.00      Average reward: 104.96
Episode 150     Last reward: 97.00      Average reward: 98.74
Episode 160     Last reward: 150.00     Average reward: 100.87
Episode 170     Last reward: 126.00     Average reward: 104.38
Episode 180     Last reward: 500.00     Average reward: 213.74
Episode 190     Last reward: 322.00     Average reward: 300.22
Episode 200     Last reward: 165.00     Average reward: 272.71
Episode 210     Last reward: 168.00     Average reward: 233.11
Episode 220     Last reward: 184.00     Average reward: 195.02
Episode 230     Last reward: 284.00     Average reward: 208.32
Episode 240     Last reward: 395.00     Average reward: 247.37
Episode 250     Last reward: 500.00     Average reward: 335.42
Episode 260     Last reward: 500.00     Average reward: 386.30
Episode 270     Last reward: 500.00     Average reward: 405.29
Episode 280     Last reward: 500.00     Average reward: 443.29
Episode 290     Last reward: 500.00     Average reward: 464.65
Solved! Running reward is now 475.3163778435275!

在此示例中,我们将演示如何使用 RPC 作为通信工具来传递 数据,以及如何使用 RRef 引用远程对象。这是真的 您可以直接在 API 上构建整个结构,或者使用其他通信/RPC 库。然而 通过使用 torch.distributed.rpc,您可以获得原生支持和 在后台持续优化性能。ProcessGroupsendrecv

接下来,我们将展示如何将 RPC 和 RRef 与分布式 autograd 和 分布式优化器来执行分布式模型并行训练。

使用 Distributed Autograd 和 Distributed Optimizer 的分布式 RNN

在本节中,我们使用 RNN 模型来展示如何构建分布式模型 使用 RPC API 进行并行训练。示例 RNN 模型非常小,并且 可以很容易地放入单个 GPU 中,但我们仍然将其层划分为两个 不同的工人来演示这个想法。开发者可以应用类似的 在多个设备上分发更大模型的技术,以及 机器。

RNN 模型设计借鉴了 PyTorch 示例仓库中的 word 语言模型,其中包含三个主要组件:嵌入表、层和解码器。下面的代码包装了 embedding 表和 decoder 转换为子模块,以便它们的构造函数可以传递给 RPC 应用程序接口。在子模块中,我们特意将 GPU 层放在 GPU 上以覆盖用例。在 v1.4 中,RPC 始终创建 目标工作程序上的 CPU 张量参数或返回值。如果函数 获取 GPU 张量,则需要将其显式移动到适当的设备。LSTMEmbeddingTableEmbedding

class EmbeddingTable(nn.Module):
    r"""
    Encoding layers of the RNNModel
    """
    def __init__(self, ntoken, ninp, dropout):
        super(EmbeddingTable, self).__init__()
        self.drop = nn.Dropout(dropout)
        self.encoder = nn.Embedding(ntoken, ninp).cuda()
        self.encoder.weight.data.uniform_(-0.1, 0.1)

    def forward(self, input):
        return self.drop(self.encoder(input.cuda()).cpu()


class Decoder(nn.Module):
    def __init__(self, ntoken, nhid, dropout):
        super(Decoder, self).__init__()
        self.drop = nn.Dropout(dropout)
        self.decoder = nn.Linear(nhid, ntoken)
        self.decoder.bias.data.zero_()
        self.decoder.weight.data.uniform_(-0.1, 0.1)

    def forward(self, output):
        return self.decoder(self.drop(output))

通过上述子模块,我们现在可以使用 RPC 将它们拼凑在一起,以 创建 RNN 模型。在下面的代码中,表示一个参数服务器 它承载 embedding table 和 decoder 的参数。构造函数 使用远程 API 创建一个对象,并在 parameter server 并在本地创建子模块。在 forward pass 的 Trainer 使用 来查找 remote 子模块中,并将输入数据传递给 using RPC 并获取查找结果。然后,它通过本地层运行 embedding,最后使用另一个 RPC 将输出发送到子模块。一般来说,要实现分布式模型并行 训练时,开发者可以将模型划分为多个子模块,调用 RPC 来创建 sub-module 实例,并在必要时使用 on 来查找它们。 正如您在下面的代码中看到的,它看起来与单机模型非常相似 并行训练。主要区别在于替换为 RPC 函数。psEmbeddingTableDecoderLSTMEmbeddingTableRRefEmbeddingTableLSTMDecoderRRefTensor.to(device)

class RNNModel(nn.Module):
    def __init__(self, ps, ntoken, ninp, nhid, nlayers, dropout=0.5):
        super(RNNModel, self).__init__()

        # setup embedding table remotely
        self.emb_table_rref = rpc.remote(ps, EmbeddingTable, args=(ntoken, ninp, dropout))
        # setup LSTM locally
        self.rnn = nn.LSTM(ninp, nhid, nlayers, dropout=dropout)
        # setup decoder remotely
        self.decoder_rref = rpc.remote(ps, Decoder, args=(ntoken, nhid, dropout))

    def forward(self, input, hidden):
        # pass input to the remote embedding table and fetch emb tensor back
        emb = _remote_method(EmbeddingTable.forward, self.emb_table_rref, input)
        output, hidden = self.rnn(emb, hidden)
        # pass output to the rremote decoder and get the decoded output back
        decoded = _remote_method(Decoder.forward, self.decoder_rref, output)
        return decoded, hidden

在介绍分布式优化器之前,让我们添加一个辅助函数 生成模型参数的 RRef 列表,该列表将被 分布式优化器。在本地训练中,应用程序可以调用 grab 对所有参数张量的引用,并将其传递 添加到本地优化器以进行后续更新。但是,相同的 API 不会 在分布式训练场景中工作,因为某些参数位于远程 机器。因此,不是采用参数列表, 分布式优化器采用一个列表 ,每个模型一个 parameter 获取本地和远程模型参数。辅助函数是 很简单,只需调用 并创建一个本地 on 每个参数。Module.parameters()TensorsRRefsRRefModule.parameters()RRef

def _parameter_rrefs(module):
    param_rrefs = []
    for param in module.parameters():
        param_rrefs.append(RRef(param))
    return param_rrefs

然后,由于 包含 三个子模块,我们需要调用 3 次,并将其包装到另一个辅助函数中。RNNModel_parameter_rrefs

class RNNModel(nn.Module):
    ...
    def parameter_rrefs(self):
        remote_params = []
        # get RRefs of embedding table
        remote_params.extend(_remote_method(_parameter_rrefs, self.emb_table_rref))
        # create RRefs for local parameters
        remote_params.extend(_parameter_rrefs(self.rnn))
        # get RRefs of decoder
        remote_params.extend(_remote_method(_parameter_rrefs, self.decoder_rref))
        return remote_params

现在,我们已准备好实现训练循环。初始化模型后 参数,我们创建 和 .这 分布式优化器会取一个 list of parameter ,找到所有 distinct owner worker,并创建给定的本地优化器(即,在本例中为 您也可以使用其他本地优化器)在每个 owner worker 上使用 给定的参数(即 )。RNNModelDistributedOptimizerRRefsSGDlr=0.05

在训练循环中,它首先创建一个分布式 autograd 上下文,该上下文 将帮助分布式 autograd 引擎找到渐变和涉及的 RPC send/recv 函数。分布式 autograd 引擎的设计细节可以 可以在其设计说明中找到。 然后,它开始向前传球,就好像它是本地的一样 model 的 SET 和 SET SET 的 SUB对于分布式向后,您 只需要指定一个 root 列表,这种情况下就是 loss 了。 分布式 autograd 引擎将遍历分布式图 并正确写入渐变。接下来,它在分布式优化器上运行该函数,该优化器将联系所有相关人员 本地优化器来更新模型参数。与本地培训相比,一个 细微的区别是你不需要运行,因为每个 autograd 上下文有专门的空间来存储渐变,当我们创建一个 context 中,来自不同迭代的那些梯度将不会 累积到同一组 。Tensorstepzero_grad()Tensors

def run_trainer():
    batch = 5
    ntoken = 10
    ninp = 2

    nhid = 3
    nindices = 3
    nlayers = 4
    hidden = (
        torch.randn(nlayers, nindices, nhid),
        torch.randn(nlayers, nindices, nhid)
    )

    model = rnn.RNNModel('ps', ntoken, ninp, nhid, nlayers)

    # setup distributed optimizer
    opt = DistributedOptimizer(
        optim.SGD,
        model.parameter_rrefs(),
        lr=0.05,
    )

    criterion = torch.nn.CrossEntropyLoss()

    def get_next_batch():
        for _ in range(5):
            data = torch.LongTensor(batch, nindices) % ntoken
            target = torch.LongTensor(batch, ntoken) % nindices
            yield data, target

    # train for 10 iterations
    for epoch in range(10):
        for data, target in get_next_batch():
            # create distributed autograd context
            with dist_autograd.context() as context_id:
                hidden[0].detach_()
                hidden[1].detach_()
                output, hidden = model(data, hidden)
                loss = criterion(output, target)
                # run distributed backward pass
                dist_autograd.backward(context_id, [loss])
                # run distributed optimizer
                opt.step(context_id)
                # not necessary to zero grads since they are
                # accumulated into the distributed autograd context
                # which is reset every iteration.
        print("Training epoch {}".format(epoch))

最后,让我们添加一些 glue 代码来启动 parameter server 和 trainer 过程。

def run_worker(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '29500'
    if rank == 1:
        rpc.init_rpc("trainer", rank=rank, world_size=world_size)
        _run_trainer()
    else:
        rpc.init_rpc("ps", rank=rank, world_size=world_size)
        # parameter server do nothing
        pass

    # block until all rpcs finish
    rpc.shutdown()


if __name__=="__main__":
    world_size = 2
    mp.spawn(run_worker, args=(world_size, ), nprocs=world_size, join=True)

文档

访问 PyTorch 的全面开发人员文档

查看文档

教程

获取面向初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并解答您的问题

查看资源