目录

torchx.specs

这包含了TorchX应用定义及其相关组件定义。这些用于组件来定义可以由TorchX调度器或管道适配器启动的应用程序。

AppDef

class torchx.specs.AppDef(name: str, roles: ~typing.List[~torchx.specs.api.Role] = <factory>, metadata: ~typing.Dict[str, str] = <factory>)[source]

代表一个由多个Roles和元数据组成的分布式应用程序。包含提交此应用程序给调度器所需的必要信息。

Parameters:
  • 名称 – 应用程序的名称

  • 角色 – 列表中的角色

  • metadata – 应用程序的元数据(处理元数据取决于调度器)

角色

class torchx.specs.Role(name: str, image: str, min_replicas: ~typing.Optional[int] = None, base_image: ~typing.Optional[str] = None, entrypoint: str = '<MISSING>', args: ~typing.List[str] = <factory>, env: ~typing.Dict[str, str] = <factory>, num_replicas: int = 1, max_retries: int = 0, retry_policy: ~torchx.specs.api.RetryPolicy = RetryPolicy.APPLICATION, resource: ~torchx.specs.api.Resource = <factory>, port_map: ~typing.Dict[str, int] = <factory>, metadata: ~typing.Dict[str, ~typing.Any] = <factory>, mounts: ~typing.List[~typing.Union[~torchx.specs.api.BindMount, ~torchx.specs.api.VolumeMount, ~torchx.specs.api.DeviceMount]] = <factory>)[source]

一组节点,它们在AppDef中执行特定职责。 示例:

  1. 分布式数据并行应用 - 由一个角色(训练器)组成。

  2. 带有参数服务器的应用 - 由多个角色(训练器,ps)组成。

注意

一个 image 是一个软件包,安装在由调度器调度的容器上。调度器上的容器决定了实际的图像是什么。图像可以很简单,就是一个 tar-ball 或者映射到 Docker 图像。调度器通常知道如何“拉取”给定的图像名称(字符串),这可能是简单的名称(例如 Docker 图像)或 URL,例如 s3://path/my_image.tar

Usage:

trainer = Role(name="trainer",
               image = "pytorch/torch:1",
               entrypoint = "my_trainer.py"
               args = ["--arg", "foo", ENV_VAR="FOOBAR"],
               num_replicas = 4,
               resource = Resource(cpu=1, gpu=1, memMB=500),
               port_map={"tcp_store":8080, "tensorboard": 8081},
               metadata={"local_cwd.property", value})
Parameters:
  • 名称 – 角色的名称

  • 图像 – 一个安装在容器上的软件包。

  • 入口点 – 命令(在容器内)以调用角色

  • 参数 – 进入点命令行参数

  • 环境变量映射

  • num_replicas – 运行容器副本的数量

  • min_replicas – 最小副本数量,用于开始任务。当设置为任务大小时,可以根据集群资源和策略自动调整在 min_replicas 和 num_replicas 之间。如果调度器不支持自动扩展,则此字段被忽略,任务大小将为 num_replicas。

  • 最大重试次数 – 重试的最大次数在放弃之前

  • 重试策略 – 在副本失败时的重试行为

  • 资源 – 角色所需资源。该角色应由调度器在num_replicas个容器上进行调度,每个容器至少应有resource个保证。

  • 端口映射 – 角色的端口映射。键是端口的唯一标识符,例如“tensorboard”: 9090

  • metadata – 自由形式的信息,与角色相关联,例如 调度器特定的数据。键应遵循模式:$scheduler.$key

  • mounts – 一个机器上的挂载列表

pre_proc(scheduler: str, dryrun_info: AppDryRunInfo) AppDryRunInfo[source]

根据角色特定配置修改调度请求。 该方法在调度器 submit_dryrun 进行每次调用。 如果有多个角色,将按 AppDef.roles 列表中定义的顺序为每个角色调用该方法。

class torchx.specs.RetryPolicy(value)[source]

定义了在Roles中的AppDef的重试策略。 该策略定义了当角色副本遇到失败时的行为:

  1. 非零退出代码失败

  2. 硬件/主机崩溃

  3. 抢占

  4. 驱逐

注意

