管道并行度¶
管道并行最初是在 Gpipe 论文中引入的,并且是一个高效的 技术在多个 GPU 上训练大型模型。
警告
管道并行度是实验性的,可能会发生变化。
使用多个 GPU 的模型并行性¶
通常,对于不适合单个 GPU 的大型模型,模型并行性 用于将模型的某些部分放置在不同的 GPU 上。 虽然,如果这对顺序模型来说是天真地完成的,那么训练过程 由于 GPU 一次只有一个 GPU 处于活动状态,因此 GPU 处于利用率不足状态 如下图所示:
流水线执行¶
为了缓解这个问题,管道并行性将输入小批量拆分为 多个微批处理和管道执行这些微批处理 多个 GPU。下图对此进行了概述:
PyTorch 中的 Pipe API¶
- 
class (module, chunks=1, checkpoint='except_last', deferred_batch_norm=False)[来源]torch.distributed.pipeline.sync.Pipe¶
- 将任意 - nn.Sequential模块 使用同步管道并行性进行训练。如果模块需要 大量内存,并且不适合单个 GPU,因此管道并行性是一个 用于培训的有用技术。- 该实现基于 torchgpipe 论文。 - Pipe 将管道并行性与 checkpoint 相结合以减少峰值 训练所需的内存,同时最大限度地减少设备利用率不足。 - 您应该将所有模块放在适当的设备上并包装它们 转换为 - nn.Sequential模块定义 所需的执行顺序。如果模块不包含任何 parameters/buffers,则假设此模块应在 CPU 上执行 和模块的相应输入张量在 CPU 之前移动到 CPU 执行。此行为可以被包装器覆盖,包装器可用于显式指定模块的设备 应该继续运行。- WithDevice- 参数
- 模块 ( - nn.Sequential) —— 要使用流水线并行化的顺序模块。每个模块 必须将其所有参数都放在一个 装置。序列中的每个模块都必须是 nn.模块 或- nn.Sequential(要组合多个 sequential modules 在单个设备上)
- chunks (int) – 微批次的数量 (默认: - 1)
- checkpoint (str) – 何时启用检查点,是 、 或 (default: ) 之一。 完全禁用检查点,为除最后一个微批次之外的所有微批次启用检查点 并为所有微批处理启用 checkpointing。 - 'always'- 'except_last'- 'never'- 'except_last'- 'never'- 'except_last'- 'always'
- deferred_batch_norm (bool) – 是否使用延迟移动统计数据(默认值: - BatchNorm- False).如果设置为- True,我们跟踪 多个微批处理来更新每个 mini-batch 的 MICRO-BATCH 中。
 
- 提高
- TypeError – 该模块不是 - nn.Sequential.
- ValueError – 无效的参数 
 
 - 例::
- 跨 GPU 0 和 1 的两个 FC 层的管道。 - >>> # Need to initialize RPC framework first. >>> os.environ['MASTER_ADDR'] = 'localhost' >>> os.environ['MASTER_PORT'] = '29500' >>> torch.distributed.rpc.init_rpc('worker', rank=0, world_size=1) >>> >>> # Build pipe. >>> fc1 = nn.Linear(16, 8).cuda(0) >>> fc2 = nn.Linear(8, 4).cuda(1) >>> model = nn.Sequential(fc1, fc2) >>> model = Pipe(model, chunks=8) >>> input = torch.rand(16, 16).cuda(0) >>> output_rref = model(input) 
 - 注意 - 您可以将 - Pipe模型与- torch.nn.parallel.DistributedDataParallel仅当 checkpoint 参数为- Pipe是。- 'never'- 注意 - Pipe目前只支持节点内流水线,但 将来将扩展以支持节点间流水线。 forward 函数返回一个- RRef为了允许将来的节点间流水线,其中输出 可能位于远程主机上。对于节点内 pipelining,您可以使用它来检索 output 在本地。- local_value()- 警告 - Pipe是实验性的,可能会发生变化。- 
