目录

TorchRL 目标:编码 DDPG 损失

作者Vincent Moens

概述

TorchRL 将 RL sota-implementations 的训练分为各个部分,这些部分将是 在您的训练脚本中组装:环境、数据收集和 storage、模型,最后是损失函数。

TorchRL 损失(或“目标”)是有状态对象,其中包含 可训练参数(策略和价值模型)。 本教程将指导您完成从头开始编写亏损代码的步骤 使用 TorchRL。

为此,我们将专注于 DDPG,这是一个相对简单的 算法进行编码。深度确定性策略梯度 (DDPG) 是一种简单的连续控制算法。它包括学习 参数值函数,以及 然后学习一个策略,该策略输出可最大化此价值的操作 函数给定一定的观察值。

您将学到什么:

  • 如何编写 loss 模块并自定义其 Value Estimator;

  • 如何在 TorchRL 中构建环境,包括转换 (例如,数据规范化)和并行执行;

  • 如何设计政策和价值网络;

  • 如何有效地从环境中收集数据并存储数据 在 replay 缓冲区中;

  • 如何在重放缓冲区中存储轨迹(而不是过渡));

  • 如何评估您的模型。

先决条件

本教程假定您已完成 PPO 教程,该教程提供 TorchRL 组件和依赖项概述,例如 和 , 虽然它应该是 足够透明,无需深入了解即可理解 这些类。tensordict.TensorDicttensordict.nn.TensorDictModules

注意

我们的目的不是给出算法的 SOTA 实现,而是 提供 TorchRL 的 loss 实现的高级说明 以及要在 这个算法。

导入和设置

%%bash
pip3 install torchrl mujoco glfw
import torch
import tqdm

如果可用,我们将在 CUDA 上执行策略

is_fork = multiprocessing.get_start_method() == "fork"
device = (
    torch.device(0)
    if torch.cuda.is_available() and not is_fork
    else torch.device("cpu")
)
collector_device = torch.device("cpu")  # Change the device to ``cuda`` to use CUDA

TorchRL LossModule

TorchRL 提供了一系列可在训练脚本中使用的损失。 目标是获得易于重用/可交换的损失,并且具有 一个简单的签名。

TorchRL 损失的主要特点是:

  • 它们是有状态对象:它们包含可训练参数的副本 这样就可以提供训练 算法。loss_module.parameters()

  • 它们遵循约定:该方法将接收一个 TensorDict 作为 Input,其中包含所有必要的 信息返回损失值。TensorDict

    >>> data = replay_buffer.sample()
    >>> loss_dict = loss_module(data)
    
  • 它们输出一个实例,其中包含 loss 值 写入其中是一个描述 损失。中的其他键可能是记录期间的有用指标 训练时间。tensordict.TensorDict"loss_<smth>"smthTensorDict

    注意

    我们之所以返回独立亏损,是为了让用户使用不同的 optimizer 来获取不同的参数集。汇总损失 可以简单地通过

    >>> loss_val = sum(loss for key, loss in loss_dict.items() if key.startswith("loss_"))
    

方法__init__

所有损失的父类是 。 与该库的许多其他组件一样,其方法期望 作为输入,从体验中采样的实例 replay buffer 或任何类似的数据结构。使用这种格式可以 可以跨 模态,或者在模型需要读取多个 条目。换句话说,它允许我们编写一个 loss 模块,该 忽略了被赋予 is 的数据类型,并且它专注于 运行 loss 函数的基本步骤,并且只运行这些步骤。tensordict.TensorDict

为了使本教程尽可能具有教学性,我们将展示每种方法 的 API 中,我们将在稍后的 阶段。

让我们从方法开始。DDPG 旨在通过简单的策略解决控制任务: 训练策略以输出操作,这些操作使预测的值最大化 价值网络。因此,我们的 loss 模块需要在其 constructor:一个 Actor 和一个 Value Networks。我们预计这两者都是 与 TensorDict 兼容的对象,例如 . 我们的损失函数需要计算一个目标值并拟合该值 network 添加到 this 中,并生成一个操作并适合策略,使其 value estimate 最大化。__init__()tensordict.nn.TensorDictModule

该方法的关键步骤是调用 .此方法将提取 模块中的参数并将其转换为功能模块。 严格来说,这不是必需的,一个人可以完美地编码所有 没有它的损失。但是,我们鼓励将其用于以下用途 原因。LossModule.__init__()convert_to_functional()

TorchRL 这样做的原因是 RL sota-implementations 通常执行相同的操作 具有不同参数集的模型,称为 “trainable” 和 “target” 参数。 “可训练”参数是优化器需要拟合的参数。这 “target” 参数通常是前者的副本,但有一些时间滞后 (绝对或通过移动平均线稀释)。 这些目标参数用于计算与 下一次观察。使用一组目标参数的优点 对于不完全匹配的值模型,当前配置为 它们为正在计算的值函数提供了一个悲观的边界。 注意下面的 keyword 参数:this argument 告诉该方法在 loss 模块中创建一组要使用的目标参数 用于目标值计算。如果将其设置为 (请参阅 actor 网络 例如),该属性仍将为 accessible 的 URL,但这只会返回 actor 参数。create_target_paramsFalsetarget_actor_network_params