并非所有重试策略都支持所有调度器。 然而,所有调度器必须支持 RetryPolicy.APPLICATION。 请参阅调度器的文档以获取更多关于它们支持的重试策略和行为注意事项(如有)的信息。

  1. REPLICA: Replaces the replica instance. Surviving replicas are untouched.

    使用 dist.ddp 组件来实现 torchelastic 的坐标重置和成员变更。否则,应用程序需要处理失败的副本离开和替换副本的接纳。

  2. 应用:重启整个应用程序。

资源

class torchx.specs.Resource(cpu: int, gpu: int, memMB: int, capabilities: ~typing.Dict[str, ~typing.Any] = <factory>, devices: ~typing.Dict[str, int] = <factory>)[source]

代表资源需求的Role

Parameters:
  • cpu – 数字逻辑CPU核心。CPU核心的定义取决于调度器。请参阅您的调度器文档,了解逻辑CPU核心如何映射到物理核心和线程。

  • gpu – 数量的显卡

  • 内存MB – MB的RAM

  • 能力 – 额外硬件规格(由调度器解释)

  • 设备 – 一个包含设备名称及其数量的列表

注意:你应该优先使用named_resources,而不是直接指定原始资源需求。

static copy(original: Resource, **capabilities: Any) Resource[source]

复制资源并应用新功能。如果原始资源中存在相同的功能作为参数,则使用参数中的功能。

torchx.specs.resource(cpu: Optional[int] = None, gpu: Optional[int] = None, memMB: Optional[int] = None, h: Optional[str] = None) Resource[source]

便捷的方法从原始资源规格(cpu, gpu, memMB)或注册的命名资源创建Resource对象。注意,(cpu, gpu, memMB)与h是互斥的,如果指定,则h具有优先权。

如果 h 被指定,则使用它从注册的命名资源列表中查找资源规格。 请参见 注册命名资源

否则,从原始资源规格创建一个Resource对象。

Example:

resource(cpu=1) # returns Resource(cpu=1)
resource(named_resource="foobar") # returns registered named resource "foo"
resource(cpu=1, named_resource="foobar") # returns registered named resource "foo" (cpu=1 ignored)
resource() # returns default resource values
resource(cpu=None, gpu=None, memMB=None) # throws
torchx.specs.get_named_resources(res: str) Resource[source]

根据通过 entrypoints.txt 注册的字符串定义获取资源对象。

TorchX实现了named_resource注册机制,该机制包括以下步骤:

  1. 创建一个模块并定义你的资源检索函数:

# my_module.resources
from typing import Dict
from torchx.specs import Resource

def gpu_x_1() -> Dict[str, Resource]:
    return Resource(cpu=2, memMB=64 * 1024, gpu = 2)
  1. 在入口点部分注册资源检索:

[torchx.named_resources]
gpu_x_1 = my_module.resources:gpu_x_1

这个 gpu_x_1 可以作为字符串参数传递给这个函数:

from torchx.specs import named_resources
resource = named_resources["gpu_x_1"]

AWS 命名资源

torchx.specs.named_resources_aws 包含表示对应AWS实例类型的资源定义,这些资源是从https://aws.amazon.com/ec2/instance-types/获取的。安装torchx库后,这些资源通过入口点暴露出来。映射存储在setup.py文件中。

当前的命名资源并未指定AWS实例类型的能力,而是仅仅表示在内存、CPU和GPU数量上的等效资源。

注意

这些资源定义可能在未来发生变化。预计每个用户应自行管理自己的资源。请参考https://pytorch.org/torchx/latest/specs.html#torchx.specs.get_named_resources以设置命名资源。

Usage:

from torchx.specs import named_resources
print(named_resources["aws_t3.medium"])
print(named_resources["aws_m5.2xlarge"])
print(named_resources["aws_p3.2xlarge"])
print(named_resources["aws_p3.8xlarge"])
torchx.specs.named_resources_aws.aws_m5_2xlarge() Resource[source]
torchx.specs.named_resources_aws.aws_p3_2xlarge() Resource[source]
torchx.specs.named_resources_aws.aws_p3_8xlarge() Resource[source]
torchx.specs.named_resources_aws.aws_t3_medium() Resource[source]

class torchx.specs.macros[source]

