目录

开始数据收集和存储

作者: Vincent Moens

注意

如需在笔记本中运行本教程,请在开头添加一个安装单元,内容为:

!pip install tensordict
!pip install torchrl

没有数据就没有学习。在监督学习中,用户习惯于使用 DataLoader 等来将数据集成到训练循环中。 Dataloaders 是可迭代对象,为您提供用于训练模型的数据。

TorchRL 以类似的方式处理数据加载问题,尽管它在强化学习(RL)库生态中展现出令人惊讶的独特性。TorchRL 的数据加载器被称为 DataCollectors。大多数情况下,数据收集并不仅限于原始数据的采集,因为这些数据需要先临时存储在缓冲区中(或对 on-policy 算法而言的等效结构),然后才能被 损失模块 消费。本教程将深入探讨这两类组件。

数据收集器

本文讨论的主要数据采集器是 SyncDataCollector,这也是本文档的重点内容。从基本层面来看,采集器是一个简单的类,负责在环境中执行您的策略、在必要时重置环境,并提供指定大小的数据批次。与环境教程中演示的rollout()方法不同,采集器不会在连续的数据批次之间进行重置。因此,两个连续的数据批次可能包含来自同一条轨迹的元素。

你需要传递给收集器的基本参数包括你想要收集的批次大小(frames_per_batch)、迭代器的长度(可能是无限的)、策略和环境。为了简化,我们将在这个示例中使用一个随机策略。

import torch

torch.manual_seed(0)

from torchrl.collectors import SyncDataCollector
from torchrl.envs import GymEnv
from torchrl.envs.utils import RandomPolicy

env = GymEnv("CartPole-v1")
env.set_seed(0)

policy = RandomPolicy(env.action_spec)
collector = SyncDataCollector(env, policy, frames_per_batch=200, total_frames=-1)

我们现在期望我们的收集器将提供大小为200的批次,无论在收集过程中发生什么。换句话说,这个批次中可能有多个轨迹!total_frames表示收集器应该有多长。一个值为-1将产生一个永不停止的收集器。

让我们遍历该采集器,以了解这些数据的形态:

for data in collector:
    print(data)
    break
TensorDict(
    fields={
        action: Tensor(shape=torch.Size([200, 2]), device=cpu, dtype=torch.int64, is_shared=False),
        collector: TensorDict(
            fields={
                traj_ids: Tensor(shape=torch.Size([200]), device=cpu, dtype=torch.int64, is_shared=False)},
            batch_size=torch.Size([200]),
            device=None,
            is_shared=False),
        done: Tensor(shape=torch.Size([200, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        next: TensorDict(
            fields={
                done: Tensor(shape=torch.Size([200, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                observation: Tensor(shape=torch.Size([200, 4]), device=cpu, dtype=torch.float32, is_shared=False),
                reward: Tensor(shape=torch.Size([200, 1]), device=cpu, dtype=torch.float32, is_shared=False),
                terminated: Tensor(shape=torch.Size([200, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                truncated: Tensor(shape=torch.Size([200, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
            batch_size=torch.Size([200]),
            device=None,
            is_shared=False),
        observation: Tensor(shape=torch.Size([200, 4]), device=cpu, dtype=torch.float32, is_shared=False),
        terminated: Tensor(shape=torch.Size([200, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        truncated: Tensor(shape=torch.Size([200, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
    batch_size=torch.Size([200]),
    device=None,
    is_shared=False)

如您所见,我们的数据通过一些采集器特定的元数据进行了增强,这些元数据被分组在名为 "collector" 的子张量字典(sub-tensordict)中,而该子张量字典在环境 rollout过程中并未出现。这有助于追踪轨迹 ID。在以下列表中,每一项均标记了对应转换(transition)所属的轨迹编号:

print(data["collector", "traj_ids"])
tensor([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2,
        2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3,
        3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4,
        4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4,
        4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 5, 5, 5,
        5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6,
        6, 6, 6, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7,
        7, 7, 7, 7, 7, 7, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9,
        9, 9, 9, 9, 9, 9, 9, 9])

数据收集器在编写最先进的算法时非常有用,因为性能通常是通过特定技术在给定数量的环境交互中解决问题的能力来衡量的(收集器中的total_frames参数)。因此,我们示例中的大多数训练循环看起来像这样:

>>> for data in collector:
...     # your algorithm here

回放缓冲区

在了解了如何收集数据之后,我们接下来要探讨的是如何存储这些数据。在强化学习(RL)中,典型的做法是:数据被采集后会暂时存储,并在经过一段时间(通常依据某种启发式策略,例如先进先出或其他策略)后被清除。一个典型的伪代码如下所示:

>>> for data in collector:
...     storage.store(data)
...     for i in range(n_optim):
...         sample = storage.sample()
...         loss_val = loss_fn(sample)
...         loss_val.backward()
...         optim.step() # etc

存储数据的父类在TorchRL中被称为ReplayBuffer。TorchRL的重放缓冲区是可组合的:你可以编辑存储类型、采样技术、写入策略或应用于它们的转换。我们将把高级内容留给专门的深入教程。通用的重放缓冲区只需要知道它需要使用哪种存储。一般来说,我们推荐使用TensorStorage子类,这在大多数情况下都能很好地工作。在这个教程中,我们将使用LazyMemmapStorage,它具有两个不错的特性:首先,它是“懒惰”的,你不需要提前明确告诉它你的数据是什么样子。其次,它使用MemoryMappedTensor作为后端,以高效的方式将数据保存到磁盘上。你唯一需要知道的是你希望你的缓冲区有多大。

from torchrl.data.replay_buffers import LazyMemmapStorage, ReplayBuffer

buffer = ReplayBuffer(storage=LazyMemmapStorage(max_size=1000))

通过以下方法填充缓冲区:add()(单个元素)或extend()(多个元素)。使用我们刚刚收集的数据,我们可以一次性初始化并填充缓冲区:

indices = buffer.extend(data)

我们可以验证缓冲区当前的元素数量是否与采集器返回的数量一致:

assert len(buffer) == collector.frames_per_batch

唯一剩下的就是如何从缓冲区收集数据。 自然地,这依赖于sample()方法。因为我们没有指定采样必须不重复, 所以不能保证从缓冲区收集的样本是唯一的:

sample = buffer.sample(batch_size=30)
print(sample)
TensorDict(
    fields={
        action: Tensor(shape=torch.Size([30, 2]), device=cpu, dtype=torch.int64, is_shared=False),
        collector: TensorDict(
            fields={
                traj_ids: Tensor(shape=torch.Size([30]), device=cpu, dtype=torch.int64, is_shared=False)},
            batch_size=torch.Size([30]),
            device=cpu,
            is_shared=False),
        done: Tensor(shape=torch.Size([30, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        next: TensorDict(
            fields={
                done: Tensor(shape=torch.Size([30, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                observation: Tensor(shape=torch.Size([30, 4]), device=cpu, dtype=torch.float32, is_shared=False),
                reward: Tensor(shape=torch.Size([30, 1]), device=cpu, dtype=torch.float32, is_shared=False),
                terminated: Tensor(shape=torch.Size([30, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                truncated: Tensor(shape=torch.Size([30, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
            batch_size=torch.Size([30]),
            device=cpu,
            is_shared=False),
        observation: Tensor(shape=torch.Size([30, 4]), device=cpu, dtype=torch.float32, is_shared=False),
        terminated: Tensor(shape=torch.Size([30, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        truncated: Tensor(shape=torch.Size([30, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
    batch_size=torch.Size([30]),
    device=cpu,
    is_shared=False)

同样,我们的样本与采集器收集到的数据完全一致!

下一步

  • 你可以查看其他多进程收集器,例如 MultiSyncDataCollectorMultiaSyncDataCollector

  • TorchRL 还为推理任务提供了分布式采集器,适用于拥有多个计算节点的场景。您可在 API 参考文档中查看相关内容。

  • 请查阅专门的 重放缓冲区教程,了解构建缓冲区时可选用的各种选项;或参考API 参考文档,其中详细介绍了所有功能。重放缓冲区拥有众多特性,例如多线程采样、优先经验回放等……

  • 我们省略了重放缓冲区的迭代能力,以简化内容。请自行尝试:构建一个缓冲区并在构造函数中指定其批量大小,然后尝试对其进行迭代。这相当于在循环中调用rb.sample()

脚本总运行时间: (0 分钟 20.853 秒)

估计内存使用量: 318 MB

通过 Sphinx-Gallery 生成的画廊

文档

访问 PyTorch 的全面开发人员文档

查看文档

教程

获取面向初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并解答您的问题

查看资源