目录

TorchRL 目标:编码 DDPG 损失

作者Vincent Moens

概述

TorchRL 将 RL 算法的训练分为各个部分,这些部分将 在您的训练脚本中组装:环境、数据收集和 storage、模型,最后是损失函数。

TorchRL 损失(或“目标”)是有状态对象,其中包含 可训练参数(策略和价值模型)。 本教程将指导您完成从头开始编写亏损代码的步骤 使用 TorchRL。

为此,我们将专注于 DDPG,这是一个相对简单的 算法进行编码。深度确定性策略梯度 (DDPG) 是一种简单的连续控制算法。它包括学习 参数值函数,以及 然后学习一个策略,该策略输出可最大化此价值的操作 函数给定一定的观察值。

您将学到什么:

  • 如何编写 loss 模块并自定义其 Value Estimator;

  • 如何在 TorchRL 中构建环境,包括转换 (例如,数据规范化)和并行执行;

  • 如何设计政策和价值网络;

  • 如何有效地从环境中收集数据并存储数据 在 replay 缓冲区中;

  • 如何在重放缓冲区中存储轨迹(而不是过渡));

  • 如何评估您的模型。

先决条件

本教程假定您已完成 PPO 教程,该教程提供 TorchRL 组件和依赖项概述,例如 和 , 虽然它应该是 足够透明,无需深入了解即可理解 这些类。tensordict.TensorDicttensordict.nn.TensorDictModules

注意

我们的目的不是给出算法的 SOTA 实现,而是 提供 TorchRL 的 loss 实现的高级说明 以及要在 这个算法。

导入和设置

%%bash
pip3 install torchrl mujoco glfw
import torch
import tqdm

如果可用,我们将在 CUDA 上执行策略

is_fork = multiprocessing.get_start_method() == "fork"
device = (
    torch.device(0)
    if torch.cuda.is_available() and not is_fork
    else torch.device("cpu")
)
collector_device = torch.device("cpu")  # Change the device to ``cuda`` to use CUDA

TorchRL LossModule

TorchRL 提供了一系列可在训练脚本中使用的损失。 目标是获得易于重用/可交换的损失,并且具有 一个简单的签名。

TorchRL 损失的主要特点是:

  • 它们是有状态对象:它们包含可训练参数的副本 这样就可以提供训练 算法。loss_module.parameters()

  • 它们遵循约定:该方法将接收一个 TensorDict 作为 Input,其中包含所有必要的 信息返回损失值。TensorDict

  • 它们输出一个实例,其中包含 loss 值 写入其中是一个描述 损失。中的其他键可能是记录期间的有用指标 训练时间。tensordict.TensorDict"loss_<smth>"smthTensorDict

    注意

    我们之所以返回独立亏损,是为了让用户使用不同的 optimizer 来获取不同的参数集。汇总损失 可以简单地通过

    ..代码 - block::P ython

    >>> loss_val = sum(loss for key, loss in loss_dict.items() if key.startswith("loss_"))
    

方法__init__

所有损失的父类是 。 与该库的许多其他组件一样,其方法期望 作为输入,从体验中采样的实例 replay buffer 或任何类似的数据结构。使用这种格式可以 可以跨 模态,或者在模型需要读取多个 条目。换句话说,它允许我们编写一个 loss 模块,该 忽略了被赋予 is 的数据类型,并且它专注于 运行 loss 函数的基本步骤,并且只运行这些步骤。tensordict.TensorDict

为了使本教程尽可能具有教学性,我们将展示每种方法 的 API 中,我们将在稍后的 阶段。

让我们从方法开始。DDPG 旨在通过简单的策略解决控制任务: 训练策略以输出操作,这些操作使预测的值最大化 价值网络。因此,我们的 loss 模块需要在其 constructor:一个 Actor 和一个 Value Networks。我们预计这两者都是 与 TensorDict 兼容的对象,例如 . 我们的损失函数需要计算一个目标值并拟合该值 network 添加到 this 中,并生成一个操作并适合策略,使其 value estimate 最大化。__init__()tensordict.nn.TensorDictModule

该方法的关键步骤是调用 .此方法将提取 模块中的参数并将其转换为功能模块。 严格来说,这不是必需的,一个人可以完美地编码所有 没有它的损失。但是,我们鼓励将其用于以下用途 原因。LossModule.__init__()convert_to_functional()

TorchRL 这样做的原因是 RL 算法通常执行相同的操作 具有不同参数集的模型,称为 “trainable” 和 “target” 参数。 “可训练”参数是优化器需要拟合的参数。这 “target” 参数通常是前者的副本,但有一些时间滞后 (绝对或通过移动平均线稀释)。 这些目标参数用于计算与 下一次观察。使用一组目标参数的优点 对于不完全匹配的值模型,当前配置为 它们为正在计算的值函数提供了一个悲观的边界。 注意下面的 keyword 参数:this argument 告诉该方法在 loss 模块中创建一组要使用的目标参数 用于目标值计算。如果将其设置为 (请参阅 actor 网络 例如),该属性仍将为 accessible 的 URL,但这只会返回 actor 参数。create_target_paramsFalsetarget_actor_network_params

