torchx.specs¶
This contains the TorchX AppDef and related component definitions. These are used by components to define the apps which can then be launched via a TorchX scheduler or pipeline adapter.
AppDef¶
- class torchx.specs.AppDef(name: str, roles: List[torchx.specs.api.Role] = <factory>, metadata: Dict[str, str] = <factory>)[source]¶
Represents a distributed application made up of multiple
Roles
and metadata. Contains the necessary information for the driver to submit this app to the scheduler.- Parameters
name – Name of application
roles – List of roles
metadata – metadata to the app (treament of metadata is scheduler dependent)
Role¶
- class torchx.specs.Role(name: str, image: str, base_image: Optional[str] = None, entrypoint: str = '<MISSING>', args: List[str] = <factory>, env: Dict[str, str] = <factory>, num_replicas: int = 1, max_retries: int = 0, retry_policy: torchx.specs.api.RetryPolicy = <RetryPolicy.APPLICATION: 'APPLICATION'>, resource: torchx.specs.api.Resource = Resource(cpu=-1, gpu=-1, memMB=-1, capabilities={}), port_map: Dict[str, int] = <factory>, metadata: Dict[str, Any] = <factory>)[source]¶
A set of nodes that perform a specific duty within the
AppDef
. Examples:Distributed data parallel app - made up of a single role (trainer).
App with parameter server - made up of multiple roles (trainer, ps).
Note
An
image
is a software bundle that is installed on the container scheduled by the scheduler. The container on the scheduler dictates what an image actually is. An image could be as simple as a tar-ball or map to a docker image. The scheduler typically knows how to “pull” the image given an image name (str), which could be a simple name (e.g. docker image) or a url e.g.s3://path/my_image.tar
).Usage:
trainer = Role(name="trainer", image = "pytorch/torch:1", entrypoint = "my_trainer.py" args = ["--arg", "foo", ENV_VAR="FOOBAR"], num_replicas = 4, resource = Resource(cpu=1, gpu=1, memMB=500), port_map={"tcp_store":8080, "tensorboard": 8081}, metadata={"local_cwd.property", value})
- Parameters
name – name of the role
image – a software bundle that is installed on a container.
entrypoint – command (within the container) to invoke the role
args – commandline arguments to the entrypoint cmd
env – environment variable mappings
replicas – number of container replicas to run
max_retries – max number of retries before giving up
retry_policy – retry behavior upon replica failures
resource – Resource requirement for the role. The role should be scheduled by the scheduler on
num_replicas
container, each of them should have at leastresource
guarantees.port_map – Port mapping for the role. The key is the unique identifier of the port e.g. “tensorboard”: 9090
metadata – Free form information that is associated with the role, for example scheduler specific data. The key should follow the pattern:
$scheduler.$key
- pre_proc(scheduler: str, dryrun_info: torchx.specs.api.AppDryRunInfo) → torchx.specs.api.AppDryRunInfo[source]¶
Modifies the scheduler request based on the role specific configuration. The method is invoked for each role during scheduler
submit_dryrun
. If there are multiple roles, the method is invoked for each role in order that is defined by theAppDef.roles
list.
- class torchx.specs.RetryPolicy(value)[source]¶
Defines the retry policy for the
Roles
in theAppDef
. The policy defines the behavior when the role replica encounters a failure:unsuccessful (non zero) exit code
hardware/host crashes
preemption
eviction
Note
Not all retry policies are supported by all schedulers. However all schedulers must support
RetryPolicy.APPLICATION
. Please refer to the scheduler’s documentation for more information on the retry policies they support and behavior caveats (if any).- REPLICA: Replaces the replica instance. Surviving replicas are untouched.
Use with
torch_dist_role
to have torch coordinate restarts and membership changes. Otherwise, it is up to the application to deal with failed replica departures and replacement replica admittance.
APPLICATION: Restarts the entire application.
Resource¶
- class torchx.specs.Resource(cpu: int, gpu: int, memMB: int, capabilities: Dict[str, Any] = <factory>)[source]¶
Represents resource requirements for a
Role
.- Parameters
cpu – number of logical cpu cores. The definition of a CPU core depends on the scheduler. See your scheduler documentation for how a logical CPU core maps to physical cores and threads.
gpu – number of gpus
memMB – MB of ram
capabilities – additional hardware specs (interpreted by scheduler)
Note: you should prefer to use named_resources instead of specifying the raw resource requirement directly.
- static copy(original: torchx.specs.api.Resource, **capabilities: Any) → torchx.specs.api.Resource[source]¶
Copies a resource and applies new capabilities. If the same capabilities are present in the original resource and as parameter, the one from parameter will be used.
- torchx.specs.get_named_resources(res: str) → torchx.specs.api.Resource[source]¶
Get resource object based on the string definition registered via entrypoints.txt.
TorchX implements
named_resource
registration mechanism, which consists of the following steps:Create a module and define your resource retrieval function:
# my_module.resources from typing import Dict from torchx.specs import Resource def gpu_x_1() -> Dict[str, Resource]: return Resource(cpu=2, memMB=64 * 1024, gpu = 2)
Register resource retrieval in the entrypoints section:
[torchx.named_resources] gpu_x_1 = my_module.resources:gpu_x_1
The
gpu_x_1
can be used as string argument to this function:from torchx.specs import named_resources resource = named_resources["gpu_x_1"]
AWS Named Resources¶
torchx.specs.named_resources_aws contains resource definitions that represent corresponding AWS instance types taken from https://aws.amazon.com/ec2/instance-types/. The resources are exposed via entrypoints after installing torchx lib. The mapping is stored in the setup.py file.
The named resources currently do not specify AWS instance type capabilities but merely represent the equvalent resource in mem, cpu and gpu numbers.
Note
These resource definitions may change in future. It is expected for each user to manage their own resources. Follow https://pytorch.org/torchx/latest/specs.html#torchx.specs.get_named_resources to set up named resources.
Usage:
from torchx.specs import named_resources print(named_resources["aws_t3.medium"]) print(named_resources["aws_m5.2xlarge"]) print(named_resources["aws_p3.2xlarge"]) print(named_resources["aws_p3.8xlarge"])
Macros¶
- class torchx.specs.macros[source]¶
Defines macros that can be used with
Role.entrypoint
andRole.args
. The macros will be substituted at runtime to their actual values.Available macros:
img_root
- root directory of the pulled container.imageapp_id
- application id as assigned by the schedulerreplica_id
- unique id for each instance of a replica of a Role,for instance a role with 3 replicas could have the 0, 1, 2 as replica ids. Note that when the container fails and is replaced, the new container will have the same
replica_id
as the one it is replacing. For instance if node 1 failed and was replaced by the scheduler the replacing node will also havereplica_id=1
.
Example:
# runs: hello_world.py --app_id ${app_id} trainer = Role(name="trainer", entrypoint="hello_world.py", args=["--app_id", macros.app_id]) app = AppDef("train_app", roles=[trainer]) app_handle = session.run(app, scheduler="local", cfg={})
- class Values(img_root: str, app_id: str, replica_id: str, base_img_root: str = 'DEPRECATED')[source]¶
- apply(role: torchx.specs.api.Role) → torchx.specs.api.Role[source]¶
apply applies the values to a copy the specified role and returns it.
Run Configs¶
- class torchx.specs.runopts[source]¶
Holds the accepted scheduler run configuration keys, default value (if any), and help message string. These options are provided by the
Scheduler
and validated inSession.run
against user provided run cfg. AllowsNone
default values. Required opts must NOT have a non-None default.Important
This class has no accessors because it is intended to be constructed and returned by
Scheduler.run_config_options
and printed out as a “help” tool or as part of an exception msg.Usage:
opts = runopts() opts.add("run_as_user", type_=str, help="user to run the job as") opts.add("cluster_id", type_=int, help="cluster to submit the job", required=True) opts.add("priority", type_=float, default=0.5, help="job priority") opts.add("preemptible", type_=bool, default=False, help="is the job preemptible") # invalid opts.add("illegal", default=10, required=True) opts.add("bad_type", type=str, default=10) opts.check(cfg) print(opts)
- add(cfg_key: str, type_: Type[Optional[Union[str, int, float, bool, List[str]]]], help: str, default: Optional[Union[str, int, float, bool, List[str]]] = None, required: bool = False) → None[source]¶
Adds the
config
option with the given help string anddefault
value (if any). If thedefault
is not specified then this option is a required option.
- get(name: str) → Optional[torchx.specs.api.runopt][source]¶
Returns option if any was registered, or None otherwise
Run Status¶
- class torchx.specs.AppStatus(state: torchx.specs.api.AppState, num_restarts: int = 0, msg: str = '', structured_error_msg: str = '<NONE>', ui_url: Optional[str] = None, roles: List[torchx.specs.api.RoleStatus] = <factory>)[source]¶
The runtime status of the
AppDef
. The scheduler can return an arbitrary text message (msg field). If any error occurs, scheduler can populatestructured_error_msg
with json response.replicas
represent the statuses of the replicas in the job. If the job runs with multiple retries, the parameter will contain the statuses of the most recent retry. Note: if the previous retries failed, but the most recent retry succeeded or in progress,replicas
will not contain occurred errors.
- class torchx.specs.AppState(value)[source]¶
State of the application. An application starts from an initial
UNSUBMITTED
state and moves throughSUBMITTED
,PENDING
,RUNNING
states finally reaching a terminal state:SUCCEEDED
,``FAILED``,CANCELLED
.If the scheduler supports preemption, the app moves from a
RUNNING
state toPENDING
upon preemption.If the user stops the application, then the application state moves to
STOPPED
, then toCANCELLED
when the job is actually cancelled by the scheduler.UNSUBMITTED - app has not been submitted to the scheduler yet
SUBMITTED - app has been successfully submitted to the scheduler
PENDING - app has been submitted to the scheduler pending allocation
RUNNING - app is running
SUCCEEDED - app has successfully completed
FAILED - app has unsuccessfully completed
CANCELLED - app was cancelled before completing
UNKNOWN - app state is unknown
- torchx.specs.ReplicaState¶
alias of
torchx.specs.api.AppState
Component Linter¶
- torchx.specs.file_linter.validate(path: str, component_function: str) → List[torchx.specs.file_linter.LinterMessage][source]¶
Validates the function to make sure it complies the component standard.
validate
finds thecomponent_function
and vaidates it for according to the following rules:The function must have google-styple docs
All function parameters must be annotated
The function must return
torchx.specs.api.AppDef
- Parameters
path – Path to python source file.
component_function – Name of the function to be validated.
- Returns
List of validation errors
- Return type
List[LinterMessage]