将 Distributed DataParallel 与 Distributed RPC 框架相结合¶
创建时间: 2020年7月28日 |上次更新时间:2023 年 6 月 6 日 |上次验证时间:未验证
作者: Pritam Damania 和 Yi Wang
注意
在 github 中查看和编辑本教程。
本教程使用一个简单的示例来演示如何组合 DistributedDataParallel (DDP) 与分布式 RPC 框架结合使用,将分布式数据并行与分布式模型并行相结合,以 训练一个简单的模型。该示例的源代码可以在这里找到。
前面的教程 Distributed Data Parallel 入门 和 Distributed RPC Framework 入门 描述了如何执行分布式数据并行和分布式模型 parallel training 分别。虽然,有几种训练范式 您可能希望将这两种技术组合在一起。例如:
如果我们有一个模型,其中包含一个稀疏部分(大型嵌入表)和一个密集的 部分(FC 层)中,我们可能希望将嵌入表放在一个参数 服务器,并使用 DistributedDataParallel 在多个 trainer 之间复制 FC 层。 分布式 RPC 框架可用于在 Parameter Server 上执行嵌入查找。
启用 PipeDream 论文中所述的混合并行性。 我们可以使用分布式 RPC 框架将模型的阶段流水线化到多个 worker 之间,并复制每个 worker 阶段(如果需要)使用 DistributedDataParallel。
在本教程中,我们将介绍上述情况 1。我们总共有 4 个 worker 的配置如下:
1 个 Master,负责创建 embedding table (nn.EmbeddingBag) 的 BetaBag 实例。主节点还驱动 两个 trainer 上的 training loop。
1 个 Parameter Server,它基本上将嵌入表保存在内存中,并且 响应来自 Master 和 Trainer 的 RPC。
2 个 Trainer,用于存储 FC 层 (nn.Linear),该 本身使用 DistributedDataParallel 进行。 培训师还负责执行向前传球、向后传球 pass 和 optimizer 步骤。
整个训练过程执行如下:
主服务器创建一个 RemoteModule,该 RemoteModule 在 Parameter Server 上保存一个嵌入表。
然后,主站启动训练器上的训练循环,并将 remote 模块。
训练师创建一个首先执行嵌入查找 使用 master 提供的 remote 模块,然后执行 FC 层,该层包裹在 DDP 中。
HybridModel
trainer 执行模型的前向传递,并使用 loss 来 使用 Distributed Autograd 执行向后传递。
作为向后传递的一部分,将计算 FC 层的梯度 首先,并通过 DDP 中的 allreduce 同步到所有训练器。
接下来,Distributed Autograd 将梯度传播到参数服务器 其中,嵌入表的梯度将更新。
最后,使用 Distributed Optimizer 更新所有参数。
注意力
如果您将 DDP 和 RPC 结合使用,则应始终使用 Distributed Autograd 进行向后传递。
现在,让我们详细介绍每个部分。首先,我们需要设置所有 工人。我们创建 4 个流程,以便 等级 0 和 1 是我们的训练师,等级 2 是大师,等级 3 是 Parameter Server 的 Parameter Server 中。
我们使用 TCP init_method在所有 4 个 worker 上初始化 RPC 框架。
RPC 初始化完成后,主站会创建一个远程模块,该模块使用 RemoteModule 在 Parameter Server 上保存 EmbeddingBag 层。
然后,Master 遍历每个 trainer 并通过以下方式启动训练循环
使用 rpc_async 呼叫每个培训师。
最后,master 等待所有训练完成,然后再退出。_run_trainer
培训师首先使用 world_size=2 初始化 DDP
(适用于两名培训师)使用 init_process_group。
接下来,他们使用 TCP init_method初始化 RPC 框架。请注意,
RPC 初始化和 ProcessGroup 初始化的端口不同。
这是为了避免两个框架的初始化之间的端口冲突。
初始化完成后,trainer 只需等待来自主站的 RPC。ProcessGroup
_run_trainer
参数服务器只是初始化 RPC 框架,并等待 RPC 培训师和师父。
def run_worker(rank, world_size):
r"""
A wrapper function that initializes RPC, calls the function, and shuts down
RPC.
"""
# We need to use different port numbers in TCP init_method for init_rpc and
# init_process_group to avoid port conflicts.
rpc_backend_options = TensorPipeRpcBackendOptions()
rpc_backend_options.init_method = "tcp://localhost:29501"
# Rank 2 is master, 3 is ps and 0 and 1 are trainers.
if rank == 2:
rpc.init_rpc(
"master",
rank=rank,
world_size=world_size,
rpc_backend_options=rpc_backend_options,
)
remote_emb_module = RemoteModule(
"ps",
torch.nn.EmbeddingBag,
args=(NUM_EMBEDDINGS, EMBEDDING_DIM),
kwargs={"mode": "sum"},
)
# Run the training loop on trainers.
futs = []
for trainer_rank in [0, 1]:
trainer_name = "trainer{}".format(trainer_rank)
fut = rpc.rpc_async(
trainer_name, _run_trainer, args=(remote_emb_module, trainer_rank)
)
futs.append(fut)
# Wait for all training to finish.
for fut in futs:
fut.wait()
elif rank <= 1:
# Initialize process group for Distributed DataParallel on trainers.
dist.init_process_group(
backend="gloo", rank=rank, world_size=2, init_method="tcp://localhost:29500"
)
# Initialize RPC.
trainer_name = "trainer{}".format(rank)
rpc.init_rpc(
trainer_name,
rank=rank,
world_size=world_size,
rpc_backend_options=rpc_backend_options,
)
# Trainer just waits for RPCs from master.
else:
rpc.init_rpc(
"ps",
rank=rank,
world_size=world_size,
rpc_backend_options=rpc_backend_options,
)
# parameter server do nothing
pass
# block until all rpcs finish
rpc.shutdown()
if __name__ == "__main__":
# 2 trainers, 1 parameter server, 1 master.
world_size = 4
mp.spawn(run_worker, args=(world_size,), nprocs=world_size, join=True)
在我们讨论 Trainer 的细节之前,让我们先介绍一下
教练使用。如下所述,使用
remote 模块,该模块在参数服务器上保存嵌入表 () 和用于 DDP。模型的初始化包装了一个 nn. DDP 中的线性层,用于在所有 trainer 之间复制和同步此层。HybridModel
HybridModel
remote_emb_module
device
模型的 forward 方法非常简单。它执行
使用 RemoteModule 的 embedding lookup on the parameter server,并将其输出传递到 FC 层。forward
class HybridModel(torch.nn.Module):
r"""
The model consists of a sparse part and a dense part.
1) The dense part is an nn.Linear module that is replicated across all trainers using DistributedDataParallel.
2) The sparse part is a Remote Module that holds an nn.EmbeddingBag on the parameter server.
This remote model can get a Remote Reference to the embedding table on the parameter server.
"""
def __init__(self, remote_emb_module, device):
super(HybridModel, self).__init__()
self.remote_emb_module = remote_emb_module
self.fc = DDP(torch.nn.Linear(16, 8).cuda(device), device_ids=[device])
self.device = device
def forward(self, indices, offsets):
emb_lookup = self.remote_emb_module.forward(indices, offsets)
return self.fc(emb_lookup.cuda(self.device))
接下来,让我们看看 Trainer 上的设置。Trainer 首先使用一个 remote 模块创建上述内容,该模块将嵌入表保存在
parameter server 及其自己的 rank 的 Rank 的 Rank 的 Rank 的 S THybridModel
现在,我们需要检索一个 RRef 列表,其中包含我们想要的所有参数
喜欢使用 DistributedOptimizer 进行优化。
要从 Parameter Server 检索嵌入表的参数,
我们可以调用 RemoteModule 的 remote_parameters
它基本上遍历了 embedding 表的所有参数,并返回
RRef 列表。trainer 通过 RPC 在参数服务器上调用该方法
接收所需参数的 RRef 列表。由于
DistributedOptimizer 总是将 RRef 列表提供给需要
进行优化,我们甚至需要为我们的
FC 层。这是通过步行完成的,为
每个参数并将其附加到从 .
请注意,我们不能使用 ,
因为它会递归调用 ,
不支持。model.fc.parameters()
remote_parameters()
model.parameters()
model.remote_emb_module.parameters()
RemoteModule
最后,我们使用所有 RRef 创建 DistributedOptimizer 并定义一个 CrossEntropyLoss 函数。
def _run_trainer(remote_emb_module, rank):
r"""
Each trainer runs a forward pass which involves an embedding lookup on the
parameter server and running nn.Linear locally. During the backward pass,
DDP is responsible for aggregating the gradients for the dense part
(nn.Linear) and distributed autograd ensures gradients updates are
propagated to the parameter server.
"""
# Setup the model.
model = HybridModel(remote_emb_module, rank)
# Retrieve all model parameters as rrefs for DistributedOptimizer.
# Retrieve parameters for embedding table.
model_parameter_rrefs = model.remote_emb_module.remote_parameters()
# model.fc.parameters() only includes local parameters.
# NOTE: Cannot call model.parameters() here,
# because this will call remote_emb_module.parameters(),
# which supports remote_parameters() but not parameters().
for param in model.fc.parameters():
model_parameter_rrefs.append(RRef(param))
# Setup distributed optimizer
opt = DistributedOptimizer(
optim.SGD,
model_parameter_rrefs,
lr=0.05,
)
criterion = torch.nn.CrossEntropyLoss()
现在,我们准备介绍在每个 trainer 上运行的主训练循环。 只是一个用于生成随机输入的辅助函数,而
训练目标。我们针对多个 epoch 运行训练循环,并为每个 epoch 运行
批:get_next_batch
为 Distributed Autograd 设置 Distributed Autograd 上下文。
运行模型的正向传递并检索其输出。
使用 loss 函数根据我们的输出和目标计算损失。
使用 Distributed Autograd 以使用 loss 执行分布式向后传递。
最后,运行 Distributed Optimizer 步骤以优化所有参数。
def get_next_batch(rank):
for _ in range(10):
num_indices = random.randint(20, 50)
indices = torch.LongTensor(num_indices).random_(0, NUM_EMBEDDINGS)
# Generate offsets.
offsets = []
start = 0
batch_size = 0
while start < num_indices:
offsets.append(start)
start += random.randint(1, 10)
batch_size += 1
offsets_tensor = torch.LongTensor(offsets)
target = torch.LongTensor(batch_size).random_(8).cuda(rank)
yield indices, offsets_tensor, target
# Train for 100 epochs
for epoch in range(100):
# create distributed autograd context
for indices, offsets, target in get_next_batch(rank):
with dist_autograd.context() as context_id:
output = model(indices, offsets)
loss = criterion(output, target)
# Run distributed backward pass
dist_autograd.backward(context_id, [loss])
# Tun distributed optimizer
opt.step(context_id)
# Not necessary to zero grads as each iteration creates a different
# distributed autograd context which hosts different grads
print("Training done for epoch {}".format(epoch))
可以在此处找到整个示例的源代码。