多处理最佳实践¶
是 Python
模块的直接替代品。它支持完全相同的操作,
而是扩展它,以便通过 发送
的所有张量都会将其数据移动到共享
内存,并且只会将句柄发送到另一个进程。
注意
当 发送到另一个进程时,
数据将被共享。如果
是
not ,它也是共享的。在 a
没有
一个
字段被发送到另一个进程,则
创建一个特定于 Standard Process-specific
的
不会在所有进程之间自动共享,这与 数据的共享方式
不同。
None
.grad
这允许实施各种训练方法,例如 Hogwild、A3C 或任何 其他需要异步操作的。
多进程中的 CUDA¶
CUDA 运行时不支持 start 方法;或 start 方法为
在子进程中使用 CUDA 所需的。fork
spawn
forkserver
注意
可以通过使用 或 直接使用 创建上下文来设置 start 方法 。multiprocessing.get_context(...)
multiprocessing.set_start_method(...)
与 CPU 张量不同,发送过程需要保留原始张量 只要接收进程保留 Tensor 的副本即可。它已实现 在后台,但要求用户遵循该计划的最佳实践 才能正常运行。例如,只要 Consumer 进程具有对 Tensor 的引用,而 refCounting 不能 如果使用者进程通过 Fatal 信号异常退出,则 Save You。请参阅此部分。
另请参阅:使用 nn.parallel.DistributedDataParallel 而不是 multiprocessing 或 nn。DataParallel 数据并行
最佳实践和提示¶
避免和对抗死锁¶
当生成新进程时,有很多事情可能会出错,其中
死锁的最常见原因是后台线程。如果有
线程,并且被调用,则非常
子进程可能处于损坏状态并死锁,或者
以不同的方式失败。请注意,即使你不这样做,Python 内置的
库可以 - 无需再深入研究。
实际上是一个非常复杂的类,则
生成多个用于序列化、发送和接收对象的线程,它们
也可能导致上述问题。如果您发现自己处于这种情况
尝试使用 , 它不会
使用任何其他线程。
fork
multiprocessing.queues.SimpleQueue
我们正在尽最大努力让您轻松,并确保这些死锁不会 但有些事情超出了我们的控制范围。如果您有任何问题,则无法 应付一段时间,尝试在论坛上联系,我们会看看它是否是一个 我们可以修复的问题。
重用通过 Queue 传递的缓冲区¶
请记住,每次将 a 放入 时
,它都必须移动到共享内存中。
如果已经共享,则为 no-op,否则将产生额外的
内存复制可能会减慢整个过程。即使您有一个
进程将数据发送到单个 Broker 中,使其将缓冲区发送回去 - 这个
几乎是免费的,并且可以让您在发送下一批时避免复制。
异步多进程训练(例如 Hogwild)¶
使用 ,可以训练模型
异步,参数要么一直共享,要么
定期同步。在第一种情况下,我们建议将整个
model 对象,而在后者中,我们建议只发送
.
我们建议使用 for pass all kinds
的 PyTorch 对象。例如,可以继承张量
以及共享内存中已有的存储,当使用 start 方法时,
但是,它很容易出错,应谨慎使用,并且只能由 Advanced
用户。队列,即使它们有时是一个不那么优雅的解决方案,也会起作用
在所有情况下都是正确的。
fork
警告
您应该小心使用不受保护的 global 语句
替换为 .如果使用的 start 方法不同,它们将在所有子进程中执行。if __name__ == '__main__'
fork
霍格维尔德¶
具体的 Hogwild 实现可以在 examples 存储库中找到, 但为了展示代码的整体结构,还有一个 minimal 示例如下:
import torch.multiprocessing as mp
from model import MyModel
def train(model):
# Construct data_loader, optimizer, etc.
for data, labels in data_loader:
optimizer.zero_grad()
loss_fn(model(data), labels).backward()
optimizer.step() # This will update the shared parameters
if __name__ == '__main__':
num_processes = 4
model = MyModel()
# NOTE: this is required for the ``fork`` method to work
model.share_memory()
processes = []
for rank in range(num_processes):
p = mp.Process(target=train, args=(model,))
p.start()
processes.append(p)
for p in processes:
p.join()