分布式流水线并行简介¶
创建时间:2024年7月9日 | 最后更新时间:2024年12月12日 | 最后验证时间:2024年11月5日
作者: Howard Huang
注意
查看和编辑此教程在 github。
本教程使用一种类似gpt的变压器模型来演示使用 torch.distributed.pipelining APIs 实现分布式流水线并行。
如何使用
torch.distributed.pipeliningAPI如何将管道并行性应用于变压器模型
如何在一组微批次上使用不同的调度方案
熟悉PyTorch中的基本分布式训练
设置¶
使用 torch.distributed.pipelining,我们将对模型的执行进行划分,并在微批次上安排计算。我们将使用一个简化的变压器解码器模型版本。该模型架构仅用于教学目的,并包含多个变压器解码器层,因为我们希望展示如何将模型拆分为不同的部分。首先,让我们定义模型:
import torch
import torch.nn as nn
from dataclasses import dataclass
@dataclass
class ModelArgs:
dim: int = 512
n_layers: int = 8
n_heads: int = 8
vocab_size: int = 10000
class Transformer(nn.Module):
def __init__(self, model_args: ModelArgs):
super().__init__()
self.tok_embeddings = nn.Embedding(model_args.vocab_size, model_args.dim)
# Using a ModuleDict lets us delete layers witout affecting names,
# ensuring checkpoints will correctly save and load.
self.layers = torch.nn.ModuleDict()
for layer_id in range(model_args.n_layers):
self.layers[str(layer_id)] = nn.TransformerDecoderLayer(model_args.dim, model_args.n_heads)
self.norm = nn.LayerNorm(model_args.dim)
self.output = nn.Linear(model_args.dim, model_args.vocab_size)
def forward(self, tokens: torch.Tensor):
# Handling layers being 'None' at runtime enables easy pipeline splitting
h = self.tok_embeddings(tokens) if self.tok_embeddings else tokens
for layer in self.layers.values():
h = layer(h, h)
h = self.norm(h) if self.norm else h
output = self.output(h).clone() if self.output else h
return output
然后,我们需要在脚本中导入必要的库并初始化分布式训练过程。在这种情况下,我们定义了一些全局变量以供脚本后续使用:
import os
import torch.distributed as dist
from torch.distributed.pipelining import pipeline, SplitPoint, PipelineStage, ScheduleGPipe
global rank, device, pp_group, stage_index, num_stages
def init_distributed():
global rank, device, pp_group, stage_index, num_stages
rank = int(os.environ["LOCAL_RANK"])
world_size = int(os.environ["WORLD_SIZE"])
device = torch.device(f"cuda:{rank}") if torch.cuda.is_available() else torch.device("cpu")
dist.init_process_group()
# This group can be a sub-group in the N-D parallel case
pp_group = dist.new_group()
stage_index = rank
num_stages = world_size
The rank, world_size, and init_process_group() code should seem familiar to you as those are commonly used in
all distributed programs. The globals specific to pipeline parallelism include pp_group which is the process
group that will be used for send/recv communications, stage_index which, in this example, is a single rank
per stage so the index is equivalent to the rank, and num_stages which is equivalent to world_size.
The num_stages 用于设置在流水线并行调度中将使用的阶段数量。例如,
对于 num_stages=4,一个微批次需要经历 4 次前向和 4 次后向才能完成。stage_index
是框架知道如何在阶段之间进行通信所必需的。例如,对于第一个阶段 (stage_index=0),它将
使用数据加载器中的数据,并且不需要从任何先前的对等节点接收数据来进行计算。
步骤 1: 分割 Transformer 模型¶
有两种不同的方式来分割模型:
第一种是手动模式,在这种模式下,我们可以通过删除模型属性的部分内容,手动创建两个模型实例。在本例中,对于两个阶段(2个rank),模型被切成两半。
def manual_model_split(model) -> PipelineStage:
if stage_index == 0:
# prepare the first stage model
for i in range(4, 8):
del model.layers[str(i)]
model.norm = None
model.output = None
elif stage_index == 1:
# prepare the second stage model
for i in range(4):
del model.layers[str(i)]
model.tok_embeddings = None
stage = PipelineStage(
model,
stage_index,
num_stages,
device,
)
return stage
如我们所见,第一阶段不包含层归一化层或输出层,仅包括前四个Transformer块。
第二阶段不包含输入嵌入层,但包含输出层和最后四个Transformer块。然后函数
返回 PipelineStage 作为当前排名。
第二种方法是基于跟踪器的模式,该模式根据split_spec参数自动拆分模型。使用管道规范,我们可以指示
torch.distributed.pipelining 在何处拆分模型。在以下代码块中,
我们在第4个变压器解码器层之前进行拆分,这与上述手动拆分类似。
同样,在完成拆分后,我们可以通过调用 build_stage 来获取 PipelineStage。
步骤 2: 定义主执行¶
在主函数中,我们将创建一个特定的流水线调度,各个阶段应遵循该调度。 torch.distributed.pipelining
支持多种调度,包括每个rank单阶段调度 GPipe 和 1F1B,
以及每个rank多阶段调度,例如 Interleaved1F1B 和 LoopedBFS。
if __name__ == "__main__":
init_distributed()
num_microbatches = 4
model_args = ModelArgs()
model = Transformer(model_args)
# Dummy data
x = torch.ones(32, 500, dtype=torch.long)
y = torch.randint(0, model_args.vocab_size, (32, 500), dtype=torch.long)
example_input_microbatch = x.chunk(num_microbatches)[0]
# Option 1: Manual model splitting
stage = manual_model_split(model)
# Option 2: Tracer model splitting
# stage = tracer_model_split(model, example_input_microbatch)
model.to(device)
x = x.to(device)
y = y.to(device)
def tokenwise_loss_fn(outputs, targets):
loss_fn = nn.CrossEntropyLoss()
outputs = outputs.reshape(-1, model_args.vocab_size)
targets = targets.reshape(-1)
return loss_fn(outputs, targets)
schedule = ScheduleGPipe(stage, n_microbatches=num_microbatches, loss_fn=tokenwise_loss_fn)
if rank == 0:
schedule.step(x)
elif rank == 1:
losses = []
output = schedule.step(target=y, losses=losses)
print(f"losses: {losses}")
dist.destroy_process_group()
在上面的例子中,我们使用的是手动方法来分割模型,但可以取消注释代码以尝试基于追踪器的模型分割函数。在我们的调度中,我们需要传入微批次的数量以及用于评估目标的损失函数。
The .step() 函数处理整个小批量,并根据之前传入的 n_microbatches 自动将其拆分为微小批量。然后根据调度类对这些微小批量进行操作。
在上面的例子中,我们使用了 GPipe,它遵循一个简单的先全部前向传播再全部反向传播的调度方式。排名 1 返回的输出将与模型在单个 GPU 上运行整个批次时的结果相同。同样地,我们可以传入一个 losses 容器来存储每个微小批量对应的损失。
步骤 3: 启动分布式进程¶
最后,我们准备运行脚本。我们将使用 torchrun 来创建一个单主机、2进程的任务。
我们的脚本已经以 rank 0 的方式编写,执行管道阶段 0 所需的逻辑,而 rank 1 执行管道阶段 1 的逻辑。
torchrun --nnodes 1 --nproc_per_node 2 pipelining_tutorial.py
结论¶
在这个教程中,我们学习了如何使用 PyTorch 的 torch.distributed.pipelining API 实现分布式流水线并行。
我们探讨了环境设置、定义变压器模型以及为分布式训练进行分区的过程。
我们讨论了两种模型分区方法,手动和基于追踪的方法,并演示了如何在不同阶段上对微批次进行计算调度。最后,我们介绍了如何执行流水线计划以及使用 torchrun 启动分布式进程。
附加资源¶
我们已成功将 torch.distributed.pipelining 集成到 torchtitan 仓库中。TorchTitan 是一个使用原生 PyTorch 进行大规模 LLM 训练的简洁、最小代码库。要了解生产就绪的流水线并行性用法以及与其他分布式技术的组合,请参阅
TorchTitan 的 3D 并行性端到端示例。