目录

开始使用数据收集和存储

作者Vincent Moens

注意

要在笔记本中运行本教程,请添加安装单元 开头包含:

!pip install tensordict
!pip install torchrl

没有数据就没有学习。在监督式学习中,用户是 习惯使用等 将数据集成到他们的训练循环中。 Dataloader 是可迭代对象,可为您提供所需的数据 用于训练您的模型。

TorchRL 以类似的方式处理数据加载问题,尽管 它在 RL 库的生态系统中出奇地独特。TorchRL 的 Dataloader 称为 。大多数时候, 数据收集并不止于原始数据的收集, 因为数据需要临时存储在缓冲区中 (或策略算法的等效结构)之前 由 loss 模块。本教程将探讨 这两个类。DataCollectors

数据收集器

这里讨论的主要数据收集器是 ,这是本文的重点 文档。从根本上讲,收集器是一个简单的 类负责在环境中执行策略, 必要时重置环境,并提供批量的 predefined size 的 Predefined size。与方法 在 env 教程中演示,收集器不会 在连续的数据批次之间重置。因此,两个连续的 批量数据可能包含来自同一轨迹的元素。

您需要传递给收集器的基本参数是 batchs()、长度(可能 infinite) 的 URLERATOR、策略和环境。为简单起见, 在此示例中,我们将使用虚拟的 random 策略。frames_per_batch

import torch

torch.manual_seed(0)

from torchrl.collectors import SyncDataCollector
from torchrl.envs import GymEnv
from torchrl.envs.utils import RandomPolicy

env = GymEnv("CartPole-v1")
env.set_seed(0)

policy = RandomPolicy(env.action_spec)
collector = SyncDataCollector(env, policy, frames_per_batch=200, total_frames=-1)

我们现在预计我们的收集器将交付大小为 no 的批次 重要的是收集过程中发生的情况。换句话说,我们可能有多个 这批的轨迹!表示 collector 应该是。值 将产生一个 never end collector 的 Collector 中。200total_frames-1

让我们遍历收集器以获得一种感觉 这些数据是什么样的:

for data in collector:
    print(data)
    break