稍后,我们将看到如何在 TorchRL 中更新目标参数。

from tensordict.nn import TensorDictModule, TensorDictSequential


def _init(
    self,
    actor_network: TensorDictModule,
    value_network: TensorDictModule,
) -> None:
    super(type(self), self).__init__()

    self.convert_to_functional(
        actor_network,
        "actor_network",
        create_target_params=True,
    )
    self.convert_to_functional(
        value_network,
        "value_network",
        create_target_params=True,
        compare_against=list(actor_network.parameters()),
    )

    self.actor_in_keys = actor_network.in_keys

    # Since the value we'll be using is based on the actor and value network,
    # we put them together in a single actor-critic container.
    actor_critic = ActorCriticWrapper(actor_network, value_network)
    self.actor_critic = actor_critic
    self.loss_function = "l2"

value estimator loss 方法

在许多 RL 算法中,价值网络(或 Q 值网络)是基于训练的 根据经验价值估计。这可以是自举的 (TD(0),低 方差、高偏差)、含义 目标值是使用 next reward 获得的,而不是其他任何奖励,或者 可以获得蒙特卡洛估计值 (TD(1)),在这种情况下,整个 将使用即将到来的奖励的顺序(高变异性、低偏差)。一 中间估计器 (TD()) 也可用于妥协 偏差和方差。 TorchRL 可以通过 Enum 类轻松使用一个或另一个估计器,该类包含 指向所有已实现的值估计器的指针。我们来定义默认的 value 函数。我们将采用最简单的版本 (TD(0)),稍后显示 关于如何改变这一点。

from torchrl.objectives.utils import ValueEstimators

default_value_estimator = ValueEstimators.TD0

我们还需要向 DDPG 提供一些有关如何构建价值的说明 estimator 的 intent 值,具体取决于用户查询。根据提供的估算器, 我们将构建相应的模块,以便在 Train 时使用:

from torchrl.objectives.utils import default_value_kwargs
from torchrl.objectives.value import TD0Estimator, TD1Estimator, TDLambdaEstimator


def make_value_estimator(self, value_type: ValueEstimators, **hyperparams):
    hp = dict(default_value_kwargs(value_type))
    if hasattr(self, "gamma"):
        hp["gamma"] = self.gamma
    hp.update(hyperparams)
    value_key = "state_action_value"
    if value_type == ValueEstimators.TD1:
        self._value_estimator = TD1Estimator(value_network=self.actor_critic, **hp)
    elif value_type == ValueEstimators.TD0:
        self._value_estimator = TD0Estimator(value_network=self.actor_critic, **hp)
    elif value_type == ValueEstimators.GAE:
        raise NotImplementedError(
            f"Value type {value_type} it not implemented for loss {type(self)}."
        )
    elif value_type == ValueEstimators.TDLambda:
        self._value_estimator = TDLambdaEstimator(value_network=self.actor_critic, **hp)
    else:
        raise NotImplementedError(f"Unknown value type {value_type}")
    self._value_estimator.set_keys(value=value_key)

方法可以但不需要调用:如果 not 时,将使用 它的默认 estimator 来估计。make_value_estimator

actor 损失方法

RL 算法的核心部分是 actor 的训练损失。 在 DDPG 的情况下,这个函数非常简单:我们只需要计算 与使用 Policy and optimize 计算的操作关联的值 Actor 权重以最大化此值。

在计算这个值时,我们必须确保去掉 value 参数 ,否则 actor 和 value loss 会混淆。 为此,函数 可以使用。

def _loss_actor(
    self,
    tensordict,
) -> torch.Tensor:
    td_copy = tensordict.select(*self.actor_in_keys)
    # Get an action from the actor network: since we made it functional, we need to pass the params
    with self.actor_network_params.to_module(self.actor_network):
        td_copy = self.actor_network(td_copy)
    # get the value associated with that action
    with self.value_network_params.detach().to_module(self.value_network):
        td_copy = self.value_network(td_copy)
    return -td_copy.get("state_action_value")

value loss 方法

我们现在需要优化我们的价值网络参数。 为此,我们将依赖我们类的值估计器:

from torchrl.objectives.utils import distance_loss


