目录

模型并行

注意

torch.distributed.pipelining 当前处于测试阶段并正在开发中。API 可能会发生变化。它是从 PiPPy 项目迁移而来的。

为什么使用管道并行?

管道并行是深度学习的一种基本并行方式。 它允许模型的执行被分区,使得多个 微批次可以并发地执行模型代码的不同部分。 管道并行可以是一种有效的技术用于:

  • 大规模训练

  • 带宽受限的集群

  • 大模型推理。

上述场景有一个共同点,即每个设备上的计算无法掩盖传统并行性的通信开销,例如 FSDP 的权重全汇聚操作。

什么是 torch.distributed.pipelining?

虽然流水线技术在扩展方面很有前景,但由于它除了需要划分模型权重外,还需要划分模型的执行,因此往往难以实现。 执行的划分通常需要对您的模型进行侵入性的代码更改。复杂性的另一个方面来自于在分布式环境中调度微批次,并且需要考虑数据流依赖性

pipelining 包提供了一个工具包,可以自动完成这些事情,从而轻松实现管道并行计算 在 通用 模型上。

它由两部分组成:一个 分割前端 和一个 分布式运行时。 分割前端直接使用您的模型代码,将其拆分为“模型分区”,并捕获数据流关系。分布式运行时并行地在不同设备上执行各个管道阶段,处理诸如微批次拆分、调度、通信和梯度传播等任务。

总体而言,pipelining包提供了以下功能:

  • 根据简单规范对模型代码进行拆分。

  • 为流水线调度提供丰富的支持,包括 GPipe、1F1B、 交错 1F1B 和循环 BFS,并提供编写 自定义调度的基础设施。

  • 对跨主机管道并行性的一流支持,因为管道并行性通常用于较慢的互连网络中。

  • 与其他PyTorch并行技术(如数据并行DDP、FSDP或张量并行)的组合性。 TorchTitan项目展示了Llama模型上的“3D并行”应用。

步骤 1:构建 PipelineStage 以执行

在我们可以使用PipelineSchedule之前,我们需要创建PipelineStage对象来封装该阶段中运行的部分模型。PipelineStage负责分配通信缓冲区并创建发送/接收操作以与其对等方通信。它管理中间缓冲区,例如尚未被消费的前向传播输出,并提供用于运行阶段模型反向传播的工具。

一个 PipelineStage 需要知道阶段模型的输入和输出形状,以便它可以正确分配通信缓冲区。这些形状必须是静态的,例如在运行时,形状不能从一步变化到另一步。如果运行时形状与预期形状不匹配,则会引发一个 PipeliningShapeError 类错误。在与其他并行技术组合或应用混合精度时,必须考虑这些技术,以便 PipelineStage 在运行时知道阶段模块输出的正确形状(和数据类型)。

用户可以直接通过传递一个PipelineStage实例来构建,该实例代表模型中应在舞台上运行的部分。这可能需要对原始模型代码进行更改。请参阅nn.Module中的示例选项 1:手动拆分模型

或者,分割前端可以使用图分区技术将您的模型自动分割成一系列nn.Module。此技术要求模型可以通过torch.Export进行跟踪。生成的nn.Module与其他并行技术的组合性尚处于试验阶段,可能需要一些变通方法。如果用户无法轻松更改模型代码,使用此前端可能更具吸引力。有关更多信息,请参阅选项 2:自动分割模型

步骤 2:使用 PipelineSchedule 执行

我们现在可以将 PipelineStage 连接到管道调度,并使用输入数据运行调度。这是一个GPipe示例:

from torch.distributed.pipelining import ScheduleGPipe

# Create a schedule
schedule = ScheduleGPipe(stage, n_microbatches)

# Input data (whole batch)
x = torch.randn(batch_size, in_dim, device=device)

# Run the pipeline with input `x`
# `x` will be divided into microbatches automatically
if rank == 0:
    schedule.step(x)
else:
    output = schedule.step()

注意,上述代码需要在每个工作者上启动,因此我们使用一个启动服务来启动多个进程:

torchrun --nproc_per_node=2 example.py

模型拆分选项

选项 1:手动拆分模型

要直接构建一个 PipelineStage,用户需要提供一个单独的 nn.Module 实例,该实例拥有相关的 nn.Parametersnn.Buffers,并定义一个执行该阶段相关操作的 forward() 方法。例如,Torchtitan 中定义的 Transformer 类的一个简化版本展示了构建一个易于分区的模型的模式。

