目录

TorchRL 目标:编写 DDPG 损失函数

作者: Vincent Moens

概览

TorchRL 将强化学习(RL)算法的训练过程划分为多个部分,这些部分将在您的训练脚本中进行组装:环境、数据收集与存储、模型,以及最终的损失函数。

TorchRL 损失函数(或称“目标函数”)是带有状态的对象,其中包含可训练参数(策略模型和价值模型)。 本教程将引导您从零开始,使用 TorchRL 编写一个损失函数。

为此,我们将重点关注DDPG(深度确定性策略梯度),这是一种相对易于实现的算法。 深度确定性策略梯度(DDPG)是一种简单的连续控制算法。其核心在于学习一个针对“动作-观测”对的参数化价值函数,然后学习一个策略,该策略在给定特定观测时输出能够最大化该价值函数的动作。

你将学到:

  • 如何编写损失模块并自定义其价值评估器;

  • 如何在 TorchRL 中构建环境,包括变换(例如数据归一化)和并行执行;

  • 如何设计策略网络和价值网络;

  • 如何高效地从环境中收集数据并将其存储在经验回放缓冲区中;

  • 如何在你的重放缓冲区中存储轨迹(而不是转换);

  • 如何评估您的模型。

先决条件

本教程假设您已完成 PPO 教程,该教程概述了 TorchRL 的组件和依赖项(例如 tensordict.TensorDicttensordict.nn.TensorDictModules), 但即使不深入理解这些类,本教程的内容也足够清晰,易于理解。

注意

我们并非旨在提供该算法的最先进(SOTA)实现,而是 旨在高层次地展示 TorchRL 中损失函数(loss)的实现方式, 以及在该算法背景下需使用的库功能。

导入和设置

%%bash
pip3 install torchrl mujoco glfw
import torch
import tqdm

如果可用,我们将在 CUDA 上执行策略

is_fork = multiprocessing.get_start_method() == "fork"
device = (
    torch.device(0)
    if torch.cuda.is_available() and not is_fork
    else torch.device("cpu")
)
collector_device = torch.device("cpu")  # Change the device to ``cuda`` to use CUDA

TorchRL LossModule

TorchRL 提供了一系列可在训练脚本中使用的损失函数。 其目标是提供易于复用/替换、且接口简洁的损失函数。

TorchRL 损失的主要特征如下:

  • 它们是具有状态的对象:它们包含可训练参数的副本 以便 loss_module.parameters() 提供训练算法所需的一切。

  • 他们遵循 TensorDict 规范:torch.nn.Module.forward() 方法将接收一个包含所有必要信息以返回损失值的 TensorDict 作为输入。

  • 他们输出一个tensordict.TensorDict实例,其中包含损失值 写在"loss_<smth>"下,其中smth是一个描述损失的字符串。TensorDict中的其他键可能是在训练期间记录的有用指标。

    注意

    我们返回独立损失的原因是为了让用户能够为不同参数集使用不同的优化器。 对损失求和可简单通过以下方式实现:

    代码块::Python

    >>> loss_val = sum(loss for key, loss in loss_dict.items() if key.startswith("loss_"))
    

The __init__ 方法

所有损失的父类是 LossModule。 与库中的许多其他组件一样,其 forward() 方法期望 作为输入的是从经验重放缓冲区或任何类似数据结构中采样的 tensordict.TensorDict 实例。使用这种格式使得可以在 跨模态或多实例复杂设置中重用该模块成为可能。换句话说,它允许我们编写一个对给定的数据类型视而不见的损失模块,并且只关注运行损失函数的基本步骤。

为了使本教程尽可能具有教学性,我们将独立展示该类的每种方法, 并在后续阶段逐步填充该类的内容。

让我们从__init__()方法开始。DDPG的目标是通过简单的策略解决控制任务:训练一个策略网络以输出最大化价值网络预测值的动作。因此,我们的损失模块在其构造函数中需要接收两个网络:一个演员网络和一个价值网络。我们期望这两个都是TensorDict兼容的对象,例如tensordict.nn.TensorDictModule。我们的损失函数将需要计算目标值并使价值网络适应这个值,并生成一个动作并使策略适应以使其价值估计最大化。

LossModule.__init__() 方法的关键步骤是调用 convert_to_functional()。此方法将从模块中提取参数并将其转换为功能模块。严格来说,这并不是必需的,完全可以不使用它来编写所有损失函数。然而,我们鼓励使用它,原因如下。

TorchRL之所以这样做,是因为强化学习算法通常会使用同一模型的不同参数集,称为“可训练”和“目标”参数。 “可训练”参数是优化器需要拟合的参数。“目标”参数通常是前者的副本,并且有一些时间滞后(绝对或通过移动平均稀释)。 这些目标参数用于计算与下一个观察值相关的值。使用一组与当前配置不完全匹配的目标参数来计算价值函数的一个优点是,它们提供了所计算的价值函数的悲观边界。 请注意下面的create_target_params关键字参数:此参数告诉convert_to_functional()方法在损失模块中创建一组目标参数,用于目标值计算。如果将其设置为False(请参见演员网络),则target_actor_network_params属性仍然可以访问,但这将返回演员参数的分离版本。

稍后,我们将看到在 TorchRL 中目标参数应该如何更新。

from tensordict.nn import TensorDictModule, TensorDictSequential


def _init(
    self,
    actor_network: TensorDictModule,
    value_network: TensorDictModule,
) -> None:
    super(type(self), self).__init__()

    self.convert_to_functional(
        actor_network,
        "actor_network",
        create_target_params=True,
    )
    self.convert_to_functional(
        value_network,
        "value_network",
        create_target_params=True,
        compare_against=list(actor_network.parameters()),
    )

    self.actor_in_keys = actor_network.in_keys

    # Since the value we'll be using is based on the actor and value network,
    # we put them together in a single actor-critic container.
    actor_critic = ActorCriticWrapper(actor_network, value_network)
    self.actor_critic = actor_critic
    self.loss_function = "l2"

价值估计器损失方法

在许多强化学习(RL)算法中,价值网络(或Q值网络)基于经验价值估计进行训练。该估计可采用自举法(如TD(0),方差低、偏差高),即目标值仅通过下一个即时奖励计算得出;也可采用蒙特卡洛估计法(如TD(1)),此时将使用后续整个奖励序列(方差高、偏差低)。此外,还可采用一种折中方案的中间估计器(如TD(\(\lambda\))),以平衡偏差与方差。 TorchRL 通过 ValueEstimators 枚举类,轻松支持上述任一估计器,该枚举类包含了所有已实现价值估计器的引用。下面我们在此定义默认价值函数:首先采用最简单的版本(TD(0)),随后再展示如何更改此设置。

from torchrl.objectives.utils import ValueEstimators

default_value_estimator = ValueEstimators.TD0

我们还需要根据用户查询,向 DDPG 提供一些关于如何构建价值估计器的指令。 根据所提供的估计器,我们将构建相应的模块,用于训练阶段:

from torchrl.objectives.utils import default_value_kwargs
from torchrl.objectives.value import TD0Estimator, TD1Estimator, TDLambdaEstimator


def make_value_estimator(self, value_type: ValueEstimators, **hyperparams):
    hp = dict(default_value_kwargs(value_type))
    if hasattr(self, "gamma"):
        hp["gamma"] = self.gamma
    hp.update(hyperparams)
    value_key = "state_action_value"
    if value_type == ValueEstimators.TD1:
        self._value_estimator = TD1Estimator(value_network=self.actor_critic, **hp)
    elif value_type == ValueEstimators.TD0:
        self._value_estimator = TD0Estimator(value_network=self.actor_critic, **hp)
    elif value_type == ValueEstimators.GAE:
        raise NotImplementedError(
            f"Value type {value_type} it not implemented for loss {type(self)}."
        )
    elif value_type == ValueEstimators.TDLambda:
        self._value_estimator = TDLambdaEstimator(value_network=self.actor_critic, **hp)
    else:
        raise NotImplementedError(f"Unknown value type {value_type}")
    self._value_estimator.set_keys(value=value_key)

The make_value_estimator 方法可以但不需要被调用:如果不调用,LossModule 将使用其默认估计器查询此方法。

演员损失方法

强化学习(RL)算法的核心在于策略网络(actor)的训练损失函数。 以深度确定性策略梯度(DDPG)为例,该损失函数非常简单:我们只需计算由策略网络生成的动作所对应的价值,并优化策略网络的权重,以最大化该价值。

在计算这个值时,我们必须确保将值参数从图中取出,否则演员和价值损失会混淆。 为此,可以使用hold_out_params()函数。

def _loss_actor(
    self,
    tensordict,
) -> torch.Tensor:
    td_copy = tensordict.select(*self.actor_in_keys)
    # Get an action from the actor network: since we made it functional, we need to pass the params
    with self.actor_network_params.to_module(self.actor_network):
        td_copy = self.actor_network(td_copy)
    # get the value associated with that action
    with self.value_network_params.detach().to_module(self.value_network):
        td_copy = self.value_network(td_copy)
    return -td_copy.get("state_action_value")

价值损失方法

现在我们需要优化值网络的参数。 为此,我们将依赖我们类中的值估计器:

from torchrl.objectives.utils import distance_loss


def _loss_value(
    self,
    tensordict,
):
    td_copy = tensordict.clone()

    # V(s, a)
    with self.value_network_params.to_module(self.value_network):
        self.value_network(td_copy)
    pred_val = td_copy.get("state_action_value").squeeze(-1)

    # we manually reconstruct the parameters of the actor-critic, where the first
    # set of parameters belongs to the actor and the second to the value function.
    target_params = TensorDict(
        {
            "module": {
                "0": self.target_actor_network_params,
                "1": self.target_value_network_params,
            }
        },
        batch_size=self.target_actor_network_params.batch_size,
        device=self.target_actor_network_params.device,
    )
    with target_params.to_module(self.actor_critic):
        target_value = self.value_estimator.value_estimate(tensordict).squeeze(-1)

    # Computes the value loss: L2, L1 or smooth L1 depending on `self.loss_function`
    loss_value = distance_loss(pred_val, target_value, loss_function=self.loss_function)
    td_error = (pred_val - target_value).pow(2)

    return loss_value, td_error, pred_val, target_value

在前向调用中将所有内容组合起来

唯一缺少的部分是前向方法,它将价值和演员损失粘合在一起,收集成本值并将其写入一个TensorDict中,然后传递给用户。

from tensordict import TensorDict, TensorDictBase


def _forward(self, input_tensordict: TensorDictBase) -> TensorDict:
    loss_value, td_error, pred_val, target_value = self.loss_value(
        input_tensordict,
    )
    td_error = td_error.detach()
    td_error = td_error.unsqueeze(input_tensordict.ndimension())
    if input_tensordict.device is not None:
        td_error = td_error.to(input_tensordict.device)
    input_tensordict.set(
        "td_error",
        td_error,
        inplace=True,
    )
    loss_actor = self.loss_actor(input_tensordict)
    return TensorDict(
        source={
            "loss_actor": loss_actor.mean(),
            "loss_value": loss_value.mean(),
            "pred_value": pred_val.mean().detach(),
            "target_value": target_value.mean().detach(),
            "pred_value_max": pred_val.max().detach(),
            "target_value_max": target_value.max().detach(),
        },
        batch_size=[],
    )


from torchrl.objectives import LossModule


class DDPGLoss(LossModule):
    default_value_estimator = default_value_estimator
    make_value_estimator = make_value_estimator

    __init__ = _init
    forward = _forward
    loss_value = _loss_value
    loss_actor = _loss_actor

现在我们已获得损失函数,可以利用它来训练策略,以解决控制任务。

环境

在大多数算法中,首先需要关注的是环境的构建,因为环境会制约训练脚本其余部分的执行。

对于这个示例,我们将使用"cheetah"任务。目标是让半猎豹跑得尽可能快。

在TorchRL中,可以通过依赖dm_controlgym来创建此类任务:

env = GymEnv("HalfCheetah-v4")

or

env = DMControlEnv("cheetah", "run")

默认情况下,这些环境禁用渲染。从状态进行训练通常比从图像进行训练更容易。为了简化问题,我们只关注从状态学习。要将tensordicts收集的像素传递给env.step(),只需将from_pixels=True参数传递给构造函数:

env = GymEnv("HalfCheetah-v4", from_pixels=True, pixels_only=True)

我们编写了一个make_env()辅助函数,该函数将创建一个环境 具有上述两种后端之一(dm-controlgym)。

from torchrl.envs.libs.dm_control import DMControlEnv
from torchrl.envs.libs.gym import GymEnv

env_library = None
env_name = None


def make_env(from_pixels=False):
    """Create a base ``env``."""
    global env_library
    global env_name

    if backend == "dm_control":
        env_name = "cheetah"
        env_task = "run"
        env_args = (env_name, env_task)
        env_library = DMControlEnv
    elif backend == "gym":
        env_name = "HalfCheetah-v4"
        env_args = (env_name,)
        env_library = GymEnv
    else:
        raise NotImplementedError

    env_kwargs = {
        "device": device,
        "from_pixels": from_pixels,
        "pixels_only": from_pixels,
        "frame_skip": 2,
    }
    env = env_library(*env_args, **env_kwargs)
    return env

变换

现在我们有了一个基础环境,我们可能希望修改其表示方式以使其更符合策略。在TorchRL中,转换被附加到基础环境中一个专门的torchr.envs.TransformedEnv类。

  • 在 DDPG 中,通常会使用某种启发式值对奖励进行重新缩放。在本示例中,我们将奖励乘以 5。

  • 如果我们使用 dm_control,那么在模拟器(它使用双精度数字)和我们的脚本(它可能使用单精度数字)之间构建一个接口也很重要。这种转换是双向的:当调用 env.step() 时,我们的动作需要以双精度表示,而输出则需要转换为单精度。 DoubleToFloat 转换正是这样做的:in_keys 列表指的是需要从双精度转换为浮点数的键,而 in_keys_inv 指的是在传递给环境之前需要转换为双精度的键。

  • 我们将状态键拼接在一起,使用CatTensors变换。

  • 最后,我们还保留对状态进行归一化的可能性:稍后我们将负责计算归一化常数。

from torchrl.envs import (
    CatTensors,
    DoubleToFloat,
    EnvCreator,
    InitTracker,
    ObservationNorm,
    ParallelEnv,
    RewardScaling,
    StepCounter,
    TransformedEnv,
)


def make_transformed_env(
    env,
):
    """Apply transforms to the ``env`` (such as reward scaling and state normalization)."""

    env = TransformedEnv(env)

    # we append transforms one by one, although we might as well create the
    # transformed environment using the `env = TransformedEnv(base_env, transforms)`
    # syntax.
    env.append_transform(RewardScaling(loc=0.0, scale=reward_scaling))

    # We concatenate all states into a single "observation_vector"
    # even if there is a single tensor, it'll be renamed in "observation_vector".
    # This facilitates the downstream operations as we know the name of the
    # output tensor.
    # In some environments (not half-cheetah), there may be more than one
    # observation vector: in this case this code snippet will concatenate them
    # all.
    selected_keys = list(env.observation_spec.keys())
    out_key = "observation_vector"
    env.append_transform(CatTensors(in_keys=selected_keys, out_key=out_key))

    # we normalize the states, but for now let's just instantiate a stateless
    # version of the transform
    env.append_transform(ObservationNorm(in_keys=[out_key], standard_normal=True))

    env.append_transform(DoubleToFloat())

    env.append_transform(StepCounter(max_frames_per_traj))

    # We need a marker for the start of trajectories for our Ornstein-Uhlenbeck (OU)
    # exploration:
    env.append_transform(InitTracker())

    return env

并行执行

以下辅助函数允许我们并行运行环境。 并行运行环境可显著提升数据收集的吞吐量。 在使用变换后的环境时,我们需要选择是为每个环境单独执行变换, 还是集中数据后以批处理方式执行变换。两种方法均易于实现:

env = ParallelEnv(
    lambda: TransformedEnv(GymEnv("HalfCheetah-v4"), transforms),
    num_workers=4
)
env = TransformedEnv(
    ParallelEnv(lambda: GymEnv("HalfCheetah-v4"), num_workers=4),
    transforms
)

为了利用 PyTorch 的向量化能力,我们采用第一种方法:

def parallel_env_constructor(
    env_per_collector,
    transform_state_dict,
):
    if env_per_collector == 1:

        def make_t_env():
            env = make_transformed_env(make_env())
            env.transform[2].init_stats(3)
            env.transform[2].loc.copy_(transform_state_dict["loc"])
            env.transform[2].scale.copy_(transform_state_dict["scale"])
            return env

        env_creator = EnvCreator(make_t_env)
        return env_creator

    parallel_env = ParallelEnv(
        num_workers=env_per_collector,
        create_env_fn=EnvCreator(lambda: make_env()),
        create_env_kwargs=None,
        pin_memory=False,
    )
    env = make_transformed_env(parallel_env)
    # we call `init_stats` for a limited number of steps, just to instantiate
    # the lazy buffers.
    env.transform[2].init_stats(3, cat_dim=1, reduce_dim=[0, 1])
    env.transform[2].load_state_dict(transform_state_dict)
    return env


# The backend can be ``gym`` or ``dm_control``
backend = "gym"

注意

frame_skip 批处理多个步骤并用一个动作完成 如果 > 1,其他帧计数(例如,frames_per_batch、total_frames) 需要进行调整以确保在实验中收集的总帧数一致。这一点很重要,因为提高帧跳过率但保持总帧数不变可能会看起来像是作弊:所有事物相比, 一个使用帧跳过率为2的数据集与另一个使用帧跳过率为1的数据集实际上与环境的交互比例为2:1!简而言之,在处理帧跳过时应谨慎对待训练脚本中的帧计数,因为这可能导致对不同训练策略的偏见比较。

缩放奖励值有助于我们控制信号幅度,从而实现更高效的学习。

reward_scaling = 5.0

我们还定义了轨迹何时会被截断。对于猎豹任务,使用一千步(若帧跳跃数 frame-skip = 2,则为五百步)是一个合适的选择:

max_frames_per_traj = 500

观察值的归一化

为了计算归一化统计量,我们在环境中运行任意数量的随机步骤,并计算收集到的观测值的均值和标准差。可以使用ObservationNorm.init_stats()方法来实现这一目的。为了获取汇总统计信息,我们创建一个虚拟环境并运行指定数量的步骤,收集给定数量步骤的数据并计算其汇总统计信息。

def get_env_stats():
    """Gets the stats of an environment."""
    proof_env = make_transformed_env(make_env())
    t = proof_env.transform[2]
    t.init_stats(init_env_steps)
    transform_state_dict = t.state_dict()
    proof_env.close()
    return transform_state_dict

归一化统计

用于统计计算的随机步骤数量为ObservationNorm

init_env_steps = 5000

transform_state_dict = get_env_stats()

每个数据收集器中的环境数量

env_per_collector = 4

我们将之前计算的统计量传递给环境,以对其输出进行归一化:

parallel_env = parallel_env_constructor(
    env_per_collector=env_per_collector,
    transform_state_dict=transform_state_dict,
)


from torchrl.data import Composite

构建模型

接下来,我们来搭建模型。如前所述,DDPG 需要一个价值网络(value network),用于估计状态-动作对的价值;同时还需要一个参数化的执行器(actor),用于学习如何选择能够最大化该价值的动作。

请记住,构建一个 TorchRL 模块需要两个步骤:

  • 编写将用作网络的 torch.nn.Module

  • 将网络封装在一个tensordict.nn.TensorDictModule中,其中数据流通过指定输入和输出键来处理。

在更复杂的情况下,tensordict.nn.TensorDictSequential 也可以被使用。

Q-值网络被封装在一个ValueOperator 中,该自动将out_keys设置为"state_action_value用于q-值 网络,并将state_value用于其他值网络。

TorchRL 提供了原论文中所述的 DDPG 网络的内置版本。这些可以在 DdpgMlpActorDdpgMlpQNet 中找到。

由于我们使用了延迟初始化模块(lazy modules),因此在将策略(policy)从一个设备迁移至另一个设备并执行其他操作之前,必须先对这些延迟初始化模块进行实例化。 因此,使用一小批样本数据运行这些模块是一种良好的实践。为此,我们根据环境规范(environment specs)生成模拟数据。

from torchrl.modules import (
    ActorCriticWrapper,
    DdpgMlpActor,
    DdpgMlpQNet,
    OrnsteinUhlenbeckProcessModule,
    ProbabilisticActor,
    TanhDelta,
    ValueOperator,
)


def make_ddpg_actor(
    transform_state_dict,
    device="cpu",
):
    proof_environment = make_transformed_env(make_env())
    proof_environment.transform[2].init_stats(3)
    proof_environment.transform[2].load_state_dict(transform_state_dict)

    out_features = proof_environment.action_spec.shape[-1]

    actor_net = DdpgMlpActor(
        action_dim=out_features,
    )

    in_keys = ["observation_vector"]
    out_keys = ["param"]

    actor = TensorDictModule(
        actor_net,
        in_keys=in_keys,
        out_keys=out_keys,
    )

    actor = ProbabilisticActor(
        actor,
        distribution_class=TanhDelta,
        in_keys=["param"],
        spec=Composite(action=proof_environment.action_spec),
    ).to(device)

    q_net = DdpgMlpQNet()

    in_keys = in_keys + ["action"]
    qnet = ValueOperator(
        in_keys=in_keys,
        module=q_net,
    ).to(device)

    # initialize lazy modules
    qnet(actor(proof_environment.reset().to(device)))
    return actor, qnet


actor, qnet = make_ddpg_actor(
    transform_state_dict=transform_state_dict,
    device=device,
)

探索

该策略被传递到一个OrnsteinUhlenbeckProcessModule探索模块中,正如原始论文中所建议的那样。 让我们定义在OU噪声达到其最小值之前的帧数

annealing_frames = 1_000_000

actor_model_explore = TensorDictSequential(
    actor,
    OrnsteinUhlenbeckProcessModule(
        spec=actor.spec.clone(),
        annealing_num_steps=annealing_frames,
    ).to(device),
)
if device == torch.device("cpu"):
    actor_model_explore.share_memory()

数据收集器

TorchRL 提供了专门的类,帮助您通过在环境中执行策略来收集数据。这些“数据收集器”会迭代地计算在特定时刻需要执行的动作,然后在环境中执行一步操作,并在需要时重置环境。 数据收集器的设计旨在帮助开发者精确控制每批数据所含的帧数、数据收集过程的(异)步特性,以及分配给数据收集任务的资源(例如 GPU、工作进程数量等)。

在这里我们将使用 SyncDataCollector,一个简单的单进程数据收集器。TorchRL提供了其他收集器,例如 MultiaSyncDataCollector,它以异步方式执行回滚(例如,在策略被优化的同时收集数据,从而解耦训练和数据收集)。

需要指定的参数是:

  • 一个环境工厂或一个环境,

  • 政策,

  • 在收集器被认为为空之前总帧数,

  • 每个轨迹的最大帧数(对于非终止环境,如dm_control类环境非常有用)。

    注意

    传递给收集器的 max_frames_per_traj 将具有在用于推理的环境中注册一个新的 StepCounter 转换的效果。我们可以通过手动实现相同的结果,就像我们在本脚本中所做的那样。

还应传递:

  • 每个批次收集的帧数

  • 独立于策略执行的随机步骤数量

  • 用于策略执行的设备

  • 用于在数据传递给主进程之前存储数据的设备。

我们在训练过程中将使用的总帧数应约为100万。

total_frames = 10_000  # 1_000_000

外部循环每次迭代中,采集器返回的帧数等于每条子轨迹的长度乘以每个采集器中并行运行的环境数量。

换句话说,我们期望从收集器获得的批次具有[env_per_collector, traj_len]维形状, 其中traj_len=frames_per_batch/env_per_collector

traj_len = 200
frames_per_batch = env_per_collector * traj_len
init_random_frames = 5000
num_collectors = 2

from torchrl.collectors import SyncDataCollector
from torchrl.envs import ExplorationType

collector = SyncDataCollector(
    parallel_env,
    policy=actor_model_explore,
    total_frames=total_frames,
    frames_per_batch=frames_per_batch,
    init_random_frames=init_random_frames,
    reset_at_each_iter=False,
    split_trajs=False,
    device=collector_device,
    exploration_type=ExplorationType.RANDOM,
)

Evaluator: 构建您的记录器对象

由于训练数据是通过某种探索策略获得的,因此需要在确定性模式下评估我们算法的真实性能。我们通过一个专用类LogValidationReward来实现这一点,该类以给定的频率在环境中执行策略,并返回从这些模拟中获得的一些统计信息。

以下辅助函数构建这个对象:

from torchrl.trainers import LogValidationReward


def make_recorder(actor_model_explore, transform_state_dict, record_interval):
    base_env = make_env()
    environment = make_transformed_env(base_env)
    environment.transform[2].init_stats(
        3
    )  # must be instantiated to load the state dict
    environment.transform[2].load_state_dict(transform_state_dict)

    recorder_obj = LogValidationReward(
        record_frames=1000,
        policy_exploration=actor_model_explore,
        environment=environment,
        exploration_type=ExplorationType.DETERMINISTIC,
        record_interval=record_interval,
    )
    return recorder_obj

我们将每收集10个批次就记录一次性能

record_interval = 10

recorder = make_recorder(
    actor_model_explore, transform_state_dict, record_interval=record_interval
)

from torchrl.data.replay_buffers import (
    LazyMemmapStorage,
    PrioritizedSampler,
    RandomSampler,
    TensorDictReplayBuffer,
)

回放缓冲区

经验回放缓冲区分为两种类型:优先级回放(使用某种误差信号,使某些样本被采样的概率高于其他样本)和常规的循环式经验回放。

TorchRL 的经验回放缓冲区具有可组合性:用户可以自由选择存储、采样和写入策略。此外,还可以使用内存映射数组将张量存储在物理内存中。以下函数负责使用指定的超参数创建经验回放缓冲区:

from torchrl.envs import RandomCropTensorDict


def make_replay_buffer(buffer_size, batch_size, random_crop_len, prefetch=3, prb=False):
    if prb:
        sampler = PrioritizedSampler(
            max_capacity=buffer_size,
            alpha=0.7,
            beta=0.5,
        )
    else:
        sampler = RandomSampler()
    replay_buffer = TensorDictReplayBuffer(
        storage=LazyMemmapStorage(
            buffer_size,
            scratch_dir=buffer_scratch_dir,
        ),
        batch_size=batch_size,
        sampler=sampler,
        pin_memory=False,
        prefetch=prefetch,
        transform=RandomCropTensorDict(random_crop_len, sample_dim=1),
    )
    return replay_buffer

我们将把重放缓冲区存储在磁盘上的一个临时目录中

import tempfile

tmpdir = tempfile.TemporaryDirectory()
buffer_scratch_dir = tmpdir.name

回放缓冲区存储和批量大小

TorchRL 回放缓冲区计算第一个维度上的元素数量。 由于我们将轨迹馈送到缓冲区,我们需要通过将缓冲区大小除以数据收集器生成的子轨迹长度来调整缓冲区大小。 关于批量大小,我们的采样策略将包括在选择子轨迹之前采样长度为 traj_len=200 的轨迹, 或选择长度为 random_crop_len=25 的子轨迹,在这些子轨迹上计算损失。 这种策略平衡了存储特定长度的完整轨迹的选择与向我们的损失提供足够异质性样本的需求。 下图显示了从一个每次批量获取 8 帧且并行运行 2 个环境的数据收集器, 将其馈送到包含 1000 条轨迹的回放缓冲区,并采样子轨迹每 2 个时间步的情况。

Storing trajectories in the replay buffer

让我们从缓冲区中存储的帧数开始

def ceil_div(x, y):
    return -x // (-y)


buffer_size = 1_000_000
buffer_size = ceil_div(buffer_size, traj_len)

优先级重放缓冲区默认是禁用的

prb = False

我们还需要定义每批数据收集后进行多少次更新。这被称为更新与数据比率或 UTD 比率:

update_to_data = 64

我们将用长度为25的轨迹来计算损失:

random_crop_len = 25

在原始论文中,作者对每个采集到的帧使用大小为 64 的批量进行一次更新。 此处,我们复现了相同的比例,但在每次批量采集后执行多次更新。 我们调整了批量大小,以实现相同的“每帧更新次数”比例:

batch_size = ceil_div(64 * frames_per_batch, update_to_data * random_crop_len)

replay_buffer = make_replay_buffer(
    buffer_size=buffer_size,
    batch_size=batch_size,
    random_crop_len=random_crop_len,
    prefetch=3,
    prb=prb,
)

损失模块构建

我们使用刚刚创建的演员和qnet构建我们的损失模块。 因为我们有目标参数需要更新,所以我们_必须_创建一个目标网络更新器。

gamma = 0.99
lmbda = 0.9
tau = 0.001  # Decay factor for the target network

loss_module = DDPGLoss(actor, qnet)

让我们使用TD(lambda)估计器吧!

loss_module.make_value_estimator(ValueEstimators.TDLambda, gamma=gamma, lmbda=lmbda)

注意

Off-policy 通常使用 TD(0) 估计器。在这里,我们使用 TD(\(\lambda\)) 估计器,这将引入一些偏差,因为跟随某个状态的轨迹是使用过时的策略收集的。 这个技巧,以及可以在数据收集期间使用的多步技巧,都是我们在实践中发现效果良好的“黑客”方法的替代版本,尽管它们在回报估计中引入了一些偏差。

目标网络更新器

目标网络是离策略强化学习(RL)算法的关键组成部分。 借助 HardUpdateSoftUpdate 类,目标网络参数的更新变得十分简便。 这些类以损失模块作为参数构建,并在训练循环中的适当位置调用 updater.step() 来完成更新。

from torchrl.objectives.utils import SoftUpdate

target_net_updater = SoftUpdate(loss_module, eps=1 - tau)

优化器

最后,我们将使用Adam优化器来优化策略网络和价值网络:

from torch import optim

optimizer_actor = optim.Adam(
    loss_module.actor_network_params.values(True, True), lr=1e-4, weight_decay=0.0
)
optimizer_value = optim.Adam(
    loss_module.value_network_params.values(True, True), lr=1e-3, weight_decay=1e-2
)
total_collection_steps = total_frames // frames_per_batch

是时候训练策略了

现在,我们已经构建了所需的所有模块,训练循环就非常直接了。

rewards = []
rewards_eval = []

# Main loop

collected_frames = 0
pbar = tqdm.tqdm(total=total_frames)
r0 = None
for i, tensordict in enumerate(collector):

    # update weights of the inference policy
    collector.update_policy_weights_()

    if r0 is None:
        r0 = tensordict["next", "reward"].mean().item()
    pbar.update(tensordict.numel())

    # extend the replay buffer with the new data
    current_frames = tensordict.numel()
    collected_frames += current_frames
    replay_buffer.extend(tensordict.cpu())

    # optimization steps
    if collected_frames >= init_random_frames:
        for _ in range(update_to_data):
            # sample from replay buffer
            sampled_tensordict = replay_buffer.sample().to(device)

            # Compute loss
            loss_dict = loss_module(sampled_tensordict)

            # optimize
            loss_dict["loss_actor"].backward()
            gn1 = torch.nn.utils.clip_grad_norm_(
                loss_module.actor_network_params.values(True, True), 10.0
            )
            optimizer_actor.step()
            optimizer_actor.zero_grad()

            loss_dict["loss_value"].backward()
            gn2 = torch.nn.utils.clip_grad_norm_(
                loss_module.value_network_params.values(True, True), 10.0
            )
            optimizer_value.step()
            optimizer_value.zero_grad()

            gn = (gn1**2 + gn2**2) ** 0.5

            # update priority
            if prb:
                replay_buffer.update_tensordict_priority(sampled_tensordict)
            # update target network
            target_net_updater.step()

    rewards.append(
        (
            i,
            tensordict["next", "reward"].mean().item(),
        )
    )
    td_record = recorder(None)
    if td_record is not None:
        rewards_eval.append((i, td_record["r_evaluation"].item()))
    if len(rewards_eval) and collected_frames >= init_random_frames:
        target_value = loss_dict["target_value"].item()
        loss_value = loss_dict["loss_value"].item()
        loss_actor = loss_dict["loss_actor"].item()
        rn = sampled_tensordict["next", "reward"].mean().item()
        rs = sampled_tensordict["next", "reward"].std().item()
        pbar.set_description(
            f"reward: {rewards[-1][1]: 4.2f} (r0 = {r0: 4.2f}), "
            f"reward eval: reward: {rewards_eval[-1][1]: 4.2f}, "
            f"reward normalized={rn :4.2f}/{rs :4.2f}, "
            f"grad norm={gn: 4.2f}, "
            f"loss_value={loss_value: 4.2f}, "
            f"loss_actor={loss_actor: 4.2f}, "
            f"target value: {target_value: 4.2f}"
        )

    # update the exploration strategy
    actor_model_explore[1].step(current_frames)

collector.shutdown()
del collector
  0%|          | 0/10000 [00:00<?, ?it/s]
  8%|▊         | 800/10000 [00:00<00:03, 2637.60it/s]
 16%|█▌        | 1600/10000 [00:02<00:13, 641.71it/s]
 24%|██▍       | 2400/10000 [00:02<00:07, 978.24it/s]
 32%|███▏      | 3200/10000 [00:02<00:05, 1299.00it/s]
 40%|████      | 4000/10000 [00:03<00:03, 1585.24it/s]
 48%|████▊     | 4800/10000 [00:03<00:02, 1832.78it/s]
 56%|█████▌    | 5600/10000 [00:03<00:02, 2034.12it/s]
reward: -2.29 (r0 = -1.97), reward eval: reward: -0.00, reward normalized=-2.04/6.78, grad norm= 68.11, loss_value= 465.99, loss_actor= 13.91, target value: -13.13:  56%|█████▌    | 5600/10000 [00:05<00:02, 2034.12it/s]
reward: -2.29 (r0 = -1.97), reward eval: reward: -0.00, reward normalized=-2.04/6.78, grad norm= 68.11, loss_value= 465.99, loss_actor= 13.91, target value: -13.13:  64%|██████▍   | 6400/10000 [00:06<00:05, 710.74it/s]
reward: -2.37 (r0 = -1.97), reward eval: reward: -0.00, reward normalized=-2.65/5.69, grad norm= 49.61, loss_value= 211.72, loss_actor= 14.42, target value: -16.98:  64%|██████▍   | 6400/10000 [00:08<00:05, 710.74it/s]
reward: -2.37 (r0 = -1.97), reward eval: reward: -0.00, reward normalized=-2.65/5.69, grad norm= 49.61, loss_value= 211.72, loss_actor= 14.42, target value: -16.98:  72%|███████▏  | 7200/10000 [00:09<00:05, 496.69it/s]
reward: -5.23 (r0 = -1.97), reward eval: reward: -0.00, reward normalized=-2.82/6.44, grad norm= 128.96, loss_value= 357.72, loss_actor= 15.46, target value: -18.87:  72%|███████▏  | 7200/10000 [00:11<00:05, 496.69it/s]
reward: -5.23 (r0 = -1.97), reward eval: reward: -0.00, reward normalized=-2.82/6.44, grad norm= 128.96, loss_value= 357.72, loss_actor= 15.46, target value: -18.87:  80%|████████  | 8000/10000 [00:11<00:04, 415.06it/s]
reward: -4.89 (r0 = -1.97), reward eval: reward: -0.00, reward normalized=-2.95/5.74, grad norm= 59.11, loss_value= 246.36, loss_actor= 18.35, target value: -19.09:  80%|████████  | 8000/10000 [00:13<00:04, 415.06it/s]
reward: -4.89 (r0 = -1.97), reward eval: reward: -0.00, reward normalized=-2.95/5.74, grad norm= 59.11, loss_value= 246.36, loss_actor= 18.35, target value: -19.09:  88%|████████▊ | 8800/10000 [00:14<00:03, 371.73it/s]
reward: -5.27 (r0 = -1.97), reward eval: reward: -5.03, reward normalized=-3.32/5.64, grad norm= 130.20, loss_value= 308.06, loss_actor= 22.70, target value: -23.36:  88%|████████▊ | 8800/10000 [00:18<00:03, 371.73it/s]
reward: -5.27 (r0 = -1.97), reward eval: reward: -5.03, reward normalized=-3.32/5.64, grad norm= 130.20, loss_value= 308.06, loss_actor= 22.70, target value: -23.36:  96%|█████████▌| 9600/10000 [00:18<00:01, 287.09it/s]
reward: -3.88 (r0 = -1.97), reward eval: reward: -5.03, reward normalized=-3.69/5.15, grad norm= 104.31, loss_value= 211.37, loss_actor= 23.34, target value: -26.08:  96%|█████████▌| 9600/10000 [00:20<00:01, 287.09it/s]
reward: -3.88 (r0 = -1.97), reward eval: reward: -5.03, reward normalized=-3.69/5.15, grad norm= 104.31, loss_value= 211.37, loss_actor= 23.34, target value: -26.08: : 10400it [00:21, 271.50it/s]
reward: -4.61 (r0 = -1.97), reward eval: reward: -5.03, reward normalized=-2.93/5.39, grad norm= 125.93, loss_value= 236.82, loss_actor= 21.76, target value: -20.33: : 10400it [00:23, 271.50it/s]

实验结果

我们绘制了训练期间平均奖励的简单曲线图。可以观察到, 我们的策略已较好地学会了完成该任务。

注意

如上所述,为了获得更合理的性能, 请使用更大的值作为 total_frames 的值,例如 1M。

from matplotlib import pyplot as plt

plt.figure()
plt.plot(*zip(*rewards), label="training")
plt.plot(*zip(*rewards_eval), label="eval")
plt.legend()
plt.xlabel("iter")
plt.ylabel("reward")
plt.tight_layout()
coding ddpg

结论

在本教程中,我们学习了如何基于 DDPG 的具体示例,在 TorchRL 中编写损失模块。

关键要点如下:

  • 如何使用 LossModule 类来编码一个新的损失组件;

  • 如何使用(或不使用)目标网络,以及如何更新其参数;

  • 如何创建与损失模块关联的优化器。

下一步

要进一步迭代这个损失模块,我们可以考虑:

脚本总运行时间: (2 分钟 0.389 秒)

估计内存使用量: 322 MB

通过 Sphinx-Gallery 生成的画廊

文档

访问 PyTorch 的全面开发人员文档

查看文档

教程

获取面向初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并解答您的问题

查看资源