目录

DDP 通信钩子

DDP 通信钩子是控制通信方式的通用接口 通过覆盖 DistributedDataParallel 中的原版 AllReduce 来实现 Gradients。 提供了一些内置的通信钩子, 用户可以轻松应用这些钩子中的任何一个来优化通信。 此外,钩子接口还可以支持用户定义的通信 更高级用例的策略。

如何使用通信钩子?

要使用通信钩子,用户只需要让 DDP 模型注册 训练循环之前的 hook,如下所示。

torch.nn.parallel.DistributedDataParallel.register_comm_hook()

通信钩子对什么进行操作?

通信钩子提供了一种灵活的方法来减少梯度。 所以,它主要在 allreduce 之前对每个副本上的梯度进行操作, 它们被分桶以增加通信和计算之间的重叠。 具体而言,表示要 allreduced 的梯度张量桶。

torch.distributed 中。GradBucket (英语

此类主要传递一个扁平化的梯度张量 (返回者 ) 到 DDP 通信钩子。 此张量可以进一步分解为此存储桶中每个参数的张量列表 (返回者 ) 以应用分层操作。get_per_parameter_tensors()

torch.distributed.GradBucket 的 GradBucket 中。indexself torch._C ._distributed_c10d.GradBucket int

警告

由于存储桶是在第一次迭代后重建的,因此不应依赖训练开始时的索引。

结果

存储几个连续层的梯度的存储桶的索引。 所有渐变都已分桶化。

torch.distributed.GradBucket 的 GradBucket 中。bufferself torch._C._distributed_c10d.GradBucket torch 中。张肌
结果

扁平化的 1D 缓冲区 可以进一步分解为此存储桶中每个参数的张量列表。torch.Tensor

torch.distributed.GradBucket 的 GradBucket 中。gradientsself torch._C._distributed_c10d.GradBucket List[torch.张量]
结果

的列表 .列表中的每个张量对应一个梯度。torch.Tensor

torch.distributed.GradBucket 的 GradBucket 中。is_last自我 torch._C._distributed_c10d.GradBucket bool
结果

此存储桶是否是迭代中 allreduce 的最后一个存储桶。 这也意味着此存储桶对应于 forward pass 中的前几层。

torch.distributed.GradBucket 的 GradBucket 中。set_buffer自我 torch._C._distributed_c10d.GradBucket缓冲区torch。Tensor None

将存储桶中的张量替换为输入张量缓冲区。

torch.distributed.GradBucket 的 GradBucket 中。parametersself torch._C._distributed_c10d.GradBucket List[torch.张量]
结果

的列表 .列表中的每个张量对应一个模型 参数。torch.Tensor

默认通信钩子

默认通信 hook 是简单的无状态 hook,因此输入状态 in 是进程组或 . 输入是一个对象。register_comm_hookNonebucket

torch.distributed.algorithms.ddp_comm_hooks.default_hooks 中。allreduce_hookprocess_groupbucket[来源]

这个 DDP 通信钩子只是使用 Tensors 进行调用。在所有工作线程中聚合梯度张量后,其回调将获取平均值并返回结果。如果用户注册了这个钩子, DDP 结果应与未注册 hook 的情况相同。 因此,这不会改变 DDP 的行为,用户可以将其用作参考 或修改此钩子以记录有用的信息或任何其他目的,而 不影响 DDP 行为。allreduceGradBucketthen

例::
>>> ddp_model.register_comm_hook(process_group, allreduce_hook)
返回类型

Future[张量]

torch.distributed.algorithms.ddp_comm_hooks.default_hooks 中。fp16_compress_hookprocess_groupbucket[来源]

这个 DDP 通信钩子实现了一个简单的梯度压缩 将 Tensor 强制转换为半精度浮点格式的方法 () ,然后将其除以进程组大小。 这一切都减少了那些梯度张量。一次压缩梯度 tensor 的 Tensor 都是 AllReduced 的,则链式回调会将其转换回输入数据类型(例如 )。GradBuckettorch.float16float16decompressfloat32

例::
>>> ddp_model.register_comm_hook(process_group, fp16_compress_hook)
返回类型

Future[张量]

torch.distributed.algorithms.ddp_comm_hooks.default_hooks 中。bf16_compress_hookprocess_groupbucket[来源]

警告:此 API 是实验性的,它需要高于 2.9.6 的 NCCL 版本。

这个 DDP 通信钩子实现了一个简单的梯度压缩 将张量转换为半精度 Brain 浮点格式的方法 () ,然后将其除以进程组大小。 这一切都减少了那些梯度张量。一次压缩梯度 tensor 的 Tensor 都是 AllReduced 的,则链式回调会将其转换回输入数据类型(例如 )。GradBuckettorch.bfloat16bfloat16decompressfloat32

例::
>>> ddp_model.register_comm_hook(process_group, bf16_compress_hook)
返回类型

Future[张量]

此外,还提供了一个通信钩子包装器来支持作为包装器, 可以与其他通信钩子结合使用。

torch.distributed.algorithms.ddp_comm_hooks.default_hooks 中。fp16_compress_wrapper子)[来源]

