目录

使用异步执行实现批处理 RPC 处理

创建时间: 2020年7月28日 |上次更新时间:2024 年 11 月 13 日 |上次验证时间:未验证

作者沈丽

注意

编辑github 中查看和编辑本教程。

先决条件:

本教程演示如何使用 @rpc.functions.async_execution 装饰器,它通过减少阻塞的 RPC 线程和整合对被调用方的 CUDA作。这将共享 与使用 TorchServe 的批量推理相同。

注意

本教程需要 PyTorch v1.6.0 或更高版本。

基本

前面的教程演示了构建分布式训练的步骤 使用 torch.distributed.rpc 的应用程序, 但他们没有详细说明在处理 RPC 请求。从 PyTorch v1.5 开始,每个 RPC 请求将阻塞 callee 执行该请求中的函数,直到该函数返回。 这适用于许多用例,但有一个警告。如果用户函数 块,例如,使用嵌套的 RPC 调用,或信令,例如等待 要取消阻止的不同 RPC 请求,则被调用方上的 RPC 线程必须 空闲等待,直到 IO 完成或发生信令事件。因此, RPC 被调用方可能会使用不必要的线程。造成这种情况的原因 问题是 RPC 将用户函数视为黑盒,并且知道的很少 了解函数中发生的情况。允许用户函数 yield 和 free RPC 线程,需要向 RPC 系统提供更多提示。

从 v1.6.0 开始,PyTorch 通过引入两个新概念来解决这个问题:

使用这两个工具,应用程序代码可以将用户函数分解为 多个较小的函数,将它们作为对象的回调链接在一起,并返回包含最终结果的 the 。在被调用方上 端,在获取对象时,会安装后续的 RPC 响应 preparation 和 communication 作为回调,将被触发 当最终结果准备就绪时。这样,被调用方就不再需要阻塞 一个线程,并等待最终返回值准备就绪。请参阅 @rpc.functions.async_execution 的 API 文档。FutureFutureFuture

除了减少被调用方的空闲线程数外,这些工具还有助于 使批处理 RPC 处理更轻松、更快速。以下两个部分 本教程演示如何构建分布式批量更新参数 使用 @rpc.functions.async_execution 装饰器的服务器和批处理强化学习应用程序。

批量更新 Parameter Server

考虑一个具有一个参数的同步参数服务器训练应用程序 服务器 (PS) 和多个 Trainer。在此应用程序中,PS 持有 参数,并等待所有训练器报告梯度。在每次迭代中, 它会等待,直到收到来自所有训练器的梯度,然后更新所有 参数。下面的代码显示了 PS 类的实现。 该方法使用 和 Trainer 调用。每 invocation 返回一个对象,该对象将填充更新后的 型。大多数训练程序启动的调用只是将梯度累积到字段,立即返回,并在 PS 上生成 RPC 线程。这 最后到达的 Trainer 将触发 Optimizer 步骤并消耗之前的所有 报告的梯度。然后,它将 设置为更新后的模型 这反过来又会通过该对象通知来自其他 trainer 的所有先前请求,并将更新的模型发送给所有 trainer。update_and_fetch_model@rpc.functions.async_executionFuture.gradfuture_modelFuture

import threading
import torchvision
import torch
import torch.distributed.rpc as rpc
from torch import optim

num_classes, batch_update_size = 30, 5

class BatchUpdateParameterServer(object):
    def __init__(self, batch_update_size=batch_update_size):
        self.model = torchvision.models.resnet50(num_classes=num_classes)
        self.lock = threading.Lock()
        self.future_model = torch.futures.Future()
        self.batch_update_size = batch_update_size
        self.curr_update_size = 0
        self.optimizer = optim.SGD(self.model.parameters(), lr=0.001, momentum=0.9)
        for p in self.model.parameters():
            p.grad = torch.zeros_like(p)

    def get_model(self):
        return self.model

    @staticmethod
    @rpc.functions.async_execution
    def update_and_fetch_model(ps_rref, grads):
        # Using the RRef to retrieve the local PS instance
        self = ps_rref.local_value()
        with self.lock:
            self.curr_update_size += 1
            # accumulate gradients into .grad field
            for p, g in zip(self.model.parameters(), grads):
                p.grad += g

            # Save the current future_model and return it to make sure the
            # returned Future object holds the correct model even if another
            # thread modifies future_model before this thread returns.
            fut = self.future_model

            if self.curr_update_size >= self.batch_update_size:
                # update the model
                for p in self.model.parameters():
                    p.grad /= self.batch_update_size
                self.curr_update_size = 0
                self.optimizer.step()
                self.optimizer.zero_grad()
                # by settiing the result on the Future object, all previous
                # requests expecting this updated model will be notified and
                # the their responses will be sent accordingly.
                fut.set_result(self.model)
                self.future_model = torch.futures.Future()

        return fut