def _loss_value(
    self,
    tensordict,
):
    td_copy = tensordict.clone()

    # V(s, a)
    with self.value_network_params.to_module(self.value_network):
        self.value_network(td_copy)
    pred_val = td_copy.get("state_action_value").squeeze(-1)

    # we manually reconstruct the parameters of the actor-critic, where the first
    # set of parameters belongs to the actor and the second to the value function.
    target_params = TensorDict(
        {
            "module": {
                "0": self.target_actor_network_params,
                "1": self.target_value_network_params,
            }
        },
        batch_size=self.target_actor_network_params.batch_size,
        device=self.target_actor_network_params.device,
    )
    with target_params.to_module(self.actor_critic):
        target_value = self.value_estimator.value_estimate(tensordict).squeeze(-1)

    # Computes the value loss: L2, L1 or smooth L1 depending on `self.loss_function`
    loss_value = distance_loss(pred_val, target_value, loss_function=self.loss_function)
    td_error = (pred_val - target_value).pow(2)

    return loss_value, td_error, pred_val, target_value

在前瞻呼叫中将所有内容放在一起

唯一缺少的部分是 forward 方法,它将 value 和 actor loss 中,收集 cost 值并将其写入 delivered to the user.TensorDict

from tensordict import TensorDict, TensorDictBase


def _forward(self, input_tensordict: TensorDictBase) -> TensorDict:
    loss_value, td_error, pred_val, target_value = self.loss_value(
        input_tensordict,
    )
    td_error = td_error.detach()
    td_error = td_error.unsqueeze(input_tensordict.ndimension())
    if input_tensordict.device is not None:
        td_error = td_error.to(input_tensordict.device)
    input_tensordict.set(
        "td_error",
        td_error,
        inplace=True,
    )
    loss_actor = self.loss_actor(input_tensordict)
    return TensorDict(
        source={
            "loss_actor": loss_actor.mean(),
            "loss_value": loss_value.mean(),
            "pred_value": pred_val.mean().detach(),
            "target_value": target_value.mean().detach(),
            "pred_value_max": pred_val.max().detach(),
            "target_value_max": target_value.max().detach(),
        },
        batch_size=[],
    )


from torchrl.objectives import LossModule


class DDPGLoss(LossModule):
    default_value_estimator = default_value_estimator
    make_value_estimator = make_value_estimator

    __init__ = _init
    forward = _forward
    loss_value = _loss_value
    loss_actor = _loss_actor

现在我们有了损失,我们可以使用它来训练策略来解决 控制任务。

环境

在大多数算法中,首先需要注意的是 环境的构造,因为它限制了 training 脚本。

在此示例中,我们将使用 task.目标是使 半只猎豹尽可能快地跑。"cheetah"

在 TorchRL 中,可以通过依赖 或 来创建这样的任务:dm_controlgym

env = GymEnv("HalfCheetah-v4")

env = DMControlEnv("cheetah", "run")

默认情况下,这些环境将禁用渲染。来自各州的培训是 通常比从图像训练更容易。为了简单起见,我们专注于 仅向各州学习。要将像素传递给 被 收集,只需将参数传递给构造函数即可:tensordictsenv.step()from_pixels=True

env = GymEnv("HalfCheetah-v4", from_pixels=True, pixels_only=True)

我们编写一个 helper 函数来创建一个环境 上面考虑了两个后端中的任何一个 ( 或 )。make_env()dm-controlgym

from torchrl.envs.libs.dm_control import DMControlEnv
from torchrl.envs.libs.gym import GymEnv

env_library = None
env_name = None


def make_env(from_pixels=False):
    """Create a base ``env``."""
    global env_library
    global env_name

    if backend == "dm_control":
        env_name = "cheetah"
        env_task = "run"
        env_args = (env_name, env_task)
        env_library = DMControlEnv
    elif backend == "gym":
        env_name = "HalfCheetah-v4"
        env_args = (env_name,)
        env_library = GymEnv
    else:
        raise NotImplementedError

    env_kwargs = {
        "device": device,
        "from_pixels": from_pixels,
        "pixels_only": from_pixels,
        "frame_skip": 2,
    }
    env = env_library(*env_args, **env_kwargs)
    return env

变换

现在我们有一个基本环境,我们可能想要修改它的表示 使其对策略更友好。在 TorchRL 中,转换被附加到 base 环境。torchr.envs.TransformedEnv

  • 在 DDPG 中,使用一些启发式值重新缩放奖励是很常见的。我们 将奖励乘以 5。

  • 如果我们使用 ,那么构建一个接口也很重要 在处理双精度数字的模拟器和我们的 script 的 intent 调用的 intent 的 1 个实例。这种转变是 两种方式:调用 时,我们的 action 都需要是 以双精度表示,并且输出需要转换 设置为单精度。 转换正是这样做的:列表引用需要转换的键 double 设置为 float,而 the 则是指那些需要 在传递到环境之前转换为 double。dm_controlenv.step()DoubleToFloatin_keysin_keys_inv

  • 我们使用 transform 将 state key 连接在一起。CatTensors

  • 最后,我们还保留了使状态正常化的可能性:我们将 稍后负责计算归一化常量。

from torchrl.envs import (
    CatTensors,
    DoubleToFloat,
    EnvCreator,
    InitTracker,
    ObservationNorm,
    ParallelEnv,
    RewardScaling,
    StepCounter,
    TransformedEnv,
)


