Distributed¶
Torchx helps you to run your distributed trainer jobs. Check out torchx.components.train
on the example of running single trainer job. Here we will be using
the same Trainer App Example.
but will run it in a distributed manner.
Torchx uses Torch distributed run to launch user processes and expects that user applications will be written in Distributed data parallel manner
Distributed Trainer Execution¶
In order to run your trainer on a single or multiple processes, remotely or locally, all you need to do is to write a distributed torchx component. The example that we will be using here is Distributed Trainer Component
The component defines how the user application is launched and torchx will take care of translating this into scheduler-specific definitions.
Single node, multiple trainers (desktop)¶
Try launching a single node, multiple trainers example on your desktop:
torchx run -s local_cwd ./torchx/examples/apps/lightning_classy_vision/component.py:trainer_dist --nnodes 1 --nproc_per_node 2 --rdzv_backend c10d --rdzv_endpoint localhost:29500
The ./torchx/examples/apps/lightning_classy_vision/component.py:trainer_dist is reference to the component function: Distributed Trainer Component
Single node, multiple trainers (kubernetes)¶
Now lets launch the same component on the kubernetes cluster.
Check out torchx.schedulers.kubernetes_scheduler
on dependencies that needs to be installed
before running using kubernetes scheduler.
We can use the following cmd to launch application on kubernetes:
torchx run -s kubernetes --scheduler_args namespace=default,queue=default ./torchx/examples/apps/lightning_classy_vision/component.py:trainer_dist --nnodes 1 --nproc_per_node 2
The namespaces arg corresponds to the kubernetes namespace that you want to launch. The queue arg is the volcano queue.
Example of output:
kubernetes://torchx/default:cv-trainer-pa2a7qgee9zng
torchx 2021-10-18 18:46:55 INFO Launched app: kubernetes://torchx/default:cv-trainer-pa2a7qgee9zng
torchx 2021-10-18 18:46:55 INFO AppStatus:
msg: <NONE>
num_restarts: -1
roles: []
state: PENDING (2)
structured_error_msg: <NONE>
ui_url: null
torchx 2021-10-18 18:46:55 INFO Job URL: None
You can use the job url to query the status or logs of the job:
torchx status kubernetes://torchx/default:cv-trainer-pa2a7qgee9zng
torchx 2021-10-18 18:47:44 INFO AppDef:
State: SUCCEEDED
Num Restarts: -1
Roles:
*worker[0]:SUCCEEDED
Try running torchx log kubernetes://torchx/default:cv-trainer-pa2a7qgee9zng.
Multiple nodes, multiple trainers (kubernetes)¶
It is simple to launch multiple nodes trainer in kubernetes:
torchx run -s kubernetes --scheduler_args namespace=default,queue=default ./torchx/examples/apps/lightning_classy_vision/component.py:trainer_dist --nnodes 2 --nproc_per_node 2
The command above will launch distributed train job on kubernetes default namespace using volcano default queue. It will use etcd service accessible on etcd-server:2379 to perform etcd rendezvous.
You can overwrite rendezvous endpoint:
torchx run -s kubernetes --scheduler_args namespace=default,queue=default ./torchx/examples/apps/lightning_classy_vision/component.py:trainer_dist --nnodes 2 --nproc_per_node 1 --rdzv_endpoint etcd-server.default.svc.cluster.local:2379
Note
For GPU training, keep nproc_per_node equal to the amount of GPUs the instace has.
The command above will launch distributed train job on kubernetes default namespace using volcano default queue. It will use etcd service accessible on etcd-server:2379 to perform etcd rendezvous.
Builtin distributed components¶
In the examples above we used custom components to launch user applications. It is not always the case that users need to write their own components.
Torchx comes with set of builtin component that describe typical execution patterns.
dist.ddp¶
dist.ddp
is a component for applications that run as distributed jobs in a DDP manner.
You can use it to quickly iterate over your application without the need of authoring your own component.
Note
dist.ddp
is a generic component, as a result it is good for quick iterations, but not production usage.
It is recommended to author your own component if you want to put your application in production.
Learn more Authoring about how to author your component.
We will be using dist.ddp
to execute the following example:
# main.py
import os
import torch
import torch.distributed as dist
import torch.nn.functional as F
import torch.distributed.run
def compute_world_size():
rank = int(os.getenv("RANK", "0"))
world_size = int(os.getenv("WORLD_SIZE", "1"))
dist.init_process_group()
print("successfully initialized process group")
t = F.one_hot(torch.tensor(rank), num_classes=world_size)
dist.all_reduce(t)
computed_world_size = int(torch.sum(t).item())
print(
f"rank: {rank}, actual world_size: {world_size}, computed world_size: {computed_world_size}"
)
if __name__ == "__main__":
compute_world_size()
Single trainer on desktop¶
We can run this example on desktop on four processes using the following cmd:
torchx run -s local_cwd dist.ddp --entrypoint main.py --nproc_per_node 4
Single trainer on kubernetes cluster¶
We can execute it on the kubernetes cluster
torchx run -s kubernetes --scheduler_args namespace=default,queue=defaultdist.ddp --entrypoint main.py --nproc_per_node 4
Components APIs
- torchx.components.dist.ddp(*script_args: str, entrypoint: str, image: str = 'ghcr.io/pytorch/torchx:0.1.0rc2', rdzv_backend: Optional[str] = None, rdzv_endpoint: Optional[str] = None, resource: Optional[str] = None, nnodes: int = 1, nproc_per_node: int = 1, name: str = 'test-name', role: str = 'worker', env: Optional[Dict[str, str]] = None) → torchx.specs.api.AppDef[source]¶
Distributed data parallel style application (one role, multi-replica). Uses torch.distributed.run to launch and coordinate pytorch worker processes.
- Parameters
script_args – Script arguments.
image – container image.
entrypoint – script or binary to run within the image.
rdzv_backend – rendezvous backend to use, allowed values can be found in the rdzv registry docs The default backend is c10d
rdzv_endpoint – Controller endpoint. In case of rdzv_backend is etcd, this is a etcd endpoint, in case of c10d, this is the endpoint of one of the hosts. The default entdpoint it localhost:29500
resource – Optional named resource identifier. The resource parameter gets ignored when running on the local scheduler.
nnodes – Number of nodes.
nproc_per_node – Number of processes per node.
name – Name of the application.
role – Name of the ddp role.
env – Env variables.
- Returns
Torchx AppDef
- Return type