对于训练器,它们都使用同一组 参数。在每次迭代中,每个 trainer 首先向前运行 和向后传递以在本地生成渐变。然后,每个 trainer 使用 RPC 将其梯度报告给 PS,并取回更新的 parameters 通过同一 RPC 请求的返回值。在教练的 implementation 中,无论目标函数是否标有 都没有区别。这 trainer 只需使用 which 调用 块,直到返回更新的模型。@rpc.functions.async_executionupdate_and_fetch_modelrpc_sync

batch_size, image_w, image_h  = 20, 64, 64

class Trainer(object):
    def __init__(self, ps_rref):
        self.ps_rref, self.loss_fn = ps_rref, torch.nn.MSELoss()
        self.one_hot_indices = torch.LongTensor(batch_size) \
                                    .random_(0, num_classes) \
                                    .view(batch_size, 1)

    def get_next_batch(self):
        for _ in range(6):
            inputs = torch.randn(batch_size, 3, image_w, image_h)
            labels = torch.zeros(batch_size, num_classes) \
                        .scatter_(1, self.one_hot_indices, 1)
            yield inputs.cuda(), labels.cuda()

    def train(self):
        name = rpc.get_worker_info().name
        # get initial model parameters
        m = self.ps_rref.rpc_sync().get_model().cuda()
        # start training
        for inputs, labels in self.get_next_batch():
            self.loss_fn(m(inputs), labels).backward()
            m = rpc.rpc_sync(
                self.ps_rref.owner(),
                BatchUpdateParameterServer.update_and_fetch_model,
                args=(self.ps_rref, [p.grad for p in m.cpu().parameters()]),
            ).cuda()

在本教程中,我们跳过启动多个进程的代码,请 有关完整实施,请参阅 examples repo。请注意,可以实现 batch 在没有 @rpc.functions.async_execution 装饰器的情况下进行处理。但是,这需要在 PS 或使用另一轮 RPC 来获取更新的模型,其中后者 会增加代码复杂性和通信开销。

本节使用一个简单的参数服务器训练示例来展示如何 使用 @rpc.functions.async_execution 装饰器实现批处理 RPC 应用程序。在下一节中,我们将重新实现强化学习 前面的分布式 RPC 框架入门教程中的示例使用批处理,并演示其对训练的影响 速度。

批处理 CartPole 求解器

本节使用 OpenAI Gym 的 CartPole-v1 作为 一个示例,用于显示批处理 RPC 对性能的影响。请注意 因为目标是演示 @rpc.functions.async_execution 的用法,而不是构建最好的 CartPole 求解器或解决大多数不同的 RL 问题,我们使用非常简单的政策和奖励计算策略,以及 重点介绍多观察者单代理批处理 RPC 实现。我们使用 与上一个教程类似的模型,如下所示。比较 与前面的教程不同,区别在于它的构造函数采用 额外的参数,用于控制参数,因为使用批处理时,函数中的参数包含来自多个观察者的状态,因此维度需要 以正确更改。其他一切都保持不变。PolicybatchdimF.softmaxxforward

import argparse
import torch.nn as nn
import torch.nn.functional as F

parser = argparse.ArgumentParser(description='PyTorch RPC Batch RL example')
parser.add_argument('--gamma', type=float, default=1.0, metavar='G',
                    help='discount factor (default: 1.0)')
parser.add_argument('--seed', type=int, default=543, metavar='N',
                    help='random seed (default: 543)')
parser.add_argument('--num-episode', type=int, default=10, metavar='E',
                    help='number of episodes (default: 10)')
args = parser.parse_args()

torch.manual_seed(args.seed)

class Policy(nn.Module):
    def __init__(self, batch=True):
        super(Policy, self).__init__()
        self.affine1 = nn.Linear(4, 128)
        self.dropout = nn.Dropout(p=0.6)
        self.affine2 = nn.Linear(128, 2)
        self.dim = 2 if batch else 1

    def forward(self, x):
        x = self.affine1(x)
        x = self.dropout(x)
        x = F.relu(x)
        action_scores = self.affine2(x)
        return F.softmax(action_scores, dim=self.dim)

的构造函数也会相应地进行调整。它还需要一个参数,该参数控制它用于选择哪个函数 行动。在批处理模式下,它调用函数,该函数将很快出现,并且此函数将使用 @rpc.functions.async_execution 进行装饰。ObserverbatchAgentselect_action_batchAgent

