管道并行度¶
管道并行最初是在 Gpipe 论文中引入的,并且是一个高效的 技术在多个 GPU 上训练大型模型。
警告
管道并行度是实验性的,可能会发生变化。
使用多个 GPU 的模型并行性¶
通常,对于不适合单个 GPU 的大型模型,模型并行性 用于将模型的某些部分放置在不同的 GPU 上。 虽然,如果这对顺序模型来说是天真地完成的,那么训练过程 由于 GPU 一次只有一个 GPU 处于活动状态,因此 GPU 处于利用率不足状态 如下图所示:
流水线执行¶
为了缓解这个问题,管道并行性将输入小批量拆分为 多个微批处理和管道执行这些微批处理 多个 GPU。下图对此进行了概述:
PyTorch 中的 Pipe API¶
-
class (module, chunks=1, checkpoint='except_last', deferred_batch_norm=False)[来源]
torch.distributed.pipeline.sync.
Pipe
¶ 包装任意
模块 使用同步管道并行性进行训练。如果模块需要 大量内存,并且不适合单个 GPU,因此管道并行性是一个 用于培训的有用技术。
该实现基于 torchgpipe 论文。
Pipe 将管道并行性与 checkpoint 相结合以减少峰值 训练所需的内存,同时最大限度地减少设备利用率不足。
您应该将所有模块放在适当的设备上并包装它们 导入到一个模块中
,定义 所需的执行顺序。如果模块不包含任何 parameters/buffers,则假设此模块应在 CPU 上执行 和模块的相应输入张量在 CPU 之前移动到 CPU 执行。此行为可以被包装器覆盖,包装器可用于显式指定模块的设备 应该继续运行。
WithDevice
- 参数
module (
) – 要使用流水线并行化的顺序模块。每个模块 必须将其所有参数都放在一个 装置。序列中的每个模块都必须是 nn.模块 或
(组合多个 sequential modules 在单个设备上)
chunks (int) – 微批次的数量 (默认:
1
)checkpoint (str) – 何时启用检查点,是 、 或 (default: ) 之一。 完全禁用检查点,为除最后一个微批次之外的所有微批次启用检查点 并为所有微批处理启用 checkpointing。
'always'
'except_last'
'never'
'except_last'
'never'
'except_last'
'always'
deferred_batch_norm (bool) – 是否使用延迟移动统计数据 (default:
)。如果设置为
,我们将跟踪 多个微批处理来更新每个 mini-batch 的 MICRO-BATCH 中。
BatchNorm
- 提高
TypeError – 模块不是
.
ValueError – 无效的参数
- 例::
跨 GPU 0 和 1 的两个 FC 层的管道。
>>> # Need to initialize RPC framework first. >>> os.environ['MASTER_ADDR'] = 'localhost' >>> os.environ['MASTER_PORT'] = '29500' >>> torch.distributed.rpc.init_rpc('worker', rank=0, world_size=1) >>> >>> # Build pipe. >>> fc1 = nn.Linear(16, 8).cuda(0) >>> fc2 = nn.Linear(8, 4).cuda(1) >>> model = nn.Sequential(fc1, fc2) >>> model = Pipe(model, chunks=8) >>> input = torch.rand(16, 16).cuda(0) >>> output_rref = model(input)
注意
目前只支持节点内流水线,但 将来将扩展以支持节点间流水线。 forward 函数返回一个
以允许将来进行节点间流水线,其中输出 可能位于远程主机上。对于节点内 pipelining,您可以使用它来
检索 output 在本地。
-
forward
(*输入)[来源]¶ 通过管道处理单个输入小批量,并返回指向输出的指针。
是一个相当透明的模块包装器。它没有 修改底层模块的 input and output 签名。但 存在类型限制。输入和输出必须至少包含一个 张肌。此限制也适用于 partition boundaries。
输入序列作为 .因此,此函数的位置 args 应 匹配管道第一阶段的 positional args。一样 条件适用于管道的一个阶段的输出,该阶段是 input 进行下一阶段。
*inputs
输入张量根据用于初始化的参数拆分为多个微批次。
批量大小 假定为 Tensor 的第一维,如果 batch size 小于 ,微批次的数量等于 批量大小。
chunks
chunks
只有 Tensor 被拆分为多个微批次、非 Tensor 输入 只是在每个微批处理中按原样复制。对于非 Tensor 输出 在管道的最后阶段,它们被聚合为 A 并返回 User。例如,如果您有 2 个微批次 返回整数 5,则用户将收到合并的 [5, 5] 的输出
List
所有输入张量都需要与第一个张量位于同一设备上 partition 的管道。
如果张量使用包装器包装,则张量 不会跨微批处理进行拆分,而是按原样复制,类似于 非张量。
NoChunk
- 参数
inputs – 输入小批量
- 返回
- 提高
TypeError – input 不包含至少一个张量
跳过连接¶
某些模型(如 ResNeXt)不是完全顺序的,并且具有 skip 层之间的连接。天真地作为管道的一部分实现 并行性意味着我们需要通过 多个 GPU,直到我们最终到达 GPU 中跳过的层 连接。为避免此复制开销,我们在下面提供了 API 来存储 并在模型的不同层中弹出 Tensor。
-
torch.distributed.pipeline.sync.skip.skippable.
skippable
(stash=(), pop=())[来源]¶ 使用 skip 定义
a 的装饰器 连接。修饰后的模块称为“可跳过的”。此功能 即使模块没有被
包装,也能正常工作。
每个 skip 张量都由其名称管理。在操作 skip 张量之前, 可跳过模块必须通过 Stash 和/或 POP 参数静态声明跳过张量的名称。具有预先声明名称的 Skip 张量可以是 由 或 弹出。
yield stash(name, tensor)
tensor = yield pop(name)
下面是一个包含三个图层的示例。隐藏了名为 “1to3” 的 skip 张量 和 popped 分别位于第一层和最后一层:
@skippable(stash=['1to3']) class Layer1(nn.Module): def forward(self, input): yield stash('1to3', input) return f1(input) class Layer2(nn.Module): def forward(self, input): return f2(input) @skippable(pop=['1to3']) class Layer3(nn.Module): def forward(self, input): skip_1to3 = yield pop('1to3') return f3(input) + skip_1to3 model = nn.Sequential(Layer1(), Layer2(), Layer3())
一个可跳过的模块可以存储或弹出多个跳过张量:
@skippable(stash=['alice', 'bob'], pop=['carol']) class StashStashPop(nn.Module): def forward(self, input): yield stash('alice', f_alice(input)) yield stash('bob', f_bob(input)) carol = yield pop('carol') return input + carol
每个 skip 张量必须与一对 stash 和 pop 相关联。
检查此 限制。您还可以检查 限制 by
without
.
-
class (name, tensor)[来源]
torch.distributed.pipeline.sync.skip.skippable.
stash
¶ 用于存储 skip 张量的命令。
def forward(self, input): yield stash('name', input) return f(input)
- 参数
name (str) – skip 张量的名称
input (Torch.Tensor 或 None) – 传递给 skip 连接的张量
-
class (name)[来源]
torch.distributed.pipeline.sync.skip.skippable.
pop
¶ 用于弹出 skip 张量的命令。
def forward(self, input): skip = yield pop('name') return f(input) + skip
- 参数
name (str) – skip 张量的名称
- 返回
之前由同名下的另一个层存储的 skip 张量
-
torch.distributed.pipeline.sync.skip.skippable.
verify_skippables
(module)[来源]¶ 验证底层的可跳过式模块是否满足完整性要求。
每个 skip 张量必须只有一对 stash 和 pop。如果有 是一个或多个不匹配的对子,它将与
详细消息。
# Layer1 stashes "1to3". # Layer3 pops "1to3". nn.Sequential(Layer1(), Layer2()) # └──── ? nn.Sequential(Layer2(), Layer3()) # ? ────┘ nn.Sequential(Layer1(), Layer2(), Layer3(), Layer3()) # └───────────────────┘ ^^^^^^ nn.Sequential(Layer1(), Layer1(), Layer2(), Layer3()) # ^^^^^^ └───────────────────┘
要对多个 skip 张量使用相同的名称,它们必须由 不同的命名空间。看。
isolate()
- 提高
TypeError – 一对或多对 stash 和 pop 不匹配。
教程¶
确认¶
管道并行的实现基于 fairscale 的 pipe 实现和 torchgpipe。我们希望 感谢两个团队对引入 Pipeline 的贡献和指导 parallelism 导入到 PyTorch 中。