Airflow¶
For pipelines that support Python based execution you can directly use the TorchX API. TorchX is designed to be easily integrated in to other applications via the programmatic API. No special Airflow integrations are needed.
With TorchX, you can use Airflow for the pipeline orchestration and run your PyTorch application (i.e. distributed training) on a remote GPU cluster.
[1]:
import datetime
import pendulum
from airflow.utils.state import DagRunState, TaskInstanceState
from airflow.utils.types import DagRunType
from airflow.models.dag import DAG
from airflow.decorators import task
DATA_INTERVAL_START = pendulum.datetime(2021, 9, 13, tz="UTC")
DATA_INTERVAL_END = DATA_INTERVAL_START + datetime.timedelta(days=1)
/opt/hostedtoolcache/Python/3.8.16/x64/lib/python3.8/site-packages/airflow/models/base.py:49 MovedIn20Warning: Deprecated API features detected! These feature(s) are not compatible with SQLAlchemy 2.0. To prevent incompatible upgrades prior to updating applications, ensure requirements files are pinned to "sqlalchemy<2.0". Set environment variable SQLALCHEMY_WARN_20=1 to show all deprecation warnings. Set environment variable SQLALCHEMY_SILENCE_UBER_WARNING=1 to silence this message. (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)
To launch a TorchX job from Airflow you can create a Airflow Python task to import the runner, launch the job and wait for it to complete. If you’re running on a remote cluster you may need to use the virtualenv task to install the torchx
package.
[2]:
@task(task_id=f'hello_torchx')
def run_torchx(message):
"""This is a function that will run within the DAG execution"""
from torchx.runner import get_runner
with get_runner() as runner:
# Run the utils.sh component on the local_cwd scheduler.
app_id = runner.run_component(
"utils.sh",
["echo", message],
scheduler="local_cwd",
)
# Wait for the the job to complete
status = runner.wait(app_id, wait_interval=1)
# Raise_for_status will raise an exception if the job didn't succeed
status.raise_for_status()
# Finally we can print all of the log lines from the TorchX job so it
# will show up in the workflow logs.
for line in runner.log_lines(app_id, "sh", k=0):
print(line, end="")
Once we have the task defined we can put it into a Airflow DAG and run it like normal.
[3]:
from torchx.schedulers.ids import make_unique
with DAG(
dag_id=make_unique('example_python_operator'),
schedule_interval=None,
start_date=DATA_INTERVAL_START,
catchup=False,
tags=['example'],
) as dag:
run_job = run_torchx("Hello, TorchX!")
dagrun = dag.create_dagrun(
state=DagRunState.RUNNING,
execution_date=DATA_INTERVAL_START,
data_interval=(DATA_INTERVAL_START, DATA_INTERVAL_END),
start_date=DATA_INTERVAL_END,
run_type=DagRunType.MANUAL,
)
ti = dagrun.get_task_instance(task_id="hello_torchx")
ti.task = dag.get_task(task_id="hello_torchx")
ti.run(ignore_ti_state=True)
assert ti.state == TaskInstanceState.SUCCESS
/tmp/ipykernel_3999/454499020.py:3 RemovedInAirflow3Warning: Param `schedule_interval` is deprecated and will be removed in a future release. Please use `schedule` instead.
[2023-04-03 21:04:35,239] {taskinstance.py:1090} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: example_python_operator-pt95sg6zn0jhdc.hello_torchx manual__2021-09-13T00:00:00+00:00 [None]>
[2023-04-03 21:04:35,246] {taskinstance.py:1090} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: example_python_operator-pt95sg6zn0jhdc.hello_torchx manual__2021-09-13T00:00:00+00:00 [None]>
[2023-04-03 21:04:35,246] {taskinstance.py:1288} INFO -
--------------------------------------------------------------------------------
[2023-04-03 21:04:35,247] {taskinstance.py:1289} INFO - Starting attempt 1 of 1
[2023-04-03 21:04:35,248] {taskinstance.py:1290} INFO -
--------------------------------------------------------------------------------
[2023-04-03 21:04:35,260] {taskinstance.py:1309} INFO - Executing <Task(_PythonDecoratedOperator): hello_torchx> on 2021-09-13 00:00:00+00:00
[2023-04-03 21:04:35,605] {taskinstance.py:1516} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=example_python_operator-pt95sg6zn0jhdc
AIRFLOW_CTX_TASK_ID=hello_torchx
AIRFLOW_CTX_EXECUTION_DATE=2021-09-13T00:00:00+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-09-13T00:00:00+00:00
[2023-04-03 21:04:36,097] {api.py:70} INFO - Tracker configurations: {}
[2023-04-03 21:04:36,102] {local_scheduler.py:715} INFO - Log directory not set in scheduler cfg. Creating a temporary log dir that will be deleted on exit. To preserve log directory set the `log_dir` cfg option
[2023-04-03 21:04:36,102] {local_scheduler.py:721} INFO - Log directory is: /tmp/torchx_jj070zvt
Hello, TorchX!
[2023-04-03 21:04:36,213] {python.py:177} INFO - Done. Returned value was: None
[2023-04-03 21:04:36,221] {taskinstance.py:1327} INFO - Marking task as SUCCESS. dag_id=example_python_operator-pt95sg6zn0jhdc, task_id=hello_torchx, execution_date=20210913T000000, start_date=20230403T210435, end_date=20230403T210436
If all goes well you should see Hello, TorchX!
printed above.
Next Steps¶
Checkout the runner API documentation to learn more about programmatic usage of TorchX
Browse through the collection of builtin components which can be used in your Airflow pipeline