目录

管道并行度

注意

torch.distributed.pipelining当前处于 Alpha 状态及以下 发展。API 可能会更改。它是从 PiPPy 项目迁移而来的。

为什么选择 Pipeline Parallel?

Pipeline Parallelism 是深度学习的原始并行性之一。 它允许对模型的执行进行分区,以便多个微批处理可以同时执行模型代码的不同部分。 管道并行是一种有效的技术:

  • 大规模培训

  • 带宽受限的集群

  • 大型模型推理

上述方案有一个共同点,即每个设备的计算无法 隐藏了约定俗成的平行度的通信,例如权重 FSDP 的所有集合。

什么?torch.distributed.pipelining

虽然 pipelining 有望实现扩展,但通常难以实现,因为 除了模型权重之外,它还需要对模型的执行进行分区。 执行分区通常需要对 型。复杂性的另一个方面是在 分布式环境,考虑了数据流依赖性

该软件包提供了一个工具包,可以自动完成上述操作,从而可以轻松实现管道并行性 在一般型号上。pipelining

它由两部分组成:拆分前端分布式运行时。 拆分前端按原样获取您的模型代码,将其拆分为“模型 partitions“,并捕获数据流关系。分布式运行时 在不同设备上并行执行 pipeline 阶段,处理事情 就像微批量拆分、调度、通信和梯度传播一样, 等。

总体而言,该软件包提供以下功能:pipelining

  • 基于简单规范的模型代码拆分。

  • 对 pipeline schedules 的丰富支持,包括 GPipe、1F1B、 交错 1F1B 和循环 BFS,并提供用于写入的基础设施 自定义时间表。

  • 对跨主机管道并行性的一流支持,因为这是 PP 通常使用(在较慢的互连上)。

  • 与其他 PyTorch 并行技术(如数据并行)的可组合性 (DDP、FSDP) 或张量并行。TorchTitan 项目演示了“3D 并行” Llama 模型上的应用程序。

第 1 步:构建PipelineStage

在使用 之前,我们需要创建对象来包装在该阶段中运行的模型部分。负责分配通信缓冲区和 创建 Send/Recv Ops 以与其对等体通信。它管理中间 buffers 中,例如,对于尚未被消耗的 forward 的输出,以及它 提供用于向后运行 Stage 模型的实用程序。PipelineSchedulePipelineStagePipelineStage

A 需要知道舞台的输入和输出形状 模型,以便它可以正确分配通信缓冲区。形状必须 是静态的,例如,在运行时,形状不能逐步更改。如果运行时形状与 预期形状。与其他平行法作曲或应用混合时 精度,则必须考虑这些技术,以便知道 stage 模块输出的正确形状(和 dtype) 运行。PipelineStagePipeliningShapeErrorPipelineStage

用户可以通过传入一个来直接构造一个实例,方法是传入一个表示模型应该在 阶段。这可能需要更改原始模型代码。查看示例 在 选项 1:手动分割模型 中。PipelineStagenn.Module

或者,拆分前端可以使用图形分区来拆分 模型自动转换为一系列。此技术需要 模型可通过 进行跟踪。结果与其他并行技术的可组合性是实验性的,可能需要 一些解决方法。如果用户 无法轻松更改模型代码。有关详细信息,请参阅选项 2:自动拆分模型 信息。nn.Moduletorch.Exportnn.Module

第 2 步:用于执行PipelineSchedule

现在,我们可以将 附加到管道计划,并运行 schedule with input data(使用输入数据进行调度)。下面是一个 GPipe 示例:PipelineStage

from torch.distributed.pipelining import ScheduleGPipe

# Create a schedule
schedule = ScheduleGPipe(stage, n_microbatches)

# Input data (whole batch)
x = torch.randn(batch_size, in_dim, device=device)

# Run the pipeline with input `x`
# `x` will be divided into microbatches automatically
if rank == 0:
    schedule.step(x)
else:
    output = schedule.step()

请注意,需要为每个 worker 启动上述代码,因此我们使用 启动器服务启动多个进程:

torchrun --nproc_per_node=2 example.py

分割模型的选项

选项 1:手动拆分模型

要直接构造一个 ,用户负责提供 拥有相关 and 的单个实例,并定义执行操作的方法 与该阶段相关。例如,Transformer 的精简版本 class 显示了构建一个易于分区的 型。PipelineStagenn.Modulenn.Parametersnn.Buffersforward()

