快速入门¶
这是一份关于如何编写简单应用程序并开始在本地和远程集群上启动分布式作业的自包含指南。
安装¶
我们需要做的第一件事是安装 TorchX python 包,其中包括 CLI 和库。
# install torchx with all dependencies
$ pip install torchx[dev]
有关安装的更多信息,请参阅 README。
[1]:
%%sh
torchx --help
usage: torchx [-h] [--log_level LOG_LEVEL] [--version]
{builtins,cancel,configure,describe,log,run,runopts,status} ...
torchx CLI
optional arguments:
-h, --help show this help message and exit
--log_level LOG_LEVEL
Python logging log level
--version show program's version number and exit
sub-commands:
Use the following commands to run operations, e.g.: torchx run ${JOB_NAME}
{builtins,cancel,configure,describe,log,run,runopts,status}
世界您好¶
让我们从编写一个简单的 “Hello World” python 应用程序开始。这只是一个普通的 python 程序,可以包含您想要的任何内容。
注意
此示例使用 Jupyter Notebook 创建本地文件以用于示例目的。在正常使用情况下,您可以将这些文件作为独立文件。%%writefile
[2]:
%%writefile my_app.py
import sys
print(f"Hello, {sys.argv[1]}!")
Overwriting my_app.py
发射¶
我们可以通过以下方式执行我们的应用程序。调度程序相对于当前目录执行应用程序。torchx run
local_cwd
为此,我们将使用组件:utils.python
[3]:
%%sh
torchx run --scheduler local_cwd utils.python --help
usage: torchx run <run args...> python [--help] [-m M] [-c C]
[--script SCRIPT] [--image IMAGE]
[--name NAME] [--cpu CPU] [--gpu GPU]
[--memMB MEMMB] [-h H]
[--num_replicas NUM_REPLICAS]
...
Runs ``python`` with the specified module, command or script on the specified
...
positional arguments:
args arguments passed to the program in sys.argv[1:]
(ignored with `--c`) (required)
optional arguments:
--help show this help message and exit
-m M, --m M run library module as a script (default: None)
-c C, --c C program passed as string (may error if scheduler has a
length limit on args) (default: None)
--script SCRIPT .py script to run (default: None)
--image IMAGE image to run on (default:
ghcr.io/pytorch/torchx:0.2.0dev0)
--name NAME name of the job (default: torchx_utils_python)
--cpu CPU number of cpus per replica (default: 1)
--gpu GPU number of gpus per replica (default: 0)
--memMB MEMMB cpu memory in MB per replica (default: 1024)
-h H, --h H a registered named resource (if specified takes
precedence over cpu, gpu, memMB) (default: None)
--num_replicas NUM_REPLICAS
number of copies to run (each on its own container)
(default: 1)
该组件接受脚本名称,任何额外的参数都将传递给脚本本身。
[4]:
%%sh
torchx run --scheduler local_cwd utils.python --script my_app.py "your name"
torchx 2022-06-15 23:08:27 INFO Log directory not set in scheduler cfg. Creating a temporary log dir that will be deleted on exit. To preserve log directory set the `log_dir` cfg option
torchx 2022-06-15 23:08:27 INFO Log directory is: /tmp/torchx_ja7y1bix
torchx 2022-06-15 23:08:27 INFO Waiting for the app to finish...
python/0 Hello, your name!
torchx 2022-06-15 23:08:28 INFO Job finished: SUCCEEDED
local_cwd://torchx/torchx_utils_python-bbwk39djr43s9
我们可以通过调度器运行完全相同的应用程序。此调度程序会将本地工作区打包为指定映像之上的层。这提供了与基于容器的远程调度器非常相似的环境。local_docker
注意
这需要安装 Docker,并且在 Google Colab 等环境中不起作用。请参阅 Docker 安装说明:https://docs.docker.com/get-docker/
[5]:
%%sh
torchx run --scheduler local_docker utils.python --script my_app.py "your name"
torchx 2022-06-15 23:08:30 INFO Checking for changes in workspace `file:///home/runner/work/torchx/torchx/docs/source`...
torchx 2022-06-15 23:08:30 INFO To disable workspaces pass: --workspace="" from CLI or workspace=None programmatically.
torchx 2022-06-15 23:08:38 INFO Built new image `sha256:bc3727e4471a6b4ac192e425272305e3d563a871520fa6e57be2fc8e46071ce5` based on original image `ghcr.io/pytorch/torchx:0.2.0dev0` and changes in workspace `file:///home/runner/work/torchx/torchx/docs/source` for role[0]=python.
torchx 2022-06-15 23:08:39 INFO Waiting for the app to finish...
python/0 Hello, your name!
torchx 2022-06-15 23:08:40 INFO Job finished: SUCCEEDED
local_docker://torchx/torchx_utils_python-qdrhsjb96ftdld
TorchX 默认使用 ghcr.io/pytorch/torchx Docker 容器镜像,其中包含 PyTorch 库、TorchX 和相关依赖项。
分散式¶
TorchX 的组件使用 TorchElastic 来管理 worker。这意味着您可以在我们支持的所有调度程序上开箱即用地启动多工作线程和多主机作业。dist.ddp
[6]:
%%sh
torchx run --scheduler local_docker dist.ddp --help
usage: torchx run <run args...> ddp [--help] [--script SCRIPT] [-m M]
[--image IMAGE] [--name NAME] [-h H]
[--cpu CPU] [--gpu GPU] [--memMB MEMMB]
[-j J] [--env ENV]
[--max_retries MAX_RETRIES]
[--rdzv_port RDZV_PORT] [--mounts MOUNTS]
...
Distributed data parallel style application (one role, multi-replica). ...
positional arguments:
script_args arguments to the main module (required)
optional arguments:
--help show this help message and exit
--script SCRIPT script or binary to run within the image (default:
None)
-m M, --m M the python module path to run (default: None)
--image IMAGE image (e.g. docker) (default:
ghcr.io/pytorch/torchx:0.2.0dev0)
--name NAME job name override (uses the script name if not
specified) (default: None)
-h H, --h H a registered named resource (if specified takes
precedence over cpu, gpu, memMB) (default: None)
--cpu CPU number of cpus per replica (default: 2)
--gpu GPU number of gpus per replica (default: 0)
--memMB MEMMB cpu memory in MB per replica (default: 1024)
-j J, --j J {nnodes}x{nproc_per_node}, for gpu hosts,
nproc_per_node must not exceed num gpus (default: 1x2)
--env ENV environment varibles to be passed to the run (e.g.
ENV1=v1,ENV2=v2,ENV3=v3) (default: None)
--max_retries MAX_RETRIES
the number of scheduler retries allowed (default: 0)
--rdzv_port RDZV_PORT
the port on rank0's host to use for hosting the c10d
store used for rendezvous. Only takes effect when
running multi-node. When running single node, this
parameter is ignored and a random free port is chosen.
(default: 29500)
--mounts MOUNTS mounts to mount into the worker environment/container
(ex.
type=<bind/volume>,src=/host,dst=/job[,readonly]). See
scheduler documentation for more info. (default: None)
让我们创建一个稍微有趣的应用程序来利用 TorchX 分布式支持。
[7]:
%%writefile dist_app.py
import torch
import torch.distributed as dist
dist.init_process_group(backend="gloo")
print(f"I am worker {dist.get_rank()} of {dist.get_world_size()}!")
a = torch.tensor([dist.get_rank()])
dist.all_reduce(a)
print(f"all_reduce output = {a}")
Writing dist_app.py
让我们启动一个小型作业,每个节点有 2 个节点和 2 个工作进程:
[8]:
%%sh
torchx run --scheduler local_docker dist.ddp -j 2x2 --script dist_app.py
torchx 2022-06-15 23:08:44 INFO Checking for changes in workspace `file:///home/runner/work/torchx/torchx/docs/source`...
torchx 2022-06-15 23:08:44 INFO To disable workspaces pass: --workspace="" from CLI or workspace=None programmatically.
torchx 2022-06-15 23:08:52 INFO Built new image `sha256:a9fc543e27f7d6e611011bc77ddddc9304c438979e4732b1dd4ae0c78de2486e` based on original image `ghcr.io/pytorch/torchx:0.2.0dev0` and changes in workspace `file:///home/runner/work/torchx/torchx/docs/source` for role[0]=dist_app.
torchx 2022-06-15 23:08:54 INFO Waiting for the app to finish...
dist_app/0 WARNING:__main__:
dist_app/0 *****************************************
dist_app/0 Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed.
dist_app/0 *****************************************
dist_app/1 WARNING:__main__:
dist_app/1 *****************************************
dist_app/1 Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed.
dist_app/1 *****************************************
dist_app/0 [0]:I am worker 0 of 4!
dist_app/0 [1]:I am worker 1 of 4!
dist_app/0 [0]:all_reduce output = tensor([6])
dist_app/0 [1]:all_reduce output = tensor([6])
dist_app/1 [0]:I am worker 2 of 4!
dist_app/1 [0]:all_reduce output = tensor([6])
dist_app/1 [1]:I am worker 3 of 4!
dist_app/1 [1]:all_reduce output = tensor([6])
torchx 2022-06-15 23:09:01 INFO Job finished: SUCCEEDED
local_docker://torchx/dist_app-gg5lvnbw5v9tpc
工作区 / 修补¶
对于每个调度器,都有一个 .for 和 ,它使用当前工作目录。对于基于容器的调度程序,例如 ,它使用 docker 容器。image
local_cwd
slurm
local_docker
kubernetes
aws_batch
为了在本地和远程作业之间提供相同的环境,TorchX CLI 使用工作区根据每个计划程序自动修补远程作业的映像。
当您通过它启动任务时,它会将当前目录叠加在提供的图像之上,以便您的代码在启动的任务中可用。torchx run
对于基于调度程序的调度程序,您需要一个本地 docker 守护程序来构建镜像并将其推送到远程 docker 存储库。docker
.torchxconfig
¶
调度程序的参数可以通过命令行标志指定,也可以通过文件按调度程序指定。torchx run -s <scheduler> -c <args>
.torchxconfig
[9]:
%%writefile .torchxconfig
[kubernetes]
queue=torchx
image_repo=<your docker image repository>
[slurm]
partition=torchx
Writing .torchxconfig
远程调度程序¶
TorchX 支持大量的调度器。没有看到您的?请求它!
远程调度程序的运行方式与本地调度程序完全相同。相同的 local run 命令在 remote 上开箱即用。
$ torchx run --scheduler slurm dist.ddp -j 2x2 --script dist_app.py
$ torchx run --scheduler kubernetes dist.ddp -j 2x2 --script dist_app.py
$ torchx run --scheduler aws_batch dist.ddp -j 2x2 --script dist_app.py
$ torchx run --scheduler ray dist.ddp -j 2x2 --script dist_app.py
根据调度程序,可能会有一些额外的配置参数,以便 TorchX 知道在何处运行作业并上传构建的镜像。这些可以通过 或在文件中设置。-c
.torchxconfig
所有配置选项:
[10]:
%%sh
torchx runopts
local_docker:
usage:
[copy_env=COPY_ENV]
optional arguments:
copy_env=COPY_ENV (typing.List[str], None)
list of glob patterns of environment variables to copy if not set in AppDef. Ex: FOO_*
local_cwd:
usage:
[log_dir=LOG_DIR],[prepend_cwd=PREPEND_CWD],[auto_set_cuda_visible_devices=AUTO_SET_CUDA_VISIBLE_DEVICES]
optional arguments:
log_dir=LOG_DIR (str, None)
dir to write stdout/stderr log files of replicas
prepend_cwd=PREPEND_CWD (bool, False)
if set, prepends CWD to replica's PATH env var making any binaries in CWD take precedence over those in PATH
auto_set_cuda_visible_devices=AUTO_SET_CUDA_VISIBLE_DEVICES (bool, False)
sets the `CUDA_AVAILABLE_DEVICES` for roles that request GPU resources. Each role replica will be assigned one GPU. Does nothing if the device count is less than replicas.
slurm:
usage:
[partition=PARTITION],[time=TIME],[comment=COMMENT],[constraint=CONSTRAINT],[mail-user=MAIL-USER],[mail-type=MAIL-TYPE],[job_dir=JOB_DIR]
optional arguments:
partition=PARTITION (str, None)
The partition to run the job in.
time=TIME (str, None)
The maximum time the job is allowed to run for. Formats: "minutes", "minutes:seconds", "hours:minutes:seconds", "days-hours", "days-hours:minutes" or "days-hours:minutes:seconds"
comment=COMMENT (str, None)
Comment to set on the slurm job.
constraint=CONSTRAINT (str, None)
Constraint to use for the slurm job.
mail-user=MAIL-USER (str, None)
User to mail on job end.
mail-type=MAIL-TYPE (str, None)
What events to mail users on.
job_dir=JOB_DIR (str, None)
The directory to place the job code and outputs. The
directory must not exist and will be created. To enable log
iteration, jobs will be tracked in ``.torchxslurmjobdirs``.
kubernetes:
usage:
queue=QUEUE,[namespace=NAMESPACE],[image_repo=IMAGE_REPO],[service_account=SERVICE_ACCOUNT],[priority_class=PRIORITY_CLASS]
required arguments:
queue=QUEUE (str)
Volcano queue to schedule job in
optional arguments:
namespace=NAMESPACE (str, default)
Kubernetes namespace to schedule job in
image_repo=IMAGE_REPO (str, None)
The image repository to use when pushing patched images, must have push access. Ex: example.com/your/container
service_account=SERVICE_ACCOUNT (str, None)
The service account name to set on the pod specs
priority_class=PRIORITY_CLASS (str, None)
The name of the PriorityClass to set on the job specs
aws_batch:
usage:
queue=QUEUE,[image_repo=IMAGE_REPO]
required arguments:
queue=QUEUE (str)
queue to schedule job in
optional arguments:
image_repo=IMAGE_REPO (str, None)
The image repository to use when pushing patched images, must have push access. Ex: example.com/your/container
ray:
usage:
[cluster_config_file=CLUSTER_CONFIG_FILE],[cluster_name=CLUSTER_NAME],[dashboard_address=DASHBOARD_ADDRESS],[requirements=REQUIREMENTS]
optional arguments:
cluster_config_file=CLUSTER_CONFIG_FILE (str, None)
Use CLUSTER_CONFIG_FILE to access or create the Ray cluster.
cluster_name=CLUSTER_NAME (str, None)
Override the configured cluster name.
dashboard_address=DASHBOARD_ADDRESS (str, 127.0.0.1:8265)
Use ray status to get the dashboard address you will submit jobs against
requirements=REQUIREMENTS (str, None)
Path to requirements.txt
自定义图像¶
基于 Docker 的计划程序¶
如果您需要的标准 PyTorch 库以外的更多内容,您可以添加自定义 Dockerfile 或构建自己的 Docker 容器,并将其用作 TorchX 作业的基础映像。
[11]:
%%writefile timm_app.py
import timm
print(timm.models.resnet18())
Writing timm_app.py
[12]:
%%writefile Dockerfile.torchx
FROM pytorch/pytorch:1.10.0-cuda11.3-cudnn8-runtime
RUN pip install timm
COPY . .
Writing Dockerfile.torchx
创建 Dockerfile 后,我们可以正常启动,TorchX 将使用新提供的 Dockerfile 而不是默认 Dockerfile 自动构建镜像。
[13]:
%%sh
torchx run --scheduler local_docker utils.python --script timm_app.py "your name"
torchx 2022-06-15 23:09:03 INFO loaded configs from /home/runner/work/torchx/torchx/docs/source/.torchxconfig
torchx 2022-06-15 23:09:04 INFO Checking for changes in workspace `file:///home/runner/work/torchx/torchx/docs/source`...
torchx 2022-06-15 23:09:04 INFO To disable workspaces pass: --workspace="" from CLI or workspace=None programmatically.
torchx 2022-06-15 23:10:50 INFO Built new image `sha256:265a71efcd86f4ea69bacd75c22760f7d0b38f911179a34f18fe5e662fffa059` based on original image `ghcr.io/pytorch/torchx:0.2.0dev0` and changes in workspace `file:///home/runner/work/torchx/torchx/docs/source` for role[0]=python.
torchx 2022-06-15 23:10:51 INFO Waiting for the app to finish...
python/0 ResNet(
python/0 (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
python/0 (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
python/0 (act1): ReLU(inplace=True)
python/0 (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
python/0 (layer1): Sequential(
python/0 (0): BasicBlock(
python/0 (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
python/0 (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
python/0 (act1): ReLU(inplace=True)
python/0 (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
python/0 (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
python/0 (act2): ReLU(inplace=True)
python/0 )
python/0 (1): BasicBlock(
python/0 (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
python/0 (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
python/0 (act1): ReLU(inplace=True)
python/0 (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
python/0 (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
python/0 (act2): ReLU(inplace=True)
python/0 )
python/0 )
python/0 (layer2): Sequential(
python/0 (0): BasicBlock(
python/0 (conv1): Conv2d(64, 128, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
python/0 (bn1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
python/0 (act1): ReLU(inplace=True)
python/0 (conv2): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
python/0 (bn2): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
python/0 (act2): ReLU(inplace=True)
python/0 (downsample): Sequential(
python/0 (0): Conv2d(64, 128, kernel_size=(1, 1), stride=(2, 2), bias=False)
python/0 (1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
python/0 )
python/0 )
python/0 (1): BasicBlock(
python/0 (conv1): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
python/0 (bn1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
python/0 (act1): ReLU(inplace=True)
python/0 (conv2): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
python/0 (bn2): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
python/0 (act2): ReLU(inplace=True)
python/0 )
python/0 )
python/0 (layer3): Sequential(
python/0 (0): BasicBlock(
python/0 (conv1): Conv2d(128, 256, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
python/0 (bn1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
python/0 (act1): ReLU(inplace=True)
python/0 (conv2): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
python/0 (bn2): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
python/0 (act2): ReLU(inplace=True)
python/0 (downsample): Sequential(
python/0 (0): Conv2d(128, 256, kernel_size=(1, 1), stride=(2, 2), bias=False)
python/0 (1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
python/0 )
python/0 )
python/0 (1): BasicBlock(
python/0 (conv1): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
python/0 (bn1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
python/0 (act1): ReLU(inplace=True)
python/0 (conv2): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
python/0 (bn2): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
python/0 (act2): ReLU(inplace=True)
python/0 )
python/0 )
python/0 (layer4): Sequential(
python/0 (0): BasicBlock(
python/0 (conv1): Conv2d(256, 512, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
python/0 (bn1): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
python/0 (act1): ReLU(inplace=True)
python/0 (conv2): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
python/0 (bn2): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
python/0 (act2): ReLU(inplace=True)
python/0 (downsample): Sequential(
python/0 (0): Conv2d(256, 512, kernel_size=(1, 1), stride=(2, 2), bias=False)
python/0 (1): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
python/0 )
python/0 )
python/0 (1): BasicBlock(
python/0 (conv1): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
python/0 (bn1): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
python/0 (act1): ReLU(inplace=True)
python/0 (conv2): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
python/0 (bn2): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
python/0 (act2): ReLU(inplace=True)
python/0 )
python/0 )
python/0 (global_pool): SelectAdaptivePool2d (pool_type=avg, flatten=Flatten(start_dim=1, end_dim=-1))
python/0 (fc): Linear(in_features=512, out_features=1000, bias=True)
python/0 )
torchx 2022-06-15 23:10:53 INFO Job finished: SUCCEEDED
local_docker://torchx/torchx_utils_python-vzr2rg6kj022sc
Slurm¶
和 使用当前环境,以便您可以正常使用 和。slurm
local_cwd
pip
conda
后续步骤¶
查看 torchx CLI 的其他功能
查看 Runner 支持的调度程序列表
浏览内置组件的集合
查看您可以在哪些 ML 管道平台上运行组件
查看培训应用程序示例