DDP 通信钩子¶
DDP 通信钩子是控制通信方式的通用接口 通过覆盖 DistributedDataParallel 中的原版 AllReduce 来实现 Gradients。 提供了一些内置的通信钩子, 用户可以轻松应用这些钩子中的任何一个来优化通信。 此外,钩子接口还可以支持用户定义的通信 更高级用例的策略。
如何使用通信钩子?¶
要使用通信钩子,用户只需要让 DDP 模型注册 训练循环之前的 hook,如下所示。
torch.nn.parallel.DistributedDataParallel.register_comm_hook()
通信钩子对什么进行操作?¶
Communication hook 提供了一种灵活的方法来减少梯度。
所以,它主要在 allreduce 之前对每个副本上的梯度进行操作,
它们被分桶以增加通信和计算之间的重叠。
具体而言,表示要 allreduced 的梯度张量桶。
-
类
torch.distributed.
GradBucket
¶ 此类主要传递一个扁平化的梯度张量 (返回者
) 到 DDP 通信钩子。 此张量可以进一步分解为此存储桶中每个参数的张量列表 (返回者 ) 以应用分层操作。
get_per_parameter_tensors()
-
torch.distributed.GradBucket.
index
(自我:torch._C._distributed_c10d。GradBucket) → int¶ 警告
由于存储桶是在第一次迭代后重建的,因此不应依赖训练开始时的索引。
- 返回
存储几个连续层的梯度的存储桶的索引。 所有渐变都已分桶化。
-
torch.distributed.GradBucket.
buffer
(自我:torch._C._distributed_c10d。GradBucket) → at::Tensor¶ - 返回
扁平化的 1D 缓冲区 可以进一步分解为此存储桶中每个参数的张量列表。
torch.Tensor
-
torch.distributed.GradBucket.
gradients
(自我:torch._C._distributed_c10d。GradBucket) → List[at::Tensor]¶ - 返回
的列表 .列表中的每个张量对应一个梯度。
torch.Tensor
-
torch.distributed.GradBucket.
is_last
(自我:torch._C._distributed_c10d。GradBucket) → bool¶ - 返回
此存储桶是否是迭代中 allreduce 的最后一个存储桶。 这也意味着此存储桶对应于 forward pass 中的前几层。
-
torch.distributed.GradBucket.
set_buffer
(自我:torch._C._distributed_c10d。GradBucket,缓冲区:at::Tensor)→ None¶ 将存储桶中的张量替换为输入张量缓冲区。
-
torch.distributed.GradBucket.
parameters
(自我:torch._C._distributed_c10d。GradBucket) → List[at::Tensor]¶ - 返回
的列表 .列表中的每个张量对应一个模型 参数。
torch.Tensor
默认通信钩子¶
默认通信 hook 是简单的无状态 hook,因此输入状态
in 是进程组或 .
输入是一个对象。
register_comm_hook
None
bucket
-
torch.distributed.algorithms.ddp_comm_hooks.default_hooks.
allreduce_hook
(process_group,桶)[来源]¶ 这个 DDP 通信钩子只是使用 Tensors 进行调用。在所有工作线程中聚合梯度张量后,其回调将获取平均值并返回结果。如果用户注册了这个钩子, DDP 结果应与未注册 hook 的情况相同。 因此,这不会改变 DDP 的行为,用户可以将其用作参考 或修改此钩子以记录有用的信息或任何其他目的,而 不影响 DDP 行为。
allreduce
GradBucket
then
- 例::
>>> ddp_model.register_comm_hook(process_group, allreduce_hook)
-
torch.distributed.algorithms.ddp_comm_hooks.default_hooks.
fp16_compress_hook
(process_group,桶)[来源]¶ 这个 DDP 通信钩子实现了一个简单的梯度压缩 将 Tensor 强制转换为半精度浮点格式的方法 () ,然后将其除以进程组大小。 这一切都减少了那些梯度张量。一次压缩梯度 tensor 的 Tensor 都是 AllReduced 的,则链式回调会将其转换回输入数据类型(例如 )。
GradBucket
torch.float16
float16
decompress
float32
- 例::
>>> ddp_model.register_comm_hook(process_group, fp16_compress_hook)
-
torch.distributed.algorithms.ddp_comm_hooks.default_hooks.
bf16_compress_hook
(process_group,桶)[来源]¶ 警告:此 API 是实验性的,它需要高于 2.9.6 的 NCCL 版本。
这个 DDP 通信钩子实现了一个简单的梯度压缩 将张量转换为半精度 Brain 浮点格式的方法 () ,然后将其除以进程组大小。 这一切都减少了那些梯度张量。一次压缩梯度 tensor 的 Tensor 都是 AllReduced 的,则链式回调会将其转换回输入数据类型(例如 )。
GradBucket
torch.bfloat16
bfloat16
decompress
float32
- 例::
>>> ddp_model.register_comm_hook(process_group, bf16_compress_hook)
此外,还提供了一个通信钩子包装器来支持或
作为包装器,
可以与其他通信钩子结合使用。
-
torch.distributed.algorithms.ddp_comm_hooks.default_hooks.
fp16_compress_wrapper
(钩子)[来源]¶ 此包装器将给定 DDP 通信钩子的输入梯度张量转换为半精度 浮点格式 (),并将给定钩子的结果张量转换回 input 数据类型,例如 .
torch.float16
float32
因此, 等效于 。
fp16_compress_hook
fp16_compress_wrapper(allreduce_hook)
- 例::
>>> state = PowerSGDState(process_group=process_group, matrix_approximation_rank=1, start_powerSGD_iter=10) >>> ddp_model.register_comm_hook(state, fp16_compress_wrapper(powerSGD_hook))
-
torch.distributed.algorithms.ddp_comm_hooks.default_hooks.
bf16_compress_wrapper
(钩子)[来源]¶ 警告:此 API 是实验性的,它需要高于 2.9.6 的 NCCL 版本。
此包装器将给定 DDP 通信钩子的输入梯度张量转换为半精度 Brain 浮点格式 <https://en.wikipedia.org/wiki/Bfloat16_floating-point_format> '_ (''torch.bfloat16'), 并将给定钩子的结果张量转换回输入数据类型,例如 .
float32
因此, 等效于 。
bf16_compress_hook
bf16_compress_wrapper(allreduce_hook)
- 例::
>>> state = PowerSGDState(process_group=process_group, matrix_approximation_rank=1, start_powerSGD_iter=10) >>> ddp_model.register_comm_hook(state, bf16_compress_wrapper(powerSGD_hook))
PowerSGD 通信挂钩¶
PowerSGD(Vogels 等人,NeurIPS 2019) 是一种梯度压缩算法,可以提供非常高的压缩率 速率并加速带宽受限的分布式训练。 该算法需要同时维护一些超参数和内部 州。因此,PowerSGD 通信钩子是一个有状态的钩子, 用户需要提供定义如下的 state 对象。
PowerSGD 状态¶
-
类 (process_group, matrix_approximation_rank=1, start_powerSGD_iter=1000, min_compression_rate=2, use_error_feedback=True, warm_start=True, orthogonalization_epsilon=0, random_seed=0, compression_stats_logging_frequency=10000)[来源]
torch.distributed.algorithms.ddp_comm_hooks.powerSGD_hook.
PowerSGDState
¶ 在训练期间存储算法的超参数和所有梯度的内部状态。 特别是 和 是用户应该调整的主要超参数。 为了提高性能,我们建议将二进制超参数保持为 on。
matrix_approximation_rank
start_powerSGD_iter
use_error_feedback
warm_start
matrix_approximation_rank
控制压缩的低秩张量的大小,这决定了压缩率。等级越低,压缩越强。1.1. 如果太低,完整的模型质量将需要更多的训练步骤才能达到或永远不会达到,并导致准确性下降。
matrix_approximation_rank
1.2. 增加 可能会大大增加压缩的计算成本,并且精度可能不会在超过某个阈值后进一步提高。
matrix_approximation_rank
matrix_approximation_rank
要调整 ,我们建议从 1 开始,然后增加 2 倍(如说明式网格搜索、1、2、4 等),直到达到令人满意的准确度。通常只使用较小的值 1-4。对于一些 NLP 任务(如原始论文的附录 D 所示),该值已增加到 32。
matrix_approximation_rank
start_powerSGD_iter
将 PowerSGD 压缩推迟到步骤 ,而 vanilla allreduce 在步骤 之前运行 。这种 vanilla allreduce + PowerSGD 的混合方案可以有效提高精度,即使是使用相对较小的。这是因为,训练阶段的开始通常对不准确的梯度非常敏感,过早压缩梯度可能会使训练很快走上次优轨迹,从而对准确率造成不可挽回的影响。start_powerSGD_iter
start_powerSGD_iter
matrix_approximation_rank
要调整,我们建议从总训练步骤的 10% 开始,然后增加它,直到达到令人满意的准确性。如果训练中有热身阶段,通常应不少于热身步骤的数量。
start_powerSGD_iter
start_powerSGD_iter
min_compression_rate
是压缩图层时所需的最小压缩率。由于压缩会产生计算开销,因此只有在可以节省足够的带宽时,张量才值得压缩,其中 .如果无法满足指定的压缩率阈值,则直接对张量进行 allreduce,不进行压缩。(num_rows + num_cols) * matrix_approximation_rank * min_compression_rate < num_rows * num_cols
PowerSGD 压缩开始后,每次迭代都会记录压缩统计信息。
compression_stats_logging_frequency
orthogonalization_epsilon
可以是在正交化步骤中添加到每个归一化矩阵列的非常小的值(例如,1e-8),以防止在任何列全为 0 时出现除零误差。如果已经可以防止这种情况(例如,通过批量归一化),则建议使用 epsilon 为 0 以确保准确性。
警告
如果启用了错误反馈或预热,则 DDP 中允许的最小值为 2。 这是因为在 DDP 中还有另一个内部优化,它在迭代 1 时重建桶, 这可能与重建过程之前记住的任何 Tensor 冲突。
start_powerSGD_iter
PowerSGD 钩子¶
警告
PowerSGD 通常需要与模型的 gradients 启用误差反馈,从而补偿偏置 压缩通信并提高准确性。
警告
PowerSGD 钩子可能与 Apex 自动混合精密包装冲突。 请改用 PyTorch 原生自动混合精度包。
-
torch.distributed.algorithms.ddp_comm_hooks.powerSGD_hook.
powerSGD_hook
(state, bucket)[来源]¶ 此 DDP 通信钩子实现 PowerSGD 梯度压缩 算法。 一旦梯度张量在所有 worker 中聚合,此钩子就会适用 compression 如下所示:
将输入扁平化的 1D 梯度张量视为每个参数张量的列表,并将所有张量分为两组:
1.1 应该在 allreduce 之前压缩的 Tensor,因为压缩可以节省足够的带宽。
1.2 其余的张量将直接全部归约而不压缩,包括所有向量张量(用于偏差)。
处理未压缩的张量:
2.1. 为那些未压缩的张量分配连续内存,并将所有未压缩的张量作为一个批次进行 allreduce,不进行压缩;
2.2. 将单个未压缩的张量从连续内存复制回输入张量。
处理应由 PowerSGD 压缩压缩的张量:
3.1. 对于每个张量 M,创建两个低秩张量 P 和 Q 用于分解 M, 使得 M = PQ^T,其中 Q 从标准正态分布初始化并正交化;
3.2. 计算 Ps 中的每个 P,等于 MQ;
3.3. Allreduce Ps 作为一个批次;
3.4. 正交化 Ps 中的每个 P;
3.5. 以 Qs 为单位计算每个 Q,大约等于 M^TP;
3.6. All减少 Qs 为一个批次;
3.7. 计算所有压缩张量中的每个 M,大约等于 PQ^T。
请注意,这个通信钩子在第一次迭代中强制执行 vanilla allreduce。 这不仅使用户能够更好地控制加速和准确性之间的权衡, 但也有助于为未来的通信钩子开发人员抽象出 DDP 内部优化的一些复杂性。
state.start_powerSGD_iter
- 参数
state (PowerSGDState) – 用于配置压缩率并支持错误反馈、热启动等的状态信息。 要优化压缩配置,主要需要调整 、 和 。
matrix_approximation_rank
start_powerSGD_iter
min_compression_rate
bucket (dist.GradBucket) – 存储 1D 扁平化梯度张量的存储桶,该张量对多个每个变量的张量进行批处理。 注意,由于 DDP 通信钩子只支持单进程单设备模式, 此存储桶中仅存储一个 Tensor。
- 返回
Communication 的 Future 处理程序,用于就地更新梯度。
- 例::
>>> state = PowerSGDState(process_group=process_group, matrix_approximation_rank=1, start_powerSGD_iter=10, min_compression_rate=0.5) >>> ddp_model.register_comm_hook(state, powerSGD_hook)
-
torch.distributed.algorithms.ddp_comm_hooks.powerSGD_hook.
batched_powerSGD_hook
(state, bucket)[来源]¶ 此 DDP 通信钩子实现了简化的 PowerSGD 梯度压缩 算法。 此变体不会逐层压缩渐变, 而是压缩对所有梯度进行批处理的扁平化 Importing Tensor。 因此,它比
快 , 但通常会导致精度低得多,除非 为 1。
matrix_approximation_rank
警告
在此处增加不一定会增加准确性, 因为在没有列/行对齐的情况下对每个参数的张量进行批处理会破坏低秩结构。 因此,用户应始终首先考虑
, 并且仅当 is 为 1 时可以获得令人满意的准确度时,才考虑此变体。
matrix_approximation_rank
matrix_approximation_rank
一旦梯度张量在所有 worker 中聚合,此钩子就会适用 compression 如下所示:
将输入扁平化的 1D 梯度张量视为具有 0 个填充的方形张量 M;
创建两个低秩张量 P 和 Q 用于分解 M,使得 M = PQ^T,其中 Q 从标准正态分布初始化并正交化;
计算 P,等于 MQ;
全部减少 P;
正交化 P;
计算 Q,大约等于 M^TP;
全部减少 Q;
计算 M,它大约等于 PQ^T。
将输入张量截断为原始长度。
请注意,这个通信钩子在第一次迭代中强制执行 vanilla allreduce。 这不仅使用户能够更好地控制加速和准确性之间的权衡, 但也有助于为未来的通信钩子开发人员抽象出 DDP 内部优化的一些复杂性。
state.start_powerSGD_iter
- 参数
state (PowerSGDState) – 用于配置压缩率并支持错误反馈、热启动等的状态信息。 要优化压缩配置,主要需要调整 和 。
matrix_approximation_rank
start_powerSGD_iter
bucket (dist.GradBucket) – 存储 1D 扁平化梯度张量的存储桶,该张量对多个每个变量的张量进行批处理。 注意,由于 DDP 通信钩子只支持单进程单设备模式, 此存储桶中仅存储一个 Tensor。
- 返回
Communication 的 Future 处理程序,用于就地更新梯度。
- 例::
>>> state = PowerSGDState(process_group=process_group, matrix_approximation_rank=1) >>> ddp_model.register_comm_hook(state, batched_powerSGD_hook)
调试通信钩子¶
顾名思义,调试通信钩子仅用于调试和性能优化目的。
警告
调试通信挂钩不一定输出正确的结果。
-
torch.distributed.algorithms.ddp_comm_hooks.debugging_hooks.
noop_hook
(_, 桶)[来源]¶ 此 DDP 通信钩子返回一个包装输入 因此,它是一个不会产生任何通信开销的 noop。
这个钩子应该只用于 allreduce 优化的 headroom 分析, 而不是正常的梯度同步。 例如,如果在注册此 hook 后只能观察到少于 10% 的训练时间加速, 它通常意味着 AllReduce 不是这种情况的性能瓶颈。 此类检测可能特别有用 如果 GPU 跟踪不易检索或跟踪分析复杂 一些因素,例如 AllReduce 和 computation 之间的重叠或跨 Rank 的不同步。
- 例::
>>> ddp_model.register_comm_hook(None, noop_hook)