分布式数据并行通信钩子¶
DDP通信钩子是一个通用接口,通过覆盖DistributedDataParallel中的普通allreduce,用于控制如何在工作器之间进行梯度通信。 提供了一些内置的通信钩子, 用户可以轻松应用这些钩子来优化通信。 此外,该钩子接口还可以支持用户自定义的通信策略,以满足更高级的使用场景。
如何使用通信钩子?¶
要使用通信钩子,用户只需在训练循环之前让DDP模型注册该钩子,如下所示。
torch.nn.parallel.DistributedDataParallel.register_comm_hook()
通信钩子操作的对象是什么?¶
通信钩子提供了一种灵活的方式来执行梯度的allreduce操作。因此,它主要在每个副本上的梯度进行allreduce之前对其进行操作,这些梯度会被分组以增加通信和计算之间的重叠。特别地,torch.distributed.GradBucket 表示一组待allreduce的梯度张量。
- class torch.distributed.GradBucket¶
此类主要将一个展平的梯度张量 (由
buffer()返回) 传递给 DDP 通信钩子。 此张量可以进一步分解为该存储桶内的每个参数张量列表 (由get_per_parameter_tensors()返回), 以应用逐层操作。
- torch.distributed.GradBucket.index(self: torch._C._distributed_c10d.GradBucket) int¶
警告
由于在第一次迭代后会重建存储桶,因此不应依赖训练开始时的索引。
- Returns:
一个存储几个连续层梯度的桶的索引。 所有梯度都进行了分桶处理。
- torch.distributed.GradBucket.buffer(self: torch._C._distributed_c10d.GradBucket) at::Tensor¶
- Returns:
一个展平的1D
torch.Tensor缓冲区, 它可以进一步分解为此存储桶中每个参数张量的列表。
- torch.distributed.GradBucket.gradients(self: torch._C._distributed_c10d.GradBucket) List[at::Tensor]¶
- Returns:
一个
torch.Tensor的列表。列表中的每个张量对应一个梯度。
- torch.distributed.GradBucket.is_last(self: torch._C._distributed_c10d.GradBucket) bool¶
- Returns:
该存储桶是否是迭代中所有reduce的最后一个存储桶。 这也意味着该存储桶对应于前向传递中的前几层。
- torch.distributed.GradBucket.set_buffer(self: torch._C._distributed_c10d.GradBucket, buffer: at::Tensor) None¶
将存储桶中的张量替换为输入张量缓冲区。
- torch.distributed.GradBucket.parameters(self: torch._C._distributed_c10d.GradBucket) List[at::Tensor]¶
- Returns:
一个
torch.Tensor的列表。列表中的每个张量对应于一个模型参数。
默认通信钩子¶
默认的通信钩子是简单的无状态钩子,因此register_comm_hook中的输入状态
是一个进程组或None。输入bucket是一个torch.distributed.GradBucket对象。
- torch.distributed.algorithms.ddp_comm_hooks.default_hooks.allreduce_hook(process_group, bucket)[source]¶
此DDP通信钩子仅使用
allreduce调用GradBucket张量。一旦梯度张量在所有工作器之间聚合,其then回调将计算平均值并返回结果。如果用户注册了此钩子,则预期DDP的结果与未注册钩子的情况相同。 因此,这不会改变DDP的行为,用户可以将其作为参考或修改此钩子以记录有用的信息或其他目的, 同时不影响DDP行为。- Example::
>>> ddp_model.register_comm_hook(process_group, allreduce_hook)
- torch.distributed.algorithms.ddp_comm_hooks.default_hooks.fp16_compress_hook(process_group, bucket)[source]¶
此DDP通信钩子实现了一种简单的梯度压缩方法,它将
GradBucket张量转换为半精度浮点格式(torch.float16),然后将其除以进程组大小。 它会对这些float16梯度张量进行allreduce操作。一旦压缩后的梯度张量被allreduce后,链式回调decompress会将其转换回输入数据类型(例如float32)。- Example::
>>> ddp_model.register_comm_hook(process_group, fp16_compress_hook)
- torch.distributed.algorithms.ddp_comm_hooks.default_hooks.bf16_compress_hook(process_group, bucket)[source]¶
警告:此 API 为实验性功能,需要 NCCL 版本高于 2.9.6。
此DDP通信钩子实现了一种简单的梯度压缩方法,将
GradBucket张量转换为半精度 Brain浮点格式 (torch.bfloat16) 然后除以进程组大小。 它对这些bfloat16梯度张量进行allreduce操作。一旦压缩的梯度张量被allreduce,链式回调decompress将其转换回输入数据类型(如float32)。- Example::
>>> ddp_model.register_comm_hook(process_group, bf16_compress_hook)
此外,还提供了一个通信钩子包装器,以支持 fp16_compress_hook() 或 bf16_compress_hook() 作为包装器,
可以与其他通信钩子结合使用。
- torch.distributed.algorithms.ddp_comm_hooks.default_hooks.fp16_compress_wrapper(hook)[source]¶
此包装器将给定的DDP通信钩子的输入梯度张量转换为半精度浮点格式(
torch.float16),并将给定钩子生成的张量转换回输入数据类型,例如float32。因此,
fp16_compress_hook等价于fp16_compress_wrapper(allreduce_hook)。- Example::
>>> state = PowerSGDState(process_group=process_group, matrix_approximation_rank=1, start_powerSGD_iter=10) >>> ddp_model.register_comm_hook(state, fp16_compress_wrapper(powerSGD_hook))
- Return type:
可调用[[任意, GradBucket], Future[张量]]
- torch.distributed.algorithms.ddp_comm_hooks.default_hooks.bf16_compress_wrapper(hook)[source]¶
警告:此 API 为实验性功能,需要 NCCL 版本高于 2.9.6。
此包装器将给定的DDP通信钩子的输入梯度张量转换为半精度 Brain floating point format <https://en.wikipedia.org/wiki/Bfloat16_floating-point_format> `_ (``torch.bfloat16`), 并将给定钩子生成的张量转换回输入数据类型,例如
float32。因此,
bf16_compress_hook等价于bf16_compress_wrapper(allreduce_hook)。- Example::
>>> state = PowerSGDState(process_group=process_group, matrix_approximation_rank=1, start_powerSGD_iter=10) >>> ddp_model.register_comm_hook(state, bf16_compress_wrapper(powerSGD_hook))
- Return type:
可调用[[任意, GradBucket], Future[张量]]
PowerSGD 通信钩子¶
PowerSGD (Vogels 等人,NeurIPS 2019) 是一种梯度压缩算法,它可以提供非常高的压缩率并加速带宽受限的分布式训练。 此算法需要维护一些超参数和内部状态。因此,PowerSGD 通信钩子是一个有状态的钩子, 用户需要提供如下定义的状态对象。
PowerSGD 状态¶
- class torch.distributed.algorithms.ddp_comm_hooks.powerSGD_hook.PowerSGDState(process_group, matrix_approximation_rank=1, start_powerSGD_iter=1000, min_compression_rate=2, use_error_feedback=True, warm_start=True, orthogonalization_epsilon=0, random_seed=0, compression_stats_logging_frequency=10000, batch_tensors_with_same_shape=False)[source]¶
同时存储算法的超参数和训练过程中所有梯度的内部状态。 特别地,
matrix_approximation_rank和start_powerSGD_iter是用户应调整的主要超参数。 为了性能,我们建议将二进制超参数use_error_feedback和warm_start保持开启状态。matrix_approximation_rank控制压缩后的低秩张量的大小,这决定了压缩率。秩越低,压缩越强。1.1. If
matrix_approximation_rankis too low, the full model quality will need more training steps to reach or will never reach and yield loss in accuracy.1.2. The increase of
matrix_approximation_rankcan substantially increase the computation costs of the compression, and the accuracy may not be futher improved beyond a certainmatrix_approximation_rankthreshold.
要调整
matrix_approximation_rank,我们建议从 1 开始,每次乘以 2 增加(类似于指数网格搜索,如 1, 2, 4, …),直到达到满意的准确率。通常只使用较小的值 1-4。对于某些 NLP 任务(如原始论文附录 D 所示),该值已增加到 32。start_powerSGD_iter将 PowerSGD 压缩推迟到步骤start_powerSGD_iter,而普通的 allreduce 在步骤start_powerSGD_iter之前运行。这种 普通 allreduce + PowerSGD 的混合方案可以有效提高准确性,即使使用相对较小的matrix_approximation_rank。这是因为训练阶段的开始通常对不准确的梯度非常敏感,过早压缩梯度可能会使训练迅速进入次优轨迹,从而对准确性产生不可恢复的影响。
要调整
start_powerSGD_iter,我们建议从总训练步骤的10%开始,并逐步增加直到达到令人满意的准确率。如果训练中有一个预热阶段,start_powerSGD_iter通常不应少于预热步骤的数量。min_compression_rate是在压缩某一层时所需的最小压缩率。由于压缩会带来计算开销,只有当带宽节省足够时,张量才值得被压缩,其中(num_rows + num_cols) * matrix_approximation_rank * min_compression_rate < num_rows * num_cols。如果指定的压缩率阈值无法满足,则该张量将直接进行 allreduce 操作而不会被压缩。
压缩统计信息会在 PowerSGD 压缩开始后,每隔
compression_stats_logging_frequency次迭代记录一次。orthogonalization_epsilon可以是一个非常小的值(例如,1e-8),在正交化步骤中添加到每个归一化矩阵列中,以防止单元全为0的列导致除零错误。如果这种情况已经可以避免(例如通过批量归一化),为了准确性建议将 epsilon 设置为 0。batch_tensors_with_same_shape控制是否在批量操作中压缩和解压形状相同的张量以实现更高的并行性。请注意,你还应该增加存储桶大小(即 DDP 构造函数中的bucket_cap_mb参数),以便在同一存储桶中出现更多相同形状的张量,但这样可能会减少计算与通信之间的重叠,并由于堆叠相同形状的张量而增加内存占用。如果压缩/解压计算是瓶颈,请将其设置为True。
警告
如果启用了错误反馈或预热功能,则在DDP中允许的
start_powerSGD_iter的最小值为2。 这是因为DDP中还有一个内部优化会在第1次迭代时重建存储桶, 这可能会与重建过程之前的任何张量记忆发生冲突。
PowerSGD 钩子¶
警告
PowerSGD 通常需要与模型梯度大小相同的额外内存,以启用误差反馈,这可以补偿有偏的压缩通信并提高准确性。
警告
PowerSGD hook 可能与 Apex 自动混合精度包冲突。 请使用 PyTorch 原生自动混合精度包代替。
- torch.distributed.algorithms.ddp_comm_hooks.powerSGD_hook.powerSGD_hook(state, bucket)[source]¶
此DDP通信钩子实现了论文中描述的PowerSGD梯度压缩算法。 一旦所有工作者的梯度张量被聚合,此钩子将按如下方式应用压缩:
将输入的扁平化 1D 梯度张量视为参数张量的列表,并将所有张量分为两组:
1.1 The tensors that should be compressed before allreduce, because the compression can give enough saving in bandwidth.
1.2 Rest of the tensors will be directly allreduced without compression, including all the vector tensors (for biases).
处理未压缩的张量:
2.1. Allocate contiguous memory for those uncompressed tensors, and allreduces all the uncompressed tensors as a batch, without compression;
2.2. Copies the individual uncompressed tensors from the contiguous memory back to the input tensor.
处理应通过 PowerSGD 压缩的张量:
3.1. For each tensor M, creates two low-rank tensors P and Q for decomposing M, such that M = PQ^T, where Q is initialized from a standard normal distribution and orthogonalized;
3.2. Computes each P in Ps, which is equal to MQ;
3.3. Allreduces Ps as a batch;
3.4. Orthogonalizes each P in Ps;
3.5. Computes each Q in Qs, which is approximately equal to M^TP;
3.6. Allreduces Qs as a batch;
3.7. Computes each M among all the compressed tensors, which is approximately equal to PQ^T.
请注意,此通信钩子会在前
state.start_powerSGD_iter次迭代中强制使用普通 allreduce。 这不仅让用户能够更好地控制加速与精度之间的权衡, 还有助于对DDP内部优化的某些复杂性进行抽象,从而帮助未来的通信钩子开发者。- Parameters:
state (PowerSGDState) – 状态信息,用于配置压缩率并支持错误反馈、热启动等. 要调整压缩配置,主要需要调整
matrix_approximation_rank,start_powerSGD_iter和min_compression_rate。bucket (dist.GradBucket) – 用于存储包含多个变量张量的1D扁平化梯度张量的桶。 请注意,由于DDP通信钩子仅支持单进程单设备模式, 因此此桶中只存储一个张量。
- Returns:
负责未来通信的处理程序,它会在原地更新梯度。
- Return type:
- Example::
>>> state = PowerSGDState(process_group=process_group, matrix_approximation_rank=1, start_powerSGD_iter=10, min_compression_rate=0.5) >>> ddp_model.register_comm_hook(state, powerSGD_hook)
- torch.distributed.algorithms.ddp_comm_hooks.powerSGD_hook.batched_powerSGD_hook(state, bucket)[source]¶
此DDP通信钩子实现了一种简化的PowerSGD梯度压缩算法,该算法在论文中有所描述。 此变体不逐层压缩梯度,而是压缩将所有梯度批处理的展平输入张量。 因此,它比
powerSGD_hook()更快,但通常会导致更低的准确性,除非matrix_approximation_rank为1。警告
增加
matrix_approximation_rank这里的值不一定能提高准确性, 因为在没有列/行对齐的情况下对每个参数张量进行批处理可能会破坏低秩结构。 因此,用户应始终首先考虑powerSGD_hook(), 只有在当matrix_approximation_rank为 1 时可以达到令人满意的准确性时,才考虑这个变体。一旦梯度张量在所有工作器之间聚合,此钩子将按如下方式应用压缩:
将输入的扁平化 1D 梯度张量视为一个带有 0 填充的方形张量 M;
为分解 M 创建两个低秩张量 P 和 Q,使得 M = PQ^T,其中 Q 从标准正态分布初始化并进行正交化;
计算 P,其等于 MQ;
对 P 进行全局规约;
对 P 进行正交化;
计算 Q,其近似等于 M^TP;
Allreduces Q;
计算 M,其近似等于 PQ^T。
将输入张量截断为原始长度。
请注意,此通信钩子会在前
state.start_powerSGD_iter次迭代中强制使用普通 allreduce。 这不仅让用户能够更好地控制加速与精度之间的权衡, 还有助于对DDP内部优化的某些复杂性进行抽象,从而帮助未来的通信钩子开发者。- Parameters:
state (PowerSGDState) – 状态信息,用于配置压缩率并支持错误反馈、热启动等. 要调整压缩配置,主要需要调整
matrix_approximation_rank和start_powerSGD_iter。bucket (dist.GradBucket) – 用于存储包含多个变量张量的1D扁平化梯度张量的桶。 请注意,由于DDP通信钩子仅支持单进程单设备模式, 因此此桶中只存储一个张量。
- Returns:
负责未来通信的处理程序,它会在原地更新梯度。
- Return type:
- Example::
>>> state = PowerSGDState(process_group=process_group, matrix_approximation_rank=1) >>> ddp_model.register_comm_hook(state, batched_powerSGD_hook)
调试通信钩子¶
顾名思义,调试通信钩子仅用于调试和性能优化目的。
警告
调试通信钩子不一定会输出正确的结果。
- torch.distributed.algorithms.ddp_comm_hooks.debugging_hooks.noop_hook(_, bucket)[source]¶
该 DDP 通信钩子返回一个封装了输入的 future, 因此它是一个不会产生任何通信开销的空操作。
此钩子应仅用于分析所有reduce操作的优化空间, 而不是用于正常的梯度同步。例如,如果在注册此钩子后只能观察到不到10%的训练时间加速, 通常意味着在此情况下allreduce不是性能瓶颈。这种检测方法可能特别有用, 如果无法轻松获取GPU跟踪或跟踪分析较为复杂, 某些因素如allreduce与计算之间的重叠或不同rank之间的不同步等。
- Example::
>>> ddp_model.register_comm_hook(None, noop_hook)
通信钩子的检查点¶
一个有状态的通信钩子可以作为模型检查点的一部分进行保存,以便实现训练器的重新启动。
要使钩子可序列化,__setstate__ 和 __getstate__ 应该被定义。
警告
__getstate__ 应该从返回的字典中排除不可序列化的属性。
警告
__setstate__ 应该正确初始化不可序列化的属性,这些属性被排除在提供的 state 之外。
PowerSGDState 已实现 __setstate__ 和 __getstate__,可作为参考使用。
- class torch.distributed.algorithms.ddp_comm_hooks.powerSGD_hook.PowerSGDState[source]
这是一个保存和重新加载 PowerSGD 状态和钩子的简单端到端示例。
import os
import sys
import tempfile
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
from torch.distributed.algorithms.ddp_comm_hooks import powerSGD_hook as powerSGD
class SimpleModel(nn.Module):
def __init__(self):
super(SimpleModel, self).__init__()
self.fc1 = nn.Linear(24,24)
self.relu = nn.ReLU()
self.fc2 = nn.Linear(24,12)
def forward(self, x):
return self.fc2(self.relu(self.fc1(x)))
def setup(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
# initialize the process group
dist.init_process_group("nccl", rank=rank, world_size=world_size)
def cleanup():
dist.destroy_process_group()
def run_demo(demo_fn, world_size):
mp.spawn(
demo_fn,
args=(world_size,),
nprocs=world_size,
join=True)
def demo_serialization(rank, world_size):
setup(rank, world_size)
CHECKPOINT = tempfile.gettempdir() + "/checkpoint.pt"
model = SimpleModel().to(rank)
ddp_model = DistributedDataParallel(model, device_ids=[rank])
powersgd_hook = powerSGD.powerSGD_hook
powersgd_state = powerSGD.PowerSGDState(process_group=None)
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
ddp_model.register_comm_hook(powersgd_state, powersgd_hook)
state = {
'state_dict': ddp_model.state_dict(),
'comm_hook': hook,
'comm_hook_state': hook_state}
if rank == 0:
torch.save(state, CHECKPOINT)
dist.barrier()
map_location = {'cuda:%d' % 0: 'cuda:%d' % rank}
checkpoint = torch.load(CHECKPOINT, map_location=map_location)
ddp_model.load_state_dict(checkpoint['state_dict'])
powersgd_hook = checkpoint['comm_hook']
powersgd_state = checkpoint['comm_hook_state']
ddp_model.register_comm_hook(powersgd_state, powersgd_hook)
if rank == 0:
os.remove(CHECKPOINT)
cleanup()
if __name__ == "__main__":
n_gpus = torch.cuda.device_count()
assert n_gpus >= 2, f"Requires at least 2 GPUs to run, but got {n_gpus}"
world_size = n_gpus
run_demo(demo_serialization, world_size)
致谢¶
感谢PowerSGD论文作者Thijs Vogels对PowerSGD通信钩子的代码审查,以及 对比实验, 这些实验表明PowerSGD通信钩子的性能与原始论文中的实现相当。