import gym
import torch.distributed.rpc as rpc

class Observer:
    def __init__(self, batch=True):
        self.id = rpc.get_worker_info().id - 1
        self.env = gym.make('CartPole-v1')
        self.env.seed(args.seed)
        self.select_action = Agent.select_action_batch if batch else Agent.select_action

与前面的教程分布式 RPC 框架入门相比, 观察者的行为略有不同。而不是在环境 已停止,则它始终在每个 episode 中运行迭代。当 environment 返回时,观察者只需重置环境并重新开始 再。通过这种设计,代理将从 每个观察者,因此可以将它们打包到一个固定大小的张量中。在每一个 步骤中,使用 RPC 将其状态发送到 并获取 作通过 return 值。在每个 episode 的末尾,它会返回 奖励所有步骤到 。请注意,此函数将 由使用 RPC 调用。所以这个函数中的 call 将是嵌套的 RPC 调用。我们可以将这个函数也标记为 ,以避免阻塞 .然而,作为瓶颈 是 而不是 ,那么阻止一个应该是可以的 线程。n_stepsObserverAgentAgentrun_episodeAgentrpc_sync@rpc.functions.async_executionObserverAgentObserverObserver

import torch

class Observer:
    ...

    def run_episode(self, agent_rref, n_steps):
        state, ep_reward = self.env.reset(), NUM_STEPS
        rewards = torch.zeros(n_steps)
        start_step = 0
        for step in range(n_steps):
            state = torch.from_numpy(state).float().unsqueeze(0)
            # send the state to the agent to get an action
            action = rpc.rpc_sync(
                agent_rref.owner(),
                self.select_action,
                args=(agent_rref, self.id, state)
            )

            # apply the action to the environment, and get the reward
            state, reward, done, _ = self.env.step(action)
            rewards[step] = reward

            if done or step + 1 >= n_steps:
                curr_rewards = rewards[start_step:(step + 1)]
                R = 0
                for i in range(curr_rewards.numel() -1, -1, -1):
                    R = curr_rewards[i] + args.gamma * R
                    curr_rewards[i] = R
                state = self.env.reset()
                if start_step == 0:
                    ep_reward = min(ep_reward, step - start_step + 1)
                start_step = step + 1

        return [rewards, ep_reward]

的构造函数也接受一个参数,该参数控制 作概率的批处理方式。在批处理模式下,包含一个 张量列表,其中每个张量都包含来自 一步。如果不进行批处理,则 是字典,其中 key 是观察者 ID,值是该 ID 的作概率列表 观察者。Agentbatchsaved_log_probssaved_log_probs

import threading
from torch.distributed.rpc import RRef

class Agent:
    def __init__(self, world_size, batch=True):
        self.ob_rrefs = []
        self.agent_rref = RRef(self)
        self.rewards = {}
        self.policy = Policy(batch).cuda()
        self.optimizer = optim.Adam(self.policy.parameters(), lr=1e-2)
        self.running_reward = 0

        for ob_rank in range(1, world_size):
            ob_info = rpc.get_worker_info(OBSERVER_NAME.format(ob_rank))
            self.ob_rrefs.append(rpc.remote(ob_info, Observer, args=(batch,)))
            self.rewards[ob_info.id] = []

        self.states = torch.zeros(len(self.ob_rrefs), 1, 4)
        self.batch = batch
        self.saved_log_probs = [] if batch else {k:[] for k in range(len(self.ob_rrefs))}
        self.future_actions = torch.futures.Future()
        self.lock = threading.Lock()
        self.pending_states = len(self.ob_rrefs)

非批处理只是运行状态 throw 策略,保存 作 prob,并立即将作返回给观察者。select_acion

from torch.distributions import Categorical

class Agent:
    ...

    @staticmethod
    def select_action(agent_rref, ob_id, state):
        self = agent_rref.local_value()
        probs = self.policy(state.cuda())
        m = Categorical(probs)
        action = m.sample()
        self.saved_log_probs[ob_id].append(m.log_prob(action))
        return action.item()

通过批处理,状态存储在 2D 张量中,使用 observer id 作为行 ID。然后,它通过安装回调来链接一个 函数添加到批处理生成的对象中,该 将填充使用该观察者的 ID 编制索引的特定行。 最后到达的观察者通过策略运行所有批处理状态 拍摄并相应地设置。发生这种情况时,所有 将触发 on 上安装的回调函数,并且 它们的返回值将用于填充 chained 对象 这反过来又通知 准备和传达响应 来自其他观察者的所有先前 RPC 请求。self.statesFutureself.future_actionsFutureself.future_actionsself.future_actionsFutureAgent