forward(*输入)[来源]¶
- 通过管道处理单个输入小批量,并返回一个 - RRef指向输出。- Pipe是一个相当透明的模块包装器。它没有 修改底层模块的 input and output 签名。但 存在类型限制。输入和输出必须至少包含一个 张肌。此限制也适用于 partition boundaries。- 输入序列作为 .因此,此函数的位置 args 应 匹配管道第一阶段的 positional args。一样 条件适用于管道的一个阶段的输出,该阶段是 input 进行下一阶段。 - *inputs- 输入张量根据用于初始化的参数拆分为多个微批处理 - chunks- Pipe.批量大小 假定为 Tensor 的第一维,如果 batch size 小于 ,微批次的数量等于 批量大小。- chunks- 只有 Tensor 被拆分为多个微批次、非 Tensor 输入 只是在每个微批处理中按原样复制。对于非 Tensor 输出 在管道的最后阶段,它们被聚合为 A 并返回 User。例如,如果您有 2 个微批次 返回整数 5,则用户将收到合并的 [5, 5] 的输出 - List- 所有输入张量都需要与第一个张量位于同一设备上 partition 的管道。 - 如果张量使用包装器包装,则张量 不会跨微批处理进行拆分,而是按原样复制,类似于 非张量。 - NoChunk
 
跳过连接¶
某些模型(如 ResNeXt)不是完全顺序的,并且具有 skip 层之间的连接。天真地作为管道的一部分实现 并行性意味着我们需要通过 多个 GPU,直到我们最终到达 GPU 中跳过的层 连接。为避免此复制开销,我们在下面提供了 API 来存储 并在模型的不同层中弹出 Tensor。
- 
torch.distributed.pipeline.sync.skip.skippable.skippable(stash=(), pop=())[来源]¶
- 用于定义 - nn.Module带 Skip 连接。修饰后的模块称为“可跳过的”。此功能 即使模块没有被- Pipe.- 每个 skip 张量都由其名称管理。在作 skip 张量之前, 可跳过模块必须通过 Stash 和/或 POP 参数静态声明跳过张量的名称。具有预先声明名称的 Skip 张量可以是 由 或 弹出。 - yield stash(name, tensor)- tensor = yield pop(name)- 下面是一个包含三个图层的示例。隐藏了名为 “1to3” 的 skip 张量 和 popped 分别位于第一层和最后一层: - @skippable(stash=['1to3']) class Layer1(nn.Module): def forward(self, input): yield stash('1to3', input) return f1(input) class Layer2(nn.Module): def forward(self, input): return f2(input) @skippable(pop=['1to3']) class Layer3(nn.Module): def forward(self, input): skip_1to3 = yield pop('1to3') return f3(input) + skip_1to3 model = nn.Sequential(Layer1(), Layer2(), Layer3()) - 一个可跳过的模块可以存储或弹出多个跳过张量: - @skippable(stash=['alice', 'bob'], pop=['carol']) class StashStashPop(nn.Module): def forward(self, input): yield stash('alice', f_alice(input)) yield stash('bob', f_bob(input)) carol = yield pop('carol') return input + carol - 每个 skip 张量必须与一对 stash 和 pop 相关联。 - Pipe检查此 限制。您还可以检查 限制者- verify_skippables()没有- Pipe.
- 
class (name, tensor)[来源]torch.distributed.pipeline.sync.skip.skippable.stash¶
- 用于存储 skip 张量的命令。 - def forward(self, input): yield stash('name', input) return f(input) - 参数
- name (str) – skip 张量的名称 
- input (Torch.Tensor 或 None) – 传递给 skip 连接的张量 
 
 
- 
class (name)[来源]torch.distributed.pipeline.sync.skip.skippable.pop¶
- 用于弹出 skip 张量的命令。 - def forward(self, input): skip = yield pop('name') return f(input) + skip - 参数
- name (str) – skip 张量的名称 
- 返回
- 之前由同名下的另一个层存储的 skip 张量 
 
- 
torch.distributed.pipeline.sync.skip.skippable.verify_skippables(module)[来源]¶
- 验证底层的可跳过式模块是否满足完整性要求。 - 每个 skip 张量必须只有一对 stash 和 pop。如果有 是一对或多对不匹配的对子,它将加注 - TypeError使用 详细消息。- 以下是一些失败案例。 - verify_skippables()将报告失败 对于这些情况:- # Layer1 stashes "1to3". # Layer3 pops "1to3". nn.Sequential(Layer1(), Layer2()) # └──── ? nn.Sequential(Layer2(), Layer3()) # ? ────┘ nn.Sequential(Layer1(), Layer2(), Layer3(), Layer3()) # └───────────────────┘ ^^^^^^ nn.Sequential(Layer1(), Layer1(), Layer2(), Layer3()) # ^^^^^^ └───────────────────┘ - 要对多个 skip 张量使用相同的名称,它们必须由 不同的命名空间。看。 - isolate()- 提高
- TypeError – 一对或多对 stash 和 pop 不匹配。 
 
确认¶
管道并行的实现基于 fairscale 的 pipe 实现和 torchgpipe。我们希望 感谢两个团队对引入 Pipeline 的贡献和指导 parallelism 导入到 PyTorch 中。
 