class Transformer(nn.Module):
    def __init__(self, model_args: ModelArgs):
        super().__init__()

        self.tok_embeddings = nn.Embedding(...)

        # Using a ModuleDict lets us delete layers without affecting names,
        # ensuring checkpoints will correctly save and load.
        self.layers = torch.nn.ModuleDict()
        for layer_id in range(model_args.n_layers):
            self.layers[str(layer_id)] = TransformerBlock(...)

        self.output = nn.Linear(...)

    def forward(self, tokens: torch.Tensor):
        # Handling layers being 'None' at runtime enables easy pipeline splitting
        h = self.tok_embeddings(tokens) if self.tok_embeddings else tokens

        for layer in self.layers.values():
            h = layer(h, self.freqs_cis)

        h = self.norm(h) if self.norm else h
        output = self.output(h).float() if self.output else h
        return output

以这种方式定义的模型可以很容易地按阶段进行配置,方法是: 首先 初始化整个模型(使用 meta-device 避免 OOM 错误),删除 不需要的层,然后创建一个 PipelineStage,该层将 模型。例如:

with torch.device("meta"):
    assert num_stages == 2, "This is a simple 2-stage example"

    # we construct the entire model, then delete the parts we do not need for this stage
    # in practice, this can be done using a helper function that automatically divides up layers across stages.
    model = Transformer()

    if stage_index == 0:
        # prepare the first stage model
        del model.layers["1"]
        model.norm = None
        model.output = None

    elif stage_index == 1:
        # prepare the second stage model
        model.tok_embeddings = None
        del model.layers["0"]

    from torch.distributed.pipelining import PipelineStage
    stage = PipelineStage(
        model,
        stage_index,
        num_stages,
        device,
        input_args=example_input_microbatch,
    )

需要一个示例参数,表示 阶段的运行时输入,这将是一个 Microbatch 的输入 数据。此参数通过 stage 模块的 forward 方法传递给 确定通信所需的输入和输出形状。PipelineStageinput_args

当使用其他 Data 或 Model 并行技术进行组合时,如果模型块的输出 shape/dtype 将为 影响。output_args

选项 2:自动拆分模型

如果您有一个完整的模型,并且不想花时间将其修改为 序列的 “model partitions” 中,API 可以提供帮助。 下面是一个简短的示例:pipeline

class Model(torch.nn.Module):
    def __init__(self) -> None:
        super().__init__()
        self.emb = torch.nn.Embedding(10, 3)
        self.layers = torch.nn.ModuleList(
            Layer() for _ in range(2)
        )
        self.lm = LMHead()

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.emb(x)
        for layer in self.layers:
            x = layer(x)
        x = self.lm(x)
        return x

如果我们打印模型,我们可以看到多个层次结构,这使得很难手动拆分:

Model(
  (emb): Embedding(10, 3)
  (layers): ModuleList(
    (0-1): 2 x Layer(
      (lin): Linear(in_features=3, out_features=3, bias=True)
    )
  )
  (lm): LMHead(
    (proj): Linear(in_features=3, out_features=3, bias=True)
  )
)

让我们看看 API 是如何工作的:pipeline

from torch.distributed.pipelining import pipeline, SplitPoint

# An example micro-batch input
x = torch.LongTensor([1, 2, 4, 5])

pipe = pipeline(
    module=mod,
    mb_args=(x,),
    split_spec={
        "layers.1": SplitPoint.BEGINNING,
    }
)

API 在给定 的情况下拆分模型,其中 表示在函数中执行某个子模块之前添加一个拆分点,并且 同样,对于 such 之后的 split point。pipelinesplit_specSplitPoint.BEGINNINGforwardSplitPoint.END

如果我们 ,我们可以看到:print(pipe)

GraphModule(
  (submod_0): GraphModule(
    (emb): InterpreterModule()
    (layers): Module(
      (0): InterpreterModule(
        (lin): InterpreterModule()
      )
    )
  )
  (submod_1): GraphModule(
    (layers): Module(
      (1): InterpreterModule(
        (lin): InterpreterModule()
      )
    )
    (lm): InterpreterModule(
      (proj): InterpreterModule()
    )
  )
)