稍后,我们将看到如何在 TorchRL 中更新目标参数。

from tensordict.nn import TensorDictModule


def _init(
    self,
    actor_network: TensorDictModule,
    value_network: TensorDictModule,
) -> None:
    super(type(self), self).__init__()

    self.convert_to_functional(
        actor_network,
        "actor_network",
        create_target_params=True,
    )
    self.convert_to_functional(
        value_network,
        "value_network",
        create_target_params=True,
        compare_against=list(actor_network.parameters()),
    )

    self.actor_in_keys = actor_network.in_keys

    # Since the value we'll be using is based on the actor and value network,
    # we put them together in a single actor-critic container.
    actor_critic = ActorCriticWrapper(actor_network, value_network)
    self.actor_critic = actor_critic
    self.loss_function = "l2"

value estimator loss 方法

在许多 RL 算法中,价值网络(或 Q 值网络)是基于训练的 根据经验价值估计。这可以是自举的 (TD(0),低 方差、高偏差)、含义 目标值是使用 next reward 获得的,而不是其他任何奖励,或者 可以获得蒙特卡洛估计值 (TD(1)),在这种情况下,整个 将使用即将到来的奖励的顺序(高变异性、低偏差)。一 中间估计器 (TD()) 也可用于妥协 偏差和方差。 TorchRL 可以通过 Enum 类轻松使用一个或另一个估计器,该类包含 指向所有已实现的值估计器的指针。我们来定义默认的 value 函数。我们将采用最简单的版本 (TD(0)),稍后显示 关于如何改变这一点。

from torchrl.objectives.utils import ValueEstimators

default_value_estimator = ValueEstimators.TD0

我们还需要向 DDPG 提供一些有关如何构建价值的说明 estimator 的 intent 值,具体取决于用户查询。根据提供的估算器, 我们将构建相应的模块,以便在 Train 时使用:

from torchrl.objectives.utils import default_value_kwargs
from torchrl.objectives.value import TD0Estimator, TD1Estimator, TDLambdaEstimator


def make_value_estimator(self, value_type: ValueEstimators, **hyperparams):
    hp = dict(default_value_kwargs(value_type))
    if hasattr(self, "gamma"):
        hp["gamma"] = self.gamma
    hp.update(hyperparams)
    value_key = "state_action_value"
    if value_type == ValueEstimators.TD1:
        self._value_estimator = TD1Estimator(value_network=self.actor_critic, **hp)
    elif value_type == ValueEstimators.TD0:
        self._value_estimator = TD0Estimator(value_network=self.actor_critic, **hp)
    elif value_type == ValueEstimators.GAE:
        raise NotImplementedError(
            f"Value type {value_type} it not implemented for loss {type(self)}."
        )
    elif value_type == ValueEstimators.TDLambda:
        self._value_estimator = TDLambdaEstimator(value_network=self.actor_critic, **hp)
    else:
        raise NotImplementedError(f"Unknown value type {value_type}")
    self._value_estimator.set_keys(value=value_key)

方法可以但不需要调用:ifgg not 时,将使用 它的默认 estimator 来估计。make_value_estimator

actor 损失方法

RL 算法的核心部分是 actor 的训练损失。 在 DDPG 的情况下,这个函数非常简单:我们只需要计算 与使用 Policy and optimize 计算的操作关联的值 Actor 权重以最大化此值。

在计算这个值时,我们必须确保去掉 value 参数 ,否则 actor 和 value loss 会混淆。 为此,函数 可以使用。

def _loss_actor(
    self,
    tensordict,
) -> torch.Tensor:
    td_copy = tensordict.select(*self.actor_in_keys)
    # Get an action from the actor network: since we made it functional, we need to pass the params
    with self.actor_network_params.to_module(self.actor_network):
        td_copy = self.actor_network(td_copy)
    # get the value associated with that action
    with self.value_network_params.detach().to_module(self.value_network):
        td_copy = self.value_network(td_copy)
    return -td_copy.get("state_action_value")

value loss 方法

我们现在需要优化我们的价值网络参数。 为此,我们将依赖我们类的值估计器:

from torchrl.objectives.utils import distance_loss