class Transformer(nn.Module):
    def __init__(self, model_args: ModelArgs):
        super().__init__()

        self.tok_embeddings = nn.Embedding(...)

        # Using a ModuleDict lets us delete layers witout affecting names,
        # ensuring checkpoints will correctly save and load.
        self.layers = torch.nn.ModuleDict()
        for layer_id in range(model_args.n_layers):
            self.layers[str(layer_id)] = TransformerBlock(...)

        self.output = nn.Linear(...)

    def forward(self, tokens: torch.Tensor):
        # Handling layers being 'None' at runtime enables easy pipeline splitting
        h = self.tok_embeddings(tokens) if self.tok_embeddings else tokens

        for layer in self.layers.values():
            h = layer(h, self.freqs_cis)

        h = self.norm(h) if self.norm else h
        output = self.output(h).float() if self.output else h
        return output

以这种方式定义的模型可以通过以下步骤在每个阶段进行轻松配置:首先使用元设备初始化整个模型(以避免内存不足错误),删除该阶段不需要的层,然后创建一个封装模型的PipelineStage。例如:

with torch.device("meta"):
    assert num_stages == 2, "This is a simple 2-stage example"

    # we construct the entire model, then delete the parts we do not need for this stage
    # in practice, this can be done using a helper function that automatically divides up layers across stages.
    model = Transformer()

    if stage_index == 0:
        # prepare the first stage model
        del model.layers["1"]
        model.norm = None
        model.output = None

    elif stage_index == 1:
        # prepare the second stage model
        model.tok_embeddings = None
        del model.layers["0"]

    from torch.distributed.pipelining import PipelineStage
    stage = PipelineStage(
        model,
        stage_index,
        num_stages,
        device,
        input_args=example_input_microbatch,
    )

The PipelineStage 需要一个示例参数 input_args,表示该阶段的运行时输入,这将是相当于一个微批量的输入数据。此参数通过阶段模块的前向方法传递,以确定通信所需的输入和输出形状。

当与其他数据并行或模型并行技术结合使用时,如果模型块的输出形状/数据类型将受到影响, 可能也需要output_args

选项 2:自动拆分模型

如果你有一个完整的模型,并且不想花费时间将其修改为一系列“模型分区”,那么pipeline API 将为你提供帮助。这里有一个简要示例:

class Model(torch.nn.Module):
    def __init__(self) -> None:
        super().__init__()
        self.emb = torch.nn.Embedding(10, 3)
        self.layers = torch.nn.ModuleList(
            Layer() for _ in range(2)
        )
        self.lm = LMHead()

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.emb(x)
        for layer in self.layers:
            x = layer(x)
        x = self.lm(x)
        return x

如果我们打印模型,可以看到多个层级,这使得手动拆分变得困难:

Model(
  (emb): Embedding(10, 3)
  (layers): ModuleList(
    (0-1): 2 x Layer(
      (lin): Linear(in_features=3, out_features=3, bias=True)
    )
  )
  (lm): LMHead(
    (proj): Linear(in_features=3, out_features=3, bias=True)
  )
)

让我们看看pipeline API是如何工作的:

from torch.distributed.pipelining import pipeline, SplitPoint

x = torch.LongTensor([1, 2, 4, 5])
pipe = pipeline(
    module=mod,
    num_chunks=1,
    example_args=(x,),
    split_spec={
        "layers.1": SplitPoint.BEGINNING,
    }
)

API pipeline 可以根据给定的 split_spec 来拆分模型,其中 SplitPoint.BEGINNING 表示在 forward 函数中某个子模块执行之前添加一个拆分点,并且类似地, SplitPoint.END 表示在该子模块执行之后添加拆分点。

如果我们 print(pipe),我们可以看到:

GraphModule(
  (submod_0): GraphModule(
    (emb): InterpreterModule()
    (layers): Module(
      (0): InterpreterModule(
        (lin): InterpreterModule()
      )
    )
  )
  (submod_1): GraphModule(
    (layers): Module(
      (1): InterpreterModule(
        (lin): InterpreterModule()
      )
    )
    (lm): InterpreterModule(
      (proj): InterpreterModule()
    )
  )
)

def forward(self, x):
    submod_0 = self.submod_0(x);  x = None
    submod_1 = self.submod_1(submod_0);  submod_0 = None
    return (submod_1,)