def forward(self, x):
    submod_0 = self.submod_0(x);  x = None
    submod_1 = self.submod_1(submod_0);  submod_0 = None
    return (submod_1,)

“模型分区”由子模块 (, ) 表示,每个子模块都使用原始模型操作、权重重建 和层次结构。此外,“根级”函数是 reconstructed 来捕获这些分区之间的数据流。此类数据流 稍后将由 Pipeline Runtime 以分布式方式重放。submod_0submod_1forward

该对象提供了一种检索 “model partitions” 的方法:Pipe

stage_mod : nn.Module = pipe.get_stage_module(stage_idx)

返回的是一个 ,您可以使用它创建一个 optimizer、save 或 load checkpoints,或者应用其他并行性。stage_modnn.Module

Pipe还允许您在给定的设备上创建 Distributed Stage Runtime 一个:ProcessGroup

stage = pipe.build_stage(stage_idx, device, group)

或者,如果您想稍后在一段时间后构建暂存运行时 修改 ,您可以使用 API 的功能版本。例如:stage_modbuild_stage

from torch.distributed.pipelining import build_stage
from torch.nn.parallel import DistributedDataParallel

dp_mod = DistributedDataParallel(stage_mod)
info = pipe.info()
stage = build_stage(dp_mod, stage_idx, info, device, group)

注意

前端使用 tracer () 来捕获 模型转换为单个图形。如果你的模型不是全图形的,你可以使用 我们的手动前端如下。pipelinetorch.export

拥抱脸示例

在这个软件包所在的 PiPPy 仓库中 最初创建,我们保留了基于未修改的 Hugging Face 模型的示例。 请参阅 examples/huggingface 目录。

示例包括:

技术深入探讨

API 如何拆分模型?pipeline

首先,API 将我们的模型转换为有向无环图 (DAG) 通过跟踪模型。它使用 PyTorch 2 来跟踪模型 全图捕获工具。pipelinetorch.export

然后,它将阶段所需的操作和参数组合在一起 转换为重构的子模块: , , ...submod_0submod_1

与常规的子模块访问方式不同,如 , API 不仅会切割模型的模块结构,而且 还有模型的 forward 函数。Module.children()pipeline

这是必要的,因为模型结构就像 在 期间捕获信息,并且不会捕获任何 有关 的信息。换句话说,缺乏关于 pipelininig 的以下关键方面的信息:Module.children()Module.__init__()Module.forward()Module.children()

  • 子模块的执行顺序forward

  • 子模块之间的激活流

  • 子模块之间是否有任何函数运算符(例如,或者操作不会被 捕获)。reluaddModule.children()

相反,API 确保行为 真正被保留下来。它还捕获了分区之间的激活流, 帮助分布式运行时在没有人工的情况下进行正确的 Send/Receive 调用 介入。pipelineforward

API 的另一个灵活性是 split points 可以位于 模型层次结构中的任意级别。在拆分分区中,原始模型 与该分区相关的层次结构将免费重建。 结果,指向子模块或参数的完全限定名称 (FQN) 仍然有效,并且依赖于 FQN(例如 FSDP、TP 或 checkpointing)仍然可以与几乎零代码的分区模块一起运行 改变。pipeline

实施您自己的时间表

您可以通过扩展以下两个类之一来实现自己的管道计划:

  • PipelineScheduleSingle

  • PipelineScheduleMulti

PipelineScheduleSingle适用于每个等级仅分配一个阶段的计划。 适用于为每个等级分配多个阶段的计划。PipelineScheduleMulti

例如,和 是 的子类。 而 , 和 是 的子类。ScheduleGPipeSchedule1F1BPipelineScheduleSingleScheduleFlexibleInterleaved1F1BScheduleInterleaved1F1BScheduleLoopedBFSPipelineScheduleMulti

伐木

