目录

管道并行度

管道并行最初是在 Gpipe 论文中引入的,并且是一个高效的 技术在多个 GPU 上训练大型模型。

警告

torch.distributed.pipeline 已弃用,本文档也已弃用。为 最新的 pipeline 并行实现,请参考 PyTorch 下的 PiPPy 库 organization(PyTorch 的管道并行性)。

使用多个 GPU 的模型并行性

通常,对于不适合单个 GPU 的大型模型,模型并行性 用于将模型的某些部分放置在不同的 GPU 上。 虽然,如果这对顺序模型来说是天真地完成的,那么训练过程 由于 GPU 一次只有一个 GPU 处于活动状态,因此 GPU 处于利用率不足状态 如下图所示:

_images/no_pipe.png

该图表示一个模型,其中 4 个层放置在 4 个不同的 GPU 上 (垂直轴)。横轴表示通过以下方式训练此模型 时间,表明一次只使用 1 个 GPU (图片来源)。

流水线执行

为了缓解这个问题,管道并行性将输入小批量拆分为 多个微批处理和管道执行这些微批处理 多个 GPU。下图对此进行了概述:

_images/pipe.png

该图表示一个模型,其中 4 个层放置在 4 个不同的 GPU 上 (垂直轴)。横轴表示通过以下方式训练此模型 时间证明 GPU 的利用效率要高得多。 但是,仍然存在一个气泡(如图所示),其中 某些 GPU 未被使用。 (图片来源)。

PyTorch 中的 Pipe API

torch.distributed.pipeline.sync 中。管道modulechunks=1checkpoint='except_last'deferred_batch_norm=False[来源]

包装任意模块 使用同步管道并行性进行训练。如果模块需要 大量内存,并且不适合单个 GPU,因此管道并行性是一个 用于培训的有用技术。

该实现基于 torchgpipe 论文。

Pipe 将管道并行性与 checkpoint 相结合以减少峰值 训练所需的内存,同时最大限度地减少设备利用率不足。

您应该将所有模块放在适当的设备上并包装它们 导入到一个模块中,定义 所需的执行顺序。如果模块不包含任何 parameters/buffers,则假设此模块应在 CPU 上执行 和模块的相应输入张量在 CPU 之前移动到 CPU 执行。此行为可以被包装器覆盖,包装器可用于显式指定模块的设备 应该继续运行。WithDevice

参数
  • module) – 要使用流水线并行化的顺序模块。每个模块 必须将其所有参数都放在一个 装置。序列中的每个模块都必须是 nn.模块 或 (组合多个 sequential modules 在单个设备上)

  • chunksint) – 微批次的数量 (默认:1)

  • checkpointstr) – 何时启用检查点,是 、 或 (default: ) 之一。 完全禁用检查点,为除最后一个微批次之外的所有微批次启用检查点 并为所有微批处理启用 checkpointing。'always''except_last''never''except_last''never''except_last''always'

  • deferred_batch_normbool) – 是否使用延迟移动统计数据 (default: )。如果设置为 ,我们将跟踪 多个微批处理来更新每个 mini-batch 的 MICRO-BATCH 中。BatchNorm

提高
例::

跨 GPU 0 和 1 的两个 FC 层的管道。

>>> # Need to initialize RPC framework first.
>>> os.environ['MASTER_ADDR'] = 'localhost'
>>> os.environ['MASTER_PORT'] = '29500'
>>> torch.distributed.rpc.init_rpc('worker', rank=0, world_size=1)
>>>
>>> # Build pipe.
>>> fc1 = nn.Linear(16, 8).cuda(0)
>>> fc2 = nn.Linear(8, 4).cuda(1)
>>> model = nn.Sequential(fc1, fc2)
>>> model = Pipe(model, chunks=8)
>>> input = torch.rand(16, 16).cuda(0)
>>> output_rref = model(input)

注意

只有当 checkpoint 参数为 。'never'

注意

目前只支持节点内流水线,但 将来将扩展以支持节点间流水线。 forward 函数返回一个以允许将来进行节点间流水线,其中输出 可能位于远程主机上。对于节点内流水线,您可以使用它来检索 output 在本地。RReflocal_value()

警告

是实验性的,可能会发生变化。

forward*inputs[来源]

通过管道处理单个输入小批量,并返回指向输出的指针。是一个相当透明的模块包装器。它没有 修改底层模块的 input and output 签名。但 存在类型限制。输入和输出必须至少包含一个 张肌。此限制也适用于 partition boundaries。RRef

输入序列作为 .因此,此函数的位置 args 应 匹配管道第一阶段的 positional args。一样 条件适用于管道的一个阶段的输出,该阶段是 input 进行下一阶段。*inputs

输入张量根据用于初始化的参数拆分为多个微批次批量大小 假定为 Tensor 的第一维,如果 batch size 小于 ,微批次的数量等于 批量大小。chunkschunks