TensorDict(
    fields={
        action: Tensor(shape=torch.Size([200, 2]), device=cpu, dtype=torch.int64, is_shared=False),
        collector: TensorDict(
            fields={
                traj_ids: Tensor(shape=torch.Size([200]), device=cpu, dtype=torch.int64, is_shared=False)},
            batch_size=torch.Size([200]),
            device=None,
            is_shared=False),
        done: Tensor(shape=torch.Size([200, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        next: TensorDict(
            fields={
                done: Tensor(shape=torch.Size([200, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                observation: Tensor(shape=torch.Size([200, 4]), device=cpu, dtype=torch.float32, is_shared=False),
                reward: Tensor(shape=torch.Size([200, 1]), device=cpu, dtype=torch.float32, is_shared=False),
                terminated: Tensor(shape=torch.Size([200, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                truncated: Tensor(shape=torch.Size([200, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
            batch_size=torch.Size([200]),
            device=None,
            is_shared=False),
        observation: Tensor(shape=torch.Size([200, 4]), device=cpu, dtype=torch.float32, is_shared=False),
        terminated: Tensor(shape=torch.Size([200, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        truncated: Tensor(shape=torch.Size([200, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
    batch_size=torch.Size([200]),
    device=None,
    is_shared=False)

如您所见,我们的数据通过一些特定于收集器的元数据进行了扩充 分组到我们在环境推出期间没有看到的 sub-tensordict 中。这对于跟踪 轨迹 ID。在下面的列表中,每个项目都标记了轨迹 number 对应的 transition 属于:"collector"

print(data["collector", "traj_ids"])
tensor([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2,
        2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3,
        3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4,
        4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4,
        4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 5, 5, 5,
        5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6,
        6, 6, 6, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7,
        7, 7, 7, 7, 7, 7, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9,
        9, 9, 9, 9, 9, 9, 9, 9])

数据收集器在编码最先进的技术时非常有用 算法,因为性能通常由 在给定数量的交互中解决问题的特定技术 环境 (收集器中的参数)。 因此,我们示例中的大多数训练循环如下所示:total_frames

>>> for data in collector:
...     # your algorithm here

重放缓冲区

现在我们已经探索了如何收集数据,我们想知道如何 储存它。在 RL 中,典型的设置是收集、存储数据 暂时清除,过了一会儿给出了一些启发式方法: 先进先出或其他。典型的伪代码如下所示:

>>> for data in collector:
...     storage.store(data)
...     for i in range(n_optim):
...         sample = storage.sample()
...         loss_val = loss_fn(sample)
...         loss_val.backward()
...         optim.step() # etc

在 TorchRL 中存储数据的父类 称为 。TorchRL 的重播 缓冲区是可组合的:您可以编辑存储类型及其采样 技术、写作启发式或应用于它们的转换。我们会的 将花哨的东西留给专门的深入教程。通用重播 buffer 只需要知道它必须使用什么存储空间。一般来说,我们 推荐一个子类,它将起作用 在大多数情况下很好。我们将在本教程中使用它,它有两个不错的属性:首先,“懒惰”, 您无需提前明确告诉它您的数据是什么样子的。 其次,它用作后端来保存 您的数据以有效的方式存储在磁盘上。您唯一需要知道的是 您希望缓冲区有多大。TensorStorageMemoryMappedTensor

from torchrl.data.replay_buffers import LazyMemmapStorage, ReplayBuffer

buffer = ReplayBuffer(storage=LazyMemmapStorage(max_size=1000))

可以通过 (single element) 或 (multiple elements) 方法填充缓冲区。用 我们刚刚收集的数据,我们一次性初始化并填充缓冲区:

indices = buffer.extend(data)

我们可以检查缓冲区现在的元素数量是否与缓冲区的元素数量相同 我们从收藏家那里得到:

assert len(buffer) == collector.frames_per_batch

唯一需要了解的是如何从缓冲区收集数据。 当然,这取决于方法。因为我们没有指定必须在没有 重复,则不能保证从我们的缓冲区收集的样本 将是唯一的:

sample = buffer.sample(batch_size=30)
print(sample)
TensorDict(
    fields={
        action: Tensor(shape=torch.Size([30, 2]), device=cpu, dtype=torch.int64, is_shared=False),
        collector: TensorDict(
            fields={
                traj_ids: Tensor(shape=torch.Size([30]), device=cpu, dtype=torch.int64, is_shared=False)},
            batch_size=torch.Size([30]),
            device=cpu,
            is_shared=False),
        done: Tensor(shape=torch.Size([30, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        next: TensorDict(
            fields={
                done: Tensor(shape=torch.Size([30, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                observation: Tensor(shape=torch.Size([30, 4]), device=cpu, dtype=torch.float32, is_shared=False),
                reward: Tensor(shape=torch.Size([30, 1]), device=cpu, dtype=torch.float32, is_shared=False),
                terminated: Tensor(shape=torch.Size([30, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                truncated: Tensor(shape=torch.Size([30, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
            batch_size=torch.Size([30]),
            device=cpu,
            is_shared=False),
        observation: Tensor(shape=torch.Size([30, 4]), device=cpu, dtype=torch.float32, is_shared=False),
        terminated: Tensor(shape=torch.Size([30, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        truncated: Tensor(shape=torch.Size([30, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
    batch_size=torch.Size([30]),
    device=cpu,
    is_shared=False)

同样,我们的样本看起来与我们从 收藏家!

后续步骤

  • 你可以看看其他的 multirpocessed 收集器(如 .

  • 如果您有多个节点,TorchRL 还提供分布式收集器 用于推理。在 API 参考中查看它们。

  • 查看专用的 Replay Buffer 教程以了解 有关构建缓冲区时的选项的更多信息,或涵盖 详。重播缓冲区具有无数功能,例如多线程 采样、优先体验重播等等......

  • 我们省略了要迭代的重放缓冲区的容量 单纯。自己试一试:构建一个缓冲区并指明其 batch-size,然后尝试迭代它。这是 相当于在循环中调用rb.sample()

脚本总运行时间:(0 分 22.051 秒)

估计内存使用量:321 MB

由 Sphinx-Gallery 生成的图库

文档

访问 PyTorch 的全面开发人员文档

查看文档

教程

获取面向初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并解答您的问题

查看资源