您可以使用 [torch._logging](https://pytorch.org/docs/main/logging.html#module-torch._logging) 中的 TORCH_LOGS 环境变量来启用其他日志记录:

  • TORCH_LOGS=+pp 将显示日志记录。DEBUG 消息及其上面的所有级别。

  • TORCH_LOGS=pp 将显示 logging.INFO 消息及以上。

  • TORCH_LOGS=-pp 将显示日志记录。WARNING 消息及以上。

API 参考

模型拆分 API

以下一组 API 将您的模型转换为管道表示形式。

torch.distributed.pipelining 中。SplitPointvalue[来源]

一个枚举。

torch.distributed.pipelining 中。pipelinemodulemb_argsmb_kwargs=Nonesplit_spec=Nonesplit_policy=None[来源]

根据规范拆分模块。

有关详细信息,请参阅 Pipe

参数
  • moduleModule) – 要拆分的模块。

  • mb_argsTuple[Any...]) – 示例位置输入,采用微批处理形式。

  • mb_kwargsOptional[Dict[strAny]]) – 示例关键字输入,采用微批处理形式。(默认值:)

  • split_specOptional[Dict[strSplitPoint]]) – 使用子模块名称作为拆分标记的字典。(默认值:)

  • split_policy (Optional[Callable[[GraphModule]GraphModule]]) – 用于拆分模块的策略。(默认值:)

返回类型

Pipe 的管道表示形式。

torch.distributed.pipelining 中。管道split_gmnum_stageshas_loss_and_backwardloss_spec[来源]
torch.distributed.pipelining 中。pipe_split)[来源]

pipe_split 是一个特殊的运算符,用于标记 阶段。它用于将模块拆分为多个阶段。它是一个 no-op 如果你的 Comments 模块是 Eagerly 运行的。

>>> def forward(self, x):
>>>     x = torch.mm(x, self.mm_param)
>>>     x = torch.relu(x)
>>>     pipe_split()
>>>     x = self.lin(x)
>>>     return x

上面的例子将分为两个阶段。

Microbatch 实用程序

torch.distributed.pipelining.microbatch 中。TensorChunkSpecsplit_dim[来源]

用于指定输入分块的类

torch.distributed.pipelining.microbatch 的 Package。split_args_kwargs_into_chunksargskwargschunksargs_chunk_spec=kwargs_chunk_spec=[来源]

给定一个 args 和 kwargs 序列,将它们拆分为多个块 根据它们各自的分块规格。

参数
返回

分片 args 列表 kwargs_split:分片 kwargs 列表

返回类型

args_split

torch.distributed.pipelining.microbatch 的 Package。merge_chunkschunkschunk_spec[来源]

给定一个 chunk 列表,根据 区块规格。

参数
  • chunksList[Any]) – 块列表

  • chunk_spec – 块的分块规范

返回

合并值

返回类型

价值

管道阶段

torch.distributed.pipelining.stage 中。PipelineStage子模块stage_indexnum_stages设备input_argsoutput_args==dw_builder=[来源]

一个类,表示管道并行设置中的管道阶段。 此类是通过提供示例输入(和可选的输出)手动创建的 而不是从 pipeline() 输出的 PipelineStage 类。 此类扩展了 _PipelineStageBase 类,并且可以类似地使用 在 PipelineScheule 中。

参数
  • 子模块nn.module) – 此阶段包装的 PyTorch 模块。

  • stage_indexint) – 此阶段的 ID。

  • num_stagesint) – 阶段的总数。

  • devicetorch.device) – 此阶段所在的设备。

  • input_args联合[Torch.TensorTuple[torch.tensor]], optional) – 子模块的输入参数。

  • output_argsUnion[Torch.TensorTuple[torch.tensor]], optional) – 子模块的输出参数。

  • dist.ProcessGroup可选)– 分布式训练的进程组。如果为 None,则为 default group。

  • dw_builder可选[Callable[[]Callable[[...]]]]) – TODO 清理评论

torch.distributed.pipelining.stage 中。build_stagestage_modulestage_indexpipe_infodevicegroup=None[来源]

创建一个管道阶段,给定此阶段要包装的stage_module 和管道信息。

参数
  • stage_moduletorch.nn.Module) – 此阶段要包装的模块

  • stage_indexint) – 管道中此阶段的索引

  • pipe_infoPipeInfo) – 有关管道的信息,可以通过 pipe.info() 检索

  • devicetorch.device) – 此阶段要使用的设备

  • group可选[dist.ProcessGroup]) – 此阶段要使用的流程组

返回

可以使用 PipelineSchedules 运行的管道阶段。

返回类型

_PipelineStage

管道计划

torch.distributed.pipelining.schedules。ScheduleGPipestagen_microbatchesloss_fn=args_chunk_spec=kwargs_chunk_spec=output_merge_spec=None[来源]

GPipe 计划。 将以填充-排空的方式遍历所有微批次。

