目录

循环 DQN:培训循环策略

作者Vincent Moens

您将学到什么
  • 如何在 TorchRL 的 actor 中加入 RNN

  • 如何将基于内存的策略与 replay buffer 和 loss 模块一起使用

先决条件
  • PyTorch 版本 v2.0.0

  • 健身房[Mujoco]

  • TQDM

概述

基于内存的策略不仅在观察部分时至关重要 observable 的,但当必须将时间维度考虑为 做出明智的决策。

长期以来,递归神经网络一直是基于内存的流行工具 政策。这个想法是将内存中的递归状态保持在两个 连续步骤,并将其与 目前的观察。

本教程介绍如何使用 TorchRL 将 RNN 合并到策略中。

主要学习内容:

  • 在 TorchRL 的 actor 中合并 RNN;

  • 将基于内存的策略与 replay buffer 和 loss 模块一起使用。

在 TorchRL 中使用 RNN 的核心思想是使用 TensorDict 作为数据载体 对于从一个步骤到另一个步骤的隐藏状态。我们将构建一个策略,以便 从当前 TensorDict 中读取之前的 recurrent 状态,并将 当前 recurrent 状态在下一个状态的 TensorDict 中:

使用循环策略收集数据

如图所示,我们的环境使用归零的 recurrent 填充 TensorDict state 中,这些 state 被 policy 与 observation 一起读取,以产生一个 action 和将用于下一步的 recurrent 状态。 调用该函数时,递归状态 从下一个状态带到当前 TensorDict。让我们看看这个 在实践中实现。

如果您在 Google Colab 中运行此程序,请确保安装以下依赖项:

!pip3 install torchrl
!pip3 install gym[mujoco]
!pip3 install tqdm

设置

import torch
import tqdm
from tensordict.nn import (
    TensorDictModule as Mod,
    TensorDictSequential,
    TensorDictSequential as Seq,
)
from torch import nn
from torchrl.collectors import SyncDataCollector
from torchrl.data import LazyMemmapStorage, TensorDictReplayBuffer
from torchrl.envs import (
    Compose,
    ExplorationType,
    GrayScale,
    InitTracker,
    ObservationNorm,
    Resize,
    RewardScaling,
    set_exploration_type,
    StepCounter,
    ToTensorImage,
    TransformedEnv,
)
from torchrl.envs.libs.gym import GymEnv
from torchrl.modules import ConvNet, EGreedyModule, LSTMModule, MLP, QValueModule
from torchrl.objectives import DQNLoss, SoftUpdate

is_fork = multiprocessing.get_start_method() == "fork"
device = (
    torch.device(0)
    if torch.cuda.is_available() and not is_fork
    else torch.device("cpu")
)

环境

像往常一样,第一步是构建我们的环境:它帮助我们 定义问题并相应地构建策略网络。在本教程中, 我们将运行一个基于 Pixel 的 CartPole 健身房实例 环境进行一些自定义转换:变为灰度,调整为 84x84,缩小奖励并使观察结果正常化。

注意

转换是 accessory。自 CartPole 任务目标是尽可能长地绘制轨迹,计算步数 可以帮助我们跟踪策略的执行情况。

对于本教程而言,有两个转换非常重要:

  • 将对 通过在 TensorDict 中添加布尔掩码来调用 to,该掩码将跟踪哪些步骤需要重置 的 RNN 隐藏状态。"is_init"

  • 转换要多一点 专门的。不需要使用 RNN 策略。但是,它 指示环境(以及随后的收集器)将一些额外的 键是意料之中的。添加后,将填充对 env.reset() 的调用 Primer 中用零张量指示的条目。知道 这些张量是策略所期望的,收集器会将它们传递给 在收集期间。最终,我们将把隐藏的状态存储在 replay 缓冲区,这将帮助我们引导 loss 模块中的 RNN 操作(否则将启动 替换为 0)。总之:不包括此转换不会产生太大影响 训练我们的政策,但它会让反复出现的钥匙消失 从收集的数据和重放缓冲区,这反过来又会导致 稍微不那么理想的训练。 幸运的是,我们建议的是 配备了一个 helper 方法来为我们构建该转换,因此 我们可以等到我们构建它!

env = TransformedEnv(
    GymEnv("CartPole-v1", from_pixels=True, device=device),
    Compose(
        ToTensorImage(),
        GrayScale(),
        Resize(84, 84),
        StepCounter(),
        InitTracker(),
        RewardScaling(loc=0.0, scale=0.1),
        ObservationNorm(standard_normal=True, in_keys=["pixels"]),
    ),
)

与往常一样,我们需要手动初始化我们的归一化常量:

env.transform[-1].init_stats(1000, reduce_dim=[0, 1, 2], cat_dim=0, keep_dims=[0])
td = env.reset()

政策

我们的策略将有 3 个组件:一个主干、一个内存层和一个浅层块,它将 LSTM 输出映射到 action 值。

卷积网络

我们构建了一个侧翼为 a 的卷积网络,它将压缩大小为 64 的向量中的输出。他们可以帮助我们:

feature = Mod(
    ConvNet(
        num_cells=[32, 32, 64],
        squeeze_output=True,
        aggregator_class=nn.AdaptiveAvgPool2d,
        aggregator_kwargs={"output_size": (1, 1)},
        device=device,
    ),
    in_keys=["pixels"],
    out_keys=["embed"],
)

我们对一批数据执行第一个模块,以收集 输出向量:

n_cells = feature(env.reset())["embed"].shape[-1]

LSTM 模块

TorchRL 提供了一个专门的类 将 LSTM 合并到您的代码库中。它是一个子类:因此,它有一组 and 表示 在 执行模块。该类带有可自定义的预定义 值来促进其构造。TensorDictModuleBasein_keysout_keys

注意

使用限制:该类支持几乎所有 LSTM 功能,例如 dropout 或多层 LSTM。 但是,为了遵守 TorchRL 的约定,此 LSTM 必须将属性设置为 PyTorch 中的默认值。然而 我们更改此默认值 行为,因此我们擅长使用 native 调用。batch_firstTrue

此外,LSTM 不能将属性设置为 这在在线设置中不可用。在这种情况下,默认值 是正确的。bidirectionalTrue

lstm = LSTMModule(
    input_size=n_cells,
    hidden_size=128,
    device=device,
    in_key="embed",
    out_key="embed",
)

让我们看看 LSTM Module 类,特别是它的 in 和 out_keys:

print("in_keys", lstm.in_keys)
print("out_keys", lstm.out_keys)
in_keys ['embed', 'recurrent_state_h', 'recurrent_state_c', 'is_init']
out_keys ['embed', ('next', 'recurrent_state_h'), ('next', 'recurrent_state_c')]

我们可以看到,这些值包含我们指示为 in_key 的键(和 out_key) 以及循环键名称。out_keys前面有 “next” 前缀 这表明它们需要写入 “next” TensorDict 中。 我们使用此约定(可以通过传递 in_keys/out_keys 参数),以确保对 将 recurrent 状态移动到根 TensorDict,使其可用于 RNN (参见简介中的图)。

如前所述,我们还有一个可选的转换要添加到我们的 environment 来确保将递归状态传递给缓冲区。 该方法执行 正是这样:

env.append_transform(lstm.make_tensordict_primer())
TransformedEnv(
    env=GymEnv(env=CartPole-v1, batch_size=torch.Size([]), device=cpu),
    transform=Compose(
            ToTensorImage(keys=['pixels']),
            GrayScale(keys=['pixels']),
            Resize(w=84, h=84, interpolation=InterpolationMode.BILINEAR, keys=['pixels']),
            StepCounter(keys=[]),
            InitTracker(keys=[]),
            RewardScaling(loc=0.0000, scale=0.1000, keys=['reward']),
            ObservationNorm(keys=['pixels']),
            TensorDictPrimer(primers=Composite(
                recurrent_state_h: UnboundedContinuous(
                    shape=torch.Size([1, 128]),
                    space=ContinuousBox(
                        low=Tensor(shape=torch.Size([1, 128]), device=cpu, dtype=torch.float32, contiguous=True),
                        high=Tensor(shape=torch.Size([1, 128]), device=cpu, dtype=torch.float32, contiguous=True)),
                    device=cpu,
                    dtype=torch.float32,
                    domain=continuous),
                recurrent_state_c: UnboundedContinuous(
                    shape=torch.Size([1, 128]),
                    space=ContinuousBox(
                        low=Tensor(shape=torch.Size([1, 128]), device=cpu, dtype=torch.float32, contiguous=True),
                        high=Tensor(shape=torch.Size([1, 128]), device=cpu, dtype=torch.float32, contiguous=True)),
                    device=cpu,
                    dtype=torch.float32,
                    domain=continuous),
                device=cpu,
                shape=torch.Size([])), default_value={'recurrent_state_h': 0.0, 'recurrent_state_c': 0.0}, random=None)))

就是这样!我们可以打印环境以检查现在一切看起来是否正常 我们添加了引物:

print(env)
TransformedEnv(
    env=GymEnv(env=CartPole-v1, batch_size=torch.Size([]), device=cpu),
    transform=Compose(
            ToTensorImage(keys=['pixels']),
            GrayScale(keys=['pixels']),
            Resize(w=84, h=84, interpolation=InterpolationMode.BILINEAR, keys=['pixels']),
            StepCounter(keys=[]),
            InitTracker(keys=[]),
            RewardScaling(loc=0.0000, scale=0.1000, keys=['reward']),
            ObservationNorm(keys=['pixels']),
            TensorDictPrimer(primers=Composite(
                recurrent_state_h: UnboundedContinuous(
                    shape=torch.Size([1, 128]),
                    space=ContinuousBox(
                        low=Tensor(shape=torch.Size([1, 128]), device=cpu, dtype=torch.float32, contiguous=True),
                        high=Tensor(shape=torch.Size([1, 128]), device=cpu, dtype=torch.float32, contiguous=True)),
                    device=cpu,
                    dtype=torch.float32,
                    domain=continuous),
                recurrent_state_c: UnboundedContinuous(
                    shape=torch.Size([1, 128]),
                    space=ContinuousBox(
                        low=Tensor(shape=torch.Size([1, 128]), device=cpu, dtype=torch.float32, contiguous=True),
                        high=Tensor(shape=torch.Size([1, 128]), device=cpu, dtype=torch.float32, contiguous=True)),
                    device=cpu,
                    dtype=torch.float32,
                    domain=continuous),
                device=cpu,
                shape=torch.Size([])), default_value={'recurrent_state_h': 0.0, 'recurrent_state_c': 0.0}, random=None)))

MLP

我们使用单层 MLP 来表示我们将用于 我们的政策。

mlp = MLP(
    out_features=2,
    num_cells=[
        64,
    ],
    device=device,
)

并用零填充偏差:

mlp[-1].bias.data.fill_(0.0)
mlp = Mod(mlp, in_keys=["embed"], out_keys=["action_value"])

使用 Q 值选择操作

我们政策的最后一部分是 Q 值模块。 Q-Value 模块将读取我们的 MLP 生成的 key,并且 从中,收集具有最大值的操作。 我们唯一需要做的就是指定动作空间,这可以做到 通过传递 String 或 action-spec 来执行。这允许我们使用 分类(有时称为“稀疏”)编码或其独热版本。"action_values"

qval = QValueModule(action_space=None, spec=env.action_spec)

注意

TorchRL 还提供了一个包装器类,该类 将模块包装在 Sequential 中,就像我们在这里明确所做的那样。这样做几乎没有什么好处 并且该过程不太透明,但最终结果将类似于 我们在这里做什么。torchrl.modules.QValueActor

我们现在可以将所有内容放在TensorDictSequential

stoch_policy = Seq(feature, lstm, mlp, qval)

DQN 是一种确定性算法,探索是其中的关键部分。 我们将使用 -greedy 策略,其中 epsilon 为 0.2 衰减 逐渐变为 0。 这种衰减是通过调用 to 实现的(请参阅下面的 training loop)。

exploration_module = EGreedyModule(
    annealing_num_steps=1_000_000, spec=env.action_spec, eps_init=0.2
)
stoch_policy = TensorDictSequential(
    stoch_policy,
    exploration_module,
)

使用模型处理损失

我们构建的模型已经准备好用于顺序设置。 但是,该类可以使用 cuDNN 优化的后端 在 GPU 设备上更快地运行 RNN 序列。我们不想错过 这样一个加快我们训练循环的机会! 要使用它,我们只需要告诉 LSTM 模块在 “recurrent-mode” 上运行 当被 Loss 使用时。 由于我们通常希望拥有两个 LSTM 模块的副本,因此我们通过以下方式实现 调用一个 将返回 LSTM 的新实例(具有共享权重),该实例将 假设输入数据本质上是连续的。

policy = Seq(feature, lstm.set_recurrent_mode(True), mlp, qval)

因为我们仍然有几个未初始化的参数,所以我们应该 在创建 Optimizer 等之前初始化它们。

policy(env.reset())
TensorDict(
    fields={
        action: Tensor(shape=torch.Size([2]), device=cpu, dtype=torch.int64, is_shared=False),
        action_value: Tensor(shape=torch.Size([2]), device=cpu, dtype=torch.float32, is_shared=False),
        chosen_action_value: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.float32, is_shared=False),
        done: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
        embed: Tensor(shape=torch.Size([128]), device=cpu, dtype=torch.float32, is_shared=False),
        is_init: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
        next: TensorDict(
            fields={
                recurrent_state_c: Tensor(shape=torch.Size([1, 128]), device=cpu, dtype=torch.float32, is_shared=False),
                recurrent_state_h: Tensor(shape=torch.Size([1, 128]), device=cpu, dtype=torch.float32, is_shared=False)},
            batch_size=torch.Size([]),
            device=cpu,
            is_shared=False),
        pixels: Tensor(shape=torch.Size([1, 84, 84]), device=cpu, dtype=torch.float32, is_shared=False),
        recurrent_state_c: Tensor(shape=torch.Size([1, 128]), device=cpu, dtype=torch.float32, is_shared=False),
        recurrent_state_h: Tensor(shape=torch.Size([1, 128]), device=cpu, dtype=torch.float32, is_shared=False),
        step_count: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.int64, is_shared=False),
        terminated: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
        truncated: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False)},
    batch_size=torch.Size([]),
    device=cpu,
    is_shared=False)

DQN 损失

DQN 丢失需要我们传递策略,同样,需要传递操作空间。 虽然这似乎是多余的,但它很重要,因为我们希望确保 和 是兼容的,但彼此之间并不强烈依赖。

要使用 Double-DQN,我们要求一个参数,该参数将 创建要使用的网络参数的不可微分副本 作为目标广告联盟。delay_value

loss_fn = DQNLoss(policy, action_space=env.action_spec, delay_value=True)

由于我们使用的是双 DQN,因此需要更新目标参数。 我们将使用一个实例来执行 这项工作。

updater = SoftUpdate(loss_fn, eps=0.95)

optim = torch.optim.Adam(policy.parameters(), lr=3e-4)

收集器和重放缓冲区

我们构建了最简单的数据收集器。我们将尝试训练我们的算法 100 万帧,一次扩展 50 帧的缓冲区。缓冲区 将设计为存储 20000 个轨迹,每个轨迹 50 步。 在每个优化步骤(每次数据收集 16 个)中,我们将收集 4 个项目 从我们的缓冲区,总共 200 次过渡。 我们将使用存储来保存数据 在磁盘上。

注意

为了提高效率,我们只运行了几千次迭代 这里。在实际设置中,帧总数应设置为 1M。

collector = SyncDataCollector(env, stoch_policy, frames_per_batch=50, total_frames=200)
rb = TensorDictReplayBuffer(
    storage=LazyMemmapStorage(20_000), batch_size=4, prefetch=10
)

训练循环

为了跟踪进度,我们将在环境中运行策略一次 每 50 次数据收集,并在训练后绘制结果。

utd = 16
pbar = tqdm.tqdm(total=collector.total_frames)
longest = 0

traj_lens = []
for i, data in enumerate(collector):
    if i == 0:
        print(
            "Let us print the first batch of data.\nPay attention to the key names "
            "which will reflect what can be found in this data structure, in particular: "
            "the output of the QValueModule (action_values, action and chosen_action_value),"
            "the 'is_init' key that will tell us if a step is initial or not, and the "
            "recurrent_state keys.\n",
            data,
        )
    pbar.update(data.numel())
    # it is important to pass data that is not flattened
    rb.extend(data.unsqueeze(0).to_tensordict().cpu())
    for _ in range(utd):
        s = rb.sample().to(device, non_blocking=True)
        loss_vals = loss_fn(s)
        loss_vals["loss"].backward()
        optim.step()
        optim.zero_grad()
    longest = max(longest, data["step_count"].max().item())
    pbar.set_description(
        f"steps: {longest}, loss_val: {loss_vals['loss'].item(): 4.4f}, action_spread: {data['action'].sum(0)}"
    )
    exploration_module.step(data.numel())
    updater.step()

    with set_exploration_type(ExplorationType.DETERMINISTIC), torch.no_grad():
        rollout = env.rollout(10000, stoch_policy)
        traj_lens.append(rollout.get(("next", "step_count")).max().item())
  0%|          | 0/200 [00:00<?, ?it/s]Let us print the first batch of data.
Pay attention to the key names which will reflect what can be found in this data structure, in particular: the output of the QValueModule (action_values, action and chosen_action_value),the 'is_init' key that will tell us if a step is initial or not, and the recurrent_state keys.
 TensorDict(
    fields={
        action: Tensor(shape=torch.Size([50, 2]), device=cpu, dtype=torch.int64, is_shared=False),
        action_value: Tensor(shape=torch.Size([50, 2]), device=cpu, dtype=torch.float32, is_shared=False),
        chosen_action_value: Tensor(shape=torch.Size([50, 1]), device=cpu, dtype=torch.float32, is_shared=False),
        collector: TensorDict(
            fields={
                traj_ids: Tensor(shape=torch.Size([50]), device=cpu, dtype=torch.int64, is_shared=False)},
            batch_size=torch.Size([50]),
            device=None,
            is_shared=False),
        done: Tensor(shape=torch.Size([50, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        embed: Tensor(shape=torch.Size([50, 128]), device=cpu, dtype=torch.float32, is_shared=False),
        is_init: Tensor(shape=torch.Size([50, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        next: TensorDict(
            fields={
                done: Tensor(shape=torch.Size([50, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                is_init: Tensor(shape=torch.Size([50, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                pixels: Tensor(shape=torch.Size([50, 1, 84, 84]), device=cpu, dtype=torch.float32, is_shared=False),
                recurrent_state_c: Tensor(shape=torch.Size([50, 1, 128]), device=cpu, dtype=torch.float32, is_shared=False),
                recurrent_state_h: Tensor(shape=torch.Size([50, 1, 128]), device=cpu, dtype=torch.float32, is_shared=False),
                reward: Tensor(shape=torch.Size([50, 1]), device=cpu, dtype=torch.float32, is_shared=False),
                step_count: Tensor(shape=torch.Size([50, 1]), device=cpu, dtype=torch.int64, is_shared=False),
                terminated: Tensor(shape=torch.Size([50, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                truncated: Tensor(shape=torch.Size([50, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
            batch_size=torch.Size([50]),
            device=None,
            is_shared=False),
        pixels: Tensor(shape=torch.Size([50, 1, 84, 84]), device=cpu, dtype=torch.float32, is_shared=False),
        recurrent_state_c: Tensor(shape=torch.Size([50, 1, 128]), device=cpu, dtype=torch.float32, is_shared=False),
        recurrent_state_h: Tensor(shape=torch.Size([50, 1, 128]), device=cpu, dtype=torch.float32, is_shared=False),
        step_count: Tensor(shape=torch.Size([50, 1]), device=cpu, dtype=torch.int64, is_shared=False),
        terminated: Tensor(shape=torch.Size([50, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        truncated: Tensor(shape=torch.Size([50, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
    batch_size=torch.Size([50]),
    device=None,
    is_shared=False)

 25%|██▌       | 50/200 [00:00<00:01, 130.78it/s]
 25%|██▌       | 50/200 [00:11<00:01, 130.78it/s]
steps: 9, loss_val:  0.0006, action_spread: tensor([46,  4]):  25%|██▌       | 50/200 [00:31<00:01, 130.78it/s]
steps: 9, loss_val:  0.0006, action_spread: tensor([46,  4]):  50%|█████     | 100/200 [00:32<00:37,  2.64it/s]
steps: 11, loss_val:  0.0004, action_spread: tensor([44,  6]):  50%|█████     | 100/200 [01:03<00:37,  2.64it/s]
steps: 11, loss_val:  0.0004, action_spread: tensor([44,  6]):  75%|███████▌  | 150/200 [01:04<00:24,  2.01it/s]
steps: 17, loss_val:  0.0004, action_spread: tensor([12, 38]):  75%|███████▌  | 150/200 [01:35<00:24,  2.01it/s]
steps: 17, loss_val:  0.0004, action_spread: tensor([12, 38]): 100%|██████████| 200/200 [01:35<00:00,  1.81it/s]
steps: 17, loss_val:  0.0003, action_spread: tensor([43,  7]): 100%|██████████| 200/200 [02:07<00:00,  1.81it/s]

让我们绘制结果:

if traj_lens:
    from matplotlib import pyplot as plt

    plt.plot(traj_lens)
    plt.xlabel("Test collection")
    plt.title("Test trajectory lengths")
测试轨迹长度

结论

我们已经看到了如何将 RNN 合并到 TorchRL 的策略中。 您现在应该能够:

  • 创建一个 LSTM 模块,该模块充当TensorDictModule

  • 向 LSTM 模块指示需要通过转换进行重置

  • 将此模块合并到 policy 和 loss 模块中

  • 确保收集器知道循环状态条目 这样它们就可以与 数据

延伸阅读

脚本总运行时间:(3 分 8.564 秒)

估计内存使用量:2233 MB

由 Sphinx-Gallery 生成的图库

文档

访问 PyTorch 的全面开发人员文档

查看文档

教程

获取面向初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并解答您的问题

查看资源