DDP 通信钩子¶
DDP 通信钩子是控制通信方式的通用接口 通过覆盖 DistributedDataParallel 中的原版 AllReduce 来实现 Gradients。 提供了一些内置的通信钩子, 用户可以轻松应用这些钩子中的任何一个来优化通信。 此外,钩子接口还可以支持用户定义的通信 更高级用例的策略。
如何使用通信钩子?¶
要使用通信钩子,用户只需要让 DDP 模型注册 训练循环之前的 hook,如下所示。
torch.nn.parallel.DistributedDataParallel.register_comm_hook()
通信钩子对什么进行操作?¶
通信钩子提供了一种灵活的方法来减少梯度。
所以,它主要在 allreduce 之前对每个副本上的梯度进行操作,
它们被分桶以增加通信和计算之间的重叠。
具体而言,表示要 allreduced 的梯度张量桶。
- 类 torch.distributed 中。GradBucket (英语)¶
此类主要传递一个扁平化的梯度张量 (返回者
) 到 DDP 通信钩子。 此张量可以进一步分解为此存储桶中每个参数的张量列表 (返回者 ) 以应用分层操作。
get_per_parameter_tensors()
- torch.distributed.GradBucket 的 GradBucket 中。index(self: torch._C ._distributed_c10d.GradBucket) int ¶
警告
由于存储桶是在第一次迭代后重建的,因此不应依赖训练开始时的索引。
- 返回
存储几个连续层的梯度的存储桶的索引。 所有渐变都已分桶化。
- torch.distributed.GradBucket 的 GradBucket 中。buffer(self: torch._C._distributed_c10d.GradBucket) torch 中。张肌 ¶
- 返回
扁平化的 1D 缓冲区 可以进一步分解为此存储桶中每个参数的张量列表。
torch.Tensor
- torch.distributed.GradBucket 的 GradBucket 中。gradients(self: torch._C._distributed_c10d.GradBucket) list[torch.张量] ¶
- 返回
的列表 .列表中的每个张量对应一个梯度。
torch.Tensor
- torch.distributed.GradBucket 的 GradBucket 中。is_last(自我: torch._C._distributed_c10d.GradBucket) bool ¶
- 返回
此存储桶是否是迭代中 allreduce 的最后一个存储桶。 这也意味着此存储桶对应于 forward pass 中的前几层。
- torch.distributed.GradBucket 的 GradBucket 中。set_buffer(自我: torch._C._distributed_c10d.GradBucket,缓冲区:torch。Tensor) None ¶
将存储桶中的张量替换为输入张量缓冲区。
- torch.distributed.GradBucket 的 GradBucket 中。parameters(self: torch._C._distributed_c10d.GradBucket) list[torch.张量] ¶
- 返回
的列表 .列表中的每个张量对应一个模型 参数。
torch.Tensor
默认通信钩子¶
默认通信 hook 是简单的无状态 hook,因此输入状态
in 是进程组或 .
输入是一个对象。
register_comm_hook
None
bucket
- torch.distributed.algorithms.ddp_comm_hooks.default_hooks 中。allreduce_hook(process_group, bucket)[来源]¶
使用 Tensors 调用 。
allreduce
GradBucket
在所有工作线程中聚合梯度张量后,其回调将获取平均值并返回结果。
then
如果用户注册了这个 DDP 通信钩子, DDP 结果应与未注册 hook 的情况相同。 因此,这不会改变 DDP 的行为,用户可以将其用作参考 或修改此钩子以记录有用的信息或任何其他目的,而 不影响 DDP 行为。
- 例::
>>> ddp_model.register_comm_hook(process_group, allreduce_hook)
- torch.distributed.algorithms.ddp_comm_hooks.default_hooks 中。fp16_compress_hook(process_group, bucket)[来源]¶
按强制转换进行压缩 除以进程组大小。
GradBucket
torch.float16
这个 DDP 通信钩子实现了一个简单的梯度压缩 将 Tensor 强制转换为半精度浮点格式的方法 () ,然后将其除以进程组大小。 这一切都减少了那些梯度张量。一次压缩梯度 tensor 的 Tensor 都是 AllReduced 的,则链式回调会将其转换回输入数据类型(例如 )。
GradBucket
torch.float16
float16
decompress
float32
- 例::
>>> ddp_model.register_comm_hook(process_group, fp16_compress_hook)
- torch.distributed.algorithms.ddp_comm_hooks.default_hooks 中。bf16_compress_hook(process_group, bucket)[来源]¶
警告:此 API 是实验性的,它需要高于 2.9.6 的 NCCL 版本。
这个 DDP 通信钩子实现了一个简单的梯度压缩 将张量转换为半精度 Brain 浮点格式的方法 () ,然后将其除以进程组大小。 这一切都减少了那些梯度张量。一次压缩梯度 tensor 的 Tensor 都是 AllReduced 的,则链式回调会将其转换回输入数据类型(例如 )。
GradBucket
torch.bfloat16
bfloat16
decompress
float32
- 例::
>>> ddp_model.register_comm_hook(process_group, bf16_compress_hook)
此外,还提供了一个通信钩子包装器来支持或
作为包装器,
可以与其他通信钩子结合使用。
- torch.distributed.algorithms.ddp_comm_hooks.default_hooks 中。fp16_compress_wrapper(钩子)[来源]¶
将输入张量强制转换为 ,将钩子的结果强制转换回输入 dtype。
torch.float16
此包装器将给定 DDP 通信钩子的输入梯度张量转换为半精度 浮点格式 (),并将给定钩子的结果张量转换回 input 数据类型,例如 . 因此, 等效于 。
torch.float16
float32
fp16_compress_hook
fp16_compress_wrapper(allreduce_hook)
- 例::
>>> state = PowerSGDState(process_group=process_group, matrix_approximation_rank=1, start_powerSGD_iter=10) >>> ddp_model.register_comm_hook(state, fp16_compress_wrapper(powerSGD_hook))
- 返回类型
Callable[[Any, GradBucket], Future[Tensor]]
- torch.distributed.algorithms.ddp_comm_hooks.default_hooks 中。bf16_compress_wrapper(钩子)[来源]¶
警告:此 API 是实验性的,它需要高于 2.9.6 的 NCCL 版本。
此包装器将给定 DDP 通信钩子的输入梯度张量转换为半精度 Brain 浮点格式 <https://en.wikipedia.org/wiki/Bfloat16_floating-point_format> '_ (''torch.bfloat16'), 并将给定钩子的结果张量转换回输入数据类型,例如 .
float32
因此, 等效于 。
bf16_compress_hook
bf16_compress_wrapper(allreduce_hook)
- 例::
>>> state = PowerSGDState(process_group=process_group, matrix_approximation_rank=1, start_powerSGD_iter=10) >>> ddp_model.register_comm_hook(state, bf16_compress_wrapper(powerSGD_hook))
- 返回类型
Callable[[Any, GradBucket], Future[Tensor]]
PowerSGD 通信挂钩¶
PowerSGD(Vogels 等人,NeurIPS 2019) 是一种梯度压缩算法,可以提供非常高的压缩率 速率并加速带宽受限的分布式训练。 该算法需要同时维护一些超参数和内部 州。因此,PowerSGD 通信钩子是一个有状态的钩子, 用户需要提供定义如下的 state 对象。
PowerSGD 状态¶
- 类 torch.distributed.algorithms.ddp_comm_hooks.powerSGD_hook 的PowerSGDState(process_group, matrix_approximation_rank=1, start_powerSGD_iter=1000, min_compression_rate=2, use_error_feedback=真, warm_start=真, orthogonalization_epsilon=0, random_seed=0, compression_stats_logging_frequency=10000, batch_tensors_with_same_shape=False)[来源]¶
在训练期间存储算法的超参数和所有梯度的内部状态。
特别是 和 是用户应该调整的主要超参数。 为了提高性能,我们建议将二进制超参数保持为 on。
matrix_approximation_rank
start_powerSGD_iter
use_error_feedback
warm_start
matrix_approximation_rank
控制压缩的低秩张量的大小,这决定了压缩率。等级越低,压缩越强。1.1. 如果太低,完整的模型质量将需要更多的训练步骤才能达到或永远不会达到,并导致准确性下降。
matrix_approximation_rank
1.2. 增加 of 会大大增加压缩的计算成本,并且精度可能不会超过某个阈值进一步提高。
matrix_approximation_rank
matrix_approximation_rank
要调整 ,我们建议从 1 开始,然后增加 2 倍(如指数网格搜索、1、2、4 等),直到达到令人满意的精度。通常只使用较小的值 1-4。对于一些 NLP 任务(如原始论文的附录 D 所示),该值已增加到 32。
matrix_approximation_rank
start_powerSGD_iter
将 PowerSGD 压缩推迟到步骤 ,而 vanilla allreduce 在步骤 之前运行 。这种 vanilla allreduce + PowerSGD 的混合方案可以有效提高精度,即使是使用相对较小的。这是因为,训练阶段的开始通常对不准确的梯度非常敏感,过早压缩梯度可能会使训练很快走上次优轨迹,从而对准确率造成不可挽回的影响。start_powerSGD_iter
start_powerSGD_iter
matrix_approximation_rank
要调整,我们建议从总训练步骤的 10% 开始,然后增加它,直到达到令人满意的准确性。如果训练中有热身阶段,通常应不少于热身步骤的数量。
start_powerSGD_iter
start_powerSGD_iter
min_compression_rate
是压缩图层时所需的最小压缩率。由于压缩会产生计算开销,因此只有在可以节省足够的带宽时,张量才值得压缩,其中 .如果无法满足指定的压缩率阈值,则直接对张量进行 allreduce,不进行压缩。(num_rows + num_cols) * matrix_approximation_rank * min_compression_rate < num_rows * num_cols
PowerSGD 压缩开始后,每次迭代都会记录压缩统计信息。
compression_stats_logging_frequency
orthogonalization_epsilon
可以是在正交化步骤中添加到每个归一化矩阵列的非常小的值(例如,1e-8),以防止在任何列全为 0 时出现除零误差。如果已经可以防止这种情况(例如,通过批量归一化),则建议使用 epsilon 为 0 以确保准确性。batch_tensors_with_same_shape
控制是否在批量操作中压缩和解压缩相同形状的 Tensor 以实现更高的并行度。请注意,您还应该增加存储桶大小(即 DDP 构造函数中的 arg)以使更多相同形状的张量出现在同一个存储桶中,但这可能会减少计算和通信之间的重叠,并因堆叠相同形状的张量而增加内存占用。如果压缩/解压缩计算是瓶颈,则设置为 。bucket_cap_mb
True
警告
如果启用了错误反馈或预热,则 DDP 中允许的最小值为 2。 这是因为在 DDP 中还有另一个内部优化,它在迭代 1 时重建桶, 这可能与重建过程之前记住的任何 Tensor 冲突。
start_powerSGD_iter
PowerSGD 钩子¶
警告
PowerSGD 通常需要与模型的 gradients 启用误差反馈,从而补偿偏置 压缩通信并提高准确性。
警告
PowerSGD 钩子可能与 Apex 自动混合精密包装冲突。 请改用 PyTorch 原生自动混合精度包。
- torch.distributed.algorithms.ddp_comm_hooks.powerSGD_hook 的powerSGD_hook(state, bucket)[来源]¶
实施 PowerSGD 算法。
此 DDP 通信钩子实现 PowerSGD 梯度压缩 算法。 一旦梯度张量在所有 worker 中聚合,此钩子就会适用 compression 如下所示:
将输入扁平化的 1D 梯度张量视为每个参数张量的列表,并将所有张量分为两组:
1.1 应该在 allreduce 之前压缩的 Tensor,因为压缩可以节省足够的带宽。
1.2 其余的张量将直接全部归约而不压缩,包括所有向量张量(用于偏差)。
处理未压缩的张量:
2.1. 为那些未压缩的张量分配连续内存,并将所有未压缩的张量作为一个批次进行 allreduce,不进行压缩;
2.2. 将单个未压缩的张量从连续内存复制回输入张量。
处理应由 PowerSGD 压缩压缩的张量:
3.1. 对于每个张量 M,创建两个低秩张量 P 和 Q 用于分解 M, 使得 M = PQ^T,其中 Q 从标准正态分布初始化并正交化;
3.2. 计算 Ps 中的每个 P,等于 MQ;
3.3. Allreduce Ps 作为一个批次;
3.4. 正交化 Ps 中的每个 P;
3.5. 以 Qs 为单位计算每个 Q,大约等于 M^TP;
3.6. All减少 Qs 为一个批次;
3.7. 计算所有压缩张量中的每个 M,大约等于 PQ^T。
请注意,这个通信钩子在第一次迭代中强制执行 vanilla allreduce。 这不仅使用户能够更好地控制加速和准确性之间的权衡, 但也有助于为未来的通信钩子开发人员抽象出 DDP 内部优化的一些复杂性。
state.start_powerSGD_iter
- 参数
state (PowerSGDState) – 用于配置压缩率并支持错误反馈、热启动等的状态信息。 要优化压缩配置,主要需要调整 、 和 。
matrix_approximation_rank
start_powerSGD_iter
min_compression_rate
bucket (dist.GradBucket) – 存储 1D 扁平化梯度张量的存储桶,该张量对多个每个变量的张量进行批处理。 注意,由于 DDP 通信钩子只支持单进程单设备模式, 此存储桶中仅存储一个 Tensor。
- 返回
Communication 的 Future 处理程序,用于就地更新梯度。
- 返回类型
- 例::
>>> state = PowerSGDState(process_group=process_group, matrix_approximation_rank=1, start_powerSGD_iter=10, min_compression_rate=0.5) >>> ddp_model.register_comm_hook(state, powerSGD_hook)
- torch.distributed.algorithms.ddp_comm_hooks.powerSGD_hook 的batched_powerSGD_hook(state, bucket)[来源]¶
实施简化的 PowerSGD 算法。
此 DDP 通信钩子实现了简化的 PowerSGD 梯度压缩 算法。 此变体不会逐层压缩渐变, 而是压缩对所有梯度进行批处理的扁平化 Importing Tensor。 因此,它比
快 , 但通常会导致精度低得多,除非 为 1。
matrix_approximation_rank
警告
在此处增加不一定会增加准确性, 因为在没有列/行对齐的情况下对每个参数的张量进行批处理会破坏低秩结构。 因此,用户应始终首先考虑
, 并且仅当 is 为 1 时可以获得令人满意的准确度时,才考虑此变体。
matrix_approximation_rank
matrix_approximation_rank
一旦梯度张量在所有 worker 中聚合,此钩子就会适用 compression 如下所示:
将输入扁平化的 1D 梯度张量视为具有 0 个填充的方形张量 M;
创建两个低秩张量 P 和 Q 用于分解 M,使得 M = PQ^T,其中 Q 从标准正态分布初始化并正交化;
计算 P,等于 MQ;
全部减少 P;
正交化 P;
计算 Q,大约等于 M^TP;
全部减少 Q;
计算 M,它大约等于 PQ^T。
将输入张量截断为原始长度。
请注意,这个通信钩子在第一次迭代中强制执行 vanilla allreduce。 这不仅使用户能够更好地控制加速和准确性之间的权衡, 但也有助于为未来的通信钩子开发人员抽象出 DDP 内部优化的一些复杂性。
state.start_powerSGD_iter
- 参数
state (PowerSGDState) – 用于配置压缩率并支持错误反馈、热启动等的状态信息。 要优化压缩配置,主要需要调整 和 。
matrix_approximation_rank
start_powerSGD_iter
bucket (dist.GradBucket) – 存储 1D 扁平化梯度张量的存储桶,该张量对多个每个变量的张量进行批处理。 注意,由于 DDP 通信钩子只支持单进程单设备模式, 此存储桶中仅存储一个 Tensor。
- 返回
Communication 的 Future 处理程序,用于就地更新梯度。
- 返回类型
- 例::
>>> state = PowerSGDState(process_group=process_group, matrix_approximation_rank=1) >>> ddp_model.register_comm_hook(state, batched_powerSGD_hook)
调试通信钩子¶
顾名思义,调试通信钩子仅用于调试和性能优化目的。
警告
调试通信挂钩不一定输出正确的结果。
- torch.distributed.algorithms.ddp_comm_hooks.debugging_hooks 中。noop_hook(_, bucket)[来源]¶
返回一个包装 input 的 future,因此它是一个不会产生任何通信开销的 no-op。
这个钩子应该只用于 allreduce 优化的 headroom 分析, 而不是正常的梯度同步。 例如,如果在注册此 hook 后只能观察到少于 10% 的训练时间加速, 它通常意味着 AllReduce 不是这种情况的性能瓶颈。 此类检测可能特别有用 如果 GPU 跟踪不易检索或跟踪分析复杂 一些因素,例如 AllReduce 和 computation 之间的重叠或跨 Rank 的不同步。
- 例::
>>> ddp_model.register_comm_hook(None, noop_hook)
通信钩子的检查点¶
有状态通信钩子可以保存为模型检查点的一部分,以启用训练程序重启。
使钩子可序列化,并且应该被定义。__setstate__
__getstate__
警告
__getstate__
应该从返回的字典中排除不可序列化的属性。
警告
__setstate__
应正确初始化不可序列化的属性,这些属性从提供的 .state
has 和 implemented 并可用作参考。
__setstate__
__getstate__
- 类 torch.distributed.algorithms.ddp_comm_hooks.powerSGD_hook 的PowerSGDState[来源]
下面是一个保存和重新加载 PowerSGD 状态和 hook 的简单端到端示例。
import os
import sys
import tempfile
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel
from torch.distributed.algorithms.ddp_comm_hooks import powerSGD_hook as powerSGD
class SimpleModel(nn.Module):
def __init__(self):
super().__init__()
self.fc1 = nn.Linear(24,24)
self.relu = nn.ReLU()
self.fc2 = nn.Linear(24,12)
def forward(self, x):
return self.fc2(self.relu(self.fc1(x)))
def setup(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
# initialize the process group
dist.init_process_group("nccl", rank=rank, world_size=world_size)
def cleanup():
dist.destroy_process_group()
def run_demo(demo_fn, world_size):
mp.spawn(
demo_fn,
args=(world_size,),
nprocs=world_size,
join=True)
def demo_serialization(rank, world_size):
setup(rank, world_size)
CHECKPOINT = tempfile.gettempdir() + "/checkpoint.pt"
model = SimpleModel().to(rank)
ddp_model = DistributedDataParallel(model, device_ids=[rank])
powersgd_hook = powerSGD.powerSGD_hook
powersgd_state = powerSGD.PowerSGDState(process_group=None)
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
ddp_model.register_comm_hook(powersgd_state, powersgd_hook)
state = {
'state_dict': ddp_model.state_dict(),
'comm_hook': powersgd_hook,
'comm_hook_state': powersgd_state}
if rank == 0:
torch.save(state, CHECKPOINT)
dist.barrier()
map_location = {'cuda:%d' % 0: 'cuda:%d' % rank}
checkpoint = torch.load(CHECKPOINT, map_location=map_location)
new_ddp_model = DistributedDataParallel(SimpleModel().to(rank), device_ids=[rank])
new_ddp_model.load_state_dict(checkpoint['state_dict'])
powersgd_hook = checkpoint['comm_hook']
powersgd_state = checkpoint['comm_hook_state']
new_ddp_model.register_comm_hook(powersgd_state, powersgd_hook)
if rank == 0:
os.remove(CHECKPOINT)
cleanup()
if __name__ == "__main__":
n_gpus = torch.cuda.device_count()
assert n_gpus >= 2, f"Requires at least 2 GPUs to run, but got {n_gpus}"
world_size = n_gpus
run_demo(demo_serialization, world_size)