“模型分区”由子模块表示 (submod_0, submod_1),每个子模块都使用原始模型操作 和层次结构进行重建。此外,还重建了一个“根级别”的 forward 函数, 以捕获这些分区之间的数据流。稍后,管道运行时将以分布式方式重放这种数据流。

The Pipe 对象提供了一种检索“模型分区”的方法:

stage_mod : nn.Module = pipe.get_stage_module(stage_idx)

你也可以使用 Pipe 在设备上创建一个分布式阶段运行时:

stage = pipe.build_stage(stage_idx, device, group)

注意

The pipeline 前端使用跟踪器 (torch.export) 来捕获您的模型到单个图中。如果您的模型不能完全图化,您可以使用下面的手动前端。

Hugging Face 示例

在该包最初创建的PiPPy仓库中,我们保留了基于未经修改的Hugging Face模型的示例。 查看examples/huggingface目录。

示例包括:

技术深度解读

Pytorch的pipeline API是如何拆分模型的?

首先,pipeline API通过追踪模型将其转换为有向无环图(DAG)。 它使用torch.export——一个PyTorch 2全图捕获工具来进行模型追踪。

然后,它将一个阶段所需的 操作和参数 组合成一个重构的子模块: submod_0, submod_1, …

不同于传统的子模块访问方法如 Module.children()pipeline API 不仅分割了你模型的模块结构,还分割了你的模型的 前向 函数。

这是因为模型结构如 Module.children() 仅仅在 Module.__init__() 期间捕获信息,而不会捕获任何关于 Module.forward() 的信息。换句话说,Module.children() 缺乏以下与流水线关键方面相关的信息:

  • 子模块的执行顺序在 forward

  • 激活在子模块之间流动

  • 子模块之间是否存在功能运算符(例如,reluadd 运算将不会被 Module.children() 捕获)。

相反,pipeline API 确保了 forward 行为 真正被保留。它还捕获了分区之间的激活流, 帮助分布式运行时在没有人工干预的情况下正确地进行发送/接收调用。

pipeline API 的另一个灵活性在于,拆分点可以在您的模型层次结构中的任意级别。在拆分分区中,与该分区相关的原始模型层次结构将免费为您重建。 因此,指向子模块或参数的完全限定名 (FQN) 仍然有效,并且依赖于 FQN 的服务(例如 FSDP、TP 或检查点)可以几乎不更改代码即可运行您的分区模块。

实现自己的调度

你可以通过扩展以下两个类之一来实现自己的管道调度: - `torch.utils.data.Sampler` - `torch.utils.data.WeightedRandomSampler`

  • PipelineScheduleSingle

  • PipelineScheduleMulti

PipelineScheduleSingle 用于每个排名仅分配 一个 阶段的日程。 PipelineScheduleMulti 用于每个排名分配多个阶段的日程。

例如,ScheduleGPipeSchedule1F1BPipelineScheduleSingle 的子类。 而 ScheduleInterleaved1F1BScheduleLoopedBFSPipelineScheduleMulti 的子类。

API 参考

模型拆分APIs

以下这一组 API 可以将您的模型转换为管道表示形式。

class torch.distributed.pipelining.SplitPoint(value)[source]

一个枚举。

torch.distributed.pipelining.pipeline(module, mb_args, mb_kwargs=None, split_spec=None, split_policy=None)[source]

根据规范拆分一个模块。

参见 Pipe 以获取更多详情。

Parameters
Return type

Pipe的管道表示。

class torch.distributed.pipelining.Pipe(split_gm, num_stages, has_loss_and_backward, loss_spec)[source]
torch.distributed.pipelining.pipe_split()[source]

pipe_split 是一个特殊的操作符,用于标记模块中阶段之间的边界。它用于将模块划分为不同的阶段。如果你的注释模块以急切模式运行,则它是无效操作。

示例

>>> def forward(self, x):
>>>     x = torch.mm(x, self.mm_param)
>>>     x = torch.relu(x)
>>>     pipe_split()
>>>     x = self.lin(x)
>>>     return x

上述示例将分为两个阶段。

微批次工具

class torch.distributed.pipelining.microbatch.TensorChunkSpec(split_dim)[source]

用于指定输入分块的类

torch.distributed.pipelining.microbatch.split_args_kwargs_into_chunks(args, kwargs, chunks, args_chunk_spec=None, kwargs_chunk_spec=None)[source]

给定一个参数序列和关键字参数,根据各自的分块规范将其分成若干个块。

Parameters
Returns

分片参数列表 kwargs_split:分片的关键字参数列表

Return type