此包装器将给定 DDP 通信钩子的输入梯度张量转换为半精度 浮点格式 (),并将给定钩子的结果张量转换回 input 数据类型,例如 .torch.float16float32

因此, 等效于 。fp16_compress_hookfp16_compress_wrapper(allreduce_hook)

例::
>>> state = PowerSGDState(process_group=process_group, matrix_approximation_rank=1, start_powerSGD_iter=10)
>>> ddp_model.register_comm_hook(state, fp16_compress_wrapper(powerSGD_hook))
返回类型

Callable[[AnyGradBucket], Future[Tensor]]

torch.distributed.algorithms.ddp_comm_hooks.default_hooks 中。bf16_compress_wrapper子)[来源]

警告:此 API 是实验性的,它需要高于 2.9.6 的 NCCL 版本。

此包装器将给定 DDP 通信钩子的输入梯度张量转换为半精度 Brain 浮点格式 <https://en.wikipedia.org/wiki/Bfloat16_floating-point_format> '_ (''torch.bfloat16'), 并将给定钩子的结果张量转换回输入数据类型,例如 .float32

因此, 等效于 。bf16_compress_hookbf16_compress_wrapper(allreduce_hook)

例::
>>> state = PowerSGDState(process_group=process_group, matrix_approximation_rank=1, start_powerSGD_iter=10)
>>> ddp_model.register_comm_hook(state, bf16_compress_wrapper(powerSGD_hook))
返回类型

Callable[[AnyGradBucket], Future[Tensor]]

PowerSGD 通信挂钩

PowerSGD(Vogels 等人,NeurIPS 2019) 是一种梯度压缩算法,可以提供非常高的压缩率 速率并加速带宽受限的分布式训练。 该算法需要同时维护一些超参数和内部 州。因此,PowerSGD 通信钩子是一个有状态的钩子, 用户需要提供定义如下的 state 对象。

PowerSGD 状态

torch.distributed.algorithms.ddp_comm_hooks.powerSGD_hook 的PowerSGDStateprocess_groupmatrix_approximation_rank=1start_powerSGD_iter=1000min_compression_rate=2use_error_feedback=warm_start=真orthogonalization_epsilon=0random_seed=0compression_stats_logging_frequency=10000batch_tensors_with_same_shape=False[来源]

在训练期间存储算法的超参数和所有梯度的内部状态。 特别是 和 是用户应该调整的主要超参数。 为了提高性能,我们建议将二进制超参数保持为 on。matrix_approximation_rankstart_powerSGD_iteruse_error_feedbackwarm_start

  1. matrix_approximation_rank控制压缩的低秩张量的大小,这决定了压缩率。等级越低,压缩越强。

    1.1. 如果太低,完整的模型质量将需要更多的训练步骤才能达到或永远不会达到,并导致准确性下降。matrix_approximation_rank

    1.2. 增加 of 会大大增加压缩的计算成本,并且精度可能不会超过某个阈值进一步提高。matrix_approximation_rankmatrix_approximation_rank