def make_transformed_env(
    env,
):
    """Apply transforms to the ``env`` (such as reward scaling and state normalization)."""

    env = TransformedEnv(env)

    # we append transforms one by one, although we might as well create the
    # transformed environment using the `env = TransformedEnv(base_env, transforms)`
    # syntax.
    env.append_transform(RewardScaling(loc=0.0, scale=reward_scaling))

    # We concatenate all states into a single "observation_vector"
    # even if there is a single tensor, it'll be renamed in "observation_vector".
    # This facilitates the downstream operations as we know the name of the
    # output tensor.
    # In some environments (not half-cheetah), there may be more than one
    # observation vector: in this case this code snippet will concatenate them
    # all.
    selected_keys = list(env.observation_spec.keys())
    out_key = "observation_vector"
    env.append_transform(CatTensors(in_keys=selected_keys, out_key=out_key))

    # we normalize the states, but for now let's just instantiate a stateless
    # version of the transform
    env.append_transform(ObservationNorm(in_keys=[out_key], standard_normal=True))

    env.append_transform(DoubleToFloat())

    env.append_transform(StepCounter(max_frames_per_traj))

    # We need a marker for the start of trajectories for our Ornstein-Uhlenbeck (OU)
    # exploration:
    env.append_transform(InitTracker())

    return env

并行执行

以下 helper 函数允许我们并行运行环境。 并行运行环境可以显著加快收集速度 吞吐量。当使用转换后的环境时,我们需要选择是否 想要为每个环境单独执行转换,或者 集中数据并批量转换。两种方法都很容易 法典:

env = ParallelEnv(
    lambda: TransformedEnv(GymEnv("HalfCheetah-v4"), transforms),
    num_workers=4
)
env = TransformedEnv(
    ParallelEnv(lambda: GymEnv("HalfCheetah-v4"), num_workers=4),
    transforms
)

为了利用 PyTorch 的矢量化功能,我们采用了 第一种方法:

def parallel_env_constructor(
    env_per_collector,
    transform_state_dict,
):
    if env_per_collector == 1:

        def make_t_env():
            env = make_transformed_env(make_env())
            env.transform[2].init_stats(3)
            env.transform[2].loc.copy_(transform_state_dict["loc"])
            env.transform[2].scale.copy_(transform_state_dict["scale"])
            return env

        env_creator = EnvCreator(make_t_env)
        return env_creator

    parallel_env = ParallelEnv(
        num_workers=env_per_collector,
        create_env_fn=EnvCreator(lambda: make_env()),
        create_env_kwargs=None,
        pin_memory=False,
    )
    env = make_transformed_env(parallel_env)
    # we call `init_stats` for a limited number of steps, just to instantiate
    # the lazy buffers.
    env.transform[2].init_stats(3, cat_dim=1, reduce_dim=[0, 1])
    env.transform[2].load_state_dict(transform_state_dict)
    return env


# The backend can be ``gym`` or ``dm_control``
backend = "gym"

注意

frame_skip将多个步骤与单个操作一起批处理 如果> 1,则其他帧计数(例如,frames_per_batch、total_frames) 需要进行调整,以便收集的帧总数一致 跨实验。这很重要,因为可以提高跳帧,但保持 未更改的帧总数可能看起来像作弊:所有事物都比较, 一个包含 10M 个元素的数据集,其中跳帧为 2,另一个数据集的跳帧率为 2 跳帧 1 实际上具有与环境交互的比率 2:1!简而言之,应该谨慎对待 training 脚本,因为这可能会导致 训练策略之间的偏倚比较。

缩放奖励有助于我们控制信号幅度,以获得更多 高效学习。

reward_scaling = 5.0

我们还定义了何时截断轨迹。一千步(如果为 500 frame-skip = 2) 是用于 Cheetah 任务的好数字:

max_frames_per_traj = 500

观测值的标准化

为了计算归一化统计数据,我们运行任意数量的随机 步骤,并计算 收集的观察结果。该方法 用于此目的。为了获得汇总统计数据,我们创建了一个虚拟 环境并运行给定数量的步骤,在给定的 Number of steps 并计算其摘要统计信息。ObservationNorm.init_stats()

def get_env_stats():
    """Gets the stats of an environment."""
    proof_env = make_transformed_env(make_env())
    t = proof_env.transform[2]
    t.init_stats(init_env_steps)
    transform_state_dict = t.state_dict()
    proof_env.close()
    return transform_state_dict

标准化统计

用于统计计算的随机步数ObservationNorm

init_env_steps = 5000

transform_state_dict = get_env_stats()

每个数据收集器中的环境数

env_per_collector = 4

我们传递之前计算的统计数据,以规范化 环境:

parallel_env = parallel_env_constructor(
    env_per_collector=env_per_collector,
    transform_state_dict=transform_state_dict,
)


from torchrl.data import Composite

