Kubernetes¶
This contains the TorchX Kubernetes scheduler which can be used to run TorchX components on a Kubernetes cluster.
Prerequisites¶
TorchX kubernetes scheduler depends on volcano and requires etcd intalled for distributed job execution.
Install volcano 1.4.0 version
kubectl apply -f https://raw.githubusercontent.com/volcano-sh/volcano/v1.4.0/installer/volcano-development.yaml
TorchX uses torch.distributed.run to run distributed training. This requires the installation of etcd service on your kubernetes cluster:
kubectl apply -f https://github.com/pytorch/torchx/blob/main/resources/etcd.yaml
Learn more about running distributed trainers torchx.components.dist
- class torchx.schedulers.kubernetes_scheduler.KubernetesScheduler(session_name: str, client: Optional[ApiClient] = None, docker_client: Optional[DockerClient] = None)[source]¶
Bases:
torchx.schedulers.api.Scheduler
[torchx.schedulers.kubernetes_scheduler.KubernetesOpts
],torchx.workspace.docker_workspace.DockerWorkspace
KubernetesScheduler is a TorchX scheduling interface to Kubernetes.
Important: Volcano is required to be installed on the Kubernetes cluster. TorchX requires gang scheduling for multi-replica/multi-role execution and Volcano is currently the only supported scheduler with Kubernetes. For installation instructions see: https://github.com/volcano-sh/volcano
This has been confirmed to work with Volcano v1.3.0 and Kubernetes versions v1.18-1.21. See https://github.com/pytorch/torchx/issues/120 which is tracking Volcano support for Kubernetes v1.22.
Note
AppDefs that have more than 0 retries may not be displayed as pods if they failed. This occurs due to known bug in Volcano(as per 1.4.0 release): https://github.com/volcano-sh/volcano/issues/1651
$ pip install torchx[kubernetes] $ torchx run --scheduler kubernetes --scheduler_args namespace=default,queue=test utils.echo --image alpine:latest --msg hello kubernetes://torchx_user/1234 $ torchx status kubernetes://torchx_user/1234 ...
Config Options
usage: queue=QUEUE,[namespace=NAMESPACE],[image_repo=IMAGE_REPO],[service_account=SERVICE_ACCOUNT],[priority_class=PRIORITY_CLASS] required arguments: queue=QUEUE (str) Volcano queue to schedule job in optional arguments: namespace=NAMESPACE (str, default) Kubernetes namespace to schedule job in image_repo=IMAGE_REPO (str, None) The image repository to use when pushing patched images, must have push access. Ex: example.com/your/container service_account=SERVICE_ACCOUNT (str, None) The service account name to set on the pod specs priority_class=PRIORITY_CLASS (str, None) The name of the PriorityClass to set on the job specs
Mounts
Mounting external filesystems/volumes is via the HostPath and PersistentVolumeClaim support.
hostPath volumes:
type=bind,src=<host path>,dst=<container path>[,readonly]
PersistentVolumeClaim:
type=volume,src=<claim>,dst=<container path>[,readonly]
host devices:
type=device,src=/dev/foo[,dst=<container path>][,perm=rwm]
If you specify a host device the job will run in privileged mode since Kubernetes doesn’t expose a way to pass –device to the underlying container runtime. Users should prefer to use device plugins.
See
torchx.specs.parse_mounts()
for more info.External docs: https://kubernetes.io/docs/concepts/storage/persistent-volumes/
Resources / Allocation
To select a specific machine type you can add a capability to your resources with
node.kubernetes.io/instance-type
which will constrain the launched jobs to nodes of that instance type.>>> from torchx import specs >>> specs.Resource( ... cpu=4, ... memMB=16000, ... gpu=2, ... capabilities={ ... "node.kubernetes.io/instance-type": "<cloud instance type>", ... }, ... ) Resource(...)
Kubernetes may reserve some memory for the host. TorchX assumes you’re scheduling on whole hosts and thus will automatically reduce the resource request by a small amount to account for the node reserved CPU and memory. If you run into scheduling issues you may need to reduce the requested CPU and memory from the host values.
Compatibility
Feature
Scheduler Support
Fetch Logs
✔️
Distributed Jobs
✔️
Cancel Job
✔️
Describe Job
Partial support. KubernetesScheduler will return job and replica status but does not provide the complete original AppSpec.
Workspaces / Patching
✔️
Mounts
✔️
- describe(app_id: str) → Optional[torchx.schedulers.api.DescribeAppResponse][source]¶
Describes the specified application.
- Returns
AppDef description or
None
if the app does not exist.
- log_iter(app_id: str, role_name: str, k: int = 0, regex: Optional[str] = None, since: Optional[datetime.datetime] = None, until: Optional[datetime.datetime] = None, should_tail: bool = False, streams: Optional[torchx.schedulers.api.Stream] = None) → Iterable[str][source]¶
Returns an iterator to the log lines of the
k``th replica of the ``role
. The iterator ends end all qualifying log lines have been read.If the scheduler supports time-based cursors fetching log lines for custom time ranges, then the
since
,until
fields are honored, otherwise they are ignored. Not specifyingsince
anduntil
is equivalent to getting all available log lines. If theuntil
is empty, then the iterator behaves liketail -f
, following the log output until the job reaches a terminal state.The exact definition of what constitutes a log is scheduler specific. Some schedulers may consider stderr or stdout as the log, others may read the logs from a log file.
Behaviors and assumptions:
Produces an undefined-behavior if called on an app that does not exist The caller should check that the app exists using
exists(app_id)
prior to calling this method.Is not stateful, calling this method twice with same parameters returns a new iterator. Prior iteration progress is lost.
Does not always support log-tailing. Not all schedulers support live log iteration (e.g. tailing logs while the app is running). Refer to the specific scheduler’s documentation for the iterator’s behavior.
- 3.1 If the scheduler supports log-tailing, it should be controlled
by``should_tail`` parameter.
Does not guarantee log retention. It is possible that by the time this method is called, the underlying scheduler may have purged the log records for this application. If so this method raises an arbitrary exception.
If
should_tail
is True, the method only raises aStopIteration
exception when the accessible log lines have been fully exhausted and the app has reached a final state. For instance, if the app gets stuck and does not produce any log lines, then the iterator blocks until the app eventually gets killed (either via timeout or manually) at which point it raises aStopIteration
.If
should_tail
is False, the method raisesStopIteration
when there are no more logs.Need not be supported by all schedulers.
Some schedulers may support line cursors by supporting
__getitem__
(e.g.iter[50]
seeks to the 50th log line).- Whitespace is preserved, each new line should include
\n
. To support interactive progress bars the returned lines don’t need to include
\n
but should then be printed without a newline to correctly handle\r
carriage returns.
- Whitespace is preserved, each new line should include
- Parameters
streams – The IO output streams to select. One of: combined, stdout, stderr. If the selected stream isn’t supported by the scheduler it will throw an ValueError.
- Returns
An
Iterator
over log lines of the specified role replica- Raises
NotImplementedError – if the scheduler does not support log iteration
- run_opts() → torchx.specs.api.runopts[source]¶
Returns the run configuration options expected by the scheduler. Basically a
--help
for therun
API.
- schedule(dryrun_info: torchx.specs.api.AppDryRunInfo[torchx.schedulers.kubernetes_scheduler.KubernetesJob]) → str[source]¶
Same as
submit
except that it takes anAppDryRunInfo
. Implementors are encouraged to implement this method rather than directly implementingsubmit
sincesubmit
can be trivially implemented by:dryrun_info = self.submit_dryrun(app, cfg) return schedule(dryrun_info)
- class torchx.schedulers.kubernetes_scheduler.KubernetesJob(images_to_push: Dict[str, Tuple[str, str]], resource: Dict[str, object])[source]¶
Reference¶
- torchx.schedulers.kubernetes_scheduler.create_scheduler(session_name: str, **kwargs: Any) → torchx.schedulers.kubernetes_scheduler.KubernetesScheduler[source]¶
- torchx.schedulers.kubernetes_scheduler.app_to_resource(app: torchx.specs.api.AppDef, queue: str, service_account: Optional[str], priority_class: Optional[str] = None) → Dict[str, object][source]¶
app_to_resource creates a volcano job kubernetes resource definition from the provided AppDef. The resource definition can be used to launch the app on Kubernetes.
To support macros we generate one task per replica instead of using the volcano replicas field since macros change the arguments on a per replica basis.
Volcano has two levels of retries: one at the task level and one at the job level. When using the APPLICATION retry policy, the job level retry count is set to the minimum of the max_retries of the roles.
- torchx.schedulers.kubernetes_scheduler.cleanup_str(data: str) → str[source]¶
Invokes
lower
on thes string and removes all characters that do not satisfy[a-z0-9]
pattern. This method is mostly used to make sure kubernetes scheduler gets the job name that does not violate its validation.
- torchx.schedulers.kubernetes_scheduler.pod_labels(app: torchx.specs.api.AppDef, role_idx: int, role: torchx.specs.api.Role, replica_id: int) → Dict[str, str][source]¶
- torchx.schedulers.kubernetes_scheduler.role_to_pod(name: str, role: torchx.specs.api.Role, service_account: Optional[str]) → V1Pod[source]¶