torchx.specs¶
AppDef¶
- class torchx.specs.AppDef(name: str, roles: List[torchx.specs.api.Role] = <factory>, metadata: Dict[str, str] = <factory>)[source]¶
Represents a distributed application made up of multiple
Roles
and metadata. Contains the necessary information for the driver to submit this app to the scheduler.- Parameters
name – Name of application
roles – List of roles
metadata – AppDef specific configuration, in comparison
RunConfig
is runtime specific configuration.
- add_metadata(key: str, value: str) → torchx.specs.api.AppDef[source]¶
Adds metadata to the application. .. note:: If the key already exists, this method overwrites the metadata value.
Role¶
- class torchx.specs.Role(name: str, image: str, base_image: Optional[str] = None, entrypoint: str = '<MISSING>', args: List[str] = <factory>, env: Dict[str, str] = <factory>, num_replicas: int = 1, max_retries: int = 0, retry_policy: torchx.specs.api.RetryPolicy = <RetryPolicy.APPLICATION: 'APPLICATION'>, resource: torchx.specs.api.Resource = Resource(cpu=-1, gpu=-1, memMB=-1, capabilities={}), port_map: Dict[str, int] = <factory>)[source]¶
A set of nodes that perform a specific duty within the
AppDef
. Examples:Distributed data parallel app - made up of a single role (trainer).
App with parameter server - made up of multiple roles (trainer, ps).
Note
An
image
is a software bundle that is installed on the container scheduled by the scheduler. The container on the scheduler dictates what an image actually is. An image could be as simple as a tar-ball or map to a docker image. The scheduler typically knows how to “pull” the image given an image name (str), which could be a simple name (e.g. docker image) or a url e.g.s3://path/my_image.tar
).Note
An optional
base_image
can be specified if the scheduler supports a concept of base images. For schedulers that run Docker containers the base image is not useful since the application image itself can be built from a base image (using theFROM base/image:latest
construct in the Dockerfile). However the base image is useful for schedulers that work with simple image artifacts (e.g.*.tar.gz
) that do not have a built-in concept of base images. For these schedulers, specifying a base image that includes dependencies while the main image is the actual application code makes it possible to make changes to the application code without incurring the cost of re-building the uber artifact.Usage:
trainer = Role(name="trainer", "pytorch/torch:1") .runs("my_trainer.py", "--arg", "foo", ENV_VAR="FOOBAR") .replicas(4) .require(Resource(cpu=1, gpu=1, memMB=500)) .ports({"tcp_store":8080, "tensorboard": 8081}) # for schedulers that support base_images trainer = Role(name="trainer", image="pytorch/torch:1", base_image="common/ml-tools:latest")...
- Parameters
name – name of the role
image – a software bundle that is installed on a container.
base_image – Optional base image, if schedulers support image overlay
entrypoint – command (within the container) to invoke the role
args – commandline arguments to the entrypoint cmd
env – environment variable mappings
replicas – number of container replicas to run
max_retries – max number of retries before giving up
retry_policy – retry behavior upon replica failures
resource – Resource requirement for the role. The role should be scheduled by the scheduler on
num_replicas
container, each of them should have at leastresource
guarantees.port_map – Port mapping for the role. The key is the unique identifier of the port e.g. “tensorboard”: 9090
- pre_proc(scheduler: str, dryrun_info: torchx.specs.api.AppDryRunInfo) → torchx.specs.api.AppDryRunInfo[source]¶
Modifies the scheduler request based on the role specific configuration. The method is invoked for each role during scheduler
submit_dryrun
. If there are multiple roles, the method is invoked for each role in order that is defined by theAppDef.roles
list.
- class torchx.specs.RetryPolicy(value)[source]¶
Defines the retry policy for the
Roles
in theAppDef
. The policy defines the behavior when the role replica encounters a failure:unsuccessful (non zero) exit code
hardware/host crashes
preemption
eviction
Note
Not all retry policies are supported by all schedulers. However all schedulers must support
RetryPolicy.APPLICATION
. Please refer to the scheduler’s documentation for more information on the retry policies they support and behavior caveats (if any).- REPLICA: Replaces the replica instance. Surviving replicas are untouched.
Use with
torch_dist_role
to have torch coordinate restarts and membership changes. Otherwise, it is up to the application to deal with failed replica departures and replacement replica admittance.
APPLICATION: Restarts the entire application.
Resource¶
- class torchx.specs.Resource(cpu: int, gpu: int, memMB: int, capabilities: Dict[str, Any] = <factory>)[source]¶
Represents resource requirements for a
Role
.- Parameters
cpu – number of cpu cores (note: not hyper threads)
gpu – number of gpus
memMB – MB of ram
capabilities – additional hardware specs (interpreted by scheduler)
- static copy(original: torchx.specs.api.Resource, **capabilities: Any) → torchx.specs.api.Resource[source]¶
Copies a resource and applies new capabilities. If the same capabilities are present in the original resource and as parameter, the one from parameter will be used.
Macros¶
- class torchx.specs.macros[source]¶
Defines macros that can be used with
Role.entrypoint
andRole.args
. The macros will be substituted at runtime to their actual values.Available macros:
img_root
- root directory of the pulled container.imagebase_img_root
- root directory of the pulled role.base_image(resolves to “<NONE>” if no base_image set)
app_id
- application id as assigned by the schedulerreplica_id
- unique id for each instance of a replica of a Role,for instance a role with 3 replicas could have the 0, 1, 2 as replica ids. Note that when the container fails and is replaced, the new container will have the same
replica_id
as the one it is replacing. For instance if node 1 failed and was replaced by the scheduler the replacing node will also havereplica_id=1
.
Example:
# runs: hello_world.py --app_id ${app_id} trainer = Role(name="trainer").runs("hello_world.py", "--app_id", macros.app_id) app = AppDef("train_app").of(trainer) app_handle = session.run(app, scheduler="local", cfg=RunConfig())
- class Values(img_root: str, app_id: str, replica_id: str, base_img_root: str = '<NONE>')[source]¶
- apply(role: torchx.specs.api.Role) → torchx.specs.api.Role[source]¶
apply applies the values to a copy the specified role and returns it.
Run Configs¶
- class torchx.specs.RunConfig(cfgs: Dict[str, Optional[Union[str, int, float, bool, List[str]]]] = <factory>)[source]¶
Additional run configs for the app. These are typically scheduler runtime configs/arguments that do not bind to
AppDef
nor theScheduler
. For example a particular cluster (within the scheduler) the application should be submitted to. Since the same app can be launched into multiple types of clusters (dev, prod) the cluster id config does not bind to the app. Neither does this bind to the scheduler since the cluster can be partitioned by size of the instances (S, M, L) or by a preemption setting (e.g. on-demand vs spot).Since
Session
allows the application to be submitted to multiple schedulers, users who want to submit the same app into multiple schedulers from the same session can union all theRunConfigs
into a single object. The scheduler implementation will selectively read the configs it needs.This class is intended to be trivially serialized and passed around or saved hence only allow primitives as config values. Should the scheduler need more than simple primitives (e.g. list of str) it is up to the scheduler to document a way to encode this value as a str and parse it (e.g. representing list of str as comma delimited str).
Usage:
# write config = RunConfig() config.set("run_as_user", "prod") config.set("priority", 10) # read config.get("run_as_user") # "prod" config.get("priority") # 10 config.get("never_set") # None
- class torchx.specs.runopts[source]¶
Holds the accepted scheduler run configuration keys, default value (if any), and help message string. These options are provided by the
Scheduler
and validated inSession.run
against user providedRunConfig
. AllowsNone
default values. Required opts must NOT have a non-None default.Important
This class has no accessors because it is intended to be constructed and returned by
Scheduler.run_config_options
and printed out as a “help” tool or as part of an exception msg.Usage:
opts = runopts() opts.add("run_as_user", type_=str, help="user to run the job as") opts.add("cluster_id", type_=int, help="cluster to submit the job", required=True) opts.add("priority", type_=float, default=0.5, help="job priority") opts.add("preemptible", type_=bool, default=False, help="is the job preemptible") # invalid opts.add("illegal", default=10, required=True) opts.add("bad_type", type=str, default=10) opts.check(RunConfig) print(opts)
- add(cfg_key: str, type_: Type[Optional[Union[str, int, float, bool, List[str]]]], help: str, default: Optional[Union[str, int, float, bool, List[str]]] = None, required: bool = False) → None[source]¶
Adds the
config
option with the given help string anddefault
value (if any). If thedefault
is not specified then this option is a required option.
- static is_type(obj: Optional[Union[str, int, float, bool, List[str]]], tp: Type[Optional[Union[str, int, float, bool, List[str]]]]) → bool[source]¶
Returns True if
obj
is type oftp
. Similar to isinstance() but supports tp = List[str], thus can be used to validate ConfigValue.
- resolve(config: torchx.specs.api.RunConfig) → torchx.specs.api.RunConfig[source]¶
Checks the given config against this
runopts
and sets default configs if not set.Warning
This method mutates the provided config!
Run Status¶
- class torchx.specs.AppStatus(state: torchx.specs.api.AppState, num_restarts: int = 0, msg: str = '', structured_error_msg: str = '<NONE>', ui_url: Optional[str] = None, roles: List[torchx.specs.api.RoleStatus] = <factory>)[source]¶
The runtime status of the
AppDef
. The scheduler can return an arbitrary text message (msg field). If any error occurs, scheduler can populatestructured_error_msg
with json response.replicas
represent the statuses of the replicas in the job. If the job runs with multiple retries, the parameter will contain the statuses of the most recent retry. Note: if the previous retries failed, but the most recent retry succeeded or in progress,replicas
will not contain occurred errors.
- class torchx.specs.AppState(value)[source]¶
State of the application. An application starts from an initial
UNSUBMITTED
state and moves throughSUBMITTED
,PENDING
,RUNNING
states finally reaching a terminal state:SUCCEEDED
,``FAILED``,CANCELLED
.If the scheduler supports preemption, the app moves from a
RUNNING
state toPENDING
upon preemption.If the user stops the application, then the application state moves to
STOPPED
, then toCANCELLED
when the job is actually cancelled by the scheduler.UNSUBMITTED - app has not been submitted to the scheduler yet
SUBMITTED - app has been successfully submitted to the scheduler
PENDING - app has been submitted to the scheduler pending allocation
RUNNING - app is running
SUCCEEDED - app has successfully completed
FAILED - app has unsuccessfully completed
CANCELLED - app was cancelled before completing
- torchx.specs.ReplicaState¶
alias of
torchx.specs.api.AppState