目录

使用 Replay Buffers

作者Vincent Moens

重放缓冲区是任何 RL 或控制算法的核心部分。 监督学习方法通常以训练循环为特征 其中,数据从静态数据集中随机提取并连续馈送 添加到模型和损失函数中。 在 RL 中,情况通常略有不同:数据是使用 模型,然后临时存储在动态结构中(体验 replay buffer),它用作 loss 模块的 dataset。

与往常一样,使用缓冲区的上下文极大地限制了 它是如何构建的:有些人可能希望在其他人想要的时候存储轨迹 来存储单个过渡。特定的采样策略可能更可取 在上下文中:某些项可以比其他项具有更高的优先级,或者可以 重要的是有或没有更换的样品。 计算因素也可能起作用,例如缓冲区的大小 这可能会超过可用的 RAM 存储空间。

由于这些原因,TorchRL 的重放缓冲区是完全可组合的:尽管 它们带有 “附带电池”,只需最少的工作量即可构建, 它们还支持许多自定义,例如存储类型、 采样策略或数据转换。

在本教程中,您将学习:

基础知识:构建原版重放缓冲区

TorchRL 的重放缓冲区旨在优先考虑模块化、 可组合性、效率和简单性。例如,创建一个基本的 Replay Buffer 是一个简单的过程,如下所示 例:

import tempfile

from torchrl.data import ReplayBuffer

buffer = ReplayBuffer()

默认情况下,此重放缓冲区的大小为 1000。让我们检查一下 通过使用该方法填充我们的缓冲区:

print("length before adding elements:", len(buffer))

buffer.extend(range(2000))

print("length after adding elements:", len(buffer))
length before adding elements: 0
length after adding elements: 1000

我们使用了 旨在一次添加多个项目。如果传递的对象 to 具有多个维度,则其第一个维度为 被认为是要在缓冲区中拆分为单独元素的那个。extend

这实质上意味着,当添加多维张量或 tensordicts 添加到缓冲区中,缓冲区将只查看第一维 计算它保存在内存中的元素时。 如果对象传递给它 not iterable,则会引发异常。

要一次添加一个项目,该方法 应该改用。

自定义存储

我们看到缓冲区的上限已限制为前 1000 个元素,我们 传递给它。 要更改大小,我们需要自定义我们的存储。

TorchRL 提出了三种类型的存储:

  • stores 元素独立位于 列表。它支持任何数据类型,但这种灵活性是有代价的 效率;ListStorage

  • 存储张量数据 结构。 它与 (或 ) 自然合作 对象。存储在每个张量的基础上是连续的,这意味着 采样会比使用列表时更有效,但是 隐式限制是传递给它的任何数据都必须具有相同的 基本属性(如 shape 和 dtype)作为第一批数据,其中 用于实例化缓冲区。 传递不符合此要求的数据将引发 exception 或导致一些未定义的行为。LazyTensorStorageTensorDicttensorclass

  • 它的工作原理是因为它是 lazy (即 it 期望第一批数据被实例化),并且它需要 data ,则每个存储的批次的 shape 和 dtype 都匹配。是什么造就了 存储的独特之处在于它指向磁盘文件(或使用文件系统 存储),这意味着它可以支持非常大的数据集,同时仍然 以连续方式访问数据。LazyMemmapStorageLazyTensorStorage

让我们看看如何使用这些存储中的每一个:

from torchrl.data import LazyMemmapStorage, LazyTensorStorage, ListStorage

# We define the maximum size of the buffer
size = 100

具有 list storage buffer 的缓冲区可以存储任何类型的数据(但我们必须 更改 the ,因为默认值需要数字数据):collate_fn

buffer_list = ReplayBuffer(storage=ListStorage(size), collate_fn=lambda x: x)
buffer_list.extend(["a", 0, "b"])
print(buffer_list.sample(3))
[0, 'b', 'a']

因为它是假设量最低的那个,所以它是 TorchRL 中的默认存储。ListStorage

A 可以连续存储数据。 在处理复杂但 中等大小的不变数据结构:LazyTensorStorage

buffer_lazytensor = ReplayBuffer(storage=LazyTensorStorage(size))

让我们创建一批大小为 ''torch.Size([3])' 具有 2 个张量 存储在其中:

import torch
from tensordict import TensorDict

data = TensorDict(
    {
        "a": torch.arange(12).view(3, 4),
        ("b", "c"): torch.arange(15).view(3, 5),
    },
    batch_size=[3],
)
print(data)
TensorDict(
    fields={
        a: Tensor(shape=torch.Size([3, 4]), device=cpu, dtype=torch.int64, is_shared=False),
        b: TensorDict(
            fields={
                c: Tensor(shape=torch.Size([3, 5]), device=cpu, dtype=torch.int64, is_shared=False)},
            batch_size=torch.Size([3]),
            device=None,
            is_shared=False)},
    batch_size=torch.Size([3]),
    device=None,
    is_shared=False)

