注意
转到末尾下载完整的示例代码。
使用 Replay Buffers¶
作者: Vincent Moens
重放缓冲区是任何 RL 或控制算法的核心部分。 监督学习方法通常以训练循环为特征 其中,数据从静态数据集中随机提取并连续馈送 添加到模型和损失函数中。 在 RL 中,情况通常略有不同:数据是使用 模型,然后临时存储在动态结构中(体验 replay buffer),它用作 loss 模块的 dataset。
与往常一样,使用缓冲区的上下文极大地限制了 它是如何构建的:有些人可能希望在其他人想要的时候存储轨迹 来存储单个过渡。特定的采样策略可能更可取 在上下文中:某些项可以比其他项具有更高的优先级,或者可以 重要的是有或没有更换的样品。 计算因素也可能起作用,例如缓冲区的大小 这可能会超过可用的 RAM 存储空间。
由于这些原因,TorchRL 的重放缓冲区是完全可组合的:尽管 它们带有 “附带电池”,只需最少的工作量即可构建, 它们还支持许多自定义,例如存储类型、 采样策略或数据转换。
在本教程中,您将学习:
如何构建 Replay Buffer (RB) 并将其与 任何数据类型;
如何自定义缓冲区的存储;
如何将 RB 与 TensorDict 一起使用;
如何从重放缓冲区采样或迭代, 以及如何定义采样策略;
如何使用优先重放缓冲区;
如何转换传入和传出的数据 缓冲区;
如何在缓冲区中存储轨迹。
基础知识:构建原版重放缓冲区¶
TorchRL 的重放缓冲区旨在优先考虑模块化、 可组合性、效率和简单性。例如,创建一个基本的 Replay Buffer 是一个简单的过程,如下所示 例:
import tempfile
from torchrl.data import ReplayBuffer
buffer = ReplayBuffer()
默认情况下,此重放缓冲区的大小为 1000。让我们检查一下
通过使用该方法填充我们的缓冲区:
print("length before adding elements:", len(buffer))
buffer.extend(range(2000))
print("length after adding elements:", len(buffer))
length before adding elements: 0
length after adding elements: 1000
我们使用了
旨在一次添加多个项目。如果传递的对象
to 具有多个维度,则其第一个维度为
被认为是要在缓冲区中拆分为单独元素的那个。
extend
这实质上意味着,当添加多维张量或 tensordicts 添加到缓冲区中,缓冲区将只查看第一维 计算它保存在内存中的元素时。 如果对象传递给它 not iterable,则会引发异常。
自定义存储¶
我们看到缓冲区的上限已限制为前 1000 个元素,我们 传递给它。 要更改大小,我们需要自定义我们的存储。
TorchRL 提出了三种类型的存储:
stores 元素独立位于 列表。它支持任何数据类型,但这种灵活性是有代价的 效率;
ListStorage
存储张量数据 结构。 它与 (或 ) 自然合作 对象。存储在每个张量的基础上是连续的,这意味着 采样会比使用列表时更有效,但是 隐式限制是传递给它的任何数据都必须具有相同的 基本属性(如 shape 和 dtype)作为第一批数据,其中 用于实例化缓冲区。 传递不符合此要求的数据将引发 exception 或导致某些未定义的行为。
LazyTensorStorage
TensorDict
tensorclass
它的工作原理是因为它是 lazy (即 it 期望第一批数据被实例化),并且它需要 data ,则每个存储的批次的 shape 和 dtype 都匹配。是什么造就了 存储的独特之处在于它指向磁盘文件(或使用文件系统 存储),这意味着它可以支持非常大的数据集,同时仍然 以连续方式访问数据。
LazyMemmapStorage
LazyTensorStorage
让我们看看如何使用这些存储中的每一个:
from torchrl.data import LazyMemmapStorage, LazyTensorStorage, ListStorage
# We define the maximum size of the buffer
size = 100
具有 list storage buffer 的缓冲区可以存储任何类型的数据(但我们必须
更改 the ,因为默认值需要数字数据):collate_fn
buffer_list = ReplayBuffer(storage=ListStorage(size), collate_fn=lambda x: x)
buffer_list.extend(["a", 0, "b"])
print(buffer_list.sample(3))
['a', 'a', 0]
因为它是假设量最低的那个,所以它是 TorchRL 中的默认存储。ListStorage
A 可以连续存储数据。
在处理复杂但
中等大小的不变数据结构:LazyTensorStorage
buffer_lazytensor = ReplayBuffer(storage=LazyTensorStorage(size))
让我们创建一批大小为 ''torch.Size([3])' 具有 2 个张量 存储在其中:
import torch
from tensordict import TensorDict
data = TensorDict(
{
"a": torch.arange(12).view(3, 4),
("b", "c"): torch.arange(15).view(3, 5),
},
batch_size=[3],
)
print(data)
TensorDict(
fields={
a: Tensor(shape=torch.Size([3, 4]), device=cpu, dtype=torch.int64, is_shared=False),
b: TensorDict(
fields={
c: Tensor(shape=torch.Size([3, 5]), device=cpu, dtype=torch.int64, is_shared=False)},
batch_size=torch.Size([3]),
device=None,
is_shared=False)},
batch_size=torch.Size([3]),
device=None,
is_shared=False)
对 will 的第一次调用
实例化 Storage。数据的第一个维度被解绑到
单独的数据点:
buffer_lazytensor.extend(data)
print(f"The buffer has {len(buffer_lazytensor)} elements")
The buffer has 3 elements
让我们从缓冲区中采样,并打印数据:
sample = buffer_lazytensor.sample(5)
print("samples", sample["a"], sample["b", "c"])
samples tensor([[0, 1, 2, 3],
[0, 1, 2, 3],
[4, 5, 6, 7],
[0, 1, 2, 3],
[0, 1, 2, 3]]) tensor([[0, 1, 2, 3, 4],
[0, 1, 2, 3, 4],
[5, 6, 7, 8, 9],
[0, 1, 2, 3, 4],
[0, 1, 2, 3, 4]])
A 的创建方式相同:LazyMemmapStorage
buffer_lazymemmap = ReplayBuffer(storage=LazyMemmapStorage(size))
buffer_lazymemmap.extend(data)
print(f"The buffer has {len(buffer_lazymemmap)} elements")
sample = buffer_lazytensor.sample(5)
print("samples: a=", sample["a"], "\n('b', 'c'):", sample["b", "c"])
The buffer has 3 elements
samples: a= tensor([[ 0, 1, 2, 3],
[ 0, 1, 2, 3],
[ 0, 1, 2, 3],
[ 8, 9, 10, 11],
[ 8, 9, 10, 11]])
('b', 'c'): tensor([[ 0, 1, 2, 3, 4],
[ 0, 1, 2, 3, 4],
[ 0, 1, 2, 3, 4],
[10, 11, 12, 13, 14],
[10, 11, 12, 13, 14]])
我们还可以自定义磁盘上的存储位置:
tempdir = tempfile.TemporaryDirectory()
buffer_lazymemmap = ReplayBuffer(storage=LazyMemmapStorage(size, scratch_dir=tempdir))
buffer_lazymemmap.extend(data)
print(f"The buffer has {len(buffer_lazymemmap)} elements")
print("the 'a' tensor is stored in", buffer_lazymemmap._storage._storage["a"].filename)
print(
"the ('b', 'c') tensor is stored in",
buffer_lazymemmap._storage._storage["b", "c"].filename,
)
The buffer has 3 elements
the 'a' tensor is stored in /pytorch/rl/docs/source/reference/generated/tutorials/<TemporaryDirectory '/tmp/tmpzxmvjdim'>/a.memmap
the ('b', 'c') tensor is stored in /pytorch/rl/docs/source/reference/generated/tutorials/<TemporaryDirectory '/tmp/tmpzxmvjdim'>/b/c.memmap
与 TensorDict 集成¶
张量位置遵循与 TensorDict 相同的结构,其中 包含它们:这使得在训练期间保存和加载缓冲区变得容易。
充分利用数据载体
potential,则类可以
被使用。
它的主要优势之一是它能够处理抽样的组织
数据,以及可能需要的任何其他信息
(例如样本索引)。
TensorDict
它可以按照与标准相同的方式构建,并且可以
通常可以互换使用。
from torchrl.data import TensorDictReplayBuffer
tempdir = tempfile.TemporaryDirectory()
buffer_lazymemmap = TensorDictReplayBuffer(
storage=LazyMemmapStorage(size, scratch_dir=tempdir), batch_size=12
)
buffer_lazymemmap.extend(data)
print(f"The buffer has {len(buffer_lazymemmap)} elements")
sample = buffer_lazymemmap.sample()
print("sample:", sample)
The buffer has 3 elements
sample: TensorDict(
fields={
a: Tensor(shape=torch.Size([12, 4]), device=cpu, dtype=torch.int64, is_shared=False),
b: TensorDict(
fields={
c: Tensor(shape=torch.Size([12, 5]), device=cpu, dtype=torch.int64, is_shared=False)},
batch_size=torch.Size([12]),
device=cpu,
is_shared=False),
index: Tensor(shape=torch.Size([12]), device=cpu, dtype=torch.int64, is_shared=False)},
batch_size=torch.Size([12]),
device=cpu,
is_shared=False)
我们的示例现在有一个额外的键,用于指示哪些索引
进行了抽样。
让我们来看看这些指数:"index"
print(sample["index"])
tensor([2, 1, 2, 1, 0, 0, 1, 2, 0, 0, 0, 2])
与 tensorclass 集成¶
ReplayBuffer 类和关联的子类也可以与类本机一起使用,这些类可以方便地用于
以更明确的方式对数据集进行编码:tensorclass
from tensordict import tensorclass
@tensorclass
class MyData:
images: torch.Tensor
labels: torch.Tensor
data = MyData(
images=torch.randint(
255,
(10, 64, 64, 3),
),
labels=torch.randint(100, (10,)),
batch_size=[10],
)
tempdir = tempfile.TemporaryDirectory()
buffer_lazymemmap = ReplayBuffer(
storage=LazyMemmapStorage(size, scratch_dir=tempdir), batch_size=12
)
buffer_lazymemmap.extend(data)
print(f"The buffer has {len(buffer_lazymemmap)} elements")
sample = buffer_lazymemmap.sample()
print("sample:", sample)
The buffer has 10 elements
sample: MyData(
images=Tensor(shape=torch.Size([12, 64, 64, 3]), device=cpu, dtype=torch.int64, is_shared=False),
labels=Tensor(shape=torch.Size([12]), device=cpu, dtype=torch.int64, is_shared=False),
batch_size=torch.Size([12]),
device=cpu,
is_shared=False)
不出所料。数据具有适当的类和形状!
与其他张量结构 (PyTrees) 集成¶
TorchRL 的重放缓冲区也适用于任何 pytree 数据结构。
PyTree 是由字典、列表和/或
元组,其中叶子是张量。
这意味着可以在连续内存中存储任何这样的树结构!
可以使用各种存储:,
或者
全部接受 this
类型的数据。
以下是此功能的简要演示:
from torch.utils._pytree import tree_map
让我们在磁盘上构建我们的重放缓冲区:
rb = ReplayBuffer(storage=LazyMemmapStorage(size))
data = {
"a": torch.randn(3),
"b": {"c": (torch.zeros(2), [torch.ones(1)])},
30: -torch.ones(()), # non-string keys also work
}
rb.add(data)
# The sample has a similar structure to the data (with a leading dimension of 10 for each tensor)
sample = rb.sample(10)
使用 pytrees ,任何可调用对象都可以用作转换:
def transform(x):
# Zeros all the data in the pytree
return tree_map(lambda y: y * 0, x)
rb.append_transform(transform)
sample = rb.sample(batch_size=12)
让我们检查一下我们的 transform 是否完成了它的工作:
def assert0(x):
assert (x == 0).all()
tree_map(assert0, sample)
{'a': None, 'b': {'c': (None, [None])}, 30: None}
对缓冲区进行采样和迭代¶
Replay Buffers 支持多种采样策略:
如果 batch-size 是固定的并且可以在构造时定义,则可以 作为 keyword 参数传递给缓冲区;
使用固定的 batch-size,可以迭代重放缓冲区以收集 样品;
可以使用多线程进行采样,但这与 last 选项(它要求缓冲区提前知道 next batch) 的
让我们看几个例子:
固定批量大小¶
如果在构造期间传递了 batch-size,则应在 采样:
data = MyData(
images=torch.randint(
255,
(200, 64, 64, 3),
),
labels=torch.randint(100, (200,)),
batch_size=[200],
)
buffer_lazymemmap = ReplayBuffer(storage=LazyMemmapStorage(size), batch_size=128)
buffer_lazymemmap.extend(data)
buffer_lazymemmap.sample()
MyData(
images=Tensor(shape=torch.Size([128, 64, 64, 3]), device=cpu, dtype=torch.int64, is_shared=False),
labels=Tensor(shape=torch.Size([128]), device=cpu, dtype=torch.int64, is_shared=False),
batch_size=torch.Size([128]),
device=cpu,
is_shared=False)
这批数据的大小符合我们的预期 (128)。
要启用多线程采样,只需在构造期间将正整数传递给 keyword 参数即可。这应该会加快速度
采样相当多(例如,当
使用优先采样器):prefetch
buffer_lazymemmap = ReplayBuffer(
storage=LazyMemmapStorage(size), batch_size=128, prefetch=10
) # creates a queue of 10 elements to be prefetched in the background
buffer_lazymemmap.extend(data)
print(buffer_lazymemmap.sample())
MyData(
images=Tensor(shape=torch.Size([128, 64, 64, 3]), device=cpu, dtype=torch.int64, is_shared=False),
labels=Tensor(shape=torch.Size([128]), device=cpu, dtype=torch.int64, is_shared=False),
batch_size=torch.Size([128]),
device=cpu,
is_shared=False)
迭代具有固定 batch-size 的缓冲区¶
我们也可以像使用常规 dataloader 中,只要 batch-size 是预定义的:
for i, data in enumerate(buffer_lazymemmap):
if i == 3:
print(data)
break
MyData(
images=Tensor(shape=torch.Size([128, 64, 64, 3]), device=cpu, dtype=torch.int64, is_shared=False),
labels=Tensor(shape=torch.Size([128]), device=cpu, dtype=torch.int64, is_shared=False),
batch_size=torch.Size([128]),
device=cpu,
is_shared=False)
由于我们的抽样技术是完全随机的,并且不会
防止替换,则有问题的迭代器是无限的。但是,我们可以
使用 the instead ,它会将我们的缓冲区转换为有限迭代器:
from torchrl.data.replay_buffers.samplers import SamplerWithoutReplacement
buffer_lazymemmap = ReplayBuffer(
storage=LazyMemmapStorage(size), batch_size=32, sampler=SamplerWithoutReplacement()
)
我们创建一个足够大的数据来获取几个样本
data = TensorDict(
{
"a": torch.arange(64).view(16, 4),
("b", "c"): torch.arange(128).view(16, 8),
},
batch_size=[16],
)
buffer_lazymemmap.extend(data)
for _i, _ in enumerate(buffer_lazymemmap):
continue
print(f"A total of {_i+1} batches have been collected")
A total of 1 batches have been collected
动态批量大小¶
与我们之前看到的相反,关键字
参数可以省略并直接传递给方法:batch_size
sample
buffer_lazymemmap = ReplayBuffer(
storage=LazyMemmapStorage(size), sampler=SamplerWithoutReplacement()
)
buffer_lazymemmap.extend(data)
print("sampling 3 elements:", buffer_lazymemmap.sample(3))
print("sampling 5 elements:", buffer_lazymemmap.sample(5))
sampling 3 elements: TensorDict(
fields={
a: Tensor(shape=torch.Size([3, 4]), device=cpu, dtype=torch.int64, is_shared=False),
b: TensorDict(
fields={
c: Tensor(shape=torch.Size([3, 8]), device=cpu, dtype=torch.int64, is_shared=False)},
batch_size=torch.Size([3]),
device=cpu,
is_shared=False)},
batch_size=torch.Size([3]),
device=cpu,
is_shared=False)
sampling 5 elements: TensorDict(
fields={
a: Tensor(shape=torch.Size([5, 4]), device=cpu, dtype=torch.int64, is_shared=False),
b: TensorDict(
fields={
c: Tensor(shape=torch.Size([5, 8]), device=cpu, dtype=torch.int64, is_shared=False)},
batch_size=torch.Size([5]),
device=cpu,
is_shared=False)},
batch_size=torch.Size([5]),
device=cpu,
is_shared=False)
优先重放缓冲区¶
TorchRL 还为优先重放缓冲区提供了一个接口。 此缓冲区类根据传递的优先级信号对数据进行采样 通过数据。
尽管此工具与非 tensordict 数据兼容,但我们鼓励 改用 TensorDict,因为它可以将元数据导入 毫不费力地从缓冲区中出来。
让我们首先看看如何在泛型 箱。和 超参数 必须手动设置:
from torchrl.data.replay_buffers.samplers import PrioritizedSampler
size = 100
rb = ReplayBuffer(
storage=ListStorage(size),
sampler=PrioritizedSampler(max_capacity=size, alpha=0.8, beta=1.1),
collate_fn=lambda x: x,
)
扩展重放缓冲区将返回 items 索引,我们将需要 稍后更新优先级:
indices = rb.extend([1, "foo", None])
采样器期望每个元素都有一个优先级。添加到 buffer,则 priority 设置为默认值 1。一旦优先级具有 被计算(通常通过 loss),则必须在缓冲区中更新。
这是通过 method 完成的,该方法需要 indices 和 priority。
我们人为地为数据集中的第二个样本分配了高优先级
要观察它对采样的影响,请执行以下操作:update_priority()
rb.update_priority(index=indices, priority=torch.tensor([0, 1_000, 0.1]))
我们观察到,从缓冲区采样主要返回第二个样本
():"foo"
sample, info = rb.sample(10, return_info=True)
print(sample)
['foo', 'foo', 'foo', 'foo', 'foo', 'foo', 'foo', 'foo', 'foo', 'foo']
该信息包含项目的相对权重以及索引。
print(info)
{'_weight': tensor([2.0893e-10, 2.0893e-10, 2.0893e-10, 2.0893e-10, 2.0893e-10, 2.0893e-10,
2.0893e-10, 2.0893e-10, 2.0893e-10, 2.0893e-10]), 'index': tensor([1, 1, 1, 1, 1, 1, 1, 1, 1, 1])}
我们看到,使用优先重放缓冲区需要一系列额外的 训练循环中的步骤与常规缓冲区的比较:
在收集数据并扩展缓冲区后, 项目必须更新;
在计算损失并从中获得“优先信号”之后,我们必须 再次更新缓冲区中项目的优先级。 这需要我们跟踪指数。
这极大地阻碍了缓冲区的可重用性:如果要编写 一个训练脚本,其中 prioritized 缓冲区和 regular 缓冲区都可以是 created,她必须添加大量的控制流来确保 在适当的位置调用适当的方法,如果和 仅当使用优先缓冲区时。
让我们看看如何使用 .
我们看到 returns 数据
使用它们的相对存储索引进行增强。我们没有提到的一个功能
是这个类还保证了优先级
信号会自动解析为优先采样器(如果存在),则
外延。
TensorDict
这些功能的组合以多种方式简化了操作: - 当扩展缓冲区时,优先级信号会自动为
如果存在,则解析,并且将准确分配优先级;
索引将存储在采样的 tensordict 中,因此很容易 在损失计算后更新 Priority。
在计算损失时,优先信号将记录在 tensordict 传递给 loss 模块,从而可以更新 毫不费力的重量:
>>> data = replay_buffer.sample() >>> loss_val = loss_module(data) >>> replay_buffer.update_tensordict_priority(data)
下面的代码说明了这些概念。我们使用 优先采样器,并在构造函数中指示其中 应该获取 priority 信号:
rb = TensorDictReplayBuffer(
storage=ListStorage(size),
sampler=PrioritizedSampler(size, alpha=0.8, beta=1.1),
priority_key="td_error",
batch_size=1024,
)
我们选择一个与存储索引成正比的优先级信号:
data["td_error"] = torch.arange(data.numel())
rb.extend(data)
sample = rb.sample()
较高的指数应该更频繁地出现:
from matplotlib import pyplot as plt
plt.hist(sample["index"].numpy())
(array([160., 61., 118., 64., 129., 62., 111., 61., 139., 119.]), array([ 0. , 1.5, 3. , 4.5, 6. , 7.5, 9. , 10.5, 12. , 13.5, 15. ]), <BarContainer object of 10 artists>)
处理完示例后,我们使用
方法。
为了展示它是如何工作的,让我们恢复
采样项目:torchrl.data.TensorDictReplayBuffer.update_tensordict_priority()
sample = rb.sample()
sample["td_error"] = data.numel() - sample["index"]
rb.update_tensordict_priority(sample)
现在,较高的索引出现的频率应该较低:
sample = rb.sample()
from matplotlib import pyplot as plt
plt.hist(sample["index"].numpy())
(array([211., 103., 178., 66., 148., 62., 119., 40., 63., 34.]), array([ 0. , 1.5, 3. , 4.5, 6. , 7.5, 9. , 10.5, 12. , 13.5, 15. ]), <BarContainer object of 10 artists>)
使用转换¶
存储在重放缓冲区中的数据可能尚未准备好呈现给
loss 模块。
在某些情况下,收集器生成的数据可能太重而无法承受
按原样保存。这方面的示例包括将图像从 转换为
浮点张量,或者在使用
决策转换器。uint8
只需将 适当的转换。 以下是一些示例:
保存 Raw 图像¶
uint8
-typed 张量的内存消耗比
我们通常提供给模型的浮点张量。因此,
保存 Raw 图像可能很有用。
以下脚本显示了如何构建一个仅返回
原始图像,但使用转换后的图像进行推理,以及这些图像如何
转换可以在 Replay Buffer 中回收:
from torchrl.collectors import SyncDataCollector
from torchrl.envs.libs.gym import GymEnv
from torchrl.envs.transforms import (
Compose,
GrayScale,
Resize,
ToTensorImage,
TransformedEnv,
)
from torchrl.envs.utils import RandomPolicy
env = TransformedEnv(
GymEnv("CartPole-v1", from_pixels=True),
Compose(
ToTensorImage(in_keys=["pixels"], out_keys=["pixels_trsf"]),
Resize(in_keys=["pixels_trsf"], w=64, h=64),
GrayScale(in_keys=["pixels_trsf"]),
),
)
让我们看一下推出:
print(env.rollout(3))
TensorDict(
fields={
action: Tensor(shape=torch.Size([3, 2]), device=cpu, dtype=torch.int64, is_shared=False),
done: Tensor(shape=torch.Size([3, 1]), device=cpu, dtype=torch.bool, is_shared=False),
next: TensorDict(
fields={
done: Tensor(shape=torch.Size([3, 1]), device=cpu, dtype=torch.bool, is_shared=False),
pixels: Tensor(shape=torch.Size([3, 400, 600, 3]), device=cpu, dtype=torch.uint8, is_shared=False),
pixels_trsf: Tensor(shape=torch.Size([3, 1, 64, 64]), device=cpu, dtype=torch.float32, is_shared=False),
reward: Tensor(shape=torch.Size([3, 1]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([3, 1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([3, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([3]),
device=None,
is_shared=False),
pixels: Tensor(shape=torch.Size([3, 400, 600, 3]), device=cpu, dtype=torch.uint8, is_shared=False),
pixels_trsf: Tensor(shape=torch.Size([3, 1, 64, 64]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([3, 1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([3, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([3]),
device=None,
is_shared=False)
我们刚刚创建了一个生成像素的环境。这些图像 进行处理以馈送到策略。 我们希望存储原始图像,而不是它们的转换。 为此,我们将向收集器附加一个转换以选择键 我们希望看到出现:
from torchrl.envs.transforms import ExcludeTransform
collector = SyncDataCollector(
env,
RandomPolicy(env.action_spec),
frames_per_batch=10,
total_frames=1000,
postproc=ExcludeTransform("pixels_trsf", ("next", "pixels_trsf"), "collector"),
)
让我们看一下一批数据,并控制键是否已被丢弃:"pixels_trsf"
for data in collector:
print(data)
break
TensorDict(
fields={
action: Tensor(shape=torch.Size([10, 2]), device=cpu, dtype=torch.int64, is_shared=False),
done: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
next: TensorDict(
fields={
done: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
pixels: Tensor(shape=torch.Size([10, 400, 600, 3]), device=cpu, dtype=torch.uint8, is_shared=False),
reward: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([10]),
device=None,
is_shared=False),
pixels: Tensor(shape=torch.Size([10, 400, 600, 3]), device=cpu, dtype=torch.uint8, is_shared=False),
terminated: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([10]),
device=None,
is_shared=False)
我们创建一个具有与环境相同转换的重放缓冲区。
然而,有一个细节需要解决:转换
在没有环境的情况下使用时,不会注意到数据结构。
将转换附加到环境时,嵌套 tensordict 中的数据首先被转换,然后在
转出执行。在处理静态数据时,情况并非如此。
尽管如此,我们的数据带有一个嵌套的 “next” tensordict,它将是
如果我们没有明确指示它处理
它。我们手动将这些键添加到转换中:"next"
t = Compose(
ToTensorImage(
in_keys=["pixels", ("next", "pixels")],
out_keys=["pixels_trsf", ("next", "pixels_trsf")],
),
Resize(in_keys=["pixels_trsf", ("next", "pixels_trsf")], w=64, h=64),
GrayScale(in_keys=["pixels_trsf", ("next", "pixels_trsf")]),
)
rb = TensorDictReplayBuffer(storage=LazyMemmapStorage(1000), transform=t, batch_size=16)
rb.extend(data)
tensor([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
我们可以检查方法是否看到转换后的图像重新出现:sample
print(rb.sample())
TensorDict(
fields={
action: Tensor(shape=torch.Size([16, 2]), device=cpu, dtype=torch.int64, is_shared=False),
done: Tensor(shape=torch.Size([16, 1]), device=cpu, dtype=torch.bool, is_shared=False),
index: Tensor(shape=torch.Size([16]), device=cpu, dtype=torch.int64, is_shared=False),
next: TensorDict(
fields={
done: Tensor(shape=torch.Size([16, 1]), device=cpu, dtype=torch.bool, is_shared=False),
pixels: Tensor(shape=torch.Size([16, 400, 600, 3]), device=cpu, dtype=torch.uint8, is_shared=False),
pixels_trsf: Tensor(shape=torch.Size([16, 1, 64, 64]), device=cpu, dtype=torch.float32, is_shared=False),
reward: Tensor(shape=torch.Size([16, 1]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([16, 1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([16, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([16]),
device=cpu,
is_shared=False),
pixels: Tensor(shape=torch.Size([16, 400, 600, 3]), device=cpu, dtype=torch.uint8, is_shared=False),
pixels_trsf: Tensor(shape=torch.Size([16, 1, 64, 64]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([16, 1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([16, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([16]),
device=cpu,
is_shared=False)
更复杂的示例:使用 CatFrames¶
转换展开观察结果
随着时间的推移,创建过去事件的 n-back 记忆,使模型
将过去的事件考虑在内(对于 POMDP 或
循环策略,例如 Decision Transformers)。将这些串联存储
帧可能会消耗大量内存。它也可以是
当 n-back 窗口需要不同时(通常更长)时出现问题
在训练和推理期间。我们通过在两个阶段中分别执行转换来解决此问题。
CatFrames
from torchrl.envs import CatFrames, UnsqueezeTransform
我们为返回基于像素的环境创建一个标准的转换列表 观察:
env = TransformedEnv(
GymEnv("CartPole-v1", from_pixels=True),
Compose(
ToTensorImage(in_keys=["pixels"], out_keys=["pixels_trsf"]),
Resize(in_keys=["pixels_trsf"], w=64, h=64),
GrayScale(in_keys=["pixels_trsf"]),
UnsqueezeTransform(-4, in_keys=["pixels_trsf"]),
CatFrames(dim=-4, N=4, in_keys=["pixels_trsf"]),
),
)
collector = SyncDataCollector(
env,
RandomPolicy(env.action_spec),
frames_per_batch=10,
total_frames=1000,
)
for data in collector:
print(data)
break
TensorDict(
fields={
action: Tensor(shape=torch.Size([10, 2]), device=cpu, dtype=torch.int64, is_shared=False),
collector: TensorDict(
fields={
traj_ids: Tensor(shape=torch.Size([10]), device=cpu, dtype=torch.int64, is_shared=False)},
batch_size=torch.Size([10]),
device=None,
is_shared=False),
done: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
next: TensorDict(
fields={
done: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
pixels: Tensor(shape=torch.Size([10, 400, 600, 3]), device=cpu, dtype=torch.uint8, is_shared=False),
pixels_trsf: Tensor(shape=torch.Size([10, 4, 1, 64, 64]), device=cpu, dtype=torch.float32, is_shared=False),
reward: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([10]),
device=None,
is_shared=False),
pixels: Tensor(shape=torch.Size([10, 400, 600, 3]), device=cpu, dtype=torch.uint8, is_shared=False),
pixels_trsf: Tensor(shape=torch.Size([10, 4, 1, 64, 64]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([10]),
device=None,
is_shared=False)
缓冲区转换看起来与环境转换非常相似,但使用
像以前一样额外的键:("next", ...)
t = Compose(
ToTensorImage(
in_keys=["pixels", ("next", "pixels")],
out_keys=["pixels_trsf", ("next", "pixels_trsf")],
),
Resize(in_keys=["pixels_trsf", ("next", "pixels_trsf")], w=64, h=64),
GrayScale(in_keys=["pixels_trsf", ("next", "pixels_trsf")]),
UnsqueezeTransform(-4, in_keys=["pixels_trsf", ("next", "pixels_trsf")]),
CatFrames(dim=-4, N=4, in_keys=["pixels_trsf", ("next", "pixels_trsf")]),
)
rb = TensorDictReplayBuffer(storage=LazyMemmapStorage(size), transform=t, batch_size=16)
data_exclude = data.exclude("pixels_trsf", ("next", "pixels_trsf"))
rb.add(data_exclude)
0
让我们从缓冲区中采样一批。变换后的形状 像素键的长度应为 4,沿第 4 个维度从 结束:
s = rb.sample(1) # the buffer has only one element
print(s)
TensorDict(
fields={
action: Tensor(shape=torch.Size([1, 10, 2]), device=cpu, dtype=torch.int64, is_shared=False),
collector: TensorDict(
fields={
traj_ids: Tensor(shape=torch.Size([1, 10]), device=cpu, dtype=torch.int64, is_shared=False)},
batch_size=torch.Size([1, 10]),
device=cpu,
is_shared=False),
done: Tensor(shape=torch.Size([1, 10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
index: Tensor(shape=torch.Size([1, 10]), device=cpu, dtype=torch.int64, is_shared=False),
next: TensorDict(
fields={
done: Tensor(shape=torch.Size([1, 10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
pixels: Tensor(shape=torch.Size([1, 10, 400, 600, 3]), device=cpu, dtype=torch.uint8, is_shared=False),
pixels_trsf: Tensor(shape=torch.Size([1, 10, 4, 1, 64, 64]), device=cpu, dtype=torch.float32, is_shared=False),
reward: Tensor(shape=torch.Size([1, 10, 1]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([1, 10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([1, 10, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([1, 10]),
device=cpu,
is_shared=False),
pixels: Tensor(shape=torch.Size([1, 10, 400, 600, 3]), device=cpu, dtype=torch.uint8, is_shared=False),
pixels_trsf: Tensor(shape=torch.Size([1, 10, 4, 1, 64, 64]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([1, 10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([1, 10, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([1, 10]),
device=cpu,
is_shared=False)
经过一些处理(不包括未使用的 key 等),我们看到 线上线下生成的数据匹配!
assert (data.exclude("collector") == s.squeeze(0).exclude("index", "collector")).all()
存储轨迹¶
在许多情况下,最好从缓冲区访问 trajectes 而不是 比简单的过渡。TorchRL 提供了多种实现此目的的方法。
目前首选的方法是沿第一个
维度,并使用 a 来
对这些批次的数据进行采样。此类只需要几个信息
关于您的数据结构来完成它的工作(并不是说到目前为止它只是
compatible with tensordict-structured data):切片的数量或其
length 和一些关于
episodes 中(例如,回想一下,使用 DataCollector 时,轨迹 ID 存储在 中)。在这个简单的示例中,我们构造一个数据
具有 4 个连续的短轨迹,并从中采样 4 个切片,每个切片
长度 2(因为批量大小为 8,8 个项目 // 4 个切片 = 2 个时间步)。
我们也标记了步骤。SliceSampler
("collector", "traj_ids")
from torchrl.data import SliceSampler
rb = TensorDictReplayBuffer(
storage=LazyMemmapStorage(size),
sampler=SliceSampler(traj_key="episode", num_slices=4),
batch_size=8,
)
episode = torch.zeros(10, dtype=torch.int)
episode[:3] = 1
episode[3:5] = 2
episode[5:7] = 3
episode[7:] = 4
steps = torch.cat([torch.arange(3), torch.arange(2), torch.arange(2), torch.arange(3)])
data = TensorDict(
{
"episode": episode,
"obs": torch.randn((3, 4, 5)).expand(10, 3, 4, 5),
"act": torch.randn((20,)).expand(10, 20),
"other": torch.randn((20, 50)).expand(10, 20, 50),
"steps": steps,
},
[10],
)
rb.extend(data)
sample = rb.sample()
print("episode are grouped", sample["episode"])
print("steps are successive", sample["steps"])
episode are grouped tensor([2, 2, 3, 3, 1, 1, 3, 3], dtype=torch.int32)
steps are successive tensor([0, 1, 0, 1, 1, 2, 0, 1])
结论¶
我们已经看到了如何在 TorchRL 中使用重放缓冲区,从最简单的 用于需要转换或存储数据的更高级的 特别是以特定的方式。 您现在应该能够:
创建 Replay Buffer,自定义其存储、采样器和变换;
为您的问题选择最佳存储类型(基于列表、内存或磁盘);
最大限度地减少缓冲区的内存占用。
后续步骤¶
查看数据 API 参考,了解 TorchRL 中的离线数据集。 它们基于我们的 Replay Buffer API;
检查其他采样器,例如 、 和 或其他写入器 如。
SamplerWithoutReplacement
PrioritizedSliceSampler
SliceSamplerWithoutReplacement
TensorDictMaxValueWriter
在文档中查看如何对 ReplayBuffers 进行检查点操作。
脚本总运行时间:(2 分 55.642 秒)
估计内存使用量:491 MB