注意
转到末尾 以下载完整示例代码。
从流中构建张量字典¶
作者: Vincent Moens
在许多实际应用中,数据是连续生成的,并且以不同的频率生成。
例如,物联网设备的传感器读数、金融交易或社交媒体更新等,均可能产生需要实时处理与分析的数据流。
在处理此类数据流时,通常需要将传入的数据“分桶”为离散的数据块,以便高效地进行处理与分析。然而,当面对频率或格式各不相同的数流时,这一操作可能颇具挑战性。
在本教程中,我们将探讨如何使用 TensorDict 构建和操作数据流。 我们将学习如何创建张量的惰性堆叠(lazy stacks)、处理异步数据流,以及对数据进行稠密化(densify),以实现高效存储与处理。
在本教程中,您将学习:
- 如何读取数据流并在张量字典中定期写入;
- 如何构建将不同形状的内容堆叠在一起的TensorDict;
- 如果需要,如何使用nested_tensor将这些张量合并到单个存储中。
将异构张量字典堆叠在一起¶
在许多现实场景中,数据以具有不同定义频率的流形式出现。
本教程的目标是对即将处理的数据进行“分桶”(bucketize),使其能够以指定的较低频率进行读取和处理。 此场景下的挑战在于,数据可能无法表示为规则的“矩形”格式(即张量的每个维度均有明确定义),而可能出现某一数据桶中的元素数量多于另一数据桶的情况;此时,我们无法简单地将它们堆叠(stack)在一起。通常,考虑如下情形:第一个和第二个数据桶的内容分别为:
import torch
from tensordict import TensorDict
bucket0 = TensorDict(stream0=torch.randn(5), stream1=torch.randn(4))
bucket1 = TensorDict(stream0=torch.randn(4), stream1=torch.randn(5))
原则上,我们不能将这两个 tensordict 连续堆叠在内存中,因为两个流的形状不同。
幸运的是,TensorDict 提供了一个工具来将具有异构张量形状的实例组合在一起:
LazyStackedTensorDict.
要创建一个懒惰堆栈,只需调用 lazy_stack():
data = TensorDict.lazy_stack([bucket0, bucket1], dim=0)
print(data)
LazyStackedTensorDict(
fields={
stream0: Tensor(shape=torch.Size([2, -1]), device=cpu, dtype=torch.float32, is_shared=False),
stream1: Tensor(shape=torch.Size([2, -1]), device=cpu, dtype=torch.float32, is_shared=False)},
exclusive_fields={
},
batch_size=torch.Size([2]),
device=None,
is_shared=False,
stack_dim=0)
生成的数据只是两个张量字典沿维度0堆叠在一起的表示。LazyStackedTensorDict 支持 TensorDictBase 类的大多数常见操作,以下是一些示例:
data_select = data.select("stream0")
data_plus_1 = data + 1
data_apply = data.apply(lambda x: x + 1)
此外,对其进行索引将返回我们用来创建堆栈的原始数据。
assert data[0] is bucket0
尽管如此,在某些情况下,人们可能希望拥有底层数据的连续表示。
为此,TensorDictBase 提供了一个 densify() 方法,该方法将堆叠可以堆叠的张量,并尝试将剩余部分表示为 nested_tensor 实例:
data_cont = data.densify()
异步数据流¶
现在,让我们转向一个更具体的示例:创建一个函数,以指定频率流式传输数据(本例中仅为每次迭代递增 1 的整数)。
为了跨线程传递数据,该函数将使用作为输入接收的队列:
import asyncio
from typing import List
async def generate_numbers(frequency: float, queue: asyncio.Queue) -> None:
i = 0
while True:
await asyncio.sleep(1 / frequency)
await queue.put(i)
i += 1
The collect_data 函数在给定的时间内从队列中读取数据。
一旦 timeout 秒过去,函数返回:
async def collect_data(queue: asyncio.Queue, timeout: float) -> List[int]:
values = []
# We create a nested `collect` async function in order to be able to stop it as
# soon as timeout is passed (see wait_for below).
async def collect():
nonlocal values
while True:
value = await queue.get()
values.append(value)
task = asyncio.create_task(collect())
try:
await asyncio.wait_for(task, timeout=timeout)
except asyncio.TimeoutError:
task.cancel()
return values
The wait7hz 函数在给定的时间内从队列中读取数据。
async def wait7hz() -> None:
queue = asyncio.Queue()
generate_task = asyncio.create_task(generate_numbers(7, queue))
collect_data_task = asyncio.create_task(collect_data(queue, timeout=1))
values = await collect_data_task
# The ``generate_task`` has not been terminated
generate_task.cancel()
print(values)
asyncio.run(wait7hz())
from typing import Callable, Dict
[0, 1, 2, 3, 4, 5]
我们现在可以设计一个继承自 LazyStackedTensorDict 的类,并读取来自不同流的数据并将它们注册到单独的 tensordicts 中。
LazyStackedTensorDict 的一个很好的特性是它也可以增量构建,这样我们只需通过扩展懒惰堆栈直到收集到足够的数据即可简单地注册新数据。
以下是这个 StreamedTensorDict 类的实现:
from tensordict import LazyStackedTensorDict, NestedKey, TensorDictBase
class StreamedTensorDict(LazyStackedTensorDict):
"""A lazy stack class that can be built from a dictionary of streams."""
@classmethod
async def from_streams(
cls,
streams: Dict[NestedKey, Callable],
timeout: float,
batch_size: int,
densify: bool = True,
) -> TensorDictBase:
td = cls(stack_dim=0)
# We construct a queue for each stream
queues = [asyncio.Queue() for _ in range(len(streams))]
tasks = []
for stream, queue in zip(streams.values(), queues):
task = asyncio.create_task(stream(queue))
tasks.append(task)
for _ in range(batch_size):
values_tasks = []
for queue in queues:
values_task = asyncio.create_task(collect_data(queue, timeout))
values_tasks.append(values_task)
values = await asyncio.gather(*values_tasks)
td.append(TensorDict(dict(zip(streams.keys(), values))))
# Cancel the generator tasks
for task in tasks:
task.cancel()
if densify:
return td.densify(layout=torch.strided)
return td
最后,main 函数将组合流式函数 stream0 和 stream1 并将它们传递给
StreamedTensorDict.from_streams 方法,该方法将收集 batch_size 批数据,每批持续 timeout=1 秒:
async def main() -> TensorDictBase:
def stream0(queue):
return generate_numbers(frequency=7, queue=queue)
def stream1(queue):
return generate_numbers(frequency=3, queue=queue)
# Running this should take about 10 seconds
return await StreamedTensorDict.from_streams(
{"bucket0": stream0, "bucket1": stream1}, timeout=1, batch_size=10
)
td = asyncio.run(main())
print("TensorDict from stream", td)
TensorDict from stream TensorDict(
fields={
bucket0: Tensor(shape=torch.Size([10, -1]), device=cpu, dtype=torch.int64, is_shared=False),
bucket1: Tensor(shape=torch.Size([10, -1]), device=cpu, dtype=torch.int64, is_shared=False)},
batch_size=torch.Size([10]),
device=None,
is_shared=False)
- Let’s represent the data from both streams - should be equal to torch.arange() for batch_size * timeout * Hz
<=> 1 * 10 秒 * 3 或 7
print("bucket0 (7Hz, around 70 values)", td["bucket0"].values())
print("bucket1 (3Hz, around 30 values)", td["bucket1"].values())
print("shapes of bucket0 (7Hz, around 70 values)", td["bucket0"]._nested_tensor_size())
bucket0 (7Hz, around 70 values) tensor([ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17,
18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35,
36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53,
54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68])
bucket1 (3Hz, around 30 values) tensor([ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17,
18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28])
shapes of bucket0 (7Hz, around 70 values) tensor([[6],
[7],
[7],
[7],
[7],
[7],
[7],
[7],
[7],
[7]])
结论¶
在本教程中,我们探讨了 TensorDict 和异步数据流的基本用法。 我们学习了如何创建张量的惰性堆叠(lazy stacks)、如何使用 asyncio 处理异步数据流,以及如何对数据进行稠密化(densify),以实现高效存储与处理。
我们还看到了如何使用 TensorDict 和 LazyStackedTensorDict 来简化复杂的数据处理任务,例如对具有不同频率的数据流进行分桶。
通过利用 TensorDict 和 asyncio 的强大功能,您可以构建可扩展且高效的数据处理管道,这些管道可以处理最苛刻的现实世界应用程序。
感谢您跟随本教程学习!我们希望您觉得它有帮助且内容丰富。
脚本总运行时间: (0 分钟 11.021 秒)