定义可以在元素中使用的宏 Role.args 值为 Role.env。这些宏将在运行时被替换为其实际值。

警告

Macros 使用了 Role 之外的字段,而不是上面提到的那些字段,并且不会被替换。

可用宏:

  1. img_root - 拉取的容器图像根目录

  2. app_id - 应用程序ID,由调度器分配

  3. replica_id - unique id for each instance of a replica of a Role,

    例如,一个具有3个副本的角色可以有0、1、2作为副本ID。请注意,当容器失败并被替换时,新的容器将具有与它所取代的容器相同的replica_id。例如,如果节点1失败并由调度器替换,则替换节点也将具有replica_id=1

Example:

# runs: hello_world.py --app_id ${app_id}
trainer = Role(
           name="trainer",
           entrypoint="hello_world.py",
           args=["--app_id", macros.app_id],
           env={"IMAGE_ROOT_DIR": macros.img_root})
app = AppDef("train_app", roles=[trainer])
app_handle = session.run(app, scheduler="local_docker", cfg={})
class Values(img_root: str, app_id: str, replica_id: str, rank0_env: str, base_img_root: str = 'DEPRECATED')[source]
apply(role: Role) Role[source]

应用将值应用于指定的角色的副本并返回它。

substitute(arg: str) str[source]

替换适用于将值应用到模板参数。

运行配置

class torchx.specs.runopts[source]

持有接受的调度器运行配置键、默认值(如有)以及帮助消息字符串。 这些选项由Scheduler提供并验证在Session.run中与用户提供的运行配置进行比较。 允许None默认值。必需的选项必须没有非None的默认值。

重要的

这个类没有访问器,因为它旨在通过Scheduler.run_config_options构造和返回,并作为“帮助”工具或作为异常消息的一部分打印出来。

Usage:

opts = runopts()

opts.add("run_as_user", type_=str, help="user to run the job as")
opts.add("cluster_id", type_=int, help="cluster to submit the job", required=True)
opts.add("priority", type_=float, default=0.5, help="job priority")
opts.add("preemptible", type_=bool, default=False, help="is the job preemptible")

# invalid
opts.add("illegal", default=10, required=True)
opts.add("bad_type", type=str, default=10)

opts.check(cfg)
print(opts)
add(cfg_key: str, type_: Type[Optional[Union[str, int, float, bool, List[str]]]], help: str, default: Optional[Union[str, int, float, bool, List[str]]] = None, required: bool = False) None[source]

添加了带有给定帮助字符串和值(如果有)的config选项(如果指定)。如果没有指定default,则此选项为必需选项。

cfg_from_str(cfg_str: str) Dict[str, Optional[Union[str, int, float, bool, List[str]]]][source]

解析器从字符串常量中解析 cfg 时钟调度器,并返回一个配置映射,其中的配置值已根据此运行选项对象指定的类型进行转换。未知键被忽略,并且不在结果映射中返回。

注意

不同于方法 resolve,这种方法不解决默认选项或检查所需选项是否确实存在于给定的 cfg_str 中。该方法旨在在调用 resolve() 之前被调用,当输入是字符串编码的 run cfg 时。也就是说,要完全解析 cfg,请调用 opt.resolve(opt.cfg_from_str(cfg_literal))

如果cfg_str是一个空字符串,则返回一个空的 cfg。否则,至少需要一个由 "="(等于)分隔的kv-pair。

要么 "," (逗号) 或者 ";" (分号) 可以用来分隔多个 kv-pairs。

CfgVal 允许 List 种基本操作,可以作为 ","";"(分号)分隔。由于相同的分隔符用于分隔配置键值对, 这种方法将最后一个(尾部) ","";" 作为键值对之间的分隔符。 请参见下面的示例。

Examples:

opts = runopts()
opts.add("FOO", type_=List[str], default=["a"], help="an optional list option")
opts.add("BAR", type_=str, required=True, help="a required str option")

# required and default options not checked
# method returns strictly parsed cfg from the cfg literal string
opts.cfg_from_str("") == {}

# however, unknown options are ignored
# since the value type is unknown hence cannot cast to the correct type
opts.cfg_from_str("UNKNOWN=VALUE") == {}

opts.cfg_from_str("FOO=v1") == {"FOO": "v1"}