def _loss_value(
    self,
    tensordict,
):
    td_copy = tensordict.clone()

    # V(s, a)
    with self.value_network_params.to_module(self.value_network):
        self.value_network(td_copy)
    pred_val = td_copy.get("state_action_value").squeeze(-1)

    # we manually reconstruct the parameters of the actor-critic, where the first
    # set of parameters belongs to the actor and the second to the value function.
    target_params = TensorDict(
        {
            "module": {
                "0": self.target_actor_network_params,
                "1": self.target_value_network_params,
            }
        },
        batch_size=self.target_actor_network_params.batch_size,
        device=self.target_actor_network_params.device,
    )
    with target_params.to_module(self.actor_critic):
        target_value = self.value_estimator.value_estimate(tensordict).squeeze(-1)

    # Computes the value loss: L2, L1 or smooth L1 depending on `self.loss_function`
    loss_value = distance_loss(pred_val, target_value, loss_function=self.loss_function)
    td_error = (pred_val - target_value).pow(2)

    return loss_value, td_error, pred_val, target_value

在前瞻呼叫中将所有内容放在一起

唯一缺少的部分是 forward 方法,它将 value 和 actor loss 中,收集 cost 值并将其写入 delivered to the user.TensorDict

from tensordict import TensorDict, TensorDictBase


def _forward(self, input_tensordict: TensorDictBase) -> TensorDict:
    loss_value, td_error, pred_val, target_value = self.loss_value(
        input_tensordict,
    )
    td_error = td_error.detach()
    td_error = td_error.unsqueeze(input_tensordict.ndimension())
    if input_tensordict.device is not None:
        td_error = td_error.to(input_tensordict.device)
    input_tensordict.set(
        "td_error",
        td_error,
        inplace=True,
    )
    loss_actor = self.loss_actor(input_tensordict)
    return TensorDict(
        source={
            "loss_actor": loss_actor.mean(),
            "loss_value": loss_value.mean(),
            "pred_value": pred_val.mean().detach(),
            "target_value": target_value.mean().detach(),
            "pred_value_max": pred_val.max().detach(),
            "target_value_max": target_value.max().detach(),
        },
        batch_size=[],
    )


from torchrl.objectives import LossModule


class DDPGLoss(LossModule):
    default_value_estimator = default_value_estimator
    make_value_estimator = make_value_estimator

    __init__ = _init
    forward = _forward
    loss_value = _loss_value
    loss_actor = _loss_actor

现在我们有了损失,我们可以使用它来训练策略来解决 控制任务。

环境

在大多数 sota-implementations 中,首先需要注意的是 环境的构造,因为它限制了 training 脚本。

在此示例中,我们将使用 task.目标是使 半只猎豹尽可能快地跑。"cheetah"

在 TorchRL 中,可以通过依赖 或 来创建这样的任务:dm_controlgym

env = GymEnv("HalfCheetah-v4")

env = DMControlEnv("cheetah", "run")

默认情况下,这些环境将禁用渲染。来自各州的培训是 通常比从图像训练更容易。为了简单起见,我们专注于 仅向各州学习。要将像素传递给 被 收集,只需将参数传递给构造函数即可:tensordictsenv.step()from_pixels=True

env = GymEnv("HalfCheetah-v4", from_pixels=True, pixels_only=True)

我们编写一个 helper 函数来创建一个环境 上面考虑了两个后端中的任何一个 ( 或 )。make_env()dm-controlgym

from torchrl.envs.libs.dm_control import DMControlEnv
from torchrl.envs.libs.gym import GymEnv

env_library = None
env_name = None


def make_env(from_pixels=False):
    """Create a base ``env``."""
    global env_library
    global env_name

    if backend == "dm_control":
        env_name = "cheetah"
        env_task = "run"
        env_args = (env_name, env_task)
        env_library = DMControlEnv
    elif backend == "gym":
        env_name = "HalfCheetah-v4"
        env_args = (env_name,)
        env_library = GymEnv
    else:
        raise NotImplementedError

    env_kwargs = {
        "device": device,
        "from_pixels": from_pixels,
        "pixels_only": from_pixels,
        "frame_skip": 2,
    }
    env = env_library(*env_args, **env_kwargs)
    return env

变换

现在我们有一个基本环境,我们可能想要修改它的表示 使其对策略更友好。在 TorchRL 中,转换被附加到 base 环境。torchr.envs.TransformedEnv

  • 在 DDPG 中,使用一些启发式值重新缩放奖励是很常见的。我们 将奖励乘以 5。

  • 如果我们使用 ,那么构建一个接口也很重要 在处理双精度数字的模拟器和我们的 script 的 intent 调用的 intent 的 1 个实例。这种转变是 两种方式:调用 时,我们的 action 都需要是 以双精度表示,并且输出需要转换 设置为单精度。 转换正是这样做的:列表引用需要转换的键 double 设置为 float,而 the 则是指那些需要 在传递到环境之前转换为 double。dm_controlenv.step()DoubleToFloatin_keysin_keys_inv

  • 我们使用 transform 将 state key 连接在一起。CatTensors

  • 最后,我们还保留了使状态正常化的可能性:我们将 稍后负责计算归一化常量。

from torchrl.envs import (
    CatTensors,
    DoubleToFloat,
    EnvCreator,
    InitTracker,
    ObservationNorm,
    ParallelEnv,
    RewardScaling,
    StepCounter,
    TransformedEnv,
)