will 的第一次调用 实例化 Storage。数据的第一个维度被解绑到 单独的数据点:

buffer_lazytensor.extend(data)
print(f"The buffer has {len(buffer_lazytensor)} elements")
The buffer has 3 elements

让我们从缓冲区中采样,并打印数据:

sample = buffer_lazytensor.sample(5)
print("samples", sample["a"], sample["b", "c"])
samples tensor([[ 0,  1,  2,  3],
        [ 4,  5,  6,  7],
        [ 0,  1,  2,  3],
        [ 8,  9, 10, 11],
        [ 0,  1,  2,  3]]) tensor([[ 0,  1,  2,  3,  4],
        [ 5,  6,  7,  8,  9],
        [ 0,  1,  2,  3,  4],
        [10, 11, 12, 13, 14],
        [ 0,  1,  2,  3,  4]])

A 的创建方式相同:LazyMemmapStorage

buffer_lazymemmap = ReplayBuffer(storage=LazyMemmapStorage(size))
buffer_lazymemmap.extend(data)
print(f"The buffer has {len(buffer_lazymemmap)} elements")
sample = buffer_lazytensor.sample(5)
print("samples: a=", sample["a"], "\n('b', 'c'):", sample["b", "c"])
The buffer has 3 elements
samples: a= tensor([[ 4,  5,  6,  7],
        [ 0,  1,  2,  3],
        [ 4,  5,  6,  7],
        [ 8,  9, 10, 11],
        [ 8,  9, 10, 11]])
('b', 'c'): tensor([[ 5,  6,  7,  8,  9],
        [ 0,  1,  2,  3,  4],
        [ 5,  6,  7,  8,  9],
        [10, 11, 12, 13, 14],
        [10, 11, 12, 13, 14]])

我们还可以自定义磁盘上的存储位置:

tempdir = tempfile.TemporaryDirectory()
buffer_lazymemmap = ReplayBuffer(storage=LazyMemmapStorage(size, scratch_dir=tempdir))
buffer_lazymemmap.extend(data)
print(f"The buffer has {len(buffer_lazymemmap)} elements")
print("the 'a' tensor is stored in", buffer_lazymemmap._storage._storage["a"].filename)
print(
    "the ('b', 'c') tensor is stored in",
    buffer_lazymemmap._storage._storage["b", "c"].filename,
)
The buffer has 3 elements
the 'a' tensor is stored in /pytorch/rl/docs/source/reference/generated/tutorials/<TemporaryDirectory '/tmp/tmpcavfrtwk'>/a.memmap
the ('b', 'c') tensor is stored in /pytorch/rl/docs/source/reference/generated/tutorials/<TemporaryDirectory '/tmp/tmpcavfrtwk'>/b/c.memmap

与 TensorDict 集成

张量位置遵循与 TensorDict 相同的结构,其中 包含它们:这使得在训练期间保存和加载缓冲区变得容易。

充分利用数据载体 potential,则类可以 被使用。 它的主要优势之一是它能够处理抽样的组织 数据,以及可能需要的任何其他信息 (例如样本索引)。TensorDict

它可以按照与标准相同的方式构建,并且可以 通常可以互换使用。

from torchrl.data import TensorDictReplayBuffer

tempdir = tempfile.TemporaryDirectory()
buffer_lazymemmap = TensorDictReplayBuffer(
    storage=LazyMemmapStorage(size, scratch_dir=tempdir), batch_size=12
)
buffer_lazymemmap.extend(data)
print(f"The buffer has {len(buffer_lazymemmap)} elements")
sample = buffer_lazymemmap.sample()
print("sample:", sample)
The buffer has 3 elements
sample: TensorDict(
    fields={
        a: Tensor(shape=torch.Size([12, 4]), device=cpu, dtype=torch.int64, is_shared=False),
        b: TensorDict(
            fields={
                c: Tensor(shape=torch.Size([12, 5]), device=cpu, dtype=torch.int64, is_shared=False)},
            batch_size=torch.Size([12]),
            device=cpu,
            is_shared=False),
        index: Tensor(shape=torch.Size([12]), device=cpu, dtype=torch.int64, is_shared=False)},
    batch_size=torch.Size([12]),
    device=cpu,
    is_shared=False)

我们的示例现在有一个额外的键,用于指示哪些索引 进行了抽样。 让我们来看看这些指数:"index"

print(sample["index"])
tensor([1, 2, 0, 1, 1, 1, 2, 1, 1, 2, 1, 1])

