分布式管道并行性简介¶
创建时间: Jul 09, 2024 |上次更新时间:2024 年 12 月 12 日 |上次验证: Nov 05, 2024
作者: Howard Huang
注意
在 github 中查看和编辑本教程。
本教程使用 gpt 样式的 transformer 模型来演示实现分布式 使用 torch.distributed.pipelining API 的管道并行性。
如何使用 API
torch.distributed.pipelining
如何将管道并行性应用于 transformer 模型
如何在一组微批次上使用不同的计划
熟悉 PyTorch 中的基本分布式训练
设置¶
我们将对模型的执行进行分区,并在微批处理上安排计算。我们将使用简化版本
的 Transformer 解码器模型。模型架构用于教育目的,并且具有多个 transformer 解码器层,因为我们想演示如何将模型拆分为不同的
块。首先,我们定义模型:torch.distributed.pipelining
import torch
import torch.nn as nn
from dataclasses import dataclass
@dataclass
class ModelArgs:
dim: int = 512
n_layers: int = 8
n_heads: int = 8
vocab_size: int = 10000
class Transformer(nn.Module):
def __init__(self, model_args: ModelArgs):
super().__init__()
self.tok_embeddings = nn.Embedding(model_args.vocab_size, model_args.dim)
# Using a ModuleDict lets us delete layers witout affecting names,
# ensuring checkpoints will correctly save and load.
self.layers = torch.nn.ModuleDict()
for layer_id in range(model_args.n_layers):
self.layers[str(layer_id)] = nn.TransformerDecoderLayer(model_args.dim, model_args.n_heads)
self.norm = nn.LayerNorm(model_args.dim)
self.output = nn.Linear(model_args.dim, model_args.vocab_size)
def forward(self, tokens: torch.Tensor):
# Handling layers being 'None' at runtime enables easy pipeline splitting
h = self.tok_embeddings(tokens) if self.tok_embeddings else tokens
for layer in self.layers.values():
h = layer(h, h)
h = self.norm(h) if self.norm else h
output = self.output(h).clone() if self.output else h
return output
然后,我们需要在脚本中导入必要的库并初始化分布式训练过程。在本例中,我们定义了一些要使用的全局变量 稍后在脚本中:
import os
import torch.distributed as dist
from torch.distributed.pipelining import pipeline, SplitPoint, PipelineStage, ScheduleGPipe
global rank, device, pp_group, stage_index, num_stages
def init_distributed():
global rank, device, pp_group, stage_index, num_stages
rank = int(os.environ["LOCAL_RANK"])
world_size = int(os.environ["WORLD_SIZE"])
device = torch.device(f"cuda:{rank}") if torch.cuda.is_available() else torch.device("cpu")
dist.init_process_group()
# This group can be a sub-group in the N-D parallel case
pp_group = dist.new_group()
stage_index = rank
num_stages = world_size
您似乎对 、 和 代码 很熟悉,因为它们通常用于
所有分布式程序。特定于管道并行的全局变量包括 which is the process
将用于发送/接收通信的组,在本例中为单个等级
,因此 index 等于 rank,也等于 world_size。rank
world_size
init_process_group()
pp_group
stage_index
num_stages
这用于设置将在管道并行计划中使用的阶段数。例如
对于 ,微批处理在完成之前需要向前 4 次和向后 4 次。框架知道如何在 stages 之间进行通信是必要的。例如,对于第一阶段 (),它将
使用来自 DataLoader 的数据,并且不需要从任何以前的 Peer 节点接收数据来执行其计算。num_stages
num_stages=4
stage_index
stage_index=0
第 1 步:对 Transformer 模型进行分区¶
有两种不同的模型分区方法:
首先是手动模式,在这种模式下,我们可以通过删除 属性。在此示例中,对于两个阶段(2 个等级),模型被切成两半。
def manual_model_split(model) -> PipelineStage:
if stage_index == 0:
# prepare the first stage model
for i in range(4, 8):
del model.layers[str(i)]
model.norm = None
model.output = None
elif stage_index == 1:
# prepare the second stage model
for i in range(4):
del model.layers[str(i)]
model.tok_embeddings = None
stage = PipelineStage(
model,
stage_index,
num_stages,
device,
)
return stage
正如我们所看到的,第一阶段没有层范数或输出层,它只包括前四个 transformer 模块。
第二阶段没有 input embedding 层,但包括输出层和最后四个 transformer blocks。函数
然后返回当前排名的 。PipelineStage
第二种方法是基于 tracer 的模式,它根据参数自动拆分模型。使用管道规范,我们可以指示在何处拆分模型。在下面的代码块中,
我们在 4th Transformer 解码器层之前进行 split,镜像了上述的 manual split。同样地
我们可以在这次拆分完成后通过 call 来检索 a。split_spec
torch.distributed.pipelining
PipelineStage
build_stage
第 2 步:定义主要执行¶
在 main 函数中,我们将创建一个特定的管道计划,各个阶段应遵循该计划。 支持多个计划,包括支持多个计划,包括单阶段/排名计划和 ,
以及每个等级的多个阶段计划,例如 和 。torch.distributed.pipelining
GPipe
1F1B
Interleaved1F1B
LoopedBFS
if __name__ == "__main__":
init_distributed()
num_microbatches = 4
model_args = ModelArgs()
model = Transformer(model_args)
# Dummy data
x = torch.ones(32, 500, dtype=torch.long)
y = torch.randint(0, model_args.vocab_size, (32, 500), dtype=torch.long)
example_input_microbatch = x.chunk(num_microbatches)[0]
# Option 1: Manual model splitting
stage = manual_model_split(model)
# Option 2: Tracer model splitting
# stage = tracer_model_split(model, example_input_microbatch)
model.to(device)
x = x.to(device)
y = y.to(device)
def tokenwise_loss_fn(outputs, targets):
loss_fn = nn.CrossEntropyLoss()
outputs = outputs.reshape(-1, model_args.vocab_size)
targets = targets.reshape(-1)
return loss_fn(outputs, targets)
schedule = ScheduleGPipe(stage, n_microbatches=num_microbatches, loss_fn=tokenwise_loss_fn)
if rank == 0:
schedule.step(x)
elif rank == 1:
losses = []
output = schedule.step(target=y, losses=losses)
print(f"losses: {losses}")
dist.destroy_process_group()
在上面的示例中,我们使用手动方法来拆分模型,但可以取消注释代码以尝试 基于 Tracer 的模型拆分函数。在我们的 Schedule 中,我们需要传入 microbatch 的数量和 用于评估目标的损失函数。
该功能处理整个小批量,并自动将其拆分为基于微批量的
上 通过 之前。然后根据计划类对微批次进行操作。
在上面的示例中,我们使用的是 GPipe,它遵循简单的全向前和全向后计划。输出
从排名 1 返回的模型与模型在单个 GPU 上并与整个批次一起运行相同。同样地
我们可以传入一个容器来存储每个微批次的相应损失。.step()
n_microbatches
losses
第 3 步:启动分布式进程¶
最后,我们准备好运行脚本了。我们将用于创建单个主机、2 进程作业。
我们的脚本已经以 rank 0 的方式编写,该方式执行管道阶段 0 和 rank 1 所需的逻辑
执行管道阶段 1 的逻辑。torchrun
torchrun --nnodes 1 --nproc_per_node 2 pipelining_tutorial.py
结论¶
在本教程中,我们学习了如何使用 PyTorch 的 API 实现分布式管道并行性。
我们探索了设置环境、定义 transformer 模型以及对其进行分区以进行分布式训练。
我们讨论了两种模型分区方法,手动和基于跟踪器的方法,并演示了如何在
跨不同阶段的微批次。最后,我们介绍了管道计划的执行和分布式的启动
使用 .torch.distributed.pipelining
torchrun
其他资源¶
我们已经成功地集成到 torchtitan 仓库中。TorchTitan 是一个干净、最小的代码库,用于
使用原生 PyTorch 进行大规模 LLM 训练。对于管道的生产就绪使用
并行性以及与其他分布式技术的组合,请参阅 TorchTitan 的 3D 并行性端到端示例。torch.distributed.pipelining