def make_transformed_env(
    env,
):
    """Apply transforms to the ``env`` (such as reward scaling and state normalization)."""

    env = TransformedEnv(env)

    # we append transforms one by one, although we might as well create the
    # transformed environment using the `env = TransformedEnv(base_env, transforms)`
    # syntax.
    env.append_transform(RewardScaling(loc=0.0, scale=reward_scaling))

    # We concatenate all states into a single "observation_vector"
    # even if there is a single tensor, it'll be renamed in "observation_vector".
    # This facilitates the downstream operations as we know the name of the
    # output tensor.
    # In some environments (not half-cheetah), there may be more than one
    # observation vector: in this case this code snippet will concatenate them
    # all.
    selected_keys = list(env.observation_spec.keys())
    out_key = "observation_vector"
    env.append_transform(CatTensors(in_keys=selected_keys, out_key=out_key))

    # we normalize the states, but for now let's just instantiate a stateless
    # version of the transform
    env.append_transform(ObservationNorm(in_keys=[out_key], standard_normal=True))

    env.append_transform(DoubleToFloat())

    env.append_transform(StepCounter(max_frames_per_traj))

    # We need a marker for the start of trajectories for our Ornstein-Uhlenbeck (OU)
    # exploration:
    env.append_transform(InitTracker())

    return env

并行执行

以下 helper 函数允许我们并行运行环境。 并行运行环境可以显著加快收集速度 吞吐量。当使用转换后的环境时,我们需要选择是否 想要为每个环境单独执行转换,或者 集中数据并批量转换。两种方法都很容易 法典:

env = ParallelEnv(
    lambda: TransformedEnv(GymEnv("HalfCheetah-v4"), transforms),
    num_workers=4
)
env = TransformedEnv(
    ParallelEnv(lambda: GymEnv("HalfCheetah-v4"), num_workers=4),
    transforms
)

为了利用 PyTorch 的矢量化功能,我们采用了 第一种方法:

def parallel_env_constructor(
    env_per_collector,
    transform_state_dict,
):
    if env_per_collector == 1:

        def make_t_env():
            env = make_transformed_env(make_env())
            env.transform[2].init_stats(3)
            env.transform[2].loc.copy_(transform_state_dict["loc"])
            env.transform[2].scale.copy_(transform_state_dict["scale"])
            return env

        env_creator = EnvCreator(make_t_env)
        return env_creator

    parallel_env = ParallelEnv(
        num_workers=env_per_collector,
        create_env_fn=EnvCreator(lambda: make_env()),
        create_env_kwargs=None,
        pin_memory=False,
    )
    env = make_transformed_env(parallel_env)
    # we call `init_stats` for a limited number of steps, just to instantiate
    # the lazy buffers.
    env.transform[2].init_stats(3, cat_dim=1, reduce_dim=[0, 1])
    env.transform[2].load_state_dict(transform_state_dict)
    return env


# The backend can be ``gym`` or ``dm_control``
backend = "gym"

注意

frame_skip将多个步骤与单个操作一起批处理 如果> 1,则其他帧计数(例如,frames_per_batch、total_frames) 需要进行调整,以便收集的帧总数一致 跨实验。这很重要,因为可以提高跳帧,但保持 未更改的帧总数可能看起来像作弊:所有事物都比较, 一个包含 10M 个元素的数据集,其中跳帧为 2,另一个数据集的跳帧率为 2 跳帧 1 实际上具有与环境交互的比率 2:1!简而言之,应该谨慎对待 training 脚本,因为这可能会导致 训练策略之间的偏倚比较。

缩放奖励有助于我们控制信号幅度,以获得更多 高效学习。

reward_scaling = 5.0

我们还定义了何时截断轨迹。一千步(如果为 500 frame-skip = 2) 是用于 Cheetah 任务的好数字:

max_frames_per_traj = 500

观测值的标准化

为了计算归一化统计数据,我们运行任意数量的随机 步骤,并计算 收集的观察结果。该方法 用于此目的。为了获得汇总统计数据,我们创建了一个虚拟 环境并运行给定数量的步骤,在给定的 Number of steps 并计算其摘要统计信息。ObservationNorm.init_stats()

def get_env_stats():
    """Gets the stats of an environment."""
    proof_env = make_transformed_env(make_env())
    t = proof_env.transform[2]
    t.init_stats(init_env_steps)
    transform_state_dict = t.state_dict()
    proof_env.close()
    return transform_state_dict

标准化统计

用于统计计算的随机步数ObservationNorm

init_env_steps = 5000

transform_state_dict = get_env_stats()

每个数据收集器中的环境数

env_per_collector = 4

我们传递之前计算的统计数据,以规范化 环境:

parallel_env = parallel_env_constructor(
    env_per_collector=env_per_collector,
    transform_state_dict=transform_state_dict,
)


