使用分布式 RPC 框架实现 Parameter Server¶
创建时间: Apr 06, 2020 |上次更新时间:2024 年 5 月 7 日 |上次验证时间:未验证
作者: Rohan Varma
注意
在 github 中查看和编辑本教程。
先决条件:
本教程将介绍使用 PyTorch 的分布式 RPC 框架实现参数服务器的简单示例。Parameter Server 框架是一种范例,其中一组服务器存储参数(例如大型嵌入表),并且多个 trainer 查询 Parameter 服务器以检索最新的参数。这些 trainer 可以在本地运行训练循环,并偶尔与参数服务器同步以获取最新参数。有关 Parameter Server 方法的更多信息,请查看此论文。
使用分布式 RPC 框架,我们将构建一个示例,其中多个训练器使用 RPC 与同一参数服务器通信,并使用 RRef 访问远程参数服务器实例上的状态。每个 Trainer 都将通过使用分布式 autograd 在多个节点之间拼接 autograd 图,以分布式方式启动其专用的向后传递。
注意:本教程介绍了分布式 RPC 框架的使用,该框架可用于将模型拆分到多台计算机上,或用于实现参数服务器训练策略,在该策略中,网络训练器获取托管在不同计算机上的参数。相反,如果您希望在多个 GPU 之间复制模型,请参阅 Distributed Data Parallel 教程。还有另一个 RPC 教程,涵盖强化学习和 RNN 用例。
让我们从熟悉的开始:导入我们所需的模块并定义一个简单的 ConvNet,它将在 MNIST 数据集上进行训练。以下网络主要采用 pytorch/examples 存储库中定义的网络。
import argparse
import os
import time
from threading import Lock
import torch
import torch.distributed.autograd as dist_autograd
import torch.distributed.rpc as rpc
import torch.multiprocessing as mp
import torch.nn as nn
import torch.nn.functional as F
from torch import optim
from torch.distributed.optim import DistributedOptimizer
from torchvision import datasets, transforms
# --------- MNIST Network to train, from pytorch/examples -----
class Net(nn.Module):
def __init__(self, num_gpus=0):
super(Net, self).__init__()
print(f"Using {num_gpus} GPUs to train")
self.num_gpus = num_gpus
device = torch.device(
"cuda:0" if torch.cuda.is_available() and self.num_gpus > 0 else "cpu")
print(f"Putting first 2 convs on {str(device)}")
# Put conv layers on the first cuda device, or CPU if no cuda device
self.conv1 = nn.Conv2d(1, 32, 3, 1).to(device)
self.conv2 = nn.Conv2d(32, 64, 3, 1).to(device)
# Put rest of the network on the 2nd cuda device, if there is one
if "cuda" in str(device) and num_gpus > 1:
device = torch.device("cuda:1")
print(f"Putting rest of layers on {str(device)}")
self.dropout1 = nn.Dropout2d(0.25).to(device)
self.dropout2 = nn.Dropout2d(0.5).to(device)
self.fc1 = nn.Linear(9216, 128).to(device)
self.fc2 = nn.Linear(128, 10).to(device)
def forward(self, x):
x = self.conv1(x)
x = F.relu(x)
x = self.conv2(x)
x = F.max_pool2d(x, 2)
x = self.dropout1(x)
x = torch.flatten(x, 1)
# Move tensor to next device if necessary
next_device = next(self.fc1.parameters()).device
x = x.to(next_device)
x = self.fc1(x)
x = F.relu(x)
x = self.dropout2(x)
x = self.fc2(x)
output = F.log_softmax(x, dim=1)
return output
接下来,让我们定义一些对脚本的其余部分有用的 helper 函数。下面使用 rpc_sync 和 RRef 来定义一个函数,该函数在位于远程节点上的对象上调用给定方法。下面,我们对远程对象的句柄由参数给出,并在其拥有节点上运行它: 。在调用方节点上,我们通过使用 同步运行此命令,这意味着我们将阻塞,直到收到响应。rref
rref.owner()
rpc_sync
# --------- Helper Methods --------------------
# On the local node, call a method with first arg as the value held by the
# RRef. Other args are passed in as arguments to the function called.
# Useful for calling instance methods. method could be any matching function, including
# class methods.
def call_method(method, rref, *args, **kwargs):
return method(rref.local_value(), *args, **kwargs)
# Given an RRef, return the result of calling the passed in method on the value
# held by the RRef. This call is done on the remote node that owns
# the RRef and passes along the given argument.
# Example: If the value held by the RRef is of type Foo, then
# remote_method(Foo.bar, rref, arg1, arg2) is equivalent to calling
# <foo_instance>.bar(arg1, arg2) on the remote node and getting the result
# back.
def remote_method(method, rref, *args, **kwargs):
args = [method, rref] + list(args)
return rpc.rpc_sync(rref.owner(), call_method, args=args, kwargs=kwargs)
现在,我们准备好定义我们的 Parameter Server。我们将子类化并保存一个句柄到上面定义的网络。我们还将保存一个 input 设备,该设备将是我们的 input 在调用模型之前传输到的设备。nn.Module
# --------- Parameter Server --------------------
class ParameterServer(nn.Module):
def __init__(self, num_gpus=0):
super().__init__()
model = Net(num_gpus=num_gpus)
self.model = model
self.input_device = torch.device(
"cuda:0" if torch.cuda.is_available() and num_gpus > 0 else "cpu")
接下来,我们将定义我们的 forward pass。请注意,无论模型输出的设备如何,我们都会将输出移动到 CPU,因为分布式 RPC 框架目前仅支持通过 RPC 发送 CPU 张量。由于调用方/被调用方可能使用不同的设备 (CPU/GPU) 开启,我们特意禁用了通过 RPC 发送 CUDA 张量的功能,但在未来的版本中可能会支持此功能。
class ParameterServer(nn.Module):
...
def forward(self, inp):
inp = inp.to(self.input_device)
out = self.model(inp)
# This output is forwarded over RPC, which as of 1.5.0 only accepts CPU tensors.
# Tensors must be moved in and out of GPU memory due to this.
out = out.to("cpu")
return out
接下来,我们将定义一些用于训练和验证目的的其他函数。第一个 , 将获取分布式 Autograd 上下文 ID 并调用 API 以检索分布式 autograd 计算的梯度。更多信息可以在 分布式 autograd 文档中找到。请注意,我们还会遍历生成的字典并将每个张量转换为 CPU 张量,因为框架目前仅支持通过 RPC 发送张量。接下来,将遍历我们的模型参数并将它们包装为(本地)RRef。此方法将由 trainer 节点通过 RPC 调用,并返回要优化的参数列表。这是 Distributed Optimizer 的 Importing 所必需的,它需要它必须优化的所有参数作为 s 列表。get_dist_gradients
dist_autograd.get_gradients
get_param_rrefs
RRef
# Use dist autograd to retrieve gradients accumulated for this model.
# Primarily used for verification.
def get_dist_gradients(self, cid):
grads = dist_autograd.get_gradients(cid)
# This output is forwarded over RPC, which as of 1.5.0 only accepts CPU tensors.
# Tensors must be moved in and out of GPU memory due to this.
cpu_grads = {}
for k, v in grads.items():
k_cpu, v_cpu = k.to("cpu"), v.to("cpu")
cpu_grads[k_cpu] = v_cpu
return cpu_grads
# Wrap local parameters in a RRef. Needed for building the
# DistributedOptimizer which optimizes paramters remotely.
def get_param_rrefs(self):
param_rrefs = [rpc.RRef(param) for param in self.model.parameters()]
return param_rrefs
最后,我们将创建方法来初始化我们的 Parameter Server。请注意,所有进程中只有一个参数服务器实例,并且所有训练程序都将与同一参数服务器通信并更新同一存储模型。如 所示,服务器本身不执行任何独立作;它等待来自 trainer 的请求(尚未定义),并通过运行 requested 函数来响应这些请求。run_parameter_server
# The global parameter server instance.
param_server = None
# A lock to ensure we only have one parameter server.
global_lock = Lock()
def get_parameter_server(num_gpus=0):
"""
Returns a singleton parameter server to all trainer processes
"""
global param_server
# Ensure that we get only one handle to the ParameterServer.
with global_lock:
if not param_server:
# construct it once
param_server = ParameterServer(num_gpus=num_gpus)
return param_server
def run_parameter_server(rank, world_size):
# The parameter server just acts as a host for the model and responds to
# requests from trainers.
# rpc.shutdown() will wait for all workers to complete by default, which
# in this case means that the parameter server will wait for all trainers
# to complete, and then exit.
print("PS master initializing RPC")
rpc.init_rpc(name="parameter_server", rank=rank, world_size=world_size)
print("RPC initialized! Running parameter server...")
rpc.shutdown()
print("RPC shutdown on parameter server.")
请注意,上述不会立即关闭 Parameter Server。相反,它将等待所有 worker(在本例中为 trainer)也调用 .这为我们提供了保证,在所有 trainers(尚未定义)完成其训练过程之前,参数服务器不会离线。rpc.shutdown()
rpc.shutdown()
接下来,我们将定义我们的类。这也将是 的子类,我们的方法将使用 API 来获取对参数服务器的 RRef 或远程引用。请注意,这里我们没有将 Parameter Server 复制到本地进程,而是可以将其视为指向位于单独进程上的 Parameter Server 的分布式共享指针。TrainerNet
nn.Module
__init__
rpc.remote
self.param_server_rref
# --------- Trainers --------------------
# nn.Module corresponding to the network trained by this trainer. The
# forward() method simply invokes the network on the given parameter
# server.
class TrainerNet(nn.Module):
def __init__(self, num_gpus=0):
super().__init__()
self.num_gpus = num_gpus
self.param_server_rref = rpc.remote(
"parameter_server", get_parameter_server, args=(num_gpus,))
接下来,我们将定义一个名为 .为了激发对此方法的需求,值得阅读有关 DistributedOptimizer 的文档,特别是 API 签名。必须向优化器传递一个与要优化的远程参数相对应的 s 列表,因此这里我们获取必要的 s。由于给定与之交互的唯一远程工作程序是 ,因此我们只需在 上调用 a 即可。我们使用我们在类中定义的方法。该方法将返回一个 s 列表,指向需要优化的参数。请注意,在这种情况下,our 没有定义自己的参数;如果是这样,我们也需要将每个参数包装在 a 中,并将其包含在我们的 输入中。get_global_param_rrefs
RRef
RRef
TrainerNet
ParameterServer
remote_method
ParameterServer
get_param_rrefs
ParameterServer
RRef
TrainerNet
RRef
DistributedOptimizer
class TrainerNet(nn.Module):
...
def get_global_param_rrefs(self):
remote_params = remote_method(
ParameterServer.get_param_rrefs,
self.param_server_rref)
return remote_params
现在,我们准备好定义我们的方法,该方法将调用(同步)RPC 来运行在 .请注意,我们将 ,它是 的远程句柄 ,用于我们的 RPC 调用。此调用将向运行我们的节点发送 RPC,调用该通道,并返回与模型输出对应的结果。forward
ParameterServer
self.param_server_rref
ParameterServer
ParameterServer
forward
Tensor
class TrainerNet(nn.Module):
...
def forward(self, x):
model_output = remote_method(
ParameterServer.forward, self.param_server_rref, x)
return model_output
完全定义好我们的 trainer 后,现在是时候编写我们的神经网络训练循环了,它将创建我们的网络和优化器,通过网络运行一些输入并计算损失。训练循环看起来很像本地训练程序,由于我们的网络性质分布在机器之间,因此会进行一些修改。
下面,我们初始化并构建一个 .请注意,如上所述,我们必须传入我们想要优化的所有全局(跨参与分布式训练的所有节点)参数。此外,我们传入要使用的本地优化器,在本例中为 SGD。请注意,我们可以像创建本地优化器一样配置底层优化器算法 - 所有参数都将正确转发。例如,我们传入一个自定义学习率,该学习率将用作所有本地优化器的学习率。TrainerNet
DistributedOptimizer
optimizer.SGD
def run_training_loop(rank, num_gpus, train_loader, test_loader):
# Runs the typical nueral network forward + backward + optimizer step, but
# in a distributed fashion.
net = TrainerNet(num_gpus=num_gpus)
# Build DistributedOptimizer.
param_rrefs = net.get_global_param_rrefs()
opt = DistributedOptimizer(optim.SGD, param_rrefs, lr=0.03)
接下来,我们定义我们的主要训练循环。我们遍历 PyTorch 的 DataLoader 给出的可迭代对象。在编写典型的 forward/backward/optimizer 循环之前,我们首先将 logic 包装在 Distributed Autograd 上下文中。请注意,这需要记录在模型的 forward pass 中调用的 RPC,以便可以构建一个适当的图形,其中包括 backward pass 中所有参与的分布式 worker。分布式 autograd 上下文返回 a 作为标识符,用于累积和优化与特定迭代相对应的梯度。context_id
与调用将启动此本地 worker 的向后传递的 typical 相反,我们调用并传入我们的 context_id 以及 ,这是我们希望向后传递开始的根。此外,我们将此传递给 optimizer 调用,这需要能够查找所有节点中由此特定向后传递计算的相应梯度。loss.backward()
dist_autograd.backward()
loss
context_id
def run_training_loop(rank, num_gpus, train_loader, test_loader):
...
for i, (data, target) in enumerate(train_loader):
with dist_autograd.context() as cid:
model_output = net(data)
target = target.to(model_output.device)
loss = F.nll_loss(model_output, target)
if i % 5 == 0:
print(f"Rank {rank} training batch {i} loss {loss.item()}")
dist_autograd.backward(cid, [loss])
# Ensure that dist autograd ran successfully and gradients were
# returned.
assert remote_method(
ParameterServer.get_dist_gradients,
net.param_server_rref,
cid) != {}
opt.step(cid)
print("Training complete!")
print("Getting accuracy....")
get_accuracy(test_loader, net)
下面简单计算我们模型完成训练后的准确性,很像传统的本地模型。但是,请注意,上面 we pass into this function 是 RPC 的实例,因此 forward pass 以透明的方式调用 RPC。net
TrainerNet
def get_accuracy(test_loader, model):
model.eval()
correct_sum = 0
# Use GPU to evaluate if possible
device = torch.device("cuda:0" if model.num_gpus > 0
and torch.cuda.is_available() else "cpu")
with torch.no_grad():
for i, (data, target) in enumerate(test_loader):
out = model(data, -1)
pred = out.argmax(dim=1, keepdim=True)
pred, target = pred.to(device), target.to(device)
correct = pred.eq(target.view_as(pred)).sum().item()
correct_sum += correct
print(f"Accuracy {correct_sum / len(test_loader.dataset)}")
接下来,类似于我们为负责初始化 RPC 的主循环定义的方式,让我们为 trainer 定义一个类似的循环。区别在于,我们的 trainer 必须运行我们上面定义的训练循环:run_parameter_server
ParameterServer
# Main loop for trainers.
def run_worker(rank, world_size, num_gpus, train_loader, test_loader):
print(f"Worker rank {rank} initializing RPC")
rpc.init_rpc(
name=f"trainer_{rank}",
rank=rank,
world_size=world_size)
print(f"Worker {rank} done initializing RPC")
run_training_loop(rank, num_gpus, train_loader, test_loader)
rpc.shutdown()
请注意,默认情况下,与 类似,将等待所有 worker(包括 trainer 和 ParameterServers)在此节点退出之前调用 in。这可确保节点正常终止,并且没有节点在另一个节点预期其联机时脱机。run_parameter_server
rpc.shutdown()
rpc.shutdown()
我们现在已经完成了特定于 trainer 和 parameter server 的代码,剩下的工作就是添加代码以启动 trainer 和 parameter servers。首先,我们必须考虑适用于我们的 Parameter Server 和 Trainer 的各种参数。 对应于将参与训练的节点总数,是所有训练器和参数 server 的总和。我们还必须为每个单独的进程传入一个唯一的值,从 0(我们将在其中运行单个参数服务器)到 . 和 是可用于标识 rank 0 进程运行位置的参数,各个节点将使用这些参数来发现彼此。要在本地测试此示例,只需将 and same 传递给生成的所有实例即可。请注意,出于演示目的,此示例仅支持 0-2 个 GPU,但可以扩展模式以使用其他 GPU。world_size
rank
world_size - 1
master_addr
master_port
localhost
master_port
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description="Parameter-Server RPC based training")
parser.add_argument(
"--world_size",
type=int,
default=4,
help="""Total number of participating processes. Should be the sum of
master node and all training nodes.""")
parser.add_argument(
"--rank",
type=int,
default=None,
help="Global rank of this process. Pass in 0 for master.")
parser.add_argument(
"--num_gpus",
type=int,
default=0,
help="""Number of GPUs to use for training, Currently supports between 0
and 2 GPUs. Note that this argument will be passed to the parameter servers.""")
parser.add_argument(
"--master_addr",
type=str,
default="localhost",
help="""Address of master, will default to localhost if not provided.
Master must be able to accept network traffic on the address + port.""")
parser.add_argument(
"--master_port",
type=str,
default="29500",
help="""Port that master is listening on, will default to 29500 if not
provided. Master must be able to accept network traffic on the host and port.""")
args = parser.parse_args()
assert args.rank is not None, "must provide rank argument."
assert args.num_gpus <= 3, f"Only 0-2 GPUs currently supported (got {args.num_gpus})."
os.environ['MASTER_ADDR'] = args.master_addr
os.environ["MASTER_PORT"] = args.master_port
现在,我们将根据命令行参数创建一个对应于 parameter server 或 trainer 的进程。如果我们传入的 rank 为 0,我们将创建一个,否则创建一个。请注意,我们用于启动与要执行的函数对应的子进程,并使用 .在初始化训练器的情况下,我们还使用 PyTorch 的数据加载器,以便在 MNIST 数据集上指定训练和测试数据加载器。ParameterServer
TrainerNet
torch.multiprocessing
p.join()
processes = []
world_size = args.world_size
if args.rank == 0:
p = mp.Process(target=run_parameter_server, args=(0, world_size))
p.start()
processes.append(p)
else:
# Get data to train on
train_loader = torch.utils.data.DataLoader(
datasets.MNIST('../data', train=True, download=True,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])),
batch_size=32, shuffle=True,)
test_loader = torch.utils.data.DataLoader(
datasets.MNIST(
'../data',
train=False,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])),
batch_size=32,
shuffle=True,
)
# start training worker on this node
p = mp.Process(
target=run_worker,
args=(
args.rank,
world_size, args.num_gpus,
train_loader,
test_loader))
p.start()
processes.append(p)
for p in processes:
p.join()
要在本地运行示例,请在单独的终端窗口中为服务器和要生成的每个工作程序运行以下命令 worker:。例如,对于世界大小为 2 的主节点,命令将为 。然后,可以在单独的窗口中使用命令启动 trainer,这将从一个服务器和一个 trainer 开始训练。请注意,本教程假定使用 0 到 2 个 GPU 进行训练,并且可以通过传递到训练脚本来配置此参数。python rpc_parameter_server.py --world_size=WORLD_SIZE --rank=RANK
python rpc_parameter_server.py --world_size=2 --rank=0
python rpc_parameter_server.py --world_size=2 --rank=1
--num_gpus=N
您可以传入命令行参数并指示 master worker 正在侦听的地址和端口,例如,测试 trainer 和 master 节点在不同机器上运行的功能。--master_addr=ADDRESS
--master_port=PORT