torch.futures¶
这个包提供了一个 Future 类型,封装了异步执行和一组简化操作的实用函数
在 Future 对象上。目前,
Future 类型主要由
分布式RPC框架 使用。
- class torch.futures.Future(*, devices=None)¶
围绕一个
torch._C.Future进行封装,该torch._C.Future异步执行一个可调用对象,例如rpc_async()。它还提供了一组API来添加回调函数和设置结果。警告
对GPU的支持是一项测试功能,可能会发生变化。
- add_done_callback(callback)[source]¶
将给定的回调函数附加到此
Future,当Future完成时运行。可以向同一个Future添加多个回调函数,但它们的执行顺序不能保证。回调函数必须接受一个参数,即对此Future的引用。回调函数可以使用value()方法获取其值。注意,如果此Future已完成,则给定的回调将在行内运行。我们建议您使用
then()方法,因为它可以在您的回调完成后提供同步方式。add_done_callback如果您的回调不返回任何内容可能会更便宜。但then()和add_done_callback在底层都使用相同的回调注册API。对于GPU张量,此方法的行为与
then()相同。- Parameters
回调函数 (
Future) – 一个接受一个参数的Callable,该参数是指向此Future的引用。
注意
注意,如果回调函数抛出异常,无论是通过原始future完成时带有异常并调用
fut.wait(),还是通过回调中的其他代码,都必须小心处理错误。例如,如果这个回调稍后完成了额外的futures,这些futures不会被标记为带有错误的完成,用户需要独立处理和等待这些futures。- Example::
>>> def callback(fut): ... print("This will run after the future has finished.") ... print(fut.wait()) >>> fut = torch.futures.Future() >>> fut.add_done_callback(callback) >>> fut.set_result(5) This will run after the future has finished. 5
- done()[source]¶
返回
True如果此Future已完成。如果它 有结果或异常,则Future已完成。如果值包含位于GPU上的张量,即使填充这些张量的异步内核尚未在设备上完成运行,
Future.done()仍将返回True, 因为在该阶段结果已经是可用的,只要进行适当的同步操作(参见wait())。- Return type
- set_exception(result)[source]¶
为这个
Future设置一个异常,这将把这个Future标记为带有错误的完成,并触发所有附加的回调函数。请注意,在调用此Future的 wait()/value() 时,此处设置的异常将被内联抛出。- Parameters
结果 (BaseException) – 此
Future的异常。
- Example::
>>> fut = torch.futures.Future() >>> fut.set_exception(ValueError("foo")) >>> fut.wait() Traceback (most recent call last): ... ValueError: foo
- set_result(result)[source]¶
将结果设置为此
Future,这将标记此Future为已完成,并触发所有附加的回调函数。请注意,一个Future不能被标记为已完成两次。如果结果包含驻留在GPU上的张量,即使填充这些张量的异步内核尚未在设备上完成运行,也可以调用此方法,只要在调用此方法时将那些内核排队的流设置为当前流即可。简单来说,在不更改流的情况下,可以在启动这些内核后立即安全地调用此方法,而无需任何额外的同步。此方法将在所有相关的当前流上记录事件,并使用它们来确保对此
Future的所有消费者的正确调度。- Parameters
结果 (对象) – 本次
Future的结果对象。
- Example::
>>> import threading >>> import time >>> def slow_set_future(fut, value): ... time.sleep(0.5) ... fut.set_result(value) >>> fut = torch.futures.Future() >>> t = threading.Thread( ... target=slow_set_future, ... args=(fut, torch.ones(2) * 3) ... ) >>> t.start() >>> print(fut.wait()) tensor([3., 3.]) >>> t.join()
- then(callback)[source]¶
将给定的回调函数附加到此
Future,当Future完成时运行。可以向同一个Future添加多个回调,但它们的执行顺序不能保证(要强制执行特定顺序,请考虑链接:fut.then(cb1).then(cb2))。回调函数必须接受一个参数,该参数是指向此Future的引用。回调函数可以使用value()方法获取值。请注意,如果此Future已完成,给定的回调将立即内联运行。如果
Future的值包含位于 GPU 上的张量,那么在异步内核还在填充这些张量且尚未在设备上完成执行时,可能会调用回调函数。然而,回调函数会被调用时会使用从全局池中获取的一些专用流作为当前流,并且这些流会与那些内核同步。因此,回调函数对这些张量执行的任何操作都会在内核完成后调度到设备上执行。换句话说,只要回调函数不切换流,它就可以安全地操纵结果而无需额外的同步。这与wait()的非阻塞行为类似。同样地,如果回调函数返回一个包含在 GPU 上的张量的值, 即使生成这些张量的内核仍在设备上运行,它也可以这样做, 只要回调函数在执行期间没有更改流。如果需要更改流, 必须小心将其重新与原始流同步,即那些在回调被调用时当前的流。
- Parameters
回调函数 (
Callable) – 一个Callable,它将此Future作为唯一的参数。- Returns
一个新的
Future对象,用于保存callback的返回值,并在给定的callback完成时标记为已完成。- Return type
未来[S]
注意
注意,如果回调函数抛出异常,无论是通过原始的future由于异常完成并调用
fut.wait(),还是通过回调中的其他代码,由then返回的future都会被适当地标记为遇到的错误。然而,如果这个回调稍后完成了额外的futures,这些futures不会被标记为由于错误完成,并且用户需要独立处理/等待这些futures。- Example::
>>> def callback(fut): ... print(f"RPC return value is {fut.wait()}.") >>> fut = torch.futures.Future() >>> # The inserted callback will print the return value when >>> # receiving the response from "worker1" >>> cb_fut = fut.then(callback) >>> chain_cb_fut = cb_fut.then( ... lambda x : print(f"Chained cb done. {x.wait()}") ... ) >>> fut.set_result(5) RPC return value is 5. Chained cb done. None
- torch.futures.collect_all(futures)[source]¶
收集提供的
Future对象到一个单一的 组合Future,当所有的子任务完成后即完成。- Example::
>>> fut0 = torch.futures.Future() >>> fut1 = torch.futures.Future() >>> fut = torch.futures.collect_all([fut0, fut1]) >>> fut0.set_result(0) >>> fut1.set_result(1) >>> fut_list = fut.wait() >>> print(f"fut0 result = {fut_list[0].wait()}") fut0 result = 0 >>> print(f"fut1 result = {fut_list[1].wait()}") fut1 result = 1