from torchrl.data import CompositeSpec

构建模型

现在我们来看看模型的设置。正如我们所看到的,DDPG 需要一个 value 网络,经过训练以估计状态-操作对的值,以及 参数化角色,学习如何选择最大化此值的动作。

回想一下,构建 TorchRL 模块需要两个步骤:

  • 编写将用作网络的

  • 将网络包装在 where 通过指定 input 和 output 键来处理数据流。tensordict.nn.TensorDictModule

在更复杂的场景中,可以 也被使用。tensordict.nn.TensorDictSequential

Q 值网络包装在 a 中,该网络会自动将 q 值的 to 设置为 网络和其他价值网络。ValueOperatorout_keys"state_action_valuestate_value

TorchRL 提供了 DDPG 网络的内置版本,如 原始论文。这些可以在 和 找到。

由于我们使用了惰性模块,因此有必要将惰性模块具体化 在能够将策略从一个设备移动到另一个设备并实现其他 操作。因此,最好使用小型 数据样本。为此,我们从 环境规范。

from torchrl.modules import (
    ActorCriticWrapper,
    DdpgMlpActor,
    DdpgMlpQNet,
    OrnsteinUhlenbeckProcessWrapper,
    ProbabilisticActor,
    TanhDelta,
    ValueOperator,
)


def make_ddpg_actor(
    transform_state_dict,
    device="cpu",
):
    proof_environment = make_transformed_env(make_env())
    proof_environment.transform[2].init_stats(3)
    proof_environment.transform[2].load_state_dict(transform_state_dict)

    out_features = proof_environment.action_spec.shape[-1]

    actor_net = DdpgMlpActor(
        action_dim=out_features,
    )

    in_keys = ["observation_vector"]
    out_keys = ["param"]

    actor = TensorDictModule(
        actor_net,
        in_keys=in_keys,
        out_keys=out_keys,
    )

    actor = ProbabilisticActor(
        actor,
        distribution_class=TanhDelta,
        in_keys=["param"],
        spec=CompositeSpec(action=proof_environment.action_spec),
    ).to(device)

    q_net = DdpgMlpQNet()

    in_keys = in_keys + ["action"]
    qnet = ValueOperator(
        in_keys=in_keys,
        module=q_net,
    ).to(device)

    # initialize lazy modules
    qnet(actor(proof_environment.reset().to(device)))
    return actor, qnet


actor, qnet = make_ddpg_actor(
    transform_state_dict=transform_state_dict,
    device=device,
)

勘探

正如原始论文中所建议的那样,该策略包装在一个 exploration 模块中。 让我们定义 OU 噪声达到其最小值之前的帧数OrnsteinUhlenbeckProcessWrapper

annealing_frames = 1_000_000

actor_model_explore = OrnsteinUhlenbeckProcessWrapper(
    actor,
    annealing_num_steps=annealing_frames,
).to(device)
if device == torch.device("cpu"):
    actor_model_explore.share_memory()

数据收集器

TorchRL 提供了专门的类来帮助您通过执行 环境中的策略。这些 “数据收集器” 迭代计算 要在给定时间执行的操作,然后执行 环境并在需要时重置它。 数据收集器旨在帮助开发人员进行严格控制 关于每批数据的帧数,关于这个的 (a)sync 性质 集合和分配给数据收集的资源(例如 GPU、工作线程数量等)。

这里我们将使用 ,一个简单的单进程 数据收集器。TorchRL 提供了其他收集器,例如 ,它执行了 以异步方式推出(例如,在 策略正在优化,从而将训练和 数据收集)。

要指定的参数包括:

  • 环境工厂或环境,

  • 策略、

  • 收集器被视为空之前的帧总数,

  • 每个轨迹的最大帧数(对于非终止 环境,例如 Environments)。dm_control

    注意

    传递给 collector 将产生 注册新转换 替换为用于推理的环境。我们可以达到同样的效果 manually,就像我们在这个脚本中所做的那样。max_frames_per_trajStepCounter

还应传递:

  • 收集的每个批次中的帧数,

  • 独立于策略执行的随机步骤数,

  • 用于策略执行的设备

  • 在将数据传递到 main 之前用于存储数据的设备 过程。

我们在训练期间将使用的总帧数应在 1M 左右。

total_frames = 10_000  # 1_000_000

收集器在外部的每次迭代中返回的帧数 loop 等于每个子轨迹的长度乘以 环境在每个收集器中并行运行。

换句话说,我们希望来自 collector 的 batchs 具有一个形状,其中:[env_per_collector, traj_len]traj_len=frames_per_batch/env_per_collector

traj_len = 200
frames_per_batch = env_per_collector * traj_len
init_random_frames = 5000
num_collectors = 2

from torchrl.collectors import SyncDataCollector
from torchrl.envs import ExplorationType