要调整 ,我们建议从 1 开始,然后增加 2 倍(如指数网格搜索、1、2、4 等),直到达到令人满意的精度。通常只使用较小的值 1-4。对于一些 NLP 任务(如原始论文的附录 D 所示),该值已增加到 32。matrix_approximation_rank

  1. start_powerSGD_iter将 PowerSGD 压缩推迟到步骤 ,而 vanilla allreduce 在步骤 之前运行 。这种 vanilla allreduce + PowerSGD 的混合方案可以有效提高精度,即使是使用相对较小的。这是因为,训练阶段的开始通常对不准确的梯度非常敏感,过早压缩梯度可能会使训练很快走上次优轨迹,从而对准确率造成不可挽回的影响。start_powerSGD_iterstart_powerSGD_itermatrix_approximation_rank

要调整,我们建议从总训练步骤的 10% 开始,然后增加它,直到达到令人满意的准确性。如果训练中有热身阶段,通常应不少于热身步骤的数量。start_powerSGD_iterstart_powerSGD_iter

  1. min_compression_rate是压缩图层时所需的最小压缩率。由于压缩会产生计算开销,因此只有在可以节省足够的带宽时,张量才值得压缩,其中 .如果无法满足指定的压缩率阈值,则直接对张量进行 allreduce,不进行压缩。(num_rows + num_cols) * matrix_approximation_rank * min_compression_rate < num_rows * num_cols

PowerSGD 压缩开始后,每次迭代都会记录压缩统计信息。compression_stats_logging_frequency

  1. orthogonalization_epsilon可以是在正交化步骤中添加到每个归一化矩阵列的非常小的值(例如,1e-8),以防止在任何列全为 0 时出现除零误差。如果已经可以防止这种情况(例如,通过批量归一化),则建议使用 epsilon 为 0 以确保准确性。

  2. batch_tensors_with_same_shape控制是否在批量操作中压缩和解压缩相同形状的 Tensor 以实现更高的并行度。请注意,您还应该增加存储桶大小(即 DDP 构造函数中的 arg)以使更多相同形状的张量出现在同一个存储桶中,但这可能会减少计算和通信之间的重叠,并因堆叠相同形状的张量而增加内存占用。如果压缩/解压缩计算是瓶颈,则设置为 。bucket_cap_mbTrue

警告

如果启用了错误反馈或预热,则 DDP 中允许的最小值为 2。 这是因为在 DDP 中还有另一个内部优化,它在迭代 1 时重建桶, 这可能与重建过程之前记住的任何 Tensor 冲突。start_powerSGD_iter

PowerSGD 钩子

警告

PowerSGD 通常需要与模型的 gradients 启用误差反馈,从而补偿偏置 压缩通信并提高准确性。

警告

PowerSGD 钩子可能与 Apex 自动混合精密包装冲突。 请改用 PyTorch 原生自动混合精度包

torch.distributed.algorithms.ddp_comm_hooks.powerSGD_hook 的powerSGD_hookstatebucket[来源]

此 DDP 通信钩子实现 PowerSGD 梯度压缩 算法。 一旦梯度张量在所有 worker 中聚合,此钩子就会适用 compression 如下所示:

  1. 将输入扁平化的 1D 梯度张量视为每个参数张量的列表,并将所有张量分为两组:

    1.1 应该在 allreduce 之前压缩的 Tensor,因为压缩可以节省足够的带宽。

    1.2 其余的张量将直接全部归约而不压缩,包括所有向量张量(用于偏差)。

  2. 处理未压缩的张量:

    2.1. 为那些未压缩的张量分配连续内存,并将所有未压缩的张量作为一个批次进行 allreduce,不进行压缩;

    2.2. 将单个未压缩的张量从连续内存复制回输入张量。

  3. 处理应由 PowerSGD 压缩压缩的张量:

    3.1. 对于每个张量 M,创建两个低秩张量 P 和 Q 用于分解 M, 使得 M = PQ^T,其中 Q 从标准正态分布初始化并正交化;

    3.2. 计算 Ps 中的每个 P,等于 MQ;

    3.3. Allreduce Ps 作为一个批次;

    3.4. 正交化 Ps 中的每个 P;

    3.5. 以 Qs 为单位计算每个 Q,大约等于 M^TP;

    3.6. All减少 Qs 为一个批次;

    3.7. 计算所有压缩张量中的每个 M,大约等于 PQ^T。