与 tensorclass 集成

ReplayBuffer 类和关联的子类也可以与类本机一起使用,这些类可以方便地用于 以更明确的方式对数据集进行编码:tensorclass

from tensordict import tensorclass


@tensorclass
class MyData:
    images: torch.Tensor
    labels: torch.Tensor


data = MyData(
    images=torch.randint(
        255,
        (10, 64, 64, 3),
    ),
    labels=torch.randint(100, (10,)),
    batch_size=[10],
)

tempdir = tempfile.TemporaryDirectory()
buffer_lazymemmap = ReplayBuffer(
    storage=LazyMemmapStorage(size, scratch_dir=tempdir), batch_size=12
)
buffer_lazymemmap.extend(data)
print(f"The buffer has {len(buffer_lazymemmap)} elements")
sample = buffer_lazymemmap.sample()
print("sample:", sample)
The buffer has 10 elements
sample: MyData(
    images=Tensor(shape=torch.Size([12, 64, 64, 3]), device=cpu, dtype=torch.int64, is_shared=False),
    labels=Tensor(shape=torch.Size([12]), device=cpu, dtype=torch.int64, is_shared=False),
    batch_size=torch.Size([12]),
    device=cpu,
    is_shared=False)

不出所料。数据具有适当的类和形状!

与其他张量结构 (PyTrees) 集成

TorchRL 的重放缓冲区也适用于任何 pytree 数据结构。 PyTree 是由字典、列表和/或 元组,其中叶子是张量。 这意味着可以在连续内存中存储任何这样的树结构! 可以使用各种存储:或者全部接受 this 类型的数据。

以下是此功能的简要演示:

from torch.utils._pytree import tree_map

让我们在磁盘上构建我们的重放缓冲区:

rb = ReplayBuffer(storage=LazyMemmapStorage(size))
data = {
    "a": torch.randn(3),
    "b": {"c": (torch.zeros(2), [torch.ones(1)])},
    30: -torch.ones(()),  # non-string keys also work
}
rb.add(data)

# The sample has a similar structure to the data (with a leading dimension of 10 for each tensor)
sample = rb.sample(10)

使用 pytrees ,任何可调用对象都可以用作转换:

def transform(x):
    # Zeros all the data in the pytree
    return tree_map(lambda y: y * 0, x)


rb.append_transform(transform)
sample = rb.sample(batch_size=12)

让我们检查一下我们的 transform 是否完成了它的工作:

def assert0(x):
    assert (x == 0).all()


tree_map(assert0, sample)
{'a': None, 'b': {'c': (None, [None])}, 30: None}

对缓冲区进行采样和迭代

Replay Buffers 支持多种采样策略:

  • 如果 batch-size 是固定的并且可以在构造时定义,则可以 作为 keyword 参数传递给缓冲区;

  • 使用固定的 batch-size,可以迭代重放缓冲区以收集 样品;

  • 如果 batch-size 是动态的,则可以将其传递给方法 即时。

可以使用多线程进行采样,但这与 last 选项(它要求缓冲区提前知道 next batch) 的

让我们看几个例子:

固定批量大小

如果在构造期间传递了 batch-size,则应在 采样:

data = MyData(
    images=torch.randint(
        255,
        (200, 64, 64, 3),
    ),
    labels=torch.randint(100, (200,)),
    batch_size=[200],
)

buffer_lazymemmap = ReplayBuffer(storage=LazyMemmapStorage(size), batch_size=128)
buffer_lazymemmap.extend(data)
buffer_lazymemmap.sample()
MyData(
    images=Tensor(shape=torch.Size([128, 64, 64, 3]), device=cpu, dtype=torch.int64, is_shared=False),
    labels=Tensor(shape=torch.Size([128]), device=cpu, dtype=torch.int64, is_shared=False),
    batch_size=torch.Size([128]),
    device=cpu,
    is_shared=False)

这批数据的大小符合我们的预期 (128)。

要启用多线程采样,只需在构造期间将正整数传递给 keyword 参数即可。这应该会加快速度 采样相当多(例如,当 使用优先采样器):prefetch

buffer_lazymemmap = ReplayBuffer(
    storage=LazyMemmapStorage(size), batch_size=128, prefetch=10
)  # creates a queue of 10 elements to be prefetched in the background
buffer_lazymemmap.extend(data)
print(buffer_lazymemmap.sample())
MyData(
    images=Tensor(shape=torch.Size([128, 64, 64, 3]), device=cpu, dtype=torch.int64, is_shared=False),
    labels=Tensor(shape=torch.Size([128]), device=cpu, dtype=torch.int64, is_shared=False),
    batch_size=torch.Size([128]),
    device=cpu,
    is_shared=False)