collector = SyncDataCollector(
    parallel_env,
    policy=actor_model_explore,
    total_frames=total_frames,
    frames_per_batch=frames_per_batch,
    init_random_frames=init_random_frames,
    reset_at_each_iter=False,
    split_trajs=False,
    device=collector_device,
    exploration_type=ExplorationType.RANDOM,
)

Evaluator:构建记录器对象

由于训练数据是使用某种探索策略获得的,因此 true 我们的算法的性能需要在确定性模式下进行评估。我们 使用专用类 来执行此操作,该类在 环境,并返回一些获得的统计信息 从这些模拟中。Recorder

以下帮助程序函数生成此对象:

from torchrl.trainers import Recorder


def make_recorder(actor_model_explore, transform_state_dict, record_interval):
    base_env = make_env()
    environment = make_transformed_env(base_env)
    environment.transform[2].init_stats(
        3
    )  # must be instantiated to load the state dict
    environment.transform[2].load_state_dict(transform_state_dict)

    recorder_obj = Recorder(
        record_frames=1000,
        policy_exploration=actor_model_explore,
        environment=environment,
        exploration_type=ExplorationType.MEAN,
        record_interval=record_interval,
    )
    return recorder_obj

我们将记录每 10 个收集批次的性能

record_interval = 10

recorder = make_recorder(
    actor_model_explore, transform_state_dict, record_interval=record_interval
)

from torchrl.data.replay_buffers import (
    LazyMemmapStorage,
    PrioritizedSampler,
    RandomSampler,
    TensorDictReplayBuffer,
)

重放缓冲区

重放缓冲区有两种风格:prioritized(其中一些错误信号 用于为某些项目提供比其他项目更高的抽样可能性) 以及定期的循环体验重播。

TorchRL 重放缓冲区是可组合的:可以获取存储、采样 和写作策略。也可以 使用内存映射数组将张量存储在物理内存上。以下内容 函数负责创建具有所需 超参数:

from torchrl.envs import RandomCropTensorDict


def make_replay_buffer(buffer_size, batch_size, random_crop_len, prefetch=3, prb=False):
    if prb:
        sampler = PrioritizedSampler(
            max_capacity=buffer_size,
            alpha=0.7,
            beta=0.5,
        )
    else:
        sampler = RandomSampler()
    replay_buffer = TensorDictReplayBuffer(
        storage=LazyMemmapStorage(
            buffer_size,
            scratch_dir=buffer_scratch_dir,
        ),
        batch_size=batch_size,
        sampler=sampler,
        pin_memory=False,
        prefetch=prefetch,
        transform=RandomCropTensorDict(random_crop_len, sample_dim=1),
    )
    return replay_buffer

我们将重放缓冲区存储在磁盘上的临时目录中

import tempfile

tmpdir = tempfile.TemporaryDirectory()
buffer_scratch_dir = tmpdir.name

重放缓冲区存储和批量大小

TorchRL 重放缓冲区计算沿第一维的元素数量。 由于我们将向缓冲区提供轨迹,因此我们需要调整缓冲区 size 的值除以我们的 数据收集器。 关于批次大小,我们的抽样策略将包括抽样 选择子轨迹之前的长度轨迹 或计算损失的时长。 这种策略平衡了存储某个 长度,需要提供具有足够异质性的样品 让我们蒙受损失。下图显示了来自收集器的数据流 每个批次获得 8 帧,其中 2 个环境并行运行, 将它们馈送到包含 1000 个轨迹的重放缓冲区,并且 采样每个子轨迹 2 个时间步长。traj_len=200random_crop_len=25

将轨迹存储在重放缓冲区中

让我们从缓冲区中存储的帧数开始

def ceil_div(x, y):
    return -x // (-y)


buffer_size = 1_000_000
buffer_size = ceil_div(buffer_size, traj_len)

默认情况下,优先重放缓冲区处于禁用状态

prb = False

我们还需要定义每批数据将执行多少次更新 收集。这称为更新与数据或比率:UTD

update_to_data = 64

我们将用长度为 25 的轨迹来喂养损失:

random_crop_len = 25

在原始论文中,作者对一批 64 个进行了一次更新 元素。在这里,我们再现相同的比率 但是,同时在每个批次集合中实现多次更新。我们 调整我们的 batch-size 以实现相同的每帧更新数比率:

batch_size = ceil_div(64 * frames_per_batch, update_to_data * random_crop_len)

replay_buffer = make_replay_buffer(
    buffer_size=buffer_size,
    batch_size=batch_size,
    random_crop_len=random_crop_len,
    prefetch=3,
    prb=prb,
)

损耗模块结构

我们使用 actor 构建了 loss 模块,并且刚刚创建了。 因为我们有目标参数要更新,所以我们_必须_创建一个目标网络 更新。qnet

gamma = 0.99
lmbda = 0.9
tau = 0.001  # Decay factor for the target network

loss_module = DDPGLoss(actor, qnet)

让我们使用 TD(lambda) 估算器!