请注意,这个通信钩子在第一次迭代中强制执行 vanilla allreduce。 这不仅使用户能够更好地控制加速和准确性之间的权衡, 但也有助于为未来的通信钩子开发人员抽象出 DDP 内部优化的一些复杂性。state.start_powerSGD_iter

参数
  • statePowerSGDState) – 用于配置压缩率并支持错误反馈、热启动等的状态信息。 要优化压缩配置,主要需要调整 、 和 。matrix_approximation_rankstart_powerSGD_itermin_compression_rate

  • bucketdist.GradBucket) – 存储 1D 扁平化梯度张量的存储桶,该张量对多个每个变量的张量进行批处理。 注意,由于 DDP 通信钩子只支持单进程单设备模式, 此存储桶中仅存储一个 Tensor。

结果

Communication 的 Future 处理程序,用于就地更新梯度。

返回类型

Future[张量]

例::
>>> state = PowerSGDState(process_group=process_group, matrix_approximation_rank=1,
                          start_powerSGD_iter=10, min_compression_rate=0.5)
>>> ddp_model.register_comm_hook(state, powerSGD_hook)
torch.distributed.algorithms.ddp_comm_hooks.powerSGD_hook 的batched_powerSGD_hookstatebucket[来源]

此 DDP 通信钩子实现了简化的 PowerSGD 梯度压缩 算法。 此变体不会逐层压缩渐变, 而是压缩对所有梯度进行批处理的扁平化 Importing Tensor。 因此,它比 快 , 但通常会导致精度低得多,除非 为 1。matrix_approximation_rank

警告

在此处增加不一定会增加准确性, 因为在没有列/行对齐的情况下对每个参数的张量进行批处理会破坏低秩结构。 因此,用户应始终首先考虑, 并且仅当 is 为 1 时可以获得令人满意的准确度时,才考虑此变体。matrix_approximation_rankmatrix_approximation_rank

一旦梯度张量在所有 worker 中聚合,此钩子就会适用 compression 如下所示:

  1. 将输入扁平化的 1D 梯度张量视为具有 0 个填充的方形张量 M;

  2. 创建两个低秩张量 P 和 Q 用于分解 M,使得 M = PQ^T,其中 Q 从标准正态分布初始化并正交化;

  3. 计算 P,等于 MQ;

  4. 全部减少 P;

  5. 正交化 P;

  6. 计算 Q,大约等于 M^TP;

  7. 全部减少 Q;

  8. 计算 M,它大约等于 PQ^T。

  9. 将输入张量截断为原始长度。

请注意,这个通信钩子在第一次迭代中强制执行 vanilla allreduce。 这不仅使用户能够更好地控制加速和准确性之间的权衡, 但也有助于为未来的通信钩子开发人员抽象出 DDP 内部优化的一些复杂性。state.start_powerSGD_iter

参数
  • statePowerSGDState) – 用于配置压缩率并支持错误反馈、热启动等的状态信息。 要优化压缩配置,主要需要调整 和 。matrix_approximation_rankstart_powerSGD_iter

  • bucketdist.GradBucket) – 存储 1D 扁平化梯度张量的存储桶,该张量对多个每个变量的张量进行批处理。 注意,由于 DDP 通信钩子只支持单进程单设备模式, 此存储桶中仅存储一个 Tensor。

结果

Communication 的 Future 处理程序,用于就地更新梯度。

返回类型

Future[张量]

例::
>>> state = PowerSGDState(process_group=process_group, matrix_approximation_rank=1)
>>> ddp_model.register_comm_hook(state, batched_powerSGD_hook)

调试通信钩子

顾名思义,调试通信钩子仅用于调试和性能优化目的。

警告