迭代具有固定 batch-size 的缓冲区

我们也可以像使用常规 dataloader 中,只要 batch-size 是预定义的:

for i, data in enumerate(buffer_lazymemmap):
    if i == 3:
        print(data)
        break
MyData(
    images=Tensor(shape=torch.Size([128, 64, 64, 3]), device=cpu, dtype=torch.int64, is_shared=False),
    labels=Tensor(shape=torch.Size([128]), device=cpu, dtype=torch.int64, is_shared=False),
    batch_size=torch.Size([128]),
    device=cpu,
    is_shared=False)

由于我们的抽样技术是完全随机的,并且不会 防止替换,则有问题的迭代器是无限的。但是,我们可以 使用 the instead ,它会将我们的缓冲区转换为有限迭代器:

from torchrl.data.replay_buffers.samplers import SamplerWithoutReplacement

buffer_lazymemmap = ReplayBuffer(
    storage=LazyMemmapStorage(size), batch_size=32, sampler=SamplerWithoutReplacement()
)

我们创建一个足够大的数据来获取几个样本

data = TensorDict(
    {
        "a": torch.arange(64).view(16, 4),
        ("b", "c"): torch.arange(128).view(16, 8),
    },
    batch_size=[16],
)

buffer_lazymemmap.extend(data)
for _i, _ in enumerate(buffer_lazymemmap):
    continue
print(f"A total of {_i+1} batches have been collected")
A total of 1 batches have been collected

动态批量大小

与我们之前看到的相反,关键字 参数可以省略并直接传递给方法:batch_sizesample

buffer_lazymemmap = ReplayBuffer(
    storage=LazyMemmapStorage(size), sampler=SamplerWithoutReplacement()
)
buffer_lazymemmap.extend(data)
print("sampling 3 elements:", buffer_lazymemmap.sample(3))
print("sampling 5 elements:", buffer_lazymemmap.sample(5))
sampling 3 elements: TensorDict(
    fields={
        a: Tensor(shape=torch.Size([3, 4]), device=cpu, dtype=torch.int64, is_shared=False),
        b: TensorDict(
            fields={
                c: Tensor(shape=torch.Size([3, 8]), device=cpu, dtype=torch.int64, is_shared=False)},
            batch_size=torch.Size([3]),
            device=cpu,
            is_shared=False)},
    batch_size=torch.Size([3]),
    device=cpu,
    is_shared=False)
sampling 5 elements: TensorDict(
    fields={
        a: Tensor(shape=torch.Size([5, 4]), device=cpu, dtype=torch.int64, is_shared=False),
        b: TensorDict(
            fields={
                c: Tensor(shape=torch.Size([5, 8]), device=cpu, dtype=torch.int64, is_shared=False)},
            batch_size=torch.Size([5]),
            device=cpu,
            is_shared=False)},
    batch_size=torch.Size([5]),
    device=cpu,
    is_shared=False)

优先重放缓冲区

TorchRL 还为优先重放缓冲区提供了一个接口。 此缓冲区类根据传递的优先级信号对数据进行采样 通过数据。

尽管此工具与非 tensordict 数据兼容,但我们鼓励 改用 TensorDict,因为它可以将元数据导入 毫不费力地从缓冲区中出来。

让我们首先看看如何在泛型 箱。和 超参数 必须手动设置:

from torchrl.data.replay_buffers.samplers import PrioritizedSampler

size = 100

rb = ReplayBuffer(
    storage=ListStorage(size),
    sampler=PrioritizedSampler(max_capacity=size, alpha=0.8, beta=1.1),
    collate_fn=lambda x: x,
)

扩展重放缓冲区将返回 items 索引,我们将需要 稍后更新优先级:

indices = rb.extend([1, "foo", None])

采样器期望每个元素都有一个优先级。添加到 buffer,则 priority 设置为默认值 1。一旦优先级具有 被计算(通常通过 loss),则必须在缓冲区中更新。

这是通过 method 完成的,该方法需要 indices 和 priority。 我们人为地为数据集中的第二个样本分配了高优先级 要观察它对采样的影响,请执行以下操作:update_priority()

rb.update_priority(index=indices, priority=torch.tensor([0, 1_000, 0.1]))

我们观察到,从缓冲区采样主要返回第二个样本 ():"foo"

sample, info = rb.sample(10, return_info=True)
print(sample)
['foo', 'foo', 'foo', 'foo', 'foo', 'foo', 'foo', 'foo', 'foo', 'foo']

该信息包含项目的相对权重以及索引。