构建模型

现在我们来看看模型的设置。正如我们所看到的,DDPG 需要一个 value 网络,经过训练以估计状态-操作对的值,以及 参数化角色,学习如何选择最大化此值的动作。

回想一下,构建 TorchRL 模块需要两个步骤:

  • 编写将用作网络的

  • 将网络包装在 where 通过指定 input 和 output 键来处理数据流。tensordict.nn.TensorDictModule

在更复杂的场景中,可以 也被使用。tensordict.nn.TensorDictSequential

Q 值网络包装在 a 中,该网络会自动将 q 值的 to 设置为 网络和其他价值网络。ValueOperatorout_keys"state_action_valuestate_value

TorchRL 提供了 DDPG 网络的内置版本,如 原始论文。这些可以在 和 找到。

由于我们使用了惰性模块,因此有必要将惰性模块具体化 在能够将策略从一个设备移动到另一个设备并实现其他 操作。因此,最好使用小型 数据样本。为此,我们从 环境规范。

from torchrl.modules import (
    ActorCriticWrapper,
    DdpgMlpActor,
    DdpgMlpQNet,
    OrnsteinUhlenbeckProcessModule,
    ProbabilisticActor,
    TanhDelta,
    ValueOperator,
)


def make_ddpg_actor(
    transform_state_dict,
    device="cpu",
):
    proof_environment = make_transformed_env(make_env())
    proof_environment.transform[2].init_stats(3)
    proof_environment.transform[2].load_state_dict(transform_state_dict)

    out_features = proof_environment.action_spec.shape[-1]

    actor_net = DdpgMlpActor(
        action_dim=out_features,
    )

    in_keys = ["observation_vector"]
    out_keys = ["param"]

    actor = TensorDictModule(
        actor_net,
        in_keys=in_keys,
        out_keys=out_keys,
    )

    actor = ProbabilisticActor(
        actor,
        distribution_class=TanhDelta,
        in_keys=["param"],
        spec=Composite(action=proof_environment.action_spec),
    ).to(device)

    q_net = DdpgMlpQNet()

    in_keys = in_keys + ["action"]
    qnet = ValueOperator(
        in_keys=in_keys,
        module=q_net,
    ).to(device)

    # initialize lazy modules
    qnet(actor(proof_environment.reset().to(device)))
    return actor, qnet


actor, qnet = make_ddpg_actor(
    transform_state_dict=transform_state_dict,
    device=device,
)

勘探

正如原始论文中所建议的那样,该策略被传递到 exploration 模块中。 让我们定义 OU 噪声达到其最小值之前的帧数

annealing_frames = 1_000_000

actor_model_explore = TensorDictSequential(
    actor,
    OrnsteinUhlenbeckProcessModule(
        spec=actor.spec.clone(),
        annealing_num_steps=annealing_frames,
    ).to(device),
)
if device == torch.device("cpu"):
    actor_model_explore.share_memory()

数据收集器

TorchRL 提供了专门的类来帮助您通过执行 环境中的策略。这些 “数据收集器” 迭代计算 要在给定时间执行的操作,然后执行 环境并在需要时重置它。 数据收集器旨在帮助开发人员进行严格控制 关于每批数据的帧数,关于这个的 (a)sync 性质 集合和分配给数据收集的资源(例如 GPU、工作线程数量等)。

这里我们将使用 ,一个简单的单进程 数据收集器。TorchRL 提供了其他收集器,例如 ,它执行了 以异步方式推出(例如,在 策略正在优化,从而将训练和 数据收集)。

要指定的参数包括:

  • 环境工厂或环境,

  • 策略、

  • 收集器被视为空之前的帧总数,

  • 每个轨迹的最大帧数(对于非终止 环境,例如 Environments)。dm_control

    注意

    传递给 collector 将产生 注册新转换 替换为用于推理的环境。我们可以达到同样的效果 manually,就像我们在这个脚本中所做的那样。max_frames_per_trajStepCounter

还应传递:

  • 收集的每个批次中的帧数,

  • 独立于策略执行的随机步骤数,

  • 用于策略执行的设备

  • 在将数据传递到 main 之前用于存储数据的设备 过程。

我们在训练期间将使用的总帧数应在 1M 左右。

total_frames = 10_000  # 1_000_000

收集器在外部的每次迭代中返回的帧数 loop 等于每个子轨迹的长度乘以 环境在每个收集器中并行运行。

换句话说,我们希望来自 collector 的 batchs 具有一个形状,其中:[env_per_collector, traj_len]traj_len=frames_per_batch/env_per_collector

traj_len = 200
frames_per_batch = env_per_collector * traj_len
init_random_frames = 5000
num_collectors = 2

from torchrl.collectors import SyncDataCollector
from torchrl.envs import ExplorationType

