torchx.runner 中¶
运行程序允许您在其中一个受支持的计划程序上将组件作为独立作业运行。运行程序采用一个对象,该对象
是使用一组用户提供的
参数,以及调度程序名称和调度程序参数(又名或 )
并将组件作为作业提交(见下图)。specs.AppDef
runcfg
runopts

Runner 函数¶
- torchx.runner 中。get_runner(name: Optional[str] = None, component_defaults: Optional[Dict[str, Dict[str, str]]] = 无, **scheduler_params: 任意)Runner [来源]¶
构造和获取 Runner 对象的便捷方法。用法:
with get_runner() as runner: app_handle = runner.run(component(args), scheduler="kubernetes", runcfg) print(runner.status(app_handle))
或者
runner = get_runner() try: app_handle = runner.run(component(args), scheduler="kubernetes", runcfg) print(runner.status(app_handle)) finally: runner.close()
- 参数
name – 将作为 All Launched 的一部分包含的人类可读名称 工作。
scheduler_params – 将传递给构造函数的额外参数 的所有可用调度程序。
Runner 类¶
- 类 torchx.runner 中。Runner(名称:str,scheduler_factories:Dict[str, SchedulerFactory],component_defaults: 可选[Dict[str, Dict[str, str]]] = 无,scheduler_params:可选[Dict[str, object]] = 无)[来源]¶
TorchX 单个组件运行程序。具有用户 作用于 。将缓存有关 启动的应用程序(如果它们是在本地启动的),否则取决于 特定的调度程序实现。
AppDefs
Runner
- cancel(app_handle: str) None [来源]¶
停止应用程序,有效地指示调度程序取消 工作。如果应用程序不存在,则不执行任何作。
注意
此方法在取消请求后立即返回 已提交到调度程序。应用程序将处于某种状态,直到调度程序实际终止 工作。如果调度程序成功中断作业 并终止它,则最终状态将为 ,否则它将是 。
RUNNING
CANCELLED
FAILED
- close() None [来源]¶
关闭此运行器并释放/清理任何分配的资源。 在所有调度器上传递调用该方法。 在 runner 上调用此方法后,将 runner 对象视为 invalid 以及对 Runner 对象调用的任何方法以及 与此运行程序关联的调度程序具有未定义的行为。 可以对同一个 runner 对象多次调用此方法。
close()
- describe(app_handle: str) 可选[AppDef] [源代码]¶
在给定应用程序句柄的情况下(在最佳程度上)重建应用程序。 请注意,重建的应用程序可能不是完整的应用程序 它是通过 run API 提交的。可以重建多少应用程序 取决于调度程序。
- 返回
AppDef 或 None(如果应用程序不再存在,或者如果 scheduler 不支持描述 app 句柄
- dryrun(app: AppDef, scheduler: str, cfg: 可选[Mapping[str, 可选[Union[str, int, float, bool, List[str]]]]] = 无,工作区:可选[str] = 无,parent_run_id:可选[str] = 无) AppDryRunInfo [源]¶
Dry 使用提供的运行配置在给定的调度器上运行应用程序。 实际上并未提交应用程序,而是返回本应提交的内容 提交。返回的 is formatted and can 直接打印或记录。
AppDryRunInfo
用法:
dryrun_info = session.dryrun(app, scheduler="local", cfg) print(dryrun_info)
- dryrun_component(组件: str, component_args: List[str], 调度程序: str, cfg: 可选[Mapping[str, 可选[Union[str, int, float, bool, List[str]]]]] = 无,工作区:可选[str] = 无,parent_run_id:可选[str] = 无)AppDryRunInfo [来源]¶
Dryrun 版本的
run_component()
.实际上不会运行 组件,但只返回 “Wold” 本来运行的内容。
- list(scheduler: str) List[ListAppResponse] [来源]¶
对于在调度程序上启动的应用程序,此 API 返回 ListAppResponse 列表 每个对象都有 App ID、App Handle 及其 Status。 注意:此 API 处于原型阶段,可能会发生更改。
- log_lines(app_handle: str, role_name: str, k: int = 0, regex: 可选[str] = 无,因为: 可选[datetime] = 无,直到:可选[日期时间] = 无,should_tail:bool = False,流:可选[Stream] = None) Iterable[str] [来源]¶
返回指定作业容器的日志行的迭代器。
注意
k
是节点(主机)ID 而不是 .rank
since
并且不需要总是遵守 (取决于 scheduler)。until
警告
返回的迭代器的语义和保证是高度的 依赖于 scheduler。有关此日志迭代器的高级语义,请参见。因此 强烈建议不要使用此方法生成输出 传递给下游函数/依赖项。此方法 不保证返回 100% 的日志行。 此方法返回 no 或 partial log 行是完全有效的 如果调度程序已完全或部分清除日志记录 对于应用程序。
torchx.specs.api.Scheduler.log_iter
返回行将包含空格字符,如 或 。输出行时,应确保避免添加 额外的换行符。
\n
\r
用法:
app_handle = session.run(app, scheduler="local", cfg=Dict[str, ConfigValue]()) print("== trainer node 0 logs ==") for line in session.log_lines(app_handle, "trainer", k=0): # for prints newlines will already be present in the line print(line, end="") # when writing to a file nothing extra is necessary f.write(line)
不鼓励的反模式:
# DO NOT DO THIS! # parses accuracy metric from log and reports it for this experiment run accuracy = -1 for line in session.log_lines(app_handle, "trainer", k=0): if matches_regex(line, "final model_accuracy:[0-9]*"): accuracy = parse_accuracy(line) break report(experiment_name, accuracy)
- 参数
app_handle – 应用程序句柄
role_name – 应用程序中的角色(例如,培训师)
k – 要获取其日志的角色的第 k 个副本
regex – 可选的正则表达式过滤器,如果留空,则返回所有行
Since – 基于日期时间的开始游标。如果留空,则从 第一行日志 (Job 开始)。
until – 基于日期时间的结束游标。如果留空,则遵循日志输出 直到作业完成并且所有日志行都已用完。
- 返回
指定应用程序的角色第 k 个副本的迭代器。
- 提高
UnknownAppException – 如果计划程序中不存在该应用程序
- run(app: AppDef, scheduler: str, cfg: 可选[Mapping[str, 可选[Union[str, int, float, bool, List[str]]]]] = 无,工作区:可选[str] = 无,parent_run_id:可选[str] = 无) str [source]¶
在指定模式下运行给定的应用程序。
注意
应该实现方法的子类 而不是直接覆盖此方法。
Runner
schedule
- 返回
用于在应用程序上调用其他作 API 的应用程序句柄。
- run_component(组件: str, component_args: List[str], 调度程序: str, cfg: 可选[Mapping[str, 可选[Union[str, int、 float, bool, List[str]]]]] = 无, 工作区:可选[str] = 无,parent_run_id:可选[str] = 无) str [来源]¶
运行组件。
component
具有以下解析顺序(从高到低):- 用户注册的组件。用户可以通过
https://packaging.python.org/specifications/entry-points/。方法查找 组中的入口点。
torchx.components
- 相对于 torchx.components 的内置组件。组件的路径应
是相对于 torchx.components 的模块名称和函数名称,格式为: 。
$module.$function
- 格式为: .relative 和
$FILE_PATH:FUNCTION_NAME
支持的绝对路径。
- 格式为: .relative 和
用法:
# resolved to torchx.components.distributed.ddp() runner.run_component("distributed.ddp", ...) # resolved to my_component() function in ~/home/components.py runner.run_component("~/home/components.py:my_component", ...)
- 返回
用于在应用程序上调用其他作 API 的应用程序句柄
- 提高
ComponentValidationException – 如果组件无效。
ComponentNotFoundException – 如果 无法解决。
component_path
- schedule(dryrun_info: AppDryRunInfo) str [来源]¶
实际上,从给定的 dryrun 信息运行应用程序。 当需要覆盖调度器中的参数时很有用 请求,该请求无法从其中一个对象 API 进行配置。
警告
谨慎使用,因为滥用此方法来覆盖 原始调度程序请求中的许多参数可能会 导致您对 TorchX 的使用不合规 从长远来看。此方法旨在 取消阻止用户尝试某些 短期内没有 Scheduler 特定的功能 必须等到 TorchX 公开调度器功能 在其 API 中。
注意
建议 的子类实现 此方法,而不是直接实现该方法。
Session
run
用法:
dryrun_info = session.dryrun(app, scheduler="default", cfg) # overwrite parameter "foo" to "bar" dryrun_info.request.foo = "bar" app_handle = session.submit(dryrun_info)
- scheduler_run_opts(调度程序:str) runopts [来源]¶
返回支持的调度程序后端的 。
runopts
用法:
local_runopts = session.scheduler_run_opts("local_cwd") print("local scheduler run options: {local_runopts}")
- 返回
对于指定的调度程序类型。
runopts