torchx.specs¶
AppDef¶
角色¶
- class torchx.specs.Role(name: str, image: str, base_image: Optional[str] = None, entrypoint: str = '<MISSING>', args: List[str] = <factory>, env: Dict[str, str] = <factory>, num_replicas: int = 1, max_retries: int = 0, retry_policy: torchx.specs.api.RetryPolicy = <RetryPolicy.APPLICATION: 'APPLICATION'>, resource: torchx.specs.api.Resource = Resource(cpu=-1, gpu=-1, memMB=-1, capabilities={}), port_map: Dict[str, int] = <factory>)[source]¶
一组节点,它们在
AppDef中执行特定职责。 示例:分布式数据并行应用 - 由一个角色(训练器)组成。
带有参数服务器的应用 - 由多个角色(训练器,ps)组成。
注意
一个
image是一个软件包,安装在由调度器调度的容器上。调度器上的容器决定了实际的图像是什么。图像可以很简单,就是一个 tar-ball 或者映射到 Docker 图像。调度器通常知道如何“拉取”给定的图像名称(字符串),这可能是简单的名称(例如 Docker 图像)或 URL,例如s3://path/my_image.tar。Usage:
trainer = Role(name="trainer", image = "pytorch/torch:1", entrypoint = "my_trainer.py" args = ["--arg", "foo", ENV_VAR="FOOBAR"], num_replicas = 4, resource = Resource(cpu=1, gpu=1, memMB=500), port_map={"tcp_store":8080, "tensorboard": 8081})
- Parameters
名称 – 角色的名称
图像 – 一个安装在容器上的软件包。
入口点 – 命令(在容器内)以调用角色
参数 – 进入点命令行参数
环境变量映射
副本 – 运行的容器副本数量
最大重试次数 – 重试的最大次数在放弃之前
重试策略 – 在副本失败时的重试行为
资源 – 角色所需资源。该角色应由调度器在
num_replicas个容器上进行调度,每个容器至少应有resource个保证。端口映射 – 角色的端口映射。键是端口的唯一标识符,例如“tensorboard”: 9090
- class torchx.specs.RetryPolicy(value)[source]¶
定义了在
Roles中的AppDef的重试策略。 该策略定义了当角色副本遇到失败时的行为:非零退出代码失败
硬件/主机崩溃
抢占
驱逐
注意
并非所有重试策略都支持所有调度器。 然而,所有调度器必须支持
RetryPolicy.APPLICATION。 请参阅调度器的文档以获取更多关于它们支持的重试策略和行为注意事项(如有)的信息。- REPLICA: Replaces the replica instance. Surviving replicas are untouched.
使用
torch_dist_role以使 PyTorch 坐标重置和成员变更。否则,应用程序需自行处理失败的副本离开和替换副本接纳。
应用:重启整个应用程序。
资源¶
- class torchx.specs.Resource(cpu: int, gpu: int, memMB: int, capabilities: Dict[str, Any] = <factory>)[source]¶
代表资源需求的
Role。- Parameters
cpu – 数字逻辑CPU核心。CPU核心的定义取决于调度器。请参阅您的调度器文档,了解逻辑CPU核心如何映射到物理核心和线程。
gpu – 数量的显卡
内存MB – MB的RAM
能力 – 额外硬件规格(由调度器解释)
注意:你应该优先使用named_resources,而不是直接指定原始资源需求。
- static copy(original: torchx.specs.api.Resource, **capabilities: Any) → torchx.specs.api.Resource[source]¶
复制资源并应用新功能。如果原始资源中存在相同的功能作为参数,则使用参数中的功能。
- torchx.specs.get_named_resources(res: str) → torchx.specs.api.Resource[source]¶
根据通过 entrypoints.txt 注册的字符串定义获取资源对象。
TorchX实现了
named_resource注册机制,该机制包括以下步骤:创建一个模块并定义你的资源检索函数:
# my_module.resources from typing import Dict from torchx.specs import Resource def gpu_x_1() -> Dict[str, Resource]: return Resource(cpu=2, memMB=64 * 1024, gpu = 2)
在入口点部分注册资源检索:
[torchx.named_resources] gpu_x_1 = my_module.resources:gpu_x_1
这个
gpu_x_1可以作为字符串参数传递给这个函数:from torchx.specs import named_resources resource = named_resources["gpu_x_1"]
AWS 命名资源¶
torchx.specs.named_resources_aws 包含表示对应AWS实例类型的资源定义,这些资源是从https://aws.amazon.com/ec2/instance-types/获取的。安装torchx库后,这些资源通过入口点暴露出来。映射存储在setup.py文件中。
当前的命名资源并未指定AWS实例类型的能力,而是仅仅表示在内存、CPU和GPU数量上的等效资源。
注意
这些资源定义可能在未来发生变化。预计每个用户应自行管理自己的资源。请参考https://pytorch.org/torchx/latest/specs.html#torchx.specs.get_named_resources以设置命名资源。
Usage:
from torchx.specs import named_resources print(named_resources["aws_t3.medium"]) print(named_resources["aws_m5.2xlarge"]) print(named_resources["aws_p3.2xlarge"]) print(named_resources["aws_p3.8xlarge"])
宏¶
- class torchx.specs.macros[source]¶
定义可以与
Role.entrypoint和Role.args一起使用的宏。 这些宏将在运行时被替换为其实际值。可用宏:
img_root- 拉取的容器图像根目录app_id- 应用程序ID,由调度器分配replica_id- unique id for each instance of a replica of a Role,例如,一个具有3个副本的角色可以有0、1、2作为副本ID。请注意,当容器失败并被替换时,新的容器将具有与它所取代的容器相同的
replica_id。例如,如果节点1失败并由调度器替换,则替换节点也将具有replica_id=1。
Example:
# runs: hello_world.py --app_id ${app_id} trainer = Role(name="trainer", entrypoint="hello_world.py", args=["--app_id", macros.app_id]) app = AppDef("train_app", roles=[trainer]) app_handle = session.run(app, scheduler="local", cfg=RunConfig())
运行配置¶
- class torchx.specs.RunConfig(cfgs: Dict[str, Optional[Union[str, int, float, bool, List[str]]]] = <factory>)[source]¶
附加的运行配置用于应用程序。这些通常是 调度器运行时配置/参数,不绑定到
AppDef或者Scheduler。例如 特定集群(在调度器内)应提交给的应用程序。 由于同一个应用程序可以被启动到多种类型的集群(开发、生产) 集群ID配置不绑定到应用程序。也不绑定到调度器, 因为集群可以根据实例大小(S、M、L)或预emption设置(例如 按需 vs 负载均衡)进行分区。自从
Session允许应用程序提交给多个调度器,用户可以从同一会话中将相同的 app 提交到多个调度器时,可以将所有RunConfigs统一为一个对象。调度器实现将选择性地读取所需的配置。这个类旨在轻松序列化并传递或保存,因此只允许基本类型作为配置值。如果调度器需要超过简单的基本类型(例如字符串列表),则由调度器自行记录一种方法来编码此值为字符串,并解析它(例如表示字符串列表为逗号分隔的字符串)。
Usage:
# write config = RunConfig() config.set("run_as_user", "prod") config.set("priority", 10) # read config.get("run_as_user") # "prod" config.get("priority") # 10 config.get("never_set") # None
- class torchx.specs.runopts[source]¶
持有接受的调度器运行配置键、默认值(如有)以及帮助消息字符串。 这些选项由
Scheduler提供并验证在Session.run中,与用户提供的RunConfig进行比较。 允许None设置默认值。必需的选项必须没有非None的默认值。重要的
这个类没有访问器,因为它旨在通过
Scheduler.run_config_options构造和返回,并作为“帮助”工具或作为异常消息的一部分打印出来。Usage:
opts = runopts() opts.add("run_as_user", type_=str, help="user to run the job as") opts.add("cluster_id", type_=int, help="cluster to submit the job", required=True) opts.add("priority", type_=float, default=0.5, help="job priority") opts.add("preemptible", type_=bool, default=False, help="is the job preemptible") # invalid opts.add("illegal", default=10, required=True) opts.add("bad_type", type=str, default=10) opts.check(RunConfig) print(opts)
- add(cfg_key: str, type_: Type[Optional[Union[str, int, float, bool, List[str]]]], help: str, default: Optional[Union[str, int, float, bool, List[str]]] = None, required: bool = False) → None[source]¶
添加了带有给定帮助字符串和值(如果有)的
config选项(如果指定)。如果没有指定default,则此选项为必需选项。
- static is_type(obj: Optional[Union[str, int, float, bool, List[str]]], tp: Type[Optional[Union[str, int, float, bool, List[str]]]]) → bool[source]¶
返回 True 如果
obj是类型为tp。类似于 isinstance() 但支持 tp = List[str],因此可以用于验证 ConfigValue。
- resolve(config: torchx.specs.api.RunConfig) → torchx.specs.api.RunConfig[source]¶
检查给定的配置与这个
runopts对比,并设置默认配置如果未设置。警告
这种方法会修改提供的配置!
运行状态¶
- class torchx.specs.AppStatus(state: torchx.specs.api.AppState, num_restarts: int = 0, msg: str = '', structured_error_msg: str = '<NONE>', ui_url: Optional[str] = None, roles: List[torchx.specs.api.RoleStatus] = <factory>)[source]¶
PyTorch的运行时状态。
AppDef。调度器可以返回任意文本消息(msg字段)。 如果发生任何错误,调度器可以填充structured_error_msg为JSON响应。replicas代表任务中的副本状态。如果任务运行了多次重试,该参数将包含最近一次重试的状态。注意:如果之前的重试失败,但最近一次重试成功或正在进行中,则replicas将不包含发生的错误。
- class torchx.specs.AppState(value)[source]¶
应用状态。一个应用程序从初始的
UNSUBMITTED状态开始,并通过SUBMITTED,PENDING,RUNNING状态最终到达终端状态:SUCCEEDED,``FAILED``,CANCELLED。如果调度器支持抢占,应用程序在抢占后从状态
RUNNING转移到状态PENDING。如果用户停止了应用程序,那么应用程序状态将移动到
STOPPED,然后在实际被调度器取消时移动到CANCELLED。未提交 - 应用尚未提交到调度器
提交 - 应用程序已成功提交到调度器
待处理 - 应用已提交到调度器,等待分配
运行中 - 应用程序正在运行
成功 - 应用程序已成功完成
失败 - 应用程序未成功完成
取消 - 应用在完成之前被取消
未知 - 应用状态未知
- torchx.specs.ReplicaState¶
组件检查器¶
- torchx.specs.file_linter.validate(path: str, component_function: str) → List[torchx.specs.file_linter.LinterMessage][source]¶
验证函数以确保其符合组件标准。
validate发现了component_function并根据以下规则进行验证:该函数必须具有Google风格的文档
所有函数参数必须进行注释
该函数必须返回
torchx.specs.api.AppDef
- Parameters
路径 – Python源文件的路径。
组件函数 – 需要验证的函数名称。
- Returns
验证错误列表
- Return type
List[LinterMessage]