目录

计划器

TorchRec Planner 负责确定分布式训练和推理中最高效、平衡的分片计划。

The main API for generating a sharding plan is EmbeddingShardingPlanner.plan

class torchrec.distributed.types.ShardingPlan(plan: Dict[str, ModuleShardingPlan])

表示分片计划。这使用了较大封装模型的全限定名(即使用 DistributedModelParallel 封装的模型) EmbeddingModuleShardingPlan 应该在希望 TorchRec 可组合性时使用。

plan

字典以模块路径为键,参数分片规格字典以参数名称为键。

Type:

字典[str, EmbeddingModuleShardingPlan]

get_plan_for_module(module_path: str) Optional[ModuleShardingPlan]
Parameters:

模块路径 (字符串) –

Returns:

参数分片规格字典,键为参数名称。如果给定模块路径不存在分片规格,则返回空字典。

Return type:

Optional[ModuleShardingPlan]

class torchrec.distributed.planner.planners.EmbeddingShardingPlanner(topology: Optional[Topology] = None, batch_size: Optional[int] = None, enumerator: Optional[Enumerator] = None, storage_reservation: Optional[StorageReservation] = None, proposer: Optional[Union[Proposer, List[Proposer]]] = None, partitioner: Optional[Partitioner] = None, performance_model: Optional[PerfModel] = None, stats: Optional[Union[Stats, List[Stats]]] = None, constraints: Optional[Dict[str, ParameterConstraints]] = None, debug: bool = True, callbacks: Optional[List[Callable[[List[ShardingOption]], List[ShardingOption]]]] = None)

提供了一个优化的模块分片计划,根据提供的分片器、拓扑结构和约束条件,对具有可分片参数的给定模块进行分片。

Parameters:
  • 拓扑 (可选[拓扑]) – 当前进程组的拓扑。

  • 批量大小 (可选[整数]) – 模型的批量大小。

  • 枚举器 (Optional[枚举器]) – 使用的枚举器

  • 存储预留 (可选[StorageReservation]) – 使用的存储预留

  • 提议者 (可选[Union[提议者, 列表[提议者]]]) – 使用的提议者(s)

  • 分区器 (可选[分区器]) – 使用的分区器

  • 性能模型 (可选[PerfModel]) – 使用的性能模型

  • 统计 (可选[Union[Stats, List[Stats]]]) – 使用的统计

  • 约束条件 (可选[字典[字符串, 参数约束]]) – 每张表的分片约束。

  • 调试 (布尔类型) – 是否打印调试信息。

Example:

ebc = EmbeddingBagCollection(tables=eb_configs, device=torch.device("meta"))
planner = EmbeddingShardingPlanner()
plan = planner.plan(
    module=ebc,
    sharders=[EmbeddingBagCollectionSharder()],
)
collective_plan(module: Module, sharders: Optional[List[ModuleSharder[Module]]] = None, pg: Optional[ProcessGroup] = None) ShardingPlan

在排名0上调用self.plan(…)并广播

Parameters:
  • 模块 (nn.Module) – 要分片的模块。

  • 分片器 (可选[列表[ModuleSharder[nn.Module]]]) – 用于分片的分片器

  • pg (Optional[dist.ProcessGroup]) – 使用的进程组进行集体操作

Returns:

模块的分片计划。

Return type:

ShardingPlan

plan(module: Module, sharders: List[ModuleSharder[Module]]) ShardingPlan

提供了一个优化的模块分片计划,根据提供的分片器、拓扑结构和约束条件,对具有可分片参数的给定模块进行分片。

Parameters:
  • 模块 (nn.Module) – 要分片的模块。

  • 分片器 (List[ModuleSharder][nn.Module]]) – 使用的分片器。

Returns:

模块的分片计划。

Return type:

ShardingPlan