opts.cfg_from_str("FOO=v1,v2") == {"FOO": ["v1", "v2"]}
opts.cfg_from_str("FOO=v1;v2") == {"FOO": ["v1", "v2"]}

opts.cfg_from_str("FOO=v1,v2,BAR=v3") == {"FOO": ["v1", "v2"], "BAR": "v3"}
opts.cfg_from_str("FOO=v1;v2,BAR=v3") == {"FOO": ["v1", "v2"], "BAR": "v3"}
opts.cfg_from_str("FOO=v1;v2;BAR=v3") == {"FOO": ["v1", "v2"], "BAR": "v3"}
get(name: str) Optional[runopt][source]

返回选项,如果注册了任何选项,则返回该选项;否则返回None。

static is_type(obj: Optional[Union[str, int, float, bool, List[str]]], tp: Type[Optional[Union[str, int, float, bool, List[str]]]]) bool[source]

返回 True 如果 obj 是类型为 tp。类似于 isinstance() 但支持 tp = List[str],因此可以用于验证 ConfigValue。

resolve(cfg: Mapping[str, Optional[Union[str, int, float, bool, List[str]]]]) Dict[str, Optional[Union[str, int, float, bool, List[str]]]][source]

检查给定的配置与这个 runopts 对比,并设置默认配置如果未设置。

注意

运行选项中未知的额外配置被忽略。

运行状态

class torchx.specs.AppStatus(state: ~torchx.specs.api.AppState, num_restarts: int = 0, msg: str = '', structured_error_msg: str = '<NONE>', ui_url: ~typing.Optional[str] = None, roles: ~typing.List[~torchx.specs.api.RoleStatus] = <factory>)[source]

PyTorch的运行时状态。AppDef。调度器可以返回任意文本消息(msg字段)。 如果发生任何错误,调度器可以填充structured_error_msg为JSON响应。

replicas 代表任务中的副本状态。如果任务运行了多次重试,该参数将包含最近一次重试的状态。注意:如果之前的重试失败,但最近一次重试成功或正在进行中,则 replicas 将不包含发生的错误。

format(filter_roles: Optional[List[str]] = None) str[source]
Format logs for app status. The app status include:
  1. 状态:应用的状态。

  2. 重启次数:应用程序的重启次数。

  3. 角色:角色列表。

  4. 消息:调度器返回的任意文本消息。

  5. 结构化错误消息:Json响应错误消息。

  6. UI URL: 应用程序URL

raise_for_status() None[source]

raise_for_status 将在状态不是 SUCCEEDED 时抛出 AppStatusError。

class torchx.specs.AppState(value)[source]

应用状态。一个应用程序从初始的 UNSUBMITTED 状态开始,并通过 SUBMITTED, PENDING, RUNNING 状态最终到达终端状态: SUCCEEDED,``FAILED``, CANCELLED

如果调度器支持抢占,应用程序在抢占后从状态 RUNNING 转移到状态 PENDING

如果用户停止了应用程序,那么应用程序状态将移动到STOPPED,然后在实际被调度器取消时移动到CANCELLED

  1. 未提交 - 应用尚未提交到调度器

  2. 提交 - 应用程序已成功提交到调度器

  3. 待处理 - 应用已提交到调度器,等待分配

  4. 运行中 - 应用程序正在运行

  5. 成功 - 应用程序已成功完成

  6. 失败 - 应用程序未成功完成

  7. 取消 - 应用在完成之前被取消

  8. 未知 - 应用状态未知

torchx.specs.ReplicaState

AppState的别名

Mounts

torchx.specs.parse_mounts(opts: List[str]) List[Union[BindMount, VolumeMount, DeviceMount]][source]

parse_mounts 解析一个选项列表,将其转换为类型化的挂载点,格式类似于 Docker 的绑定挂载。

多个挂载点可以在同一列表中指定。type必须在每个挂载点的开头指定。

Ex:

type=bind,src=/host,dst=/container,readonly,[type=bind,src=…,dst=…]

Supported types:

绑定挂载:类型=绑定,src=<主机路径>,dst=<容器路径>[,只读] 卷挂载:类型=卷,src=<名称/ID>,dst=<容器路径>[,只读] 设备挂载:类型=设备,src=/dev/<设备>[,dst=<容器路径>][,权限=rwm]

class torchx.specs.BindMount(src_path: str, dst_path: str, read_only: bool = False)[source]

定义了一个绑定挂载,将主机路径mount –bind绑定到工作环境。请参阅调度器文档了解每个调度器如何操作绑定挂载。

Parameters:
  • src_path – 本地主机上的路径

  • dst_path – 工作环境/容器中的路径

  • 只读 – 是否应将挂载设置为只读

class torchx.specs.VolumeMount(src: str, dst_path: str, read_only: bool = False)[source]

定义一个持久卷挂载,将其挂载到工作环境。 :参数 src:要挂载的卷的名称或ID :参数 dst_path:在工作环境/容器中的路径 :参数 read_only:是否应该将挂载设置为只读

class torchx.specs.DeviceMount(src_path: str, dst_path: str, permissions: str = 'rwm')[source]

定义一个宿主机设备以挂载到容器中。 :param src_path: 主机上的路径 :param dst_path: 工作环境/容器中的路径 :param permissions: 设备上的权限设置。默认:读、写、mknode

组件检查器

torchx.specs.file_linter.validate(path: str, component_function: str) List[LinterMessage][source]

验证函数以确保其符合组件标准。

validate 发现了 component_function 并根据以下规则进行验证:

  1. 该函数必须具有Google风格的文档

  2. 所有函数参数必须进行注释

  3. 该函数必须返回 torchx.specs.api.AppDef

Parameters:
  • 路径 – Python源文件的路径。

  • 组件函数 – 需要验证的函数名称。

Returns:

验证错误列表

Return type:

List[LinterMessage]

torchx.specs.file_linter.get_fn_docstring(fn: Callable[[...], object]) Tuple[str, Dict[str, str]][source]

解析提供的函数中的函数和参数描述。文档字符串应为 google风格格式

如果函数没有文档字符串,函数描述将是函数名,提示如何改进帮助信息和参数描述将使用参数名称。

不在文档字符串中的参数将包含默认/必需的信息

Parameters:

fn – 函数带有或不带文档字符串

Returns:

function description, arguments description where key is the name of the argument and value

如果描述

class torchx.specs.file_linter.LinterMessage(name: str, description: str, line: int, char: int, severity: str = 'error')[source]
class torchx.specs.file_linter.TorchFunctionVisitor(component_function_name: str)[source]

找到组件_function 并运行已注册的验证器。 当前已注册的验证器:

  • TorchxFunctionArgsValidator - validates arguments of the function.
    Criteria:
    • 每个参数都应该标注其类型

    • The following types are supported:
      • 原始类型:{int, str, float}

      • Optional[primitive_types],

      • 字典[原始类型,原始类型]

      • List[primitive_types],

      • 可选[字典(primitive_types, primitive_types)]

      • Optional[List[primitive_types]]

visit_FunctionDef(node: FunctionDef) None[source]

验证带有子验证器的函数 def。

class torchx.specs.file_linter.TorchXArgumentHelpFormatter(prog, indent_increment=2, max_help_position=24, width=None)[source]

帮助消息格式化器,添加默认值和必需的参数帮助。

如果参数是必需的,该类会在帮助消息末尾添加(required)。 如果参数有默认值,该类会在末尾添加(default: $DEFAULT)。 格式化器仅用于torchx组件函数。 这些函数既没有必需参数也没有默认参数。

class torchx.specs.file_linter.TorchxFunctionArgsValidator[source]
validate(app_specs_func_def: FunctionDef) List[LinterMessage][source]

调用以验证提供的函数的方法

class torchx.specs.file_linter.TorchxFunctionValidator[source]
abstract validate(app_specs_func_def: FunctionDef) List[LinterMessage][source]

调用以验证提供的函数的方法

class torchx.specs.file_linter.TorchxReturnValidator[source]
validate(app_specs_func_def: FunctionDef) List[LinterMessage][source]
Validates return annotation of the torchx function. Current allowed annotations:
  • AppDef

  • specs.AppDef

文档

访问 PyTorch 的全面开发人员文档

查看文档

教程

获取面向初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并解答您的问题

查看资源