计划¶
TorchRec Planner 负责确定性能最高、最平衡的 用于分布式训练和推理的分片计划。
生成分片计划的主要 API 是EmbeddingShardingPlanner.plan
- 类 torchrec.distributed.types 中。ShardingPlan(plan: Dict[str, ModuleShardingPlan])¶
分片计划的表示。这将使用较大包装模型(即使用 DistributedModelParallel 包装的模型)的 FQN 当需要 TorchRec 可组合性时,应使用 EmbeddingModuleShardingPlan。
- 计划¶
dict 以模块路径 以参数名称为键的参数分片规范的 dict。
- 类型:
Dict[str, EmbeddingModuleShardingPlan]
- get_plan_for_module(module_path: str) 可选[ModuleShardingPlan] ¶
- 参数
module_path (str) –
- 结果
以参数名称为键的参数分片规范的 dict。如果给定module_path不存在分片规范,则为 None。
- 返回类型:
可选[ModuleShardingPlan]
- 类 torchrec.distributed.planner.planners。EmbeddingShardingPlanner(topology: Optional[topology] = None, batch_size: Optional[int] = 无,枚举器:可选[枚举器] = 无,storage_reservation:可选[StorageReservation] = None, 提议者: Optional[Union[提议者, List[提议者]]] = 无,分区程序:可选[分区程序] = 无,performance_model: 可选[PerfModel] = None, stats: 可选[Union[Stats, List[Stats]]] = 无,约束: 可选[Dict[str, ParameterConstraints]] = 无,debug: bool = True,回调: 可选[List[Callable[[List[ShardingOption]]], List[ShardingOption]]]] = 无)¶
为给定模块提供具有可分片参数的优化分片计划 根据提供的分片、拓扑和约束。
- 参数
topology (Optional[Topology]) – 当前进程组的拓扑。
batch_size (Optional[int]) – 模型的批量大小。
enumerator (Optional[Enumerator]) – 要使用的枚举器
storage_reservation (Optional[StorageReservation]) – 要使用的存储预留
proposer (Optional[Union[Proposer, List[Proposer]]]) – 要使用的提议者
partitioner (Optional[Partitioner]) – 要使用的分区器
performance_model (Optional[PerfModel]) – 要使用的性能模型
stats (Optional[Union[Stats, List[Stats]]]) – 要使用的统计数据
constraints (Optional[Dict[str, ParameterConstraints]]) – 每个表的约束 进行分片。
debug (bool) – 是否打印调试信息。
例:
ebc = EmbeddingBagCollection(tables=eb_configs, device=torch.device("meta")) planner = EmbeddingShardingPlanner() plan = planner.plan( module=ebc, sharders=[EmbeddingBagCollectionSharder()], )
- collective_plan(module: Module, sharders: Optional[List[ModuleSharder[Module]]] = None, pg: 可选[ProcessGroup] = None) ShardingPlan ¶
在排名 0 上调用 self.plan(...) 并广播
- 参数
模块 (nn.Module) – 要分片的模块。
分片器 (Optional[List[ModuleSharder[nn.Module]]]) – 用于分片的分片
pg (可选[dist.ProcessGroup]) – 用于集合操作的流程组
- 结果
模块的分片计划。
- 返回类型:
- plan(module: Module, 分片器: List[ModuleSharder[Module]]) ShardingPlan ¶
为给定模块提供具有可分片参数的优化分片计划 根据提供的分片、拓扑和约束。
- 参数
模块 (nn.Module) – 要分片的模块。
分片 (List[ModuleSharder[nn.Module]]) – 用于分片的分片程序。
- 结果
模块的分片计划。
- 返回类型:
- 类 torchrec.distributed.planner.enumerators。EmbeddingEnumerator(topology: Topology, batch_size: int, constraints: Optional[Dict[str, ParameterConstraints]] = None, estimator: 可选[Union[ShardEstimator, List[ShardEstimator]]] = 无,use_exact_enumerate_order:可选[bool] = False)¶
为给定的 nn 生成嵌入分片选项。模块,考虑用户提供 约束。
- 参数
topology (Topology) – 设备拓扑。
batch_size (int) – 批量大小。
constraints (Optional[Dict[str, ParameterConstraints]]) – 参数名称的字典 添加到提供的 ParameterConstraints 中。
estimator (Optional[Union[ShardEstimator, List[ShardEstimator]]]) – 分片性能估算器。
use_exact_enumerate_order (bool) – 是否按确切的 name_children 枚举顺序枚举可分片参数
- enumerate(module: Module, sharders: List[ModuleSharder[Module]]) List[ShardingOption] ¶
在给定的 module 和 sharders 中生成相关的分片选项。
- 参数
模块 (nn.Module) – 要分片的模块。
分片 (List[ModuleSharder[nn.Module]]) – 为 module 提供分片。
- 结果
填充了值的有效分片选项。
- 返回类型:
列表[ShardingOption]
- populate_estimates(sharding_options: List[ShardingOption]) 无 ¶
请参阅类描述。
- 类 torchrec.distributed.planner.partitioners 中。GreedyPerfPartitioner(sort_by: SortBy = SortBy.STORAGE, balance_modules: bool = False)¶
贪婪分区程序。
- 参数
sort_by (SortBy) – 按存储或性能对分片选项进行排序 降序(即,大表将放在最前面)。
balance_modules (bool) – 是否先按模块排序,其中 较小的模块将首先排序。实际上,这将 表。
- partition(提案: List[ShardingOption], storage_constraint: 拓扑) List[ShardingOption] ¶
根据每个分片选项的 partition_by 属性将分片选项放置在拓扑上。 topology、storage 和 perfs 将在放置结束时更新。
- 参数
proposal (List[ShardingOption]) – 填充的分片选项列表。
storage_constraint (Topology) – 设备拓扑。
- 结果
所选计划的分片选项列表。
- 返回类型:
列表[ShardingOption]
例:
sharding_options = [ ShardingOption(partition_by="uniform", shards=[ Shards(storage=1, perf=1), Shards(storage=1, perf=1), ]), ShardingOption(partition_by="uniform", shards=[ Shards(storage=2, perf=2), Shards(storage=2, perf=2), ]), ShardingOption(partition_by="device", shards=[ Shards(storage=3, perf=3), Shards(storage=3, perf=3), ]) ShardingOption(partition_by="device", shards=[ Shards(storage=4, perf=4), Shards(storage=4, perf=4), ]), ] topology = Topology(world_size=2) # First [sharding_options[0] and sharding_options[1]] will be placed on the # topology with the uniform strategy, resulting in topology.devices[0].perf.total = (1,2) topology.devices[1].perf.total = (1,2) # Finally sharding_options[2] and sharding_options[3]] will be placed on the # topology with the device strategy (see docstring of `partition_by_device` for # more details). topology.devices[0].perf.total = (1,2) + (3,4) topology.devices[1].perf.total = (1,2) + (3,4) # The topology updates are done after the end of all the placements (the other # in the example is just for clarity).
- torchrec.distributed.planner.storage_reservations 类。HeuristicalStorageReservation(百分比: float, parameter_multiplier: float = 6.0, dense_tensor_estimate: 可选 [int] = 无)¶
为要使用启发式计算分片的模型保留存储。存储 reservation 由密集张量存储、KJT 存储和额外的 总存储的百分比。
- 参数
percentage (float) – 要预留的额外存储百分比,用作边距 超出 storage 的启发式计算的误差。
parameter_multiplier (float) – 总参数存储的启发式乘数。
dense_tensor_estimate (Optional[int]) – 密集张量的存储估计值,使用 default heuristic estimate (如果未提供)。
- 类 torchrec.distributed.planner.proposers 中。GreedyProposer(use_depth: bool = True, threshold: Optional[int] = None)¶
以贪婪的方式提出分片计划。
按 perf 对每个可分片参数的分片选项进行排序。 在每次迭代中,查找当前存储使用率最高的参数,并尝试其 next sharding 选项。
- 参数
use_depth (bool) – 启用后,fqn 的sharding_options将根据 max(shard.perf.total) 进行排序,否则sharding_options按 sum(shard.perf.total) 排序。
threshold (Optional[int]) – 提前停止的阈值。指定后, 当提案连续出现更差perf_rating时,提议者停止提议 比 best_perf_rating。
- feedback(partitionable: bool, plan: Optional[List[ShardingOption]] = None、perf_rating:可选[浮点数] = 无、storage_constraint:可选[拓扑] = 无) 无 ¶
向提议者提供反馈。
- 参数
partitionable (bool) – 计划是否可分区。
plan (Optional[List[ShardingOption]]) – 提供反馈的计划。
perf_rating (Optional[float]) – 计划的性能评级。
storage_constraint (Optional[Topology]) – 计划的存储约束。
- load(search_space: List[ShardingOption], enumerator: 可选[Enumerator] = None) 无 ¶
将搜索空间加载到 proposer 中。
- 参数
search_space (List[ShardingOption]) – 要加载的搜索空间。
enumerator (Enumerator) - 用于生成搜索空间的枚举器。
- propose() 可选[List[ShardingOption]] ¶
提出分片计划。
- 结果
拟议计划。
- 返回类型:
可选[List[ShardingOption]]
- torchrec.distributed.planner.shard_estimators 类。EmbeddingPerfEstimator(topology: 拓扑, constraints: Optional[Dict[str, ParameterConstraints]] = 无,is_inference:bool = False)¶
Embedding Wall Time Perf Estimator (嵌入 Wall Time Perf Estimator)。此估算器估计实际时间 的 sharding 选项。
- 参数
topology (Topology) – 设备拓扑。
constraints (Optional[Dict[str, ParameterConstraints]]) – 参数约束。
is_inference (bool) – 估计器是否用于推理。
- estimate(sharding_options: List[ShardingOption], sharder_map: Optional[Dict[str, ModuleSharder[模块]]] = 无) 无 ¶
估计给定分片选项的固定时间。
- 参数
sharding_options (List[ShardingOption]) – 分片选项列表。
sharder_map (可选[Dict[str, ModuleSharder[nn.Module]]]) – 分片映射。
- 类方法 perf_func_emb_wall_time(shard_sizes: List[List[int]], compute_kernel: str, compute_device:str、sharding_type:str、batch_sizes:List[int]、world_size:int、local_world_size:int、input_lengths:List[float]、input_data_type_size:float、table_data_type_大小:浮点数,output_data_type_size:浮点数,fwd_a2a_comm_data_type_size:浮点数,bwd_a2a_comm_data_type_size:浮点数、fwd_sr_comm_data_type_size:float、bwd_sr_comm_data_type_size:float、num_poolings:List[float]、hbm_mem_bw: float, ddr_mem_bw: float, hbm_to_ddr_mem_bw: float, intra_host_bw: float, inter_host_bw: float, bwd_compute_multiplier: float, weighted_feature_bwd_compute_multiplier: float, is_pooled: bool, is_weighted: bool = False, caching_ratio: 可选[float] = 无,is_inference:bool = False,prefetch_pipeline:bool = False,expected_cache_fetches:float = 0,uneven_sharding_perf_multiplier:float = 1.0)列表[Perf] ¶
尝试将 perfs 建模为相对实际时间的函数。
- 参数
shard_sizes (List[List[int]]) – 每个列表的 (local_rows, local_cols) 碎片。
compute_kernel (str) – 计算内核。
compute_device (str) – 计算设备。
sharding_type (STR) – TW、RW、CW、TWRW、DP。
batch_sizes (List[int]) – 每个输入特征的批量大小。
world_size (int) – 所有主机的设备数量。
local_world_size (int) – 每个主机的设备编号。
input_lengths (List[float]) – 每个的平均查找次数列表 input 查询功能。
input_data_type_size (float) – 分布式的数据类型大小 data_parallel input。
table_data_type_size (float) – 表的数据类型大小。
output_data_type_size (float) – 输出嵌入的数据类型大小。
fwd_comm_data_type_size (float) – 分布式的数据类型大小 data_parallel 正向通信期间的输入。
bwd_comm_data_type_size (float) – 分布式的数据类型大小 data_parallel 反向通信期间的输入。
num_poolings (List[float]) – 每个样本的池化数,通常为 1.0。
hbm_mem_bw (float) – 设备 HBM 的带宽。
ddr_mem_bw (float) – 系统 DDR 内存的带宽。
hbm_to_ddr_bw (float) – 设备 HBM 和系统 DDR 之间的带宽。
intra_host_bw (float) – 单个主机(如多个线程)内的带宽。
inter_host_bw (float) – 两台主机(如多台计算机)之间的带宽。
is_pooled (bool) - 如果嵌入输出是池化的(即.EmbeddingBag)、False 如果未池化/顺序(即。嵌入)。
is_weighted (bool = False) – 如果模块是 EBC 并且是加权的,通常 表示 ID 分数列表特征。
is_inference (bool = False) – 如果计划推理。
caching_ratio (Optional[float] = None) – 用于确定带宽的缓存比率 的设备。
prefetch_pipeline (bool = False) – 是否启用预取管道。
expected_cache_fetches (float) – 跨全局批处理的预期缓存获取次数
uneven_sharding_perf_multiplier (float = 1.0) – 考虑分片性能不均匀的乘数
- 结果
每个分片的性能列表。
- 返回类型:
列表[float]
- torchrec.distributed.planner.shard_estimators 类。EmbeddingStorageEstimator(topology: 拓扑, constraints: Optional[Dict[str, ParameterConstraints]] = 无,pipeline_type:PipelineType = PipelineType.NONE,run_embedding_at_peak_memory:bool = False, is_inference: bool = False)¶
嵌入存储使用估算器
- 参数
topology (Topology) – 设备拓扑。
constraints (Optional[Dict[str, ParameterConstraints]]) – 参数约束。
pipeline_type (PipelineType) – 管道的类型(如果有)。将决定 在内存估计期间输入 replication factor。
run_embedding_at_peak_memory (bool) –
如果在 HBM 使用量处于峰值。当设置为 TRUE 时,在 embedding forward/backward,只要 output_dist 之前的输出大小就会 计入 HBM 存储成本。否则他们不会,因为他们会 被真实内存峰值“隐藏”。
仅当 pipeline_type 设置为向后兼容时生效(不影响 使用旧管道不可知公式的模型)
默认为 false,因为对于 RecSys 来说,这通常为 false,因为内存 峰值发生在 DENSE Forwrad 的末尾/DENSE BACKWARD 的开头。
is_inference (bool) – 如果模型是推理模型。默认为 False。
- estimate(sharding_options: List[ShardingOption], sharder_map: Optional[Dict[str, ModuleSharder[模块]]] = 无) 无 ¶
估算每个分片选项的存储成本。
- 参数
sharding_options (List[ShardingOption]) – 分片选项列表。
sharder_map (可选[Dict[str, ModuleSharder[nn.Module]]]) – 来自模块的映射 键入到 sharder。