class Agent:
    ...

    @staticmethod
    @rpc.functions.async_execution
    def select_action_batch(agent_rref, ob_id, state):
        self = agent_rref.local_value()
        self.states[ob_id].copy_(state)
        future_action = self.future_actions.then(
            lambda future_actions: future_actions.wait()[ob_id].item()
        )

        with self.lock:
            self.pending_states -= 1
            if self.pending_states == 0:
                self.pending_states = len(self.ob_rrefs)
                probs = self.policy(self.states.cuda())
                m = Categorical(probs)
                actions = m.sample()
                self.saved_log_probs.append(m.log_prob(actions).t()[0])
                future_actions = self.future_actions
                self.future_actions = torch.futures.Future()
                future_actions.set_result(actions.cpu())
        return future_action

现在,我们来定义如何将不同的 RPC 函数拼接在一起。控制每个情节的执行。它首先用于启动 所有 observers 上的剧集并阻止返回的 futures,这将是 填充观察者奖励。请注意,下面的代码使用 RRef 帮助程序在所有者上启动函数 的 RRef 中。 然后,它将保存的 action probs 和返回的 observer rewards 转换为 expected data 格式,然后启动训练步骤。最后,它将所有 状态并返回当前剧集的奖励。此函数是入口 指向运行一个分集。Agentrpc_asyncob_rref.rpc_async()run_episodeob_rref

class Agent:
    ...

    def run_episode(self, n_steps=0):
        futs = []
        for ob_rref in self.ob_rrefs:
            # make async RPC to kick off an episode on all observers
            futs.append(ob_rref.rpc_async().run_episode(self.agent_rref, n_steps))

        # wait until all obervers have finished this episode
        rets = torch.futures.wait_all(futs)
        rewards = torch.stack([ret[0] for ret in rets]).cuda().t()
        ep_rewards = sum([ret[1] for ret in rets]) / len(rets)

        # stack saved probs into one tensor
        if self.batch:
            probs = torch.stack(self.saved_log_probs)
        else:
            probs = [torch.stack(self.saved_log_probs[i]) for i in range(len(rets))]
            probs = torch.stack(probs)

        policy_loss = -probs * rewards / len(rets)
        policy_loss.sum().backward()
        self.optimizer.step()
        self.optimizer.zero_grad()

        # reset variables
        self.saved_log_probs = [] if self.batch else {k:[] for k in range(len(self.ob_rrefs))}
        self.states = torch.zeros(len(self.ob_rrefs), 1, 4)

        # calculate running rewards
        self.running_reward = 0.5 * ep_rewards + 0.5 * self.running_reward
        return ep_rewards, self.running_reward

代码的其余部分是正常的进程启动和记录,它们是 类似于其他 RPC 教程。在本教程中,所有观察者都是被动的 等待来自代理的命令。请参阅 examples repo 了解完整实现。

def run_worker(rank, world_size, n_episode, batch, print_log=True):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '29500'
    if rank == 0:
        # rank0 is the agent
        rpc.init_rpc(AGENT_NAME, rank=rank, world_size=world_size)

        agent = Agent(world_size, batch)
        for i_episode in range(n_episode):
            last_reward, running_reward = agent.run_episode(n_steps=NUM_STEPS)

            if print_log:
                print('Episode {}\tLast reward: {:.2f}\tAverage reward: {:.2f}'.format(
                    i_episode, last_reward, running_reward))
    else:
        # other ranks are the observer
        rpc.init_rpc(OBSERVER_NAME.format(rank), rank=rank, world_size=world_size)
        # observers passively waiting for instructions from agents
    rpc.shutdown()


def main():
    for world_size in range(2, 12):
        delays = []
        for batch in [True, False]:
            tik = time.time()
            mp.spawn(
                run_worker,
                args=(world_size, args.num_episode, batch),
                nprocs=world_size,
                join=True
            )
            tok = time.time()
            delays.append(tok - tik)

        print(f"{world_size}, {delays[0]}, {delays[1]}")


if __name__ == '__main__':
    main()

批处理 RPC 有助于将作推理整合到较少的 CUDA作中, 从而减少摊销开销。上述函数运行 在批处理和非批处理模式下使用不同数量的观察者时,代码相同, 范围从 1 到 10。下图绘制了不同 使用默认参数值的世界大小。结果证实了我们的预期 这种批处理有助于加快训练速度。main

文档

访问 PyTorch 的全面开发人员文档

查看文档

教程

获取面向初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并解答您的问题

查看资源