collector = SyncDataCollector(
    parallel_env,
    policy=actor_model_explore,
    total_frames=total_frames,
    frames_per_batch=frames_per_batch,
    init_random_frames=init_random_frames,
    reset_at_each_iter=False,
    split_trajs=False,
    device=collector_device,
    exploration_type=ExplorationType.RANDOM,
)

Evaluator:构建记录器对象

由于训练数据是使用某种探索策略获得的,因此 true 我们的算法的性能需要在确定性模式下进行评估。我们 使用专用类 来执行此操作,该类在 环境,并返回一些获得的统计信息 从这些模拟中。LogValidationReward

以下帮助程序函数生成此对象:

from torchrl.trainers import LogValidationReward


def make_recorder(actor_model_explore, transform_state_dict, record_interval):
    base_env = make_env()
    environment = make_transformed_env(base_env)
    environment.transform[2].init_stats(
        3
    )  # must be instantiated to load the state dict
    environment.transform[2].load_state_dict(transform_state_dict)

    recorder_obj = LogValidationReward(
        record_frames=1000,
        policy_exploration=actor_model_explore,
        environment=environment,
        exploration_type=ExplorationType.DETERMINISTIC,
        record_interval=record_interval,
    )
    return recorder_obj

我们将记录每 10 个收集批次的性能

record_interval = 10

recorder = make_recorder(
    actor_model_explore, transform_state_dict, record_interval=record_interval
)

from torchrl.data.replay_buffers import (
    LazyMemmapStorage,
    PrioritizedSampler,
    RandomSampler,
    TensorDictReplayBuffer,
)

重放缓冲区

重放缓冲区有两种风格:prioritized(其中一些错误信号 用于为某些项目提供比其他项目更高的抽样可能性) 以及定期的循环体验重播。

TorchRL 重放缓冲区是可组合的:可以获取存储、采样 和写作策略。也可以 使用内存映射数组将张量存储在物理内存上。以下内容 函数负责创建具有所需 超参数:

from torchrl.envs import RandomCropTensorDict


def make_replay_buffer(buffer_size, batch_size, random_crop_len, prefetch=3, prb=False):
    if prb:
        sampler = PrioritizedSampler(
            max_capacity=buffer_size,
            alpha=0.7,
            beta=0.5,
        )
    else:
        sampler = RandomSampler()
    replay_buffer = TensorDictReplayBuffer(
        storage=LazyMemmapStorage(
            buffer_size,
            scratch_dir=buffer_scratch_dir,
        ),
        batch_size=batch_size,
        sampler=sampler,
        pin_memory=False,
        prefetch=prefetch,
        transform=RandomCropTensorDict(random_crop_len, sample_dim=1),
    )
    return replay_buffer

我们将重放缓冲区存储在磁盘上的临时目录中

import tempfile

tmpdir = tempfile.TemporaryDirectory()
buffer_scratch_dir = tmpdir.name

重放缓冲区存储和批量大小

TorchRL 重放缓冲区计算沿第一维的元素数量。 由于我们将向缓冲区提供轨迹,因此我们需要调整缓冲区 size 的值除以我们的 数据收集器。 关于批次大小,我们的抽样策略将包括抽样 选择子轨迹之前的长度轨迹 或计算损失的时长。 这种策略平衡了存储某个 长度,需要提供具有足够异质性的样品 让我们蒙受损失。下图显示了来自收集器的数据流 每个批次获得 8 帧,其中 2 个环境并行运行, 将它们馈送到包含 1000 个轨迹的重放缓冲区,并且 采样每个子轨迹 2 个时间步长。traj_len=200random_crop_len=25

将轨迹存储在重放缓冲区中

让我们从缓冲区中存储的帧数开始

def ceil_div(x, y):
    return -x // (-y)


buffer_size = 1_000_000
buffer_size = ceil_div(buffer_size, traj_len)

默认情况下,优先重放缓冲区处于禁用状态

prb = False

我们还需要定义每批数据将执行多少次更新 收集。这称为更新与数据或比率:UTD

update_to_data = 64

我们将用长度为 25 的轨迹来喂养损失:

random_crop_len = 25

在原始论文中,作者对一批 64 个进行了一次更新 元素。在这里,我们再现相同的比率 但是,同时在每个批次集合中实现多次更新。我们 调整我们的 batch-size 以实现相同的每帧更新数比率:

batch_size = ceil_div(64 * frames_per_batch, update_to_data * random_crop_len)

replay_buffer = make_replay_buffer(
    buffer_size=buffer_size,
    batch_size=batch_size,
    random_crop_len=random_crop_len,
    prefetch=3,
    prb=prb,
)

损耗模块结构

我们使用 actor 构建了 loss 模块,并且刚刚创建了。 因为我们有目标参数要更新,所以我们_必须_创建一个目标网络 更新。qnet

gamma = 0.99
lmbda = 0.9
tau = 0.001  # Decay factor for the target network

loss_module = DDPGLoss(actor, qnet)

让我们使用 TD(lambda) 估算器!