class torchrec.distributed.planner.enumerators.EmbeddingEnumerator(topology: Topology, batch_size: int, constraints: Optional[Dict[str, ParameterConstraints]] = None, estimator: Optional[Union[ShardEstimator, List[ShardEstimator]]] = None, use_exact_enumerate_order: Optional[bool] = False)

生成给定 nn.Module 的嵌入分片选项,考虑用户提供的约束。

Parameters:
  • 拓扑 (拓扑学) – 设备拓扑。

  • 批量大小 (整数) – 批量大小。

  • 约束条件 (可选[字典[字符串, 参数约束]]) – 字典,键为提供的参数名称,值为对应的参数约束。

  • 估计器 (可选[Union[ShardEstimator, List[ShardEstimator]]]) – 分片性能估算器。

  • 使用精确枚举顺序 (bool) – 是否在精确名称子集枚举顺序中枚举可共享参数

enumerate(module: Module, sharders: List[ModuleSharder[Module]]) List[ShardingOption]

根据模块和分片器生成相关的分片选项。

Parameters:
  • 模块 (nn.Module) – 模块将被分片。

  • 分片器 (List[ModuleSharder]) – 提供的模块分片器。

Returns:

有效的分片选项,值已填充。

Return type:

List[ShardingOption]

populate_estimates(sharding_options: List[ShardingOption]) None

请查看类描述。

class torchrec.distributed.planner.partitioners.GreedyPerfPartitioner(sort_by: SortBy = SortBy.STORAGE, balance_modules: bool = False)

贪婪分区器。

Parameters:
  • 按排序 (SortBy) – 按存储或性能对分片选项进行排序(降序排列,即大表格将被放置在前面)。

  • balance_modules (bool) – 是否按模块排序,较小的模块将先排序。这将在每个模块中平衡地放置表格。

partition(proposal: List[ShardingOption], storage_constraint: Topology) List[ShardingOption]

将分片选项放置在拓扑结构上,基于每个分片选项的 partition_by属性。 拓扑、存储和性能会在放置结束后更新。

Parameters:
  • 提案 (List[分片选项]) – 列表中的填充分片选项。

  • 存储约束 (拓扑结构) – 设备拓扑结构。

Returns:

选择计划的分片选项列表。

Return type:

List[ShardingOption]

Example:

sharding_options = [
        ShardingOption(partition_by="uniform",
                shards=[
                    Shards(storage=1, perf=1),
                    Shards(storage=1, perf=1),
                ]),
        ShardingOption(partition_by="uniform",
                shards=[
                    Shards(storage=2, perf=2),
                    Shards(storage=2, perf=2),
                ]),
        ShardingOption(partition_by="device",
                shards=[
                    Shards(storage=3, perf=3),
                    Shards(storage=3, perf=3),
                ])
        ShardingOption(partition_by="device",
                shards=[
                    Shards(storage=4, perf=4),
                    Shards(storage=4, perf=4),
                ]),
    ]
topology = Topology(world_size=2)

# First [sharding_options[0] and sharding_options[1]] will be placed on the
# topology with the uniform strategy, resulting in

topology.devices[0].perf.total = (1,2)
topology.devices[1].perf.total = (1,2)

# Finally sharding_options[2] and sharding_options[3]] will be placed on the
# topology with the device strategy (see docstring of `partition_by_device` for
# more details).

topology.devices[0].perf.total = (1,2) + (3,4)
topology.devices[1].perf.total = (1,2) + (3,4)

# The topology updates are done after the end of all the placements (the other
# in the example is just for clarity).
class torchrec.distributed.planner.storage_reservations.HeuristicalStorageReservation(percentage: float, parameter_multiplier: float = 6.0, dense_tensor_estimate: Optional[int] = None)

为模型预留存储空间,进行自适应计算。存储预留包括密集张量存储、KJT存储以及总存储的额外百分比。