调试通信挂钩不一定输出正确的结果。

torch.distributed.algorithms.ddp_comm_hooks.debugging_hooks 中。noop_hook_bucket[来源]

此 DDP 通信钩子返回一个包装输入 因此,它是一个不会产生任何通信开销的 noop。

这个钩子应该只用于 allreduce 优化的 headroom 分析, 而不是正常的梯度同步。 例如,如果在注册此 hook 后只能观察到少于 10% 的训练时间加速, 它通常意味着 AllReduce 不是这种情况的性能瓶颈。 此类检测可能特别有用 如果 GPU 跟踪不易检索或跟踪分析复杂 一些因素,例如 AllReduce 和 computation 之间的重叠或跨 Rank 的不同步。

例::
>>> ddp_model.register_comm_hook(None, noop_hook)
返回类型

Future[张量]

通信钩子的检查点

有状态通信钩子可以保存为模型检查点的一部分,以启用训练程序重启。 使钩子可序列化,并且应该被定义。__setstate____getstate__

警告

__getstate__应该从返回的字典中排除不可序列化的属性。

警告

__setstate__应正确初始化不可序列化的属性,这些属性从提供的 .state

has 和 implemented 并可用作参考。__setstate____getstate__

torch.distributed.algorithms.ddp_comm_hooks.powerSGD_hook 的PowerSGDState[来源]
__getstate__[来源]

返回一个将被腌制并保存的 a。 不可序列化且从 返回的状态。Dict[str, Any]process_group

__setstate__[来源]

获取 provided 并检索 . 设置为 default。statePowerSGDStateprocess_group

下面是一个保存和重新加载 PowerSGD 状态和 hook 的简单端到端示例。

import os
import sys
import tempfile
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim

from torch.distributed.algorithms.ddp_comm_hooks import powerSGD_hook as powerSGD

class SimpleModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(24,24)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(24,12)

    def forward(self, x):
        return self.fc2(self.relu(self.fc1(x)))

def setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # initialize the process group
    dist.init_process_group("nccl", rank=rank, world_size=world_size)

def cleanup():
    dist.destroy_process_group()

def run_demo(demo_fn, world_size):
    mp.spawn(
        demo_fn,
        args=(world_size,),
        nprocs=world_size,
        join=True)

def demo_serialization(rank, world_size):
    setup(rank, world_size)

    CHECKPOINT = tempfile.gettempdir() + "/checkpoint.pt"

    model = SimpleModel().to(rank)
    ddp_model = DistributedDataParallel(model, device_ids=[rank])

    powersgd_hook = powerSGD.powerSGD_hook
    powersgd_state = powerSGD.PowerSGDState(process_group=None)

    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
    ddp_model.register_comm_hook(powersgd_state, powersgd_hook)

    state = {
        'state_dict': ddp_model.state_dict(),
        'comm_hook': hook,
        'comm_hook_state': hook_state}

    if rank == 0:
        torch.save(state, CHECKPOINT)

    dist.barrier()
    map_location = {'cuda:%d' % 0: 'cuda:%d' % rank}
    checkpoint = torch.load(CHECKPOINT, map_location=map_location)

    ddp_model.load_state_dict(checkpoint['state_dict'])
    powersgd_hook = checkpoint['comm_hook']
    powersgd_state = checkpoint['comm_hook_state']

    ddp_model.register_comm_hook(powersgd_state, powersgd_hook)

    if rank == 0:
        os.remove(CHECKPOINT)

    cleanup()

if __name__ == "__main__":
    n_gpus = torch.cuda.device_count()
    assert n_gpus >= 2, f"Requires at least 2 GPUs to run, but got {n_gpus}"
    world_size = n_gpus
    run_demo(demo_serialization, world_size)

确认

非常感谢 PowerSGD 论文作者 Thijs Vogels 对 PowerSGD 通信钩子,以及比较实验, 这表明 PowerSGD 通信钩子的性能与 原始论文中的实现。

文档

访问 PyTorch 的全面开发人员文档

查看文档

教程

获取面向初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并解答您的问题

查看资源