torch.distributed.pipelining.schedules。Schedule1F1Bstagen_microbatchesloss_fn=args_chunk_spec=kwargs_chunk_spec=output_merge_spec=None[来源]

1F1B 时间表。 将对处于稳定状态的微批次执行一次向前和一次向后操作。

torch.distributed.pipelining.schedules。ScheduleFlexibleInterleaved1F1B阶段n_microbatchesloss_fn=args_chunk_spec=kwargs_chunk_spec=output_merge_spec=enable_zero_bubble=False[来源]

灵活的交错 1F1B 计划。

此计划与交错的 1F1B 计划最相似。 它的不同之处在于放宽了 num_microbatch % pp_size == 0 的要求。 使用 flex_pp 计划,我们将有 num_rounds = max(1, n_microbatches // pp_group_size) 和 只要 n_microbatches % num_rounds为 0,它就会起作用。举几个例子,支持

  1. pp_group_size = 4,n_microbatches = 10。我们将得到 num_rounds = 2,n_microbatches % 2 为 0。

  2. pp_group_size = 4,n_microbatches = 3。我们将得到 num_rounds = 1,n_microbatches % 1 为 0。

当 enable_zero_bubble 为 True 时,我们将在 https://openreview.net/pdf?id=tuzTN0eIO5 中使用 ZB1P 计划

torch.distributed.pipelining.schedules。ScheduleInterleaved1F1B阶段n_microbatchesloss_fn=args_chunk_spec=kwargs_chunk_spec=output_merge_spec=[来源]

交错 1F1B 计划。 有关详细信息,请参阅 https://arxiv.org/pdf/2104.04473。 将稳定地对微批次执行一次前进和一次向后操作 状态,并支持每个等级的多个阶段。当微批次准备好时 多个本地阶段,Interleaved 1F1B 优先考虑较早的微批处理 (也称为“深度优先”)。

torch.distributed.pipelining.schedules。ScheduleLoopedBFSstagesn_microbatchesloss_fn=Noneoutput_merge_spec=None[来源]

广度优先管道并行。 有关详细信息,请参阅 https://arxiv.org/abs/2211.05953。 与交错 1F1B 类似,Looped BFS 支持每个等级的多个阶段。 不同的是,当微批次准备好用于多个本地 阶段,Loops BFS 将优先考虑较早的阶段,运行所有可用的 微批次。

torch.distributed.pipelining.schedules。ScheduleInterleavedZeroBubblestagesn_microbatchesloss_fn=args_chunk_spec=kwargs_chunk_spec=output_merge_spec=[来源]

交错零气泡计划。 有关详细信息,请参阅 https://arxiv.org/pdf/2401.10241。 将对微批次的输入执行一次向前和一次向后操作 状态,并支持每个等级的多个阶段。使用 backward for weights 来填充 管道气泡。

torch.distributed.pipelining.schedules。PipelineScheduleSinglestagen_microbatchesloss_fn=args_chunk_spec=kwargs_chunk_spec=output_merge_spec=[来源]

单阶段计划的基类。 实现 step 方法。 派生类应实现 _step_microbatches

step*argstarget=Nonelosses=None**kwargs[来源]

使用全批处理输入运行管道计划的一次迭代。 会自动将输入分块成微批次,并通过 根据计划实施进行微批次。

args:模型的位置参数(如非 pipeline 情况)。 kwargs:模型的关键字参数(如非管道情况)。 target:损失函数的目标。 losses:用于存储每个微批次的损失的列表。

torch.distributed.pipelining.schedules。PipelineScheduleMultistagesn_microbatchesloss_fn=args_chunk_spec=kwargs_chunk_spec=output_merge_spec=stage_index_to_group_rank=use_full_backward=[来源]

多阶段计划的基类。 实现 step 方法。

step*argstarget=Nonelosses=None**kwargs[来源]

使用全批处理输入运行管道计划的一次迭代。 会自动将输入分块成微批次,并通过 根据计划实施进行微批次。

args:模型的位置参数(如非 pipeline 情况)。 kwargs:模型的关键字参数(如非管道情况)。 target:损失函数的目标。 losses:用于存储每个微批次的损失的列表。

文档

访问 PyTorch 的全面开发人员文档

查看文档

教程

获取面向初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并解答您的问题

查看资源