Parameters:
  • 百分比 (浮点数) – 额外存储百分比以保留作为超出 基于启发式计算的存储的误差范围。

  • 参数倍增器 (浮点数) – 总参数存储量的启发式倍增器。

  • 密集张量估计 (可选[整数]) – 密集张量的存储估算,如果未提供则使用默认估算方法。

class torchrec.distributed.planner.proposers.GreedyProposer(use_depth: bool = True, threshold: Optional[int] = None)

提出了贪婪方式下的分片计划。

对每个可分参数进行排序,根据性能选择分片选项。 在每次迭代中,找到当前存储使用量最大的参数,并尝试其下一个分片选项。

Parameters:
  • 使用深度 (bool) – 当启用时,fqn 的 sharding_options 根据 max(shard.perf.total) 排序,否则根据 sum(shard.perf.total) 排序。

  • 阈值 (可选[整数]) – 早期停止的阈值。当指定时,提议者在连续比最佳性能评分差的情况下停止提出。

feedback(partitionable: bool, plan: Optional[List[ShardingOption]] = None, perf_rating: Optional[float] = None, storage_constraint: Optional[Topology] = None) None

提供反馈给提议者。

Parameters:
  • 可分区的 (布尔值) – 计划是否可分区。

  • 计划 (可选[列表[分片选项]]) – 提供反馈的计划。

  • 性能评分 (可选[浮点数]) – 计划的性能评分。

  • 存储约束 (可选[拓扑学]) – 计划的存储约束。

load(search_space: List[ShardingOption], enumerator: Optional[Enumerator] = None) None

将搜索空间加载到提议者中。

Parameters:
  • search_space (List[ShardingOption]) – search space to load.

  • 枚举器 (枚举器) – 用于生成搜索空间的枚举器。

propose() Optional[List[ShardingOption]]

提出分片计划。

Returns:

提议的计划。

Return type:

Optional[List[ShardingOption]]

class torchrec.distributed.planner.shard_estimators.EmbeddingPerfEstimator(topology: Topology, constraints: Optional[Dict[str, ParameterConstraints]] = None, is_inference: bool = False)

嵌入墙时间性能估计器。这个估计器估计给定分片选项的墙时间。

Parameters:
  • 拓扑 (拓扑学) – 设备拓扑。

  • 约束条件 (可选[字典[字符串, 参数约束]]) – 参数约束。

  • is_inference (bool) – 是否使用该估计器进行推理。

estimate(sharding_options: List[ShardingOption], sharder_map: Optional[Dict[str, ModuleSharder[Module]]] = None) None

估计给定分片选项的墙时间。

Parameters:
  • sharding_options (List[ShardingOption]) – 列表中的分片选项。

  • sharder_map (Optional[Dict[str, ModuleSharder[nn.Module]]]) – sharder map.

classmethod perf_func_emb_wall_time(shard_sizes: List[List[int]], compute_kernel: str, compute_device: str, sharding_type: str, batch_sizes: List[int], world_size: int, local_world_size: int, input_lengths: List[float], input_data_type_size: float, table_data_type_size: float, output_data_type_size: float, fwd_a2a_comm_data_type_size: float, bwd_a2a_comm_data_type_size: float, fwd_sr_comm_data_type_size: float, bwd_sr_comm_data_type_size: float, num_poolings: List[float], hbm_mem_bw: float, ddr_mem_bw: float, hbm_to_ddr_mem_bw: float, intra_host_bw: float, inter_host_bw: float, bwd_compute_multiplier: float, weighted_feature_bwd_compute_multiplier: float, is_pooled: bool, is_weighted: bool = False, caching_ratio: Optional[float] = None, is_inference: bool = False, prefetch_pipeline: bool = False, expected_cache_fetches: float = 0, uneven_sharding_perf_multiplier: float = 1.0) List[Perf]

尝试将性能建模为相对墙时间的函数。