只有 Tensor 被拆分为多个微批次、非 Tensor 输入 只是在每个微批处理中按原样复制。对于非 Tensor 输出 在管道的最后阶段,它们被聚合为 A 并返回 User。例如,如果您有 2 个微批次 返回整数 5,则用户将收到合并的 [5, 5] 的输出List

所有输入张量都需要与第一个张量位于同一设备上 partition 的管道。

如果张量使用包装器包装,则张量 不会跨微批处理进行拆分,而是按原样复制,类似于 非张量。NoChunk

参数

inputs – 输入小批量

返回

RRef到 mini-batch 的输出

提高

TypeError – input 不包含至少一个张量

返回类型

RRef

跳过连接

某些模型(如 ResNeXt)不是完全顺序的,并且层之间有跳过连接。 天真地作为管道并行的一部分实现意味着 我们需要通过多个 GPU 复制某些层的输出,直到 我们最终到达 Skip 连接层所在的 GPU。 为了避免这种复制开销,我们在下面提供了 API 来存储和弹出张量 在模型的不同层中。

torch.distributed.pipeline.sync.skip.skippable 的 skippable 中。skippablestash=()pop=()[来源]

定义一个装饰器以使用 skip 连接创建

这些装饰的模块称为 “skippable”。此功能完美运行 即使模块没有被 .

每个 skip 张量都由其名称管理。在操作 skip 张量之前, 可跳过模块必须通过 Stash 和/或 POP 参数静态声明跳过张量的名称。具有预先声明名称的 Skip 张量可以是 由 或 弹出。yield stash(name, tensor)tensor = yield pop(name)

下面是一个包含三个图层的示例。隐藏了名为 “1to3” 的 skip 张量 和 popped 分别位于第一层和最后一层:

@skippable(stash=['1to3'])
class Layer1(nn.Module):
    def forward(self, input):
        yield stash('1to3', input)
        return f1(input)

class Layer2(nn.Module):
    def forward(self, input):
        return f2(input)

@skippable(pop=['1to3'])
class Layer3(nn.Module):
    def forward(self, input):
        skip_1to3 = yield pop('1to3')
        return f3(input) + skip_1to3

model = nn.Sequential(Layer1(), Layer2(), Layer3())

一个可跳过的模块可以存储或弹出多个跳过张量:

@skippable(stash=['alice', 'bob'], pop=['carol'])
class StashStashPop(nn.Module):
    def forward(self, input):
        yield stash('alice', f_alice(input))
        yield stash('bob', f_bob(input))
        carol = yield pop('carol')
        return input + carol

每个 skip 张量必须与一对 stashpop 相关联。检查此 限制。您还可以检查 限制 by without .

返回类型

Callable[[Type[Module]], Type[可跳过]]]

torch.distributed.pipeline.sync.skip.skippable 中。stashnametensor[来源]

用于存储 skip 张量的命令。

def forward(self, input):
    yield stash('name', input)
    return f(input)
参数
  • namestr) – skip 张量的名称

  • inputTorch.TensorNone) – 传递给 skip 连接的张量

torch.distributed.pipeline.sync.skip.skippable 中。popname[来源]

用于弹出 skip 张量的命令。

def forward(self, input):
    skip = yield pop('name')
    return f(input) + skip
参数

namestr) – skip 张量的名称

返回

之前由同名下的另一个层存储的 skip 张量

返回类型

没有

torch.distributed.pipeline.sync.skip.skippable 的 skippable 中。verify_skippables模块[来源]

验证底层的可跳过式模块是否满足完整性要求。

每个 skip 张量必须只有一对 stashpop。如果有 是一个或多个不匹配的对子,它将与 详细消息。

以下是一些失败案例。将报告失败 对于这些情况:

# Layer1 stashes "1to3".
# Layer3 pops "1to3".

nn.Sequential(Layer1(), Layer2())
#               └──── ?

nn.Sequential(Layer2(), Layer3())
#                   ? ────┘

nn.Sequential(Layer1(), Layer2(), Layer3(), Layer3())
#               └───────────────────┘       ^^^^^^

nn.Sequential(Layer1(), Layer1(), Layer2(), Layer3())
#             ^^^^^^      └───────────────────┘

要对多个 skip 张量使用相同的名称,它们必须由 不同的命名空间。看。isolate()

提高

TypeError – 一对或多对 stashpop 不匹配。

教程

以下教程很好地概述了如何使用 API 通过 PyTorch 提供的其余组件:

确认

管道并行的实现基于 fairscale 的 pipe 实现torchgpipe。我们希望 感谢两个团队对引入 Pipeline 的贡献和指导 parallelism 导入到 PyTorch 中。

文档

访问 PyTorch 的全面开发人员文档

查看文档

教程

获取面向初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并解答您的问题

查看资源