args_split

torch.distributed.pipelining.microbatch.merge_chunks(chunks, chunk_spec)[source]

给定一系列片段,根据片段规格将其合并为单个值。

Parameters
  • (列表[任意]) – 块的列表

  • chunk_spec – 分块的规格说明

Returns

合并值

Return type

管道阶段

class torch.distributed.pipelining.stage.PipelineStage(submodule, stage_index, num_stages, device, input_args, output_args=None, group=None)[source]

一个类,表示流水线并行设置中的流水线阶段。通过提供示例输入(和可选的输出)手动创建此类, 与从pipeline()输出的PipelineStage类相反。此类扩展了_PipelineStageBase类,并且可以类似地用于PipelineScheule

Parameters
  • 子模块 (nn.Module) – 由该阶段包装的PyTorch模块。

  • stage_index (int) – 此阶段的ID。

  • 阶段总数 (int) – 阶段的总数量。

  • 设备 (torch.device) – 该阶段所在的设备。

  • 输入参数 (Union[torch.Tensor, Tuple[torch.tensor]], 可选) – 子模块的输入参数。

  • 输出参数 (Union[torch.Tensor, Tuple[torch.tensor]], 可选) – 子模块的输出参数。

  • (dist.ProcessGroup, 可选) – 分布式训练的过程组。如果为None,则使用默认组。

torch.distributed.pipelining.stage.build_stage(stage_module, stage_index, pipe_info, device, group=None)[source]

给定要由该阶段包装的 stage_module 和管道信息,创建一个管道阶段。

Parameters
  • stage_module (torch.nn.Module) – 需要由该阶段包装的模块

  • stage_index (int) – 管道中此阶段的索引

  • 管道信息 (PipeInfo) – 管道的信息,可以通过pipe.info()检索

  • 设备 (torch.device) – 本阶段使用的设备

  • (可选[dist.ProcessGroup]) – 本阶段使用的进程组

Returns

一个可以与PipelineSchedules一起运行的流水线阶段。

Return type

_PipelineStage

管道调度

class torch.distributed.pipelining.schedules.ScheduleGPipe(stage, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None)[source]

GPipe调度将以填满-排空的方式遍历所有微批量。

class torch.distributed.pipelining.schedules.Schedule1F1B(stage, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None)[source]

1F1B时间表。 在稳态下,将对微小批次执行一次前向和一次后向操作。

class torch.distributed.pipelining.schedules.ScheduleInterleaved1F1B(stages, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None)[source]

交错1F1B调度。 详情请参见https://arxiv.org/pdf/2104.04473。 在稳定状态下,对微批次进行一次前向和一次后向操作,并支持每个排名的多个阶段。当多个本地阶段的微批次准备就绪时,交错1F1B优先处理较早的微批次(也称为“深度优先”)。

class torch.distributed.pipelining.schedules.ScheduleLoopedBFS(stages, n_microbatches, loss_fn=None, output_merge_spec=None)[source]

广度优先流水线并行。 详情请见https://arxiv.org/abs/2211.05953。 类似于交错1F1B,循环BFS支持每个排名多个阶段。 不同之处在于,当微批次准备就绪时,循环BFS会优先处理较早的阶段,并一次性运行所有可用的微批次。

class torch.distributed.pipelining.schedules.PipelineScheduleSingle(stage, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None)[source]

单阶段调度的基础类。 实现step方法。 派生类应实现_step_microbatches

step(*args, target=None, losses=None, **kwargs)[source]

运行管道调度的一个迭代,输入为整个批次。 会自动将输入分成微批次,并根据调度实现处理这些微批次。

args: 模型的位置参数(与非管道情况相同)。 kwargs: 模型的关键字参数(与非管道情况相同)。 target: 损失函数的目标。 losses: 一个用于存储每个微批损失的列表。

class torch.distributed.pipelining.schedules.PipelineScheduleMulti(stages, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None)[source]

多阶段调度的基础类。 实现step方法。

step(*args, target=None, losses=None, **kwargs)[source]

运行管道调度的一个迭代,输入为整个批次。 会自动将输入分成微批次,并根据调度实现处理这些微批次。

args: 模型的位置参数(与非管道情况相同)。 kwargs: 模型的关键字参数(与非管道情况相同)。 target: 损失函数的目标。 losses: 一个用于存储每个微批损失的列表。

文档

访问 PyTorch 的全面开发人员文档

查看文档

教程

获取面向初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并解答您的问题

查看资源