loss_module.make_value_estimator(ValueEstimators.TDLambda, gamma=gamma, lmbda=lmbda)

注意

Off-policy 通常指示 TD(0) 估计器。在这里,我们使用 TD() estimator 的 Timator 进行 Tim 操作,这将引入一些偏差作为随后的轨迹 某个状态是使用过时的策略收集的。 这个技巧,作为数据收集过程中可以使用的多步骤技巧, 是我们通常发现效果很好的 “hacks” 的替代版本 尽管他们在回报中引入了一些偏见,但还是进行了实践 估计。

Target Network Updater (目标网络更新程序)

目标网络是不符合策略的 RL sota-implementation 的关键部分。 多亏了 and 类,更新目标网络参数变得容易。它们是以 loss 模块作为参数构建的,更新是 通过在 training 循环。

from torchrl.objectives.utils import SoftUpdate

target_net_updater = SoftUpdate(loss_module, eps=1 - tau)

优化

最后,我们将对策略和价值网络使用 Adam 优化器:

from torch import optim

optimizer_actor = optim.Adam(
    loss_module.actor_network_params.values(True, True), lr=1e-4, weight_decay=0.0
)
optimizer_value = optim.Adam(
    loss_module.value_network_params.values(True, True), lr=1e-3, weight_decay=1e-2
)
total_collection_steps = total_frames // frames_per_batch

是时候训练策略了

训练循环非常简单,因为我们已经构建了所有 模块。

rewards = []
rewards_eval = []

# Main loop

collected_frames = 0
pbar = tqdm.tqdm(total=total_frames)
r0 = None
for i, tensordict in enumerate(collector):

    # update weights of the inference policy
    collector.update_policy_weights_()

    if r0 is None:
        r0 = tensordict["next", "reward"].mean().item()
    pbar.update(tensordict.numel())

    # extend the replay buffer with the new data
    current_frames = tensordict.numel()
    collected_frames += current_frames
    replay_buffer.extend(tensordict.cpu())

    # optimization steps
    if collected_frames >= init_random_frames:
        for _ in range(update_to_data):
            # sample from replay buffer
            sampled_tensordict = replay_buffer.sample().to(device)

            # Compute loss
            loss_dict = loss_module(sampled_tensordict)

            # optimize
            loss_dict["loss_actor"].backward()
            gn1 = torch.nn.utils.clip_grad_norm_(
                loss_module.actor_network_params.values(True, True), 10.0
            )
            optimizer_actor.step()
            optimizer_actor.zero_grad()

            loss_dict["loss_value"].backward()
            gn2 = torch.nn.utils.clip_grad_norm_(
                loss_module.value_network_params.values(True, True), 10.0
            )
            optimizer_value.step()
            optimizer_value.zero_grad()

            gn = (gn1**2 + gn2**2) ** 0.5

            # update priority
            if prb:
                replay_buffer.update_tensordict_priority(sampled_tensordict)
            # update target network
            target_net_updater.step()

    rewards.append(
        (
            i,
            tensordict["next", "reward"].mean().item(),
        )
    )
    td_record = recorder(None)
    if td_record is not None:
        rewards_eval.append((i, td_record["r_evaluation"].item()))
    if len(rewards_eval) and collected_frames >= init_random_frames:
        target_value = loss_dict["target_value"].item()
        loss_value = loss_dict["loss_value"].item()
        loss_actor = loss_dict["loss_actor"].item()
        rn = sampled_tensordict["next", "reward"].mean().item()
        rs = sampled_tensordict["next", "reward"].std().item()
        pbar.set_description(
            f"reward: {rewards[-1][1]: 4.2f} (r0 = {r0: 4.2f}), "
            f"reward eval: reward: {rewards_eval[-1][1]: 4.2f}, "
            f"reward normalized={rn :4.2f}/{rs :4.2f}, "
            f"grad norm={gn: 4.2f}, "
            f"loss_value={loss_value: 4.2f}, "
            f"loss_actor={loss_actor: 4.2f}, "
            f"target value: {target_value: 4.2f}"
        )

    # update the exploration strategy
    actor_model_explore.step(current_frames)

collector.shutdown()
del collector
  0%|          | 0/10000 [00:00<?, ?it/s]
  8%|▊         | 800/10000 [00:00<00:02, 3341.44it/s]
 16%|█▌        | 1600/10000 [00:01<00:08, 972.21it/s]
 24%|██▍       | 2400/10000 [00:01<00:05, 1450.21it/s]
 32%|███▏      | 3200/10000 [00:01<00:03, 1892.96it/s]
 40%|████      | 4000/10000 [00:02<00:02, 2269.56it/s]
 48%|████▊     | 4800/10000 [00:02<00:02, 2582.90it/s]
 56%|█████▌    | 5600/10000 [00:02<00:01, 2831.95it/s]
