TorchScript 中的动态并行处理¶
创建日期: 2020年7月28日 | 最后更新日期: 2024年12月2日 | 最后验证日期: 2024年11月5日
警告
TorchScript 已经不再处于活跃开发阶段。
在本教程中,我们将介绍如何在TorchScript中实现动态跨操作并行化。这种并行化具有以下特性:
动态——创建的并行任务数量及其工作负载可能取决于程序的控制流。
inter-op - 并行性是指在并行运行TorchScript程序片段方面的关注点。这与intra-op并行性不同,后者是指拆分单个操作符并在操作符的工作量中并行运行子集。
基本语法¶
动态并行主义的两个重要API是:
torch.jit.fork(fn : Callable[..., T], *args, **kwargs) -> torch.jit.Future[T]torch.jit.wait(fut : torch.jit.Future[T]) -> T
一个很好的方式来展示这些是如何工作的就是通过一个例子:
import torch
def foo(x):
return torch.neg(x)
@torch.jit.script
def example(x):
# Call `foo` using parallelism:
# First, we "fork" off a task. This task will run `foo` with argument `x`
future = torch.jit.fork(foo, x)
# Call `foo` normally
x_normal = foo(x)
# Second, we "wait" on the task. Since the task may be running in
# parallel, we have to "wait" for its result to become available.
# Notice that by having lines of code between the "fork()" and "wait()"
# call for a given Future, we can overlap computations so that they
# run in parallel.
x_parallel = torch.jit.wait(future)
return x_normal, x_parallel
print(example(torch.ones(1))) # (-1., -1.)
fork() 接受可调用对象 fn 和该可调用对象的参数 args
并 kwargs 创建一个异步任务来执行 fn。
fn 可以是一个函数、方法或 Module 实例。fork() 返回对该执行结果的引用,称为 Future。
由于 fork 立即创建异步任务并返回,fn 可能在调用 fork() 后的下一行代码执行时还未完成。
因此,wait() 用于等待异步任务完成并返回值。
这些构造可以用于在一个函数内部重叠语句的执行(如在示例部分所示)或与其他语言构造,例如循环,结合使用。
import torch
from typing import List
def foo(x):
return torch.neg(x)
@torch.jit.script
def example(x):
futures : List[torch.jit.Future[torch.Tensor]] = []
for _ in range(100):
futures.append(torch.jit.fork(foo, x))
results = []
for future in futures:
results.append(torch.jit.wait(future))
return torch.sum(torch.stack(results))
print(example(torch.ones([])))
注意
当我们初始化一个空的Future列表时,需要在futures处显式添加类型注解。在TorchScript中,空容器默认假设其包含Tensor值,因此我们标注列表构造函数的类型为List[torch.jit.Future[torch.Tensor]]
这个示例使用 fork() 来启动函数 foo 的 100 个实例,等待 100 个任务完成,然后汇总结果,返回 -100.0。
应用示例:双向LSTM集成¶
让我们尝试将并行处理应用到一个更现实的例子中,看看能获得什么样的性能提升。首先,我们定义基准模型:一个双向LSTM层的集成。
import torch, time
# In RNN parlance, the dimensions we care about are:
# # of time-steps (T)
# Batch size (B)
# Hidden size/number of "channels" (C)
T, B, C = 50, 50, 1024
# A module that defines a single "bidirectional LSTM". This is simply two
# LSTMs applied to the same sequence, but one in reverse
class BidirectionalRecurrentLSTM(torch.nn.Module):
def __init__(self):
super().__init__()
self.cell_f = torch.nn.LSTM(input_size=C, hidden_size=C)
self.cell_b = torch.nn.LSTM(input_size=C, hidden_size=C)
def forward(self, x : torch.Tensor) -> torch.Tensor:
# Forward layer
output_f, _ = self.cell_f(x)
# Backward layer. Flip input in the time dimension (dim 0), apply the
# layer, then flip the outputs in the time dimension
x_rev = torch.flip(x, dims=[0])
output_b, _ = self.cell_b(torch.flip(x, dims=[0]))
output_b_rev = torch.flip(output_b, dims=[0])
return torch.cat((output_f, output_b_rev), dim=2)
# An "ensemble" of `BidirectionalRecurrentLSTM` modules. The modules in the
# ensemble are run one-by-one on the same input then their results are
# stacked and summed together, returning the combined result.
class LSTMEnsemble(torch.nn.Module):
def __init__(self, n_models):
super().__init__()
self.n_models = n_models
self.models = torch.nn.ModuleList([
BidirectionalRecurrentLSTM() for _ in range(self.n_models)])
def forward(self, x : torch.Tensor) -> torch.Tensor:
results = []
for model in self.models:
results.append(model(x))
return torch.stack(results).sum(dim=0)
# For a head-to-head comparison to what we're going to do with fork/wait, let's
# instantiate the model and compile it with TorchScript
ens = torch.jit.script(LSTMEnsemble(n_models=4))
# Normally you would pull this input out of an embedding table, but for the
# purpose of this demo let's just use random data.
x = torch.rand(T, B, C)
# Let's run the model once to warm up things like the memory allocator
ens(x)
x = torch.rand(T, B, C)
# Let's see how fast it runs!
s = time.time()
ens(x)
print('Inference took', time.time() - s, ' seconds')
在我的机器上,这个网络运行时间为 2.05 秒。我们可以做得更好!
并行化前向和反向层¶
我们可以非常简单地并行化第BidirectionalRecurrentLSTM层的前向和反向层。
对于这一点,计算结构是静态的,所以我们甚至不需要任何循环。让我们像这样重写forward层的BidirectionalRecurrentLSTM方法:
def forward(self, x : torch.Tensor) -> torch.Tensor:
# Forward layer - fork() so this can run in parallel to the backward
# layer
future_f = torch.jit.fork(self.cell_f, x)
# Backward layer. Flip input in the time dimension (dim 0), apply the
# layer, then flip the outputs in the time dimension
x_rev = torch.flip(x, dims=[0])
output_b, _ = self.cell_b(torch.flip(x, dims=[0]))
output_b_rev = torch.flip(output_b, dims=[0])
# Retrieve the output from the forward layer. Note this needs to happen
# *after* the stuff we want to parallelize with
output_f, _ = torch.jit.wait(future_f)
return torch.cat((output_f, output_b_rev), dim=2)
在本例中,forward()将执行cell_f委托给另一个线程,
同时它继续执行cell_b。这导致两个单元格的执行相互重叠。
再次运行此简单的修改脚本,运行时间为
1.71 秒,提高了 17% 秒!
Aside: 并行性可视化¶
我们尚未完成模型的优化,但值得介绍一下我们用于可视化性能的工具。其中一个重要的工具是 PyTorch 分析器。
让我们结合 Chrome trace 导出功能使用性能分析器来可视化并行化模型的性能:
with torch.autograd.profiler.profile() as prof:
ens(x)
prof.export_chrome_trace('parallel.json')
这段代码将会写入一个名为parallel.json的文件。如果你导航到chrome://tracing,点击Load按钮,并加载那个JSON文件,你应该会看到如下所示的时间线:
时间轴的横轴表示时间,纵轴表示执行线程。如我们所见,我们同时运行了两个lstm实例。这是我们将双向层并行化的努力成果!
在集成中并行化模型¶
您可能已经注意到我们的代码中还存在进一步的并行化机会:我们也可以将LSTMEnsemble中的模型彼此并行运行。实现这一点的方法很简单,我们应该这样更改forward方法:LSTMEnsemble:
def forward(self, x : torch.Tensor) -> torch.Tensor:
# Launch tasks for each model
futures : List[torch.jit.Future[torch.Tensor]] = []
for model in self.models:
futures.append(torch.jit.fork(model, x))
# Collect the results from the launched tasks
results : List[torch.Tensor] = []
for future in futures:
results.append(torch.jit.wait(future))
return torch.stack(results).sum(dim=0)
或者,如果你更喜欢简洁的方式,我们可以使用列表推导式:
def forward(self, x : torch.Tensor) -> torch.Tensor:
futures = [torch.jit.fork(model, x) for model in self.models]
results = [torch.jit.wait(fut) for fut in futures]
return torch.stack(results).sum(dim=0)
如介绍中所述,我们使用了循环来为集成中的每个模型分叉任务。然后,我们又使用了一个循环来等待所有任务完成。这进一步提供了计算上的重叠。
通过这个小更新,脚本运行时间缩短至1.4秒,总共加速了32%倍!仅仅两行代码就取得了不错的成效。
我们也可以再次使用 Chrome Tracer 来查看发生了什么:
我们现在可以看到,所有 LSTM 个实例都完全并行运行。
结论¶
在本教程中,我们学习了fork()和wait(),这两个基本API用于实现动态和跨操作符并行计算。我们看到了这些函数的一些典型用法,用于并行化TorchScript代码中函数、方法或Modules的执行。最后,我们通过一个使用此技术优化模型的例子,并探索了PyTorch提供的性能测量和可视化工具。