print(info)
{'_weight': tensor([2.0893e-10, 2.0893e-10, 2.0893e-10, 2.0893e-10, 2.0893e-10, 2.0893e-10,
        2.0893e-10, 2.0893e-10, 2.0893e-10, 2.0893e-10]), 'index': tensor([1, 1, 1, 1, 1, 1, 1, 1, 1, 1])}

我们看到,使用优先重放缓冲区需要一系列额外的 训练循环中的步骤与常规缓冲区的比较:

  • 在收集数据并扩展缓冲区后, 项目必须更新;

  • 在计算损失并从中获得“优先信号”之后,我们必须 再次更新缓冲区中项目的优先级。 这需要我们跟踪指数。

这极大地阻碍了缓冲区的可重用性:如果要编写 一个训练脚本,其中 prioritized 缓冲区和 regular 缓冲区都可以是 created,她必须添加大量的控制流来确保 在适当的位置调用适当的方法,如果和 仅当使用优先缓冲区时。

让我们看看如何使用 . 我们看到 returns 数据 使用它们的相对存储索引进行增强。我们没有提到的一个功能 是这个类还保证了优先级 信号会自动解析为优先采样器(如果存在),则 外延。TensorDict

这些功能的组合以多种方式简化了操作: - 当扩展缓冲区时,优先级信号会自动为

如果存在,则解析,并且将准确分配优先级;

  • 索引将存储在采样的 tensordict 中,因此很容易 在损失计算后更新 Priority。

  • 在计算损失时,优先信号将记录在 tensordict 传递给 loss 模块,从而可以更新 毫不费力的重量:

    >>> data = replay_buffer.sample()
    >>> loss_val = loss_module(data)
    >>> replay_buffer.update_tensordict_priority(data)
    

下面的代码说明了这些概念。我们使用 优先采样器,并在构造函数中指示其中 应该获取 priority 信号:

rb = TensorDictReplayBuffer(
    storage=ListStorage(size),
    sampler=PrioritizedSampler(size, alpha=0.8, beta=1.1),
    priority_key="td_error",
    batch_size=1024,
)

我们选择一个与存储索引成正比的优先级信号:

data["td_error"] = torch.arange(data.numel())

rb.extend(data)

sample = rb.sample()

较高的指数应该更频繁地出现:

from matplotlib import pyplot as plt

plt.hist(sample["index"].numpy())
RB 教程
(array([132.,  55., 122.,  55., 113.,  66., 124.,  66., 142., 149.]), array([ 0. ,  1.5,  3. ,  4.5,  6. ,  7.5,  9. , 10.5, 12. , 13.5, 15. ]), <BarContainer object of 10 artists>)

处理完示例后,我们使用 方法。 为了展示它是如何工作的,让我们恢复 采样项目:torchrl.data.TensorDictReplayBuffer.update_tensordict_priority()

sample = rb.sample()
sample["td_error"] = data.numel() - sample["index"]
rb.update_tensordict_priority(sample)

现在,较高的索引出现的频率应该较低:

sample = rb.sample()
from matplotlib import pyplot as plt

plt.hist(sample["index"].numpy())
RB 教程
(array([200., 100., 160.,  82., 167.,  70., 103.,  41.,  71.,  30.]), array([ 0. ,  1.5,  3. ,  4.5,  6. ,  7.5,  9. , 10.5, 12. , 13.5, 15. ]), <BarContainer object of 10 artists>)

使用转换

存储在重放缓冲区中的数据可能尚未准备好呈现给 loss 模块。 在某些情况下,收集器生成的数据可能太重而无法承受 按原样保存。这方面的示例包括将图像从 转换为 浮点张量,或者在使用 决策转换器。uint8

只需将 适当的转换。 以下是一些示例:

保存 Raw 图像

uint8-typed 张量的内存消耗比 我们通常提供给模型的浮点张量。因此, 保存 Raw 图像可能很有用。 以下脚本显示了如何构建一个仅返回 原始图像,但使用转换后的图像进行推理,以及这些图像如何 转换可以在 Replay Buffer 中回收:

from torchrl.collectors import SyncDataCollector
from torchrl.envs.libs.gym import GymEnv
from torchrl.envs.transforms import (
    Compose,
    GrayScale,
    Resize,
    ToTensorImage,
    TransformedEnv,
)
from torchrl.envs.utils import RandomPolicy

env = TransformedEnv(
    GymEnv("CartPole-v1", from_pixels=True),
    Compose(
        ToTensorImage(in_keys=["pixels"], out_keys=["pixels_trsf"]),
        Resize(in_keys=["pixels_trsf"], w=64, h=64),
        GrayScale(in_keys=["pixels_trsf"]),
    ),
)

让我们看一下推出:

