Airflow¶
对于支持基于 Python 执行的管道,您可以直接使用 TorchX API。TorchX 旨在通过编程 API 轻松集成到其他应用程序中。无需特别的 Airflow 集成。
使用TorchX,您可以使用Airflow进行管道编排,并在远程GPU集群上运行您的PyTorch应用程序(例如分布式训练)。
[1]:
import datetime
import pendulum
from airflow.utils.state import DagRunState, TaskInstanceState
from airflow.utils.types import DagRunType
from airflow.models.dag import DAG
from airflow.decorators import task
DATA_INTERVAL_START = pendulum.datetime(2021, 9, 13, tz="UTC")
DATA_INTERVAL_END = DATA_INTERVAL_START + datetime.timedelta(days=1)
要从Airflow启动TorchX作业,您可以创建一个Airflow Python任务来导入运行器,启动作业并等待其完成。如果您在远程集群上运行,则可能需要使用virtualenv任务来安装torchx包。
[2]:
@task(task_id=f'hello_torchx')
def run_torchx(message):
"""This is a function that will run within the DAG execution"""
from torchx.runner import get_runner
with get_runner() as runner:
# Run the utils.sh component on the local_cwd scheduler.
app_id = runner.run_component(
"utils.sh",
["echo", message],
scheduler="local_cwd",
)
# Wait for the the job to complete
status = runner.wait(app_id, wait_interval=1)
# Raise_for_status will raise an exception if the job didn't succeed
status.raise_for_status()
# Finally we can print all of the log lines from the TorchX job so it
# will show up in the workflow logs.
for line in runner.log_lines(app_id, "sh", k=0):
print(line, end="")
一旦我们定义了任务,就可以将其放入一个 Airflow 有向无环图 (DAG) 中并像平常一样运行它。
[3]:
from torchx.schedulers.ids import make_unique
with DAG(
dag_id=make_unique('example_python_operator'),
schedule_interval=None,
start_date=DATA_INTERVAL_START,
catchup=False,
tags=['example'],
) as dag:
run_job = run_torchx("Hello, TorchX!")
dagrun = dag.create_dagrun(
state=DagRunState.RUNNING,
execution_date=DATA_INTERVAL_START,
data_interval=(DATA_INTERVAL_START, DATA_INTERVAL_END),
start_date=DATA_INTERVAL_END,
run_type=DagRunType.MANUAL,
)
ti = dagrun.get_task_instance(task_id="hello_torchx")
ti.task = dag.get_task(task_id="hello_torchx")
ti.run(ignore_ti_state=True)
assert ti.state == TaskInstanceState.SUCCESS
/var/folders/ck/dsbpg4794mn2nfwc_wr5_fn40000gn/T/ipykernel_94011/454499020.py:3 RemovedInAirflow3Warning: Param `schedule_interval` is deprecated and will be removed in a future release. Please use `schedule` instead.
[2022-10-27 17:34:37,852] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: example_python_operator-z1h7912f7w41vd.hello_torchx manual__2021-09-13T00:00:00+00:00 [None]>
[2022-10-27 17:34:37,859] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: example_python_operator-z1h7912f7w41vd.hello_torchx manual__2021-09-13T00:00:00+00:00 [None]>
[2022-10-27 17:34:37,860] {taskinstance.py:1362} INFO -
--------------------------------------------------------------------------------
[2022-10-27 17:34:37,861] {taskinstance.py:1363} INFO - Starting attempt 1 of 1
[2022-10-27 17:34:37,861] {taskinstance.py:1364} INFO -
--------------------------------------------------------------------------------
[2022-10-27 17:34:37,880] {taskinstance.py:1383} INFO - Executing <Task(_PythonDecoratedOperator): hello_torchx> on 2021-09-13 00:00:00+00:00
[2022-10-27 17:34:38,282] {taskinstance.py:1590} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=example_python_operator-z1h7912f7w41vd
AIRFLOW_CTX_TASK_ID=hello_torchx
AIRFLOW_CTX_EXECUTION_DATE=2021-09-13T00:00:00+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-09-13T00:00:00+00:00
[2022-10-27 17:34:39,536] {local_scheduler.py:715} INFO - Log directory not set in scheduler cfg. Creating a temporary log dir that will be deleted on exit. To preserve log directory set the `log_dir` cfg option
[2022-10-27 17:34:39,537] {local_scheduler.py:721} INFO - Log directory is: /var/folders/ck/dsbpg4794mn2nfwc_wr5_fn40000gn/T/torchx_eutnzwj3
Hello, TorchX!
[2022-10-27 17:34:40,584] {python.py:177} INFO - Done. Returned value was: None
[2022-10-27 17:34:40,591] {taskinstance.py:1401} INFO - Marking task as SUCCESS. dag_id=example_python_operator-z1h7912f7w41vd, task_id=hello_torchx, execution_date=20210913T000000, start_date=20221028T003437, end_date=20221028T003440
如果一切顺利,你应该会看到Hello, TorchX!被打印在上面。
下一步¶
查看 runner API文档 以了解更多关于程序化使用TorchX的信息
浏览内置组件集合,这些组件可以用于您的Airflow管道