Parameters:
  • shard_sizes (List[List[int]]) – the list of (local_rows, local_cols) of each shard.

  • 计算内核 (字符串) – 计算内核。

  • 计算设备 (字符串) – 计算设备。

  • sharding_type (str) – tw, rw, cw, twrw, dp.

  • batch_sizes (List[int]) – 每个输入特征的批次大小。

  • world_size (int) – 用于所有主机的设备数量。

  • 本地世界大小 (整数) – 每个主机的设备数量。

  • input_lengths (List[float]) – the list of the average number of lookups of each input query feature.

  • input_data_type_size (float) – 分布式数据并行输入的数据类型大小。

  • table_data_type_size (float) – 表格数据类型大小。

  • 输出数据类型大小 (浮点数) – 输出嵌入的输出数据类型大小。

  • fwd_comm_data_type_size (float) – 分布式数据并行输入在前向通信期间的数据类型大小。

  • (float) – 分布式数据并行输入在反向通信期间的数据类型大小。

  • num_poolings (List[float]) – 数量的池化每样本,通常为1.0。

  • hbm_mem_bw (float) – 该设备HBM的带宽。

  • ddr_mem_bw (float) – 系统 DDR 内存带宽。

  • hbm_to_ddr_bw (float) – 从HBM到系统DDR的带宽。

  • 内网带宽 (浮点数) – 在单个主机内的带宽,例如多个线程。

  • inter_host_bw (float) – 两个主机之间的带宽,例如多台机器。

  • is_pooled (bool) – True if embedding output is pooled (ie. EmbeddingBag), False if unpooled/sequential (ie. Embedding).

  • 是加权 (bool = False) – 如果模块是一个EBC并且是加权的,通常表示一个id得分列表特征。

  • is_inference (bool = False) – 如果计划进行推理。

  • 缓存比率 (可选[浮点数] = 无) – 确定设备带宽的缓存比率。

  • prefetch_pipeline (bool = False) – 是否启用预取管道。

  • 预期缓存查找次数 (浮点数) – 全局批量内预期的缓存查找次数

  • 不均匀分片性能乘数 (float = 1.0) – 用于考虑不均匀分片性能的乘数

Returns:

每个分片的性能列表。

Return type:

List[float]

class torchrec.distributed.planner.shard_estimators.EmbeddingStorageEstimator(topology: Topology, constraints: Optional[Dict[str, ParameterConstraints]] = None, pipeline_type: PipelineType = PipelineType.NONE, run_embedding_at_peak_memory: bool = False, is_inference: bool = False)

嵌入式存储使用量估算器

Parameters:
  • 拓扑 (拓扑学) – 设备拓扑。

  • 约束条件 (可选[字典[字符串, 参数约束]]) – 参数约束。

  • pipeline_type (PipelineType) – 管道类型,如果有。将决定内存估算期间的输入复制因子。

  • 在峰值内存中运行嵌入 (bool) –

    如果在HBM使用高峰期执行嵌入前向/反向操作。当设置为TRUE时,任何在嵌入前向/反向期间的临时内存分配,只要输出大小在输出分布之前被计算为HBM存储成本,否则它们就不会计入,因为它们会被实际内存峰值“隐藏”。

    仅在设置 pipeline_type 以实现向后兼容时生效(不影响使用旧的 pipeline-agnostic 公式模型)。

    默认为false,因为对于推荐系统来说,内存峰值通常发生在密集前向/后向计算的末尾。

  • 是推理 (布尔值) – 如果模型是推理模型。默认为False。

estimate(sharding_options: List[ShardingOption], sharder_map: Optional[Dict[str, ModuleSharder[Module]]] = None) None

估算每个分片选项的存储成本。

Parameters:
  • sharding_options (List[ShardingOption]) – 列表中的分片选项。

  • sharder_map (Optional[Dict[str, ModuleSharder[nn.Module]]]) – 映射从模块类型到分片器。

文档

访问 PyTorch 的全面开发人员文档

查看文档

教程

获取面向初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并解答您的问题

查看资源