reward: -2.79 (r0 = -1.96), reward eval: reward: -0.00, reward normalized=-2.48/5.85, grad norm= 55.29, loss_value= 313.46, loss_actor= 16.18, target value: -17.03:  56%|█████▌    | 5600/10000 [00:04<00:01, 2831.95it/s]
reward: -2.79 (r0 = -1.96), reward eval: reward: -0.00, reward normalized=-2.48/5.85, grad norm= 55.29, loss_value= 313.46, loss_actor= 16.18, target value: -17.03:  64%|██████▍   | 6400/10000 [00:04<00:04, 853.52it/s]
reward: -1.87 (r0 = -1.96), reward eval: reward: -0.00, reward normalized=-2.68/5.85, grad norm= 202.05, loss_value= 304.94, loss_actor= 13.16, target value: -17.24:  64%|██████▍   | 6400/10000 [00:06<00:04, 853.52it/s]
reward: -1.87 (r0 = -1.96), reward eval: reward: -0.00, reward normalized=-2.68/5.85, grad norm= 202.05, loss_value= 304.94, loss_actor= 13.16, target value: -17.24:  72%|███████▏  | 7200/10000 [00:07<00:05, 535.98it/s]
reward: -5.27 (r0 = -1.96), reward eval: reward: -0.00, reward normalized=-2.96/5.83, grad norm= 215.11, loss_value= 276.88, loss_actor= 17.72, target value: -20.46:  72%|███████▏  | 7200/10000 [00:09<00:05, 535.98it/s]
reward: -5.27 (r0 = -1.96), reward eval: reward: -0.00, reward normalized=-2.96/5.83, grad norm= 215.11, loss_value= 276.88, loss_actor= 17.72, target value: -20.46:  80%|████████  | 8000/10000 [00:10<00:04, 429.53it/s]
reward: -5.04 (r0 = -1.96), reward eval: reward: -0.00, reward normalized=-2.68/5.49, grad norm= 94.22, loss_value= 189.84, loss_actor= 15.74, target value: -17.99:  80%|████████  | 8000/10000 [00:11<00:04, 429.53it/s]
reward: -5.04 (r0 = -1.96), reward eval: reward: -0.00, reward normalized=-2.68/5.49, grad norm= 94.22, loss_value= 189.84, loss_actor= 15.74, target value: -17.99:  88%|████████▊ | 8800/10000 [00:12<00:03, 388.35it/s]
reward: -1.05 (r0 = -1.96), reward eval: reward:  2.85, reward normalized=-2.68/4.91, grad norm= 88.90, loss_value= 153.33, loss_actor= 14.20, target value: -19.37:  88%|████████▊ | 8800/10000 [00:15<00:03, 388.35it/s]
reward: -1.05 (r0 = -1.96), reward eval: reward:  2.85, reward normalized=-2.68/4.91, grad norm= 88.90, loss_value= 153.33, loss_actor= 14.20, target value: -19.37:  96%|█████████▌| 9600/10000 [00:16<00:01, 313.88it/s]
reward:  0.95 (r0 = -1.96), reward eval: reward:  2.85, reward normalized=-2.82/5.52, grad norm= 122.12, loss_value= 198.87, loss_actor= 14.87, target value: -19.42:  96%|█████████▌| 9600/10000 [00:18<00:01, 313.88it/s]
reward:  0.95 (r0 = -1.96), reward eval: reward:  2.85, reward normalized=-2.82/5.52, grad norm= 122.12, loss_value= 198.87, loss_actor= 14.87, target value: -19.42: : 10400it [00:19, 296.66it/s]
reward: -5.00 (r0 = -1.96), reward eval: reward:  2.85, reward normalized=-2.38/4.70, grad norm= 135.29, loss_value= 136.95, loss_actor= 19.89, target value: -17.72: : 10400it [00:21, 296.66it/s]

实验结果

我们制作了训练期间平均奖励的简单图。我们可以观察 我们的政策在解决任务方面学得相当好。

注意

如上所述,为了获得更合理的性能, 使用更大的值,例如 1M。total_frames

from matplotlib import pyplot as plt

plt.figure()
plt.plot(*zip(*rewards), label="training")
plt.plot(*zip(*rewards_eval), label="eval")
plt.legend()
plt.xlabel("iter")
plt.ylabel("reward")
plt.tight_layout()
编码 DDPG

结论

在本教程中,我们学习了如何在 TorchRL 中编写一个 loss 模块。 DDPG 的具体例子。

关键要点是:

  • 如何使用类编写新的 损失成分;

  • 如何使用(或不使用)目标网络,以及如何更新其参数;

  • 如何创建与 loss 模块关联的优化器。

后续步骤

为了进一步迭代这个 loss 模块,我们可以考虑:

脚本总运行时间:(1 分 50.926 秒)

估计内存使用量:18 MB

由 Sphinx-Gallery 生成的图库

文档

访问 PyTorch 的全面开发人员文档

查看文档

教程

获取面向初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并解答您的问题

查看资源