print(env.rollout(3))
TensorDict(
    fields={
        action: Tensor(shape=torch.Size([3, 2]), device=cpu, dtype=torch.int64, is_shared=False),
        done: Tensor(shape=torch.Size([3, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        next: TensorDict(
            fields={
                done: Tensor(shape=torch.Size([3, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                pixels: Tensor(shape=torch.Size([3, 400, 600, 3]), device=cpu, dtype=torch.uint8, is_shared=False),
                pixels_trsf: Tensor(shape=torch.Size([3, 1, 64, 64]), device=cpu, dtype=torch.float32, is_shared=False),
                reward: Tensor(shape=torch.Size([3, 1]), device=cpu, dtype=torch.float32, is_shared=False),
                terminated: Tensor(shape=torch.Size([3, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                truncated: Tensor(shape=torch.Size([3, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
            batch_size=torch.Size([3]),
            device=None,
            is_shared=False),
        pixels: Tensor(shape=torch.Size([3, 400, 600, 3]), device=cpu, dtype=torch.uint8, is_shared=False),
        pixels_trsf: Tensor(shape=torch.Size([3, 1, 64, 64]), device=cpu, dtype=torch.float32, is_shared=False),
        terminated: Tensor(shape=torch.Size([3, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        truncated: Tensor(shape=torch.Size([3, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
    batch_size=torch.Size([3]),
    device=None,
    is_shared=False)

我们刚刚创建了一个生成像素的环境。这些图像 进行处理以馈送到策略。 我们希望存储原始图像,而不是它们的转换。 为此,我们将向收集器附加一个转换以选择键 我们希望看到出现:

from torchrl.envs.transforms import ExcludeTransform

collector = SyncDataCollector(
    env,
    RandomPolicy(env.action_spec),
    frames_per_batch=10,
    total_frames=1000,
    postproc=ExcludeTransform("pixels_trsf", ("next", "pixels_trsf"), "collector"),
)

让我们看一下一批数据,并控制键是否已被丢弃:"pixels_trsf"

for data in collector:
    print(data)
    break
TensorDict(
    fields={
        action: Tensor(shape=torch.Size([10, 2]), device=cpu, dtype=torch.int64, is_shared=False),
        done: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        next: TensorDict(
            fields={
                done: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                pixels: Tensor(shape=torch.Size([10, 400, 600, 3]), device=cpu, dtype=torch.uint8, is_shared=False),
                reward: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.float32, is_shared=False),
                terminated: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                truncated: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
            batch_size=torch.Size([10]),
            device=None,
            is_shared=False),
        pixels: Tensor(shape=torch.Size([10, 400, 600, 3]), device=cpu, dtype=torch.uint8, is_shared=False),
        terminated: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        truncated: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
    batch_size=torch.Size([10]),
    device=None,
    is_shared=False)

我们创建一个具有与环境相同转换的重放缓冲区。 然而,有一个细节需要解决:转换 在没有环境的情况下使用时,不会注意到数据结构。 将转换附加到环境时,嵌套 tensordict 中的数据首先被转换,然后在 转出执行。在处理静态数据时,情况并非如此。 尽管如此,我们的数据带有一个嵌套的 “next” tensordict,它将是 如果我们没有明确指示它处理 它。我们手动将这些键添加到转换中:"next"

t = Compose(
    ToTensorImage(
        in_keys=["pixels", ("next", "pixels")],
        out_keys=["pixels_trsf", ("next", "pixels_trsf")],
    ),
    Resize(in_keys=["pixels_trsf", ("next", "pixels_trsf")], w=64, h=64),
    GrayScale(in_keys=["pixels_trsf", ("next", "pixels_trsf")]),
)
rb = TensorDictReplayBuffer(storage=LazyMemmapStorage(1000), transform=t, batch_size=16)
rb.extend(data)
tensor([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])

我们可以检查方法是否看到转换后的图像重新出现:sample

print(rb.sample())
TensorDict(
    fields={
        action: Tensor(shape=torch.Size([16, 2]), device=cpu, dtype=torch.int64, is_shared=False),
        done: Tensor(shape=torch.Size([16, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        index: Tensor(shape=torch.Size([16]), device=cpu, dtype=torch.int64, is_shared=False),
        next: TensorDict(
            fields={
                done: Tensor(shape=torch.Size([16, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                pixels: Tensor(shape=torch.Size([16, 400, 600, 3]), device=cpu, dtype=torch.uint8, is_shared=False),
                pixels_trsf: Tensor(shape=torch.Size([16, 1, 64, 64]), device=cpu, dtype=torch.float32, is_shared=False),
                reward: Tensor(shape=torch.Size([16, 1]), device=cpu, dtype=torch.float32, is_shared=False),
                terminated: Tensor(shape=torch.Size([16, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                truncated: Tensor(shape=torch.Size([16, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
            batch_size=torch.Size([16]),
            device=cpu,
            is_shared=False),
        pixels: Tensor(shape=torch.Size([16, 400, 600, 3]), device=cpu, dtype=torch.uint8, is_shared=False),
        pixels_trsf: Tensor(shape=torch.Size([16, 1, 64, 64]), device=cpu, dtype=torch.float32, is_shared=False),
        terminated: Tensor(shape=torch.Size([16, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        truncated: Tensor(shape=torch.Size([16, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
    batch_size=torch.Size([16]),
    device=cpu,
    is_shared=False)

更复杂的示例:使用 CatFrames

转换展开观察结果 随着时间的推移,创建过去事件的 n-back 记忆,使模型 将过去的事件考虑在内(对于 POMDP 或 循环策略,例如 Decision Transformers)。将这些串联存储 帧可能会消耗大量内存。它也可以是 当 n-back 窗口需要不同时(通常更长)时出现问题 在训练和推理期间。我们通过在两个阶段中分别执行转换来解决此问题。CatFrames

from torchrl.envs import CatFrames, UnsqueezeTransform

我们为返回基于像素的环境创建一个标准的转换列表 观察:

env = TransformedEnv(
    GymEnv("CartPole-v1", from_pixels=True),
    Compose(
        ToTensorImage(in_keys=["pixels"], out_keys=["pixels_trsf"]),
        Resize(in_keys=["pixels_trsf"], w=64, h=64),
        GrayScale(in_keys=["pixels_trsf"]),
        UnsqueezeTransform(-4, in_keys=["pixels_trsf"]),
        CatFrames(dim=-4, N=4, in_keys=["pixels_trsf"]),
    ),
)
collector = SyncDataCollector(
    env,
    RandomPolicy(env.action_spec),
    frames_per_batch=10,
    total_frames=1000,
)
for data in collector:
    print(data)
    break
TensorDict(
    fields={
        action: Tensor(shape=torch.Size([10, 2]), device=cpu, dtype=torch.int64, is_shared=False),
        collector: TensorDict(
            fields={
                traj_ids: Tensor(shape=torch.Size([10]), device=cpu, dtype=torch.int64, is_shared=False)},
            batch_size=torch.Size([10]),
            device=None,
            is_shared=False),
        done: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        next: TensorDict(
            fields={
                done: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                pixels: Tensor(shape=torch.Size([10, 400, 600, 3]), device=cpu, dtype=torch.uint8, is_shared=False),
                pixels_trsf: Tensor(shape=torch.Size([10, 4, 1, 64, 64]), device=cpu, dtype=torch.float32, is_shared=False),
                reward: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.float32, is_shared=False),
                terminated: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                truncated: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
            batch_size=torch.Size([10]),
            device=None,
            is_shared=False),
        pixels: Tensor(shape=torch.Size([10, 400, 600, 3]), device=cpu, dtype=torch.uint8, is_shared=False),
        pixels_trsf: Tensor(shape=torch.Size([10, 4, 1, 64, 64]), device=cpu, dtype=torch.float32, is_shared=False),
        terminated: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        truncated: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
    batch_size=torch.Size([10]),
    device=None,
    is_shared=False)

缓冲区转换看起来与环境转换非常相似,但使用 像以前一样额外的键:("next", ...)

t = Compose(
    ToTensorImage(
        in_keys=["pixels", ("next", "pixels")],
        out_keys=["pixels_trsf", ("next", "pixels_trsf")],
    ),
    Resize(in_keys=["pixels_trsf", ("next", "pixels_trsf")], w=64, h=64),
    GrayScale(in_keys=["pixels_trsf", ("next", "pixels_trsf")]),
    UnsqueezeTransform(-4, in_keys=["pixels_trsf", ("next", "pixels_trsf")]),
    CatFrames(dim=-4, N=4, in_keys=["pixels_trsf", ("next", "pixels_trsf")]),
)
rb = TensorDictReplayBuffer(storage=LazyMemmapStorage(size), transform=t, batch_size=16)
data_exclude = data.exclude("pixels_trsf", ("next", "pixels_trsf"))
rb.add(data_exclude)
0

让我们从缓冲区中采样一批。变换后的形状 像素键的长度应为 4,沿第 4 个维度从 结束:

s = rb.sample(1)  # the buffer has only one element
print(s)
TensorDict(
    fields={
        action: Tensor(shape=torch.Size([1, 10, 2]), device=cpu, dtype=torch.int64, is_shared=False),
        collector: TensorDict(
            fields={
                traj_ids: Tensor(shape=torch.Size([1, 10]), device=cpu, dtype=torch.int64, is_shared=False)},
            batch_size=torch.Size([1, 10]),
            device=cpu,
            is_shared=False),
        done: Tensor(shape=torch.Size([1, 10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        index: Tensor(shape=torch.Size([1, 10]), device=cpu, dtype=torch.int64, is_shared=False),
        next: TensorDict(
            fields={
                done: Tensor(shape=torch.Size([1, 10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                pixels: Tensor(shape=torch.Size([1, 10, 400, 600, 3]), device=cpu, dtype=torch.uint8, is_shared=False),
                pixels_trsf: Tensor(shape=torch.Size([1, 10, 4, 1, 64, 64]), device=cpu, dtype=torch.float32, is_shared=False),
                reward: Tensor(shape=torch.Size([1, 10, 1]), device=cpu, dtype=torch.float32, is_shared=False),
                terminated: Tensor(shape=torch.Size([1, 10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                truncated: Tensor(shape=torch.Size([1, 10, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
            batch_size=torch.Size([1, 10]),
            device=cpu,
            is_shared=False),
        pixels: Tensor(shape=torch.Size([1, 10, 400, 600, 3]), device=cpu, dtype=torch.uint8, is_shared=False),
        pixels_trsf: Tensor(shape=torch.Size([1, 10, 4, 1, 64, 64]), device=cpu, dtype=torch.float32, is_shared=False),
        terminated: Tensor(shape=torch.Size([1, 10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        truncated: Tensor(shape=torch.Size([1, 10, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
    batch_size=torch.Size([1, 10]),
    device=cpu,
    is_shared=False)

经过一些处理(不包括未使用的 key 等),我们看到 线上线下生成的数据匹配!

assert (data.exclude("collector") == s.squeeze(0).exclude("index", "collector")).all()

存储轨迹

在许多情况下,最好从缓冲区访问 trajectes 而不是 比简单的过渡。TorchRL 提供了多种实现此目的的方法。

目前首选的方法是沿第一个 维度,并使用 a 来 对这些批次的数据进行采样。此类只需要几个信息 关于您的数据结构来完成它的工作(并不是说到目前为止它只是 compatible with tensordict-structured data):切片的数量或其 length 和一些关于 episodes 中(例如,回想一下,使用 DataCollector 时,轨迹 ID 存储在 中)。在这个简单的示例中,我们构造一个数据 具有 4 个连续的短轨迹,并从中采样 4 个切片,每个切片 长度 2(因为批量大小为 8,8 个项目 // 4 个切片 = 2 个时间步)。 我们也标记了步骤。SliceSampler("collector", "traj_ids")

from torchrl.data import SliceSampler

rb = TensorDictReplayBuffer(
    storage=LazyMemmapStorage(size),
    sampler=SliceSampler(traj_key="episode", num_slices=4),
    batch_size=8,
)
episode = torch.zeros(10, dtype=torch.int)
episode[:3] = 1
episode[3:5] = 2
episode[5:7] = 3
episode[7:] = 4
steps = torch.cat([torch.arange(3), torch.arange(2), torch.arange(2), torch.arange(3)])
data = TensorDict(
    {
        "episode": episode,
        "obs": torch.randn((3, 4, 5)).expand(10, 3, 4, 5),
        "act": torch.randn((20,)).expand(10, 20),
        "other": torch.randn((20, 50)).expand(10, 20, 50),
        "steps": steps,
    },
    [10],
)
rb.extend(data)
sample = rb.sample()
print("episode are grouped", sample["episode"])
print("steps are successive", sample["steps"])
episode are grouped tensor([2, 2, 1, 1, 2, 2, 2, 2], dtype=torch.int32)
steps are successive tensor([0, 1, 1, 2, 0, 1, 0, 1])

结论

我们已经看到了如何在 TorchRL 中使用重放缓冲区,从最简单的 用于需要转换或存储数据的更高级的 特别是以特定的方式。 您现在应该能够:

  • 创建 Replay Buffer,自定义其存储、采样器和变换;

  • 为您的问题选择最佳存储类型(基于列表、内存或磁盘);

  • 最大限度地减少缓冲区的内存占用。

后续步骤

  • 查看数据 API 参考,了解 TorchRL 中的离线数据集。 它们基于我们的 Replay Buffer API;

  • 检查其他采样器,例如 、 和 或其他写入器 如。SamplerWithoutReplacementPrioritizedSliceSamplerSliceSamplerWithoutReplacementTensorDictMaxValueWriter

  • 文档中查看如何对 ReplayBuffers 进行检查点操作。

脚本总运行时间:(2 分 53.110 秒)

估计内存使用量:522 MB

由 Sphinx-Gallery 生成的图库

文档

访问 PyTorch 的全面开发人员文档

查看文档

教程

获取面向初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并解答您的问题

查看资源