目录

torchx.runner

运行器允许您将组件作为独立任务在受支持的 调度器之一上运行。运行器接受一个 specs.AppDef 对象,该对象是使用一组用户提供的参数评估组件函数的结果,以及调度器名称和调度器参数(即 runcfgrunopts) 并将组件提交为任务(请参见下图)。

_images/runner_diagram.png

跑马函数

torchx.runner.get_runner(name: Optional[str] = None, component_defaults: Optional[Dict[str, Dict[str, str]]] = None, **scheduler_params: Any) Runner[source]

方便的方法来构造和获取一个Runner对象。使用:

with get_runner() as runner:
  app_handle = runner.run(component(args), scheduler="kubernetes", runcfg)
  print(runner.status(app_handle))

或者

runner = get_runner()
try:
   app_handle = runner.run(component(args), scheduler="kubernetes", runcfg)
   print(runner.status(app_handle))
finally:
   runner.close()
Parameters:
  • 名称 – 可读性较强的名称,将作为所有启动的 任务的一部分被包含。

  • 调度器参数 – 传递给所有可用调度器构造函数的额外参数。

跑者类

class torchx.runner.Runner(name: str, scheduler_factories: Dict[str, SchedulerFactory], component_defaults: Optional[Dict[str, Dict[str, str]]] = None, scheduler_params: Optional[Dict[str, object]] = None)[source]

TorchX 个人组件运行器。具有用户可以操作的方法AppDefsRunner将在本地启动的应用程序中缓存信息,否则由特定的调度器实现决定。

cancel(app_handle: str) None[source]

停止应用程序,有效地将调度器导向取消任务。如果应用程序不存在,则无操作。

注意

这个方法在取消请求提交给调度器后立即返回。应用程序将在RUNNING状态直到调度器实际终止该任务。如果调度器成功中断并终止了任务,最终状态将是CANCELLED;否则,它将处于FAILED状态。

close() None[source]

关闭此跑者并释放/清理任何分配的资源。 递归调用所有调度器上的close()方法。 一旦在跑者上调用了此方法,该跑者对象被视为无效, 并且对跑者对象及其与该跑者相关的调度器上调用的方法 具有未定义的行为。 可以多次在同一跑者对象上调用此方法。

describe(app_handle: str) Optional[AppDef][source]

重建应用(尽可能地)给定应用程序句柄。 请注意,重建的应用可能不是通过运行API提交的完整应用。可以重建的应用部分取决于调度器。

Returns:

如果应用程序不再存在或者调度器不支持描述应用程序句柄,则使用AppDef或None。

dryrun(app: AppDef, scheduler: str, cfg: Optional[Mapping[str, Optional[Union[str, int, float, bool, List[str]]]]] = None, workspace: Optional[str] = None, parent_run_id: Optional[str] = None) AppDryRunInfo[source]

在给定的调度器上进行应用程序的模拟运行,使用提供的运行配置。 不会实际提交应用程序,而是返回将要提交的内容。返回的AppDryRunInfo格式非常漂亮,可以直接打印或记录。

Usage:

dryrun_info = session.dryrun(app, scheduler="local", cfg)
print(dryrun_info)
dryrun_component(component: str, component_args: List[str], scheduler: str, cfg: Optional[Mapping[str, Optional[Union[str, int, float, bool, List[str]]]]] = None, workspace: Optional[str] = None, parent_run_id: Optional[str] = None) AppDryRunInfo[source]

测试版本的 run_component()。不会实际运行组件,但只会返回“会”运行的内容。

list(scheduler: str) List[ListAppResponse][source]

对于在调度器上发布的应用程序,此API返回一个ListAppResponse对象列表,每个对象都包含应用程序ID、应用程序句柄及其状态。 注意:此API处于原型阶段,并且可能会发生变化。

log_lines(app_handle: str, role_name: str, k: int = 0, regex: Optional[str] = None, since: Optional[datetime] = None, until: Optional[datetime] = None, should_tail: bool = False, streams: Optional[Stream] = None) Iterable[str][source]

返回指定工作容器的日志行迭代器。

注意

  1. k 是节点(主机)ID,而不是 rank

  2. sinceuntil 不一定总是需要被尊重(取决于调度器)。

警告

返回迭代器的语义和保证高度依赖于调度器。请参见torchx.specs.api.Scheduler.log_iter了解此日志迭代器的高级语义。因此,强烈建议不要使用此方法生成输出以传递给下游函数/依赖项。此方法不保证返回100%的日志行。如果调度器已经完全或部分清空了应用程序的日志记录,则这种方法完全可以返回零或部分日志行。

返回行将包含空格字符,如\n\r。在输出这些行时,请确保不要添加额外的换行符。

Usage:

app_handle = session.run(app, scheduler="local", cfg=Dict[str, ConfigValue]())

print("== trainer node 0 logs ==")
for line in session.log_lines(app_handle, "trainer", k=0):
   # for prints newlines will already be present in the line
   print(line, end="")

   # when writing to a file nothing extra is necessary
   f.write(line)

被禁止的反模式:

# DO NOT DO THIS!
# parses accuracy metric from log and reports it for this experiment run
accuracy = -1
for line in session.log_lines(app_handle, "trainer", k=0):
   if matches_regex(line, "final model_accuracy:[0-9]*"):
       accuracy = parse_accuracy(line)
       break
report(experiment_name, accuracy)
Parameters:
  • app_handle – 应用程序句柄

  • 角色名称 – 应用程序中的角色(例如:训练器)

  • k – k-th replica of the role to fetch the logs for

  • 正则表达式 – 可选的正则表达式过滤器,如果为空则返回所有行

  • – 基于日期时间的起始游标。如果为空,则从第一条日志行(任务开始)开始。

  • 直到 – 基于日期时间的结束游标。如果为空,则跟随日志输出,直到任务完成且所有日志行都被消费。

Returns:

一个迭代器,遍历指定应用的第k个副本的角色。

Raises:

未知应用异常 – 如果应用程序不在调度器中

run(app: AppDef, scheduler: str, cfg: Optional[Mapping[str, Optional[Union[str, int, float, bool, List[str]]]]] = None, workspace: Optional[str] = None, parent_run_id: Optional[str] = None) str[source]

在指定模式下运行给定的应用程序。

注意

子类应实现 Runner 方法 而不是直接覆盖此方法。

Returns:

一个用于调用应用程序中其他动作API的应用程序处理对象。

run_component(component: str, component_args: List[str], scheduler: str, cfg: Optional[Mapping[str, Optional[Union[str, int, float, bool, List[str]]]]] = None, workspace: Optional[str] = None, parent_run_id: Optional[str] = None) str[source]

运行一个组件。

component has the following resolution order(high to low):
  • User-registered components. Users can register components via

    https://packaging.python.org/specifications/entry-points/. 方法查找 在组 torchx.components 中的入口点。

  • Builtin components relative to torchx.components. The path to the component should

    模块名称相对于 torchx.components,函数名称格式为: $module.$function

  • File-based components in format: $FILE_PATH:FUNCTION_NAME. Both relative and

    支持绝对路径。

Usage:

# resolved to torchx.components.distributed.ddp()
runner.run_component("distributed.ddp", ...)

# resolved to my_component() function in ~/home/components.py
runner.run_component("~/home/components.py:my_component", ...)
Returns:

一个用于调用应用程序中其他动作API的应用程序处理对象

Raises:
  • 组件验证异常 – 如果组件无效。

  • 组件未找到异常 – 如果 component_path 无法解析。

schedule(dryrun_info: AppDryRunInfo) str[source]

实际上,从给定的干运行信息中运行应用程序。 当需要覆盖调度器请求中的参数时非常有用, 该参数无法通过对象API之一进行配置。

警告

谨慎使用,因为滥用这种方法在原始调度请求中覆盖许多参数可能会导致您长期使用TorchX不符合规范。这种方法旨在短期内阻止用户等待TorchX在其API中暴露调度器功能,从而实验某些特定的调度器功能。

注意

建议 Session 的子类实现此方法,而不是直接实现 run 方法。

Usage:

dryrun_info = session.dryrun(app, scheduler="default", cfg)

# overwrite parameter "foo" to "bar"
dryrun_info.request.foo = "bar"

app_handle = session.submit(dryrun_info)
scheduler_backends() List[str][source]

返回所有支持的调度器后端列表。

scheduler_run_opts(scheduler: str) runopts[source]

返回支持的调度器后端的 runopts

Usage:

local_runopts = session.scheduler_run_opts("local_cwd")
print("local scheduler run options: {local_runopts}")
Returns:

指定调度器类型的runopts

status(app_handle: str) Optional[AppStatus][source]
Returns:

应用的状态,或者None如果应用程序已经不存在(例如,在过去停止并从调度器的后端删除)。

stop(app_handle: str) None[source]

查看方法 cancel

警告

此方法将在未来被废弃。它已被 替换为 cancel,该版本提供了相同的功能。 此更改是为了与CLI和调度API保持一致。

wait(app_handle: str, wait_interval: float = 10) Optional[AppStatus][source]

块等待(无限期)直到应用程序完成。 可能的实现:

while(True):
    app_status = status(app)
    if app_status.is_terminal():
        return
    sleep(10)
Parameters:
  • app_handle – 等待完成的应用处理

  • 等待间隔 – 在轮询状态之前等待的最小间隔

Returns:

应用的终端状态,或者None如果应用程序已经不存在

文档

访问 PyTorch 的全面开发人员文档

查看文档

教程

获取面向初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并解答您的问题

查看资源