使用 Torchserve 为大型模型提供服务¶
本文档解释了 Torchserve 如何支持大型模型服务,这里的大型模型是指无法放入一个 gpu 的模型,因此需要将它们拆分为多个 gpu 上的多个分区。 此页面分为以下几个部分:
这个怎么运作?¶
对于较小模型的 GPU 推理, TorchServe 为每个工作程序执行一个进程,该进程被分配一个 GPU。
对于大型模型推理,需要将模型拆分到多个 GPU 上。
有不同的模式可以实现这种拆分,通常包括管道并行 (PP)、张量并行或这些模式的组合。
选择哪种模式以及如何实施拆分取决于所用框架中的实施。
TorchServe 允许用户利用任何框架进行模型部署,并尝试通过灵活的配置来满足框架的需求。
一些框架需要为每个 GPU 执行一个单独的进程(PiPPy、Deep Speed),而另一些框架需要一个分配了所有 GPU (vLLM) 的进程。
如果需要多个进程,TorchServe 利用 torchrun 为 worker 设置分布式环境。
在设置过程中,将为分配给工作程序的每个 GPU 启动一个新进程。
是否使用 torchrun 取决于参数 parallelType,该参数可以在 中设置为以下选项之一:torchrun
model-config.yaml
pp
- 用于管道并行tp
- 对于 Tensor Parallelpptp
- 对于 pipeline + Tensor parallelcustom
前三个选项使用 torchrun 设置环境,而 “custom” 选项将并行化的方式留给用户,并将分配给工作程序的 GPU 分配给单个进程。 分配的 GPU 数量由 torchrun 启动的进程数量决定,即通过 nproc-per-node 或参数 parallelLevel 配置。 这意味着如果设置了 nproc-per-node,则不应设置参数 parallelLevel,反之亦然。
默认情况下,TorchServe 使用循环算法将 GPU 分配给主机上的工作线程。 对于大型模型,分配给每个工作程序的推理 GPU 将根据 model_config.yaml 中指定的 GPU 数量自动计算。 CUDA_VISIBLE_DEVICES 是根据此数字设置的。
例如,假设一个节点上有 8 个 GPU,一个工作线程需要一个节点上的 4 个 GPU (即 nproc-per-node=4 OR parallelLevel=4)。 在这种情况下,TorchServe 会将 CUDA_VISIBLE_DEVICES=“0,1,2,3” 分配给 worker1,将 CUDA_VISIBLE_DEVICES=“4,5,6,7” 分配给 worker2。
除了此默认行为之外,TorchServe 还为用户提供了为工作程序指定 GPU 的灵活性。例如,如果用户在模型配置 YAML 文件中设置了 “deviceIds: [2,3,4,5]”,并且 nproc-per-node (OR parallelLevel) 设置为 2,则 TorchServe 会将 CUDA_VISIBLE_DEVICES=“2,3” 分配给 worker1,将 CUDA_VISIBLE_DEVICES=“4,5” 分配给 worker2。
以 Pippy 集成为例,下图说明了 TorchServe 大型模型推理的内部结构。 有关使用 vLLM 的示例,请参阅此示例。
PiPPy(用于大型模型推理的 PyTorch Native 解决方案)¶
PiPPy 提供管道并行性,用于为不适合单个 GPU 的大型模型提供服务。它采用您的模型并将其拆分为相等的大小(阶段),并按您指定的设备数量进行分区。然后使用微批处理来运行您的批处理输入以进行推理(它更适合批量大小 >1)。
如何在 Torchserve 中使用 PiPPy¶
要在 Torchserve 中使用 Pippy,我们需要使用继承自 base_pippy_handler 的自定义处理程序,并将我们的设置放在 model-config.yaml 中。
Torchserve 中的客户处理程序只是一个 python 脚本,用于定义特定于您的工作流程的模型加载、预处理、推理和后处理逻辑。
它如下所示:
Create 或任何其他描述性名称。custom_handler.py
#DO import the necessary packages along with following
from ts.torch_handler.distributed.base_pippy_handler import BasePippyHandler
from ts.handler_utils.distributed.pt_pippy import initialize_rpc_workers, get_pipline_driver
class ModelHandler(BasePippyHandler, ABC):
def __init__(self):
super(ModelHandler, self).__init__()
self.initialized = False
def initialize(self, ctx):
model = # load your model from model_dir
self.device = self.local_rank % torch.cuda.device_count()# being used to move model inputs to (self.device)
self.model = get_pipline_driver(model,self.world_size, ctx)
这是你的需求,这个配置文件非常灵活,你可以添加与 frontend、backend 和 handler 相关的设置。model-config.yaml
#frontend settings
minWorkers: 1
maxWorkers: 1
maxBatchDelay: 100
responseTimeout: 120
deviceType: "gpu"
parallelType: "pp" # options depending on the solution, pp(pipeline parallelism), tp(tensor parallelism), pptp ( pipeline and tensor parallelism)
# This will be used to route input to either rank0 or all ranks from fontend based on the solution (e.g. DeepSpeed support tp, PiPPy support pp)
torchrun:
nproc-per-node: 4 # specifies the number of processes torchrun starts to serve your model, set to world_size or number of
# gpus you wish to split your model
#backend settings
pippy:
chunks: 1 # This sets the microbatch sizes, microbatch = batch size/ chunks
input_names: ['input_ids'] # input arg names to the model, this is required for FX tracing
model_type: "HF" # set the model type to HF if you are using Huggingface model other wise leave it blank or any other model you use.
rpc_timeout: 1800
num_worker_threads: 512 #set number of threads for rpc worker init.
handler:
max_length: 80 # max length of tokens for tokenizer in the handler
如何在处理程序中访问它?下面是一个示例:
def initialize(self, ctx):
model_type = ctx.model_yaml_config["pippy"]["model_type"]
其余的在 Torchserve 中就像往常一样,基本上是打包你的模型并启动服务器。
打包模型的命令示例,请确保将 model-config.yaml
torch-model-archiver --model-name bloom --version 1.0 --handler pippy_handler.py --extra-files $MODEL_CHECKPOINTS_PATH -r requirements.txt --config-file model-config.yaml --archive-format tgz
Tensor Parallel 支持正在进行中,并将在准备就绪后立即添加。
深度速度¶
DeepSpeed-Inference 是 MicroSoft 的一个开源项目。它提供模型并行性,用于为不适合一个 GPU 内存的大型基于 transformer 的 PyTorch 模型提供服务。
如何在 TorchServe 中使用 DeepSpeed¶
要在 TorchServe 中使用 DeepSpeed,我们需要使用继承自 base_deepspeed_handler 的自定义处理程序,并将我们的设置放在 model-config.yaml 中。
它如下所示:
Create 或任何其他描述性名称。custom_handler.py
#DO import the necessary packages along with following
from ts.handler_utils.distributed.deepspeed import get_ds_engine
from ts.torch_handler.distributed.base_deepspeed_handler import BaseDeepSpeedHandler
class ModelHandler(BaseDeepSpeedHandler, ABC):
def __init__(self):
super(ModelHandler, self).__init__()
self.initialized = False
def initialize(self, ctx):
model = # load your model from model_dir
ds_engine = get_ds_engine(self.model, ctx)
self.model = ds_engine.module
self.initialized = True
这是你的需求,这个配置文件非常灵活,你可以添加与 frontend、backend 和 handler 相关的设置。model-config.yaml
#frontend settings
minWorkers: 1
maxWorkers: 1
maxBatchDelay: 100
responseTimeout: 120
deviceType: "gpu"
parallelType: "tp" # options depending on the solution, pp(pipeline parallelism), tp(tensor parallelism), pptp ( pipeline and tensor parallelism)
# This will be used to route input to either rank0 or all ranks from fontend based on the solution (e.g. DeepSpeed support tp, PiPPy support pp)
torchrun:
nproc-per-node: 4 # specifies the number of processes torchrun starts to serve your model, set to world_size or number of
# gpus you wish to split your model
#backend settings
deepspeed:
config: ds-config.json # DeepSpeed config json filename.
# Details:https://www.deepspeed.ai/docs/config-json/
handler:
max_length: 80 # max length of tokens for tokenizer in the handler
下面是一个ds-config.json
{
"dtype": "torch.float16",
"replace_with_kernel_inject": true,
"tensor_parallel": {
"tp_size": 2
}
}
安装 DeepSpeed
方法 1:requirements.txt
方法二:通过命令预安装(推荐加快模型加载速度)
# See https://www.deepspeed.ai/tutorials/advanced-install/
DS_BUILD_OPS=1 pip install deepspeed
其余的在 Torchserve 中就像往常一样,基本上是打包你的模型并启动服务器。
打包模型的命令示例,请确保将 model-config.yaml
# option 1: Using model_dir
torch-model-archiver --model-name bloom --version 1.0 --handler deepspeed_handler.py --extra-files $MODEL_CHECKPOINTS_PATH,ds-config.json -r requirements.txt --config-file model-config.yaml --archive-format tgz
# option 2: Using HF model_name
torch-model-archiver --model-name bloom --version 1.0 --handler deepspeed_handler.py --extra-files ds-config.json -r requirements.txt --config-file model-config.yaml --archive-format
DeepSpeed MII¶
如果使用此处显示的受支持模型之一,则可以利用 Deep Speed MII。Deep Speed MII 使用 Deep Speed Inference 以及深度学习的进一步发展,以最大限度地减少延迟并最大限度地提高吞吐量。它针对特定模型类型、模型大小、批量大小和可用硬件资源执行此操作。
有关如何在支持的型号上利用 Deep Speed MII 的更多信息,请参阅此处的信息。 您还可以在此处找到如何将其应用于 TorchServe 的示例。
使用 Accelerate 为大型 Hugging Face 模型提供服务¶
如果使用大型 Hugging Face 模型但资源有限,则可以使用 accelerate 来为这些模型提供服务。为此,您需要在 setup_config.json 文件中设置和设置 'device_map=“auto”。low_cpu_mem_usage=True
有关对大型 Hugging Face 模型使用 accelerate 的更多信息,请参阅此示例。
大型模型推理技巧¶
减少模型加载延迟¶
为了减少模型延迟,我们建议:
在容器或主机上预安装模型并行库,例如 Deepspeed。
预下载模型检查点。例如,如果使用 HuggingFace,则可以通过 Download_model.py 预下载预训练模型
设置环境变量 HUGGINGFACE_HUB_CACHE 和 TRANSFORMERS_CACHE
通过工具将模型下载到 HuggingFace 缓存目录 Download_model.py
优化模型配置 YAML 文件¶
您可以通过以下方式优化模型配置 YAML 文件以获得更好的性能:
如果高模型推理延迟导致响应超时,请更新 responseTimeout。
如果高模型加载延迟导致启动超时,请更新 startupTimeout。
调整 torchrun 参数。支持的参数在此处定义。例如,默认情况下为 1。这可以在 YAML 文件中修改。
OMP_NUMBER_THREADS
#frontend settings
torchrun:
nproc-per-node: 4 # specifies the number of processes torchrun starts to serve your model, set to world_size or number of
# gpus you wish to split your model
OMP_NUMBER_THREADS: 2
延迟敏感型应用程序¶
工作单¶
对于延迟敏感型推理的使用案例,建议使用作业票证功能。启用作业票证后,TorchServe 会验证模型的活动工作线程是否可用于处理客户端的请求。如果有活动工作线程可用,则会立即接受并处理请求,而无需等待作业队列或动态批处理;否则,将 503 响应发送回客户端。
此功能有助于推理延迟较高的使用案例,例如生成模型、chatGPT 等自动回归解码器模型。此功能可帮助此类应用程序采取有效的操作,例如,根据业务需求将被拒绝的请求路由到其他服务器,或扩展模型服务器容量。以下是启用作业单的示例。
minWorkers: 2
maxWorkers: 2
jobQueueSize: 2
useJobTicket: true
在此示例中,一个模型有 2 个作业队列大小为 2 的工作线程。推理请求将由 TorchServe 立即处理,或者以响应代码 503 被拒绝。
通过 HTTP 1.1 分块编码进行流式处理响应¶
TorchServe 的推理 API 支持流式响应,以允许通过 HTTP 1.1 分块编码发送一系列推理响应。仅当完整响应的推理延迟较高且推理中间结果被发送到客户端时,才建议将此功能用于使用案例。一个例子是生成式应用程序的 LLM,其中生成“n”个令牌可能会有很高的延迟。在这种情况下,用户可以在准备就绪后接收每个生成的令牌,直到完整响应完成。为了实现流式响应,后端处理程序调用 “send_intermediate_predict_response” 向前端发送一个中间结果,并将最后一个结果作为现有样式返回。例如
from ts.handler_utils.utils import send_intermediate_predict_response
''' Note: TorchServe v1.0.0 will deprecate
"from ts.protocol.otf_message_handler import send_intermediate_predict_response".
Please replace it with "from ts.handler_utils.utils import send_intermediate_predict_response".
'''
def handle(data, context):
if type(data) is list:
for i in range (3):
send_intermediate_predict_response(["intermediate_response"], context.request_ids, "Intermediate Prediction success", 200, context)
return ["hello world "]
客户端接收分块数据。
import test_utils
def test_echo_stream_inference():
test_utils.start_torchserve(no_config_snapshots=True, gen_mar=False)
test_utils.register_model('echo_stream',
'https://torchserve.pytorch.org/mar_files/echo_stream.mar')
response = requests.post(TF_INFERENCE_API + '/predictions/echo_stream', data="foo", stream=True)
assert response.headers['Transfer-Encoding'] == 'chunked'
prediction = []
for chunk in (response.iter_content(chunk_size=None)):
if chunk:
prediction.append(chunk.decode("utf-8"))
assert str(" ".join(prediction)) == "hello hello hello hello world "
test_utils.unregister_model('echo_stream')
GRPC 服务器端流式处理¶
TorchServe GRPC API 添加了推理 API“StreamPredictions”的服务器端流,以允许通过同一 GRPC 流发送一系列推理响应。此 API 仅建议用于完整响应的推理延迟较高且推理中间结果发送到客户端的使用案例。一个例子是生成式应用程序的 LLM,其中生成“n”个令牌可能会有很高的延迟。与 HTTP 1.1 分块编码类似,使用此功能,用户可以在准备就绪后接收每个生成的令牌,直到完整响应完成。此 API 会自动强制 batchSize 为 1。
service InferenceAPIsService {
// Check health status of the TorchServe server.
rpc Ping(google.protobuf.Empty) returns (TorchServeHealthResponse) {}
// Predictions entry point to get inference using default model version.
rpc Predictions(PredictionsRequest) returns (PredictionResponse) {}
// Streaming response for an inference request.
rpc StreamPredictions(PredictionsRequest) returns (stream PredictionResponse) {}
}
后端处理程序调用 “send_intermediate_predict_response” 将一个中间结果发送到前端,并将最后一个结果作为现有样式返回。例如
from ts.handler_utils.utils import send_intermediate_predict_response
''' Note: TorchServe v1.0.0 will deprecate
"from ts.protocol.otf_message_handler import send_intermediate_predict_response".
Please replace it with "from ts.handler_utils.utils import send_intermediate_predict_response".
'''
def handle(data, context):
if type(data) is list:
for i in range (3):
send_intermediate_predict_response(["intermediate_response"], context.request_ids, "Intermediate Prediction success", 200, context)
return ["hello world "]