loss_module.make_value_estimator(ValueEstimators.TDLambda, gamma=gamma, lmbda=lmbda)

注意

Off-policy 通常指示 TD(0) 估计器。在这里,我们使用 TD() estimator 的 Timator 进行 Tim 操作,这将引入一些偏差作为随后的轨迹 某个状态是使用过时的策略收集的。 这个技巧,作为数据收集过程中可以使用的多步骤技巧, 是我们通常发现效果很好的 “hacks” 的替代版本 尽管他们在回报中引入了一些偏见,但还是进行了实践 估计。

Target Network Updater (目标网络更新程序)

目标网络是非策略 RL 算法的关键部分。 多亏了 and 类,更新目标网络参数变得容易。它们是以 loss 模块作为参数构建的,更新是 通过在 training 循环。

from torchrl.objectives.utils import SoftUpdate

target_net_updater = SoftUpdate(loss_module, eps=1 - tau)

优化

最后,我们将对策略和价值网络使用 Adam 优化器:

from torch import optim

optimizer_actor = optim.Adam(
    loss_module.actor_network_params.values(True, True), lr=1e-4, weight_decay=0.0
)
optimizer_value = optim.Adam(
    loss_module.value_network_params.values(True, True), lr=1e-3, weight_decay=1e-2
)
total_collection_steps = total_frames // frames_per_batch

是时候训练策略了

训练循环非常简单,因为我们已经构建了所有 模块。

rewards = []
rewards_eval = []

# Main loop

collected_frames = 0
pbar = tqdm.tqdm(total=total_frames)
r0 = None
for i, tensordict in enumerate(collector):

    # update weights of the inference policy
    collector.update_policy_weights_()

    if r0 is None:
        r0 = tensordict["next", "reward"].mean().item()
    pbar.update(tensordict.numel())

    # extend the replay buffer with the new data
    current_frames = tensordict.numel()
    collected_frames += current_frames
    replay_buffer.extend(tensordict.cpu())

    # optimization steps
    if collected_frames >= init_random_frames:
        for _ in range(update_to_data):
            # sample from replay buffer
            sampled_tensordict = replay_buffer.sample().to(device)

            # Compute loss
            loss_dict = loss_module(sampled_tensordict)

            # optimize
            loss_dict["loss_actor"].backward()
            gn1 = torch.nn.utils.clip_grad_norm_(
                loss_module.actor_network_params.values(True, True), 10.0
            )
            optimizer_actor.step()
            optimizer_actor.zero_grad()

            loss_dict["loss_value"].backward()
            gn2 = torch.nn.utils.clip_grad_norm_(
                loss_module.value_network_params.values(True, True), 10.0
            )
            optimizer_value.step()
            optimizer_value.zero_grad()

            gn = (gn1**2 + gn2**2) ** 0.5

            # update priority
            if prb:
                replay_buffer.update_tensordict_priority(sampled_tensordict)
            # update target network
            target_net_updater.step()

    rewards.append(
        (
            i,
            tensordict["next", "reward"].mean().item(),
        )
    )
    td_record = recorder(None)
    if td_record is not None:
        rewards_eval.append((i, td_record["r_evaluation"].item()))
    if len(rewards_eval) and collected_frames >= init_random_frames:
        target_value = loss_dict["target_value"].item()
        loss_value = loss_dict["loss_value"].item()
        loss_actor = loss_dict["loss_actor"].item()
        rn = sampled_tensordict["next", "reward"].mean().item()
        rs = sampled_tensordict["next", "reward"].std().item()
        pbar.set_description(
            f"reward: {rewards[-1][1]: 4.2f} (r0 = {r0: 4.2f}), "
            f"reward eval: reward: {rewards_eval[-1][1]: 4.2f}, "
            f"reward normalized={rn :4.2f}/{rs :4.2f}, "
            f"grad norm={gn: 4.2f}, "
            f"loss_value={loss_value: 4.2f}, "
            f"loss_actor={loss_actor: 4.2f}, "
            f"target value: {target_value: 4.2f}"
        )

    # update the exploration strategy
    actor_model_explore[1].step(current_frames)

collector.shutdown()
del collector
  0%|          | 0/10000 [00:00<?, ?it/s]
  8%|▊         | 800/10000 [00:00<00:03, 2637.60it/s]
 16%|█▌        | 1600/10000 [00:02<00:13, 641.71it/s]
 24%|██▍       | 2400/10000 [00:02<00:07, 978.24it/s]
 32%|███▏      | 3200/10000 [00:02<00:05, 1299.00it/s]
 40%|████      | 4000/10000 [00:03<00:03, 1585.24it/s]
 48%|████▊     | 4800/10000 [00:03<00:02, 1832.78it/s]
 56%|█████▌    | 5600/10000 [00:03<00:02, 2034.12it/s]
reward: -2.29 (r0 = -1.97), reward eval: reward: -0.00, reward normalized=-2.04/6.78, grad norm= 68.11, loss_value= 465.99, loss_actor= 13.91, target value: -13.13:  56%|█████▌    | 5600/10000 [00:05<00:02, 2034.12it/s]
reward: -2.29 (r0 = -1.97), reward eval: reward: -0.00, reward normalized=-2.04/6.78, grad norm= 68.11, loss_value= 465.99, loss_actor= 13.91, target value: -13.13:  64%|██████▍   | 6400/10000 [00:06<00:05, 710.74it/s]
reward: -2.37 (r0 = -1.97), reward eval: reward: -0.00, reward normalized=-2.65/5.69, grad norm= 49.61, loss_value= 211.72, loss_actor= 14.42, target value: -16.98:  64%|██████▍   | 6400/10000 [00:08<00:05, 710.74it/s]
reward: -2.37 (r0 = -1.97), reward eval: reward: -0.00, reward normalized=-2.65/5.69, grad norm= 49.61, loss_value= 211.72, loss_actor= 14.42, target value: -16.98:  72%|███████▏  | 7200/10000 [00:09<00:05, 496.69it/s]
reward: -5.23 (r0 = -1.97), reward eval: reward: -0.00, reward normalized=-2.82/6.44, grad norm= 128.96, loss_value= 357.72, loss_actor= 15.46, target value: -18.87:  72%|███████▏  | 7200/10000 [00:11<00:05, 496.69it/s]
reward: -5.23 (r0 = -1.97), reward eval: reward: -0.00, reward normalized=-2.82/6.44, grad norm= 128.96, loss_value= 357.72, loss_actor= 15.46, target value: -18.87:  80%|████████  | 8000/10000 [00:11<00:04, 415.06it/s]
reward: -4.89 (r0 = -1.97), reward eval: reward: -0.00, reward normalized=-2.95/5.74, grad norm= 59.11, loss_value= 246.36, loss_actor= 18.35, target value: -19.09:  80%|████████  | 8000/10000 [00:13<00:04, 415.06it/s]
reward: -4.89 (r0 = -1.97), reward eval: reward: -0.00, reward normalized=-2.95/5.74, grad norm= 59.11, loss_value= 246.36, loss_actor= 18.35, target value: -19.09:  88%|████████▊ | 8800/10000 [00:14<00:03, 371.73it/s]
reward: -5.27 (r0 = -1.97), reward eval: reward: -5.03, reward normalized=-3.32/5.64, grad norm= 130.20, loss_value= 308.06, loss_actor= 22.70, target value: -23.36:  88%|████████▊ | 8800/10000 [00:18<00:03, 371.73it/s]
reward: -5.27 (r0 = -1.97), reward eval: reward: -5.03, reward normalized=-3.32/5.64, grad norm= 130.20, loss_value= 308.06, loss_actor= 22.70, target value: -23.36:  96%|█████████▌| 9600/10000 [00:18<00:01, 287.09it/s]
reward: -3.88 (r0 = -1.97), reward eval: reward: -5.03, reward normalized=-3.69/5.15, grad norm= 104.31, loss_value= 211.37, loss_actor= 23.34, target value: -26.08:  96%|█████████▌| 9600/10000 [00:20<00:01, 287.09it/s]
reward: -3.88 (r0 = -1.97), reward eval: reward: -5.03, reward normalized=-3.69/5.15, grad norm= 104.31, loss_value= 211.37, loss_actor= 23.34, target value: -26.08: : 10400it [00:21, 271.50it/s]
reward: -4.61 (r0 = -1.97), reward eval: reward: -5.03, reward normalized=-2.93/5.39, grad norm= 125.93, loss_value= 236.82, loss_actor= 21.76, target value: -20.33: : 10400it [00:23, 271.50it/s]

实验结果

我们制作了训练期间平均奖励的简单图。我们可以观察 我们的政策在解决任务方面学得相当好。

注意

如上所述,为了获得更合理的性能, 使用更大的值,例如 1M。total_frames

from matplotlib import pyplot as plt

plt.figure()
plt.plot(*zip(*rewards), label="training")
plt.plot(*zip(*rewards_eval), label="eval")
plt.legend()
plt.xlabel("iter")
plt.ylabel("reward")
plt.tight_layout()
编码 DDPG

结论

在本教程中,我们学习了如何在 TorchRL 中编写一个 loss 模块。 DDPG 的具体例子。

关键要点是:

  • 如何使用类编写新的 损失成分;

  • 如何使用(或不使用)目标网络,以及如何更新其参数;

  • 如何创建与 loss 模块关联的优化器。

后续步骤

为了进一步迭代这个 loss 模块,我们可以考虑:

脚本总运行时间:(2 分 0.389 秒)

估计内存使用量:322 MB

由 Sphinx-Gallery 生成的图库

文档

访问 PyTorch 的全面开发人员文档

查看文档

教程

获取面向初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并解答您的问题

查看资源