注意
转到末尾下载完整的示例代码。
TorchRL 目标:编码 DDPG 损失¶
作者: Vincent Moens
概述¶
TorchRL 将 RL sota-implementations 的训练分为各个部分,这些部分将是 在您的训练脚本中组装:环境、数据收集和 storage、模型,最后是损失函数。
TorchRL 损失(或“目标”)是有状态对象,其中包含 可训练参数(策略和价值模型)。 本教程将指导您完成从头开始编写亏损代码的步骤 使用 TorchRL。
为此,我们将专注于 DDPG,这是一个相对简单的 算法进行编码。深度确定性策略梯度 (DDPG) 是一种简单的连续控制算法。它包括学习 参数值函数,以及 然后学习一个策略,该策略输出可最大化此价值的操作 函数给定一定的观察值。
您将学到什么:
如何编写 loss 模块并自定义其 Value Estimator;
如何在 TorchRL 中构建环境,包括转换 (例如,数据规范化)和并行执行;
如何设计政策和价值网络;
如何有效地从环境中收集数据并存储数据 在 replay 缓冲区中;
如何在重放缓冲区中存储轨迹(而不是过渡));
如何评估您的模型。
先决条件¶
本教程假定您已完成 PPO 教程,该教程提供
TorchRL 组件和依赖项概述,例如 和 ,
虽然它应该是
足够透明,无需深入了解即可理解
这些类。tensordict.TensorDict
tensordict.nn.TensorDictModules
注意
我们的目的不是给出算法的 SOTA 实现,而是 提供 TorchRL 的 loss 实现的高级说明 以及要在 这个算法。
导入和设置¶
%%bash pip3 install torchrl mujoco glfw
import torch
import tqdm
如果可用,我们将在 CUDA 上执行策略
is_fork = multiprocessing.get_start_method() == "fork"
device = (
torch.device(0)
if torch.cuda.is_available() and not is_fork
else torch.device("cpu")
)
collector_device = torch.device("cpu") # Change the device to ``cuda`` to use CUDA
TorchRL LossModule
¶
TorchRL 提供了一系列可在训练脚本中使用的损失。 目标是获得易于重用/可交换的损失,并且具有 一个简单的签名。
TorchRL 损失的主要特点是:
它们是有状态对象:它们包含可训练参数的副本 这样就可以提供训练 算法。
loss_module.parameters()
它们遵循约定:该方法
将接收一个 TensorDict 作为 Input,其中包含所有必要的 信息返回损失值。
TensorDict
>>> data = replay_buffer.sample() >>> loss_dict = loss_module(data)
它们输出一个实例,其中包含 loss 值 写入其中是一个描述 损失。中的其他键可能是记录期间的有用指标 训练时间。
tensordict.TensorDict
"loss_<smth>"
smth
TensorDict
注意
我们之所以返回独立亏损,是为了让用户使用不同的 optimizer 来获取不同的参数集。汇总损失 可以简单地通过
>>> loss_val = sum(loss for key, loss in loss_dict.items() if key.startswith("loss_"))
方法__init__
¶
所有损失的父类是 。
与该库的许多其他组件一样,其
方法期望
作为输入,从体验中采样的实例
replay buffer 或任何类似的数据结构。使用这种格式可以
可以跨
模态,或者在模型需要读取多个
条目。换句话说,它允许我们编写一个 loss 模块,该
忽略了被赋予 is 的数据类型,并且它专注于
运行 loss 函数的基本步骤,并且只运行这些步骤。
tensordict.TensorDict
为了使本教程尽可能具有教学性,我们将展示每种方法 的 API 中,我们将在稍后的 阶段。
让我们从方法开始。DDPG 旨在通过简单的策略解决控制任务:
训练策略以输出操作,这些操作使预测的值最大化
价值网络。因此,我们的 loss 模块需要在其
constructor:一个 Actor 和一个 Value Networks。我们预计这两者都是
与 TensorDict 兼容的对象,例如 .
我们的损失函数需要计算一个目标值并拟合该值
network 添加到 this 中,并生成一个操作并适合策略,使其
value estimate 最大化。__init__()
tensordict.nn.TensorDictModule
该方法的关键步骤是调用 .此方法将提取
模块中的参数并将其转换为功能模块。
严格来说,这不是必需的,一个人可以完美地编码所有
没有它的损失。但是,我们鼓励将其用于以下用途
原因。LossModule.__init__()
convert_to_functional()
TorchRL 这样做的原因是 RL sota-implementations 通常执行相同的操作
具有不同参数集的模型,称为 “trainable” 和 “target”
参数。
“可训练”参数是优化器需要拟合的参数。这
“target” 参数通常是前者的副本,但有一些时间滞后
(绝对或通过移动平均线稀释)。
这些目标参数用于计算与
下一次观察。使用一组目标参数的优点
对于不完全匹配的值模型,当前配置为
它们为正在计算的值函数提供了一个悲观的边界。
注意下面的 keyword 参数:this
argument 告诉该方法在 loss 模块中创建一组要使用的目标参数
用于目标值计算。如果将其设置为 (请参阅 actor 网络
例如),该属性仍将为
accessible 的 URL,但这只会返回
actor 参数。
create_target_params
False
target_actor_network_params
稍后,我们将看到如何在 TorchRL 中更新目标参数。
from tensordict.nn import TensorDictModule
def _init(
self,
actor_network: TensorDictModule,
value_network: TensorDictModule,
) -> None:
super(type(self), self).__init__()
self.convert_to_functional(
actor_network,
"actor_network",
create_target_params=True,
)
self.convert_to_functional(
value_network,
"value_network",
create_target_params=True,
compare_against=list(actor_network.parameters()),
)
self.actor_in_keys = actor_network.in_keys
# Since the value we'll be using is based on the actor and value network,
# we put them together in a single actor-critic container.
actor_critic = ActorCriticWrapper(actor_network, value_network)
self.actor_critic = actor_critic
self.loss_function = "l2"
value estimator loss 方法¶
在许多 RL 算法中,价值网络(或 Q 值网络)是基于训练的
根据经验价值估计。这可以是自举的 (TD(0),低
方差、高偏差)、含义
目标值是使用 next reward 获得的,而不是其他任何奖励,或者
可以获得蒙特卡洛估计值 (TD(1)),在这种情况下,整个
将使用即将到来的奖励的顺序(高变异性、低偏差)。一
中间估计器 (TD()) 也可用于妥协
偏差和方差。
TorchRL 可以通过 Enum 类轻松使用一个或另一个估计器,该类包含
指向所有已实现的值估计器的指针。我们来定义默认的
value 函数。我们将采用最简单的版本 (TD(0)),稍后显示
关于如何改变这一点。
from torchrl.objectives.utils import ValueEstimators
default_value_estimator = ValueEstimators.TD0
我们还需要向 DDPG 提供一些有关如何构建价值的说明 estimator 的 intent 值,具体取决于用户查询。根据提供的估算器, 我们将构建相应的模块,以便在 Train 时使用:
from torchrl.objectives.utils import default_value_kwargs
from torchrl.objectives.value import TD0Estimator, TD1Estimator, TDLambdaEstimator
def make_value_estimator(self, value_type: ValueEstimators, **hyperparams):
hp = dict(default_value_kwargs(value_type))
if hasattr(self, "gamma"):
hp["gamma"] = self.gamma
hp.update(hyperparams)
value_key = "state_action_value"
if value_type == ValueEstimators.TD1:
self._value_estimator = TD1Estimator(value_network=self.actor_critic, **hp)
elif value_type == ValueEstimators.TD0:
self._value_estimator = TD0Estimator(value_network=self.actor_critic, **hp)
elif value_type == ValueEstimators.GAE:
raise NotImplementedError(
f"Value type {value_type} it not implemented for loss {type(self)}."
)
elif value_type == ValueEstimators.TDLambda:
self._value_estimator = TDLambdaEstimator(value_network=self.actor_critic, **hp)
else:
raise NotImplementedError(f"Unknown value type {value_type}")
self._value_estimator.set_keys(value=value_key)
方法可以但不需要调用:ifgg
not 时,将使用
它的默认 estimator 来估计。
make_value_estimator
actor 损失方法¶
RL 算法的核心部分是 actor 的训练损失。 在 DDPG 的情况下,这个函数非常简单:我们只需要计算 与使用 Policy and optimize 计算的操作关联的值 Actor 权重以最大化此值。
在计算这个值时,我们必须确保去掉 value 参数
,否则 actor 和 value loss 会混淆。
为此,函数
可以使用。
def _loss_actor(
self,
tensordict,
) -> torch.Tensor:
td_copy = tensordict.select(*self.actor_in_keys)
# Get an action from the actor network: since we made it functional, we need to pass the params
with self.actor_network_params.to_module(self.actor_network):
td_copy = self.actor_network(td_copy)
# get the value associated with that action
with self.value_network_params.detach().to_module(self.value_network):
td_copy = self.value_network(td_copy)
return -td_copy.get("state_action_value")
value loss 方法¶
我们现在需要优化我们的价值网络参数。 为此,我们将依赖我们类的值估计器:
from torchrl.objectives.utils import distance_loss
def _loss_value(
self,
tensordict,
):
td_copy = tensordict.clone()
# V(s, a)
with self.value_network_params.to_module(self.value_network):
self.value_network(td_copy)
pred_val = td_copy.get("state_action_value").squeeze(-1)
# we manually reconstruct the parameters of the actor-critic, where the first
# set of parameters belongs to the actor and the second to the value function.
target_params = TensorDict(
{
"module": {
"0": self.target_actor_network_params,
"1": self.target_value_network_params,
}
},
batch_size=self.target_actor_network_params.batch_size,
device=self.target_actor_network_params.device,
)
with target_params.to_module(self.actor_critic):
target_value = self.value_estimator.value_estimate(tensordict).squeeze(-1)
# Computes the value loss: L2, L1 or smooth L1 depending on `self.loss_function`
loss_value = distance_loss(pred_val, target_value, loss_function=self.loss_function)
td_error = (pred_val - target_value).pow(2)
return loss_value, td_error, pred_val, target_value
在前瞻呼叫中将所有内容放在一起¶
唯一缺少的部分是 forward 方法,它将
value 和 actor loss 中,收集 cost 值并将其写入 delivered to the user.TensorDict
from tensordict import TensorDict, TensorDictBase
def _forward(self, input_tensordict: TensorDictBase) -> TensorDict:
loss_value, td_error, pred_val, target_value = self.loss_value(
input_tensordict,
)
td_error = td_error.detach()
td_error = td_error.unsqueeze(input_tensordict.ndimension())
if input_tensordict.device is not None:
td_error = td_error.to(input_tensordict.device)
input_tensordict.set(
"td_error",
td_error,
inplace=True,
)
loss_actor = self.loss_actor(input_tensordict)
return TensorDict(
source={
"loss_actor": loss_actor.mean(),
"loss_value": loss_value.mean(),
"pred_value": pred_val.mean().detach(),
"target_value": target_value.mean().detach(),
"pred_value_max": pred_val.max().detach(),
"target_value_max": target_value.max().detach(),
},
batch_size=[],
)
from torchrl.objectives import LossModule
class DDPGLoss(LossModule):
default_value_estimator = default_value_estimator
make_value_estimator = make_value_estimator
__init__ = _init
forward = _forward
loss_value = _loss_value
loss_actor = _loss_actor
现在我们有了损失,我们可以使用它来训练策略来解决 控制任务。
环境¶
在大多数 sota-implementations 中,首先需要注意的是 环境的构造,因为它限制了 training 脚本。
在此示例中,我们将使用 task.目标是使
半只猎豹尽可能快地跑。"cheetah"
在 TorchRL 中,可以通过依赖 或 来创建这样的任务:dm_control
gym
env = GymEnv("HalfCheetah-v4")
或
env = DMControlEnv("cheetah", "run")
默认情况下,这些环境将禁用渲染。来自各州的培训是
通常比从图像训练更容易。为了简单起见,我们专注于
仅向各州学习。要将像素传递给
被 收集,只需将参数传递给构造函数即可:tensordicts
env.step()
from_pixels=True
env = GymEnv("HalfCheetah-v4", from_pixels=True, pixels_only=True)
我们编写一个 helper 函数来创建一个环境
上面考虑了两个后端中的任何一个 ( 或 )。make_env()
dm-control
gym
from torchrl.envs.libs.dm_control import DMControlEnv
from torchrl.envs.libs.gym import GymEnv
env_library = None
env_name = None
def make_env(from_pixels=False):
"""Create a base ``env``."""
global env_library
global env_name
if backend == "dm_control":
env_name = "cheetah"
env_task = "run"
env_args = (env_name, env_task)
env_library = DMControlEnv
elif backend == "gym":
env_name = "HalfCheetah-v4"
env_args = (env_name,)
env_library = GymEnv
else:
raise NotImplementedError
env_kwargs = {
"device": device,
"from_pixels": from_pixels,
"pixels_only": from_pixels,
"frame_skip": 2,
}
env = env_library(*env_args, **env_kwargs)
return env
变换¶
现在我们有一个基本环境,我们可能想要修改它的表示
使其对策略更友好。在 TorchRL 中,转换被附加到
base 环境。torchr.envs.TransformedEnv
在 DDPG 中,使用一些启发式值重新缩放奖励是很常见的。我们 将奖励乘以 5。
如果我们使用 ,那么构建一个接口也很重要 在处理双精度数字的模拟器和我们的 script 的 intent 调用的 intent 的 1 个实例。这种转变是 两种方式:调用 时,我们的 action 都需要是 以双精度表示,并且输出需要转换 设置为单精度。 转换正是这样做的:列表引用需要转换的键 double 设置为 float,而 the 则是指那些需要 在传递到环境之前转换为 double。
dm_control
env.step()
DoubleToFloat
in_keys
in_keys_inv
我们使用 transform 将 state key 连接在一起。
CatTensors
最后,我们还保留了使状态正常化的可能性:我们将 稍后负责计算归一化常量。
from torchrl.envs import (
CatTensors,
DoubleToFloat,
EnvCreator,
InitTracker,
ObservationNorm,
ParallelEnv,
RewardScaling,
StepCounter,
TransformedEnv,
)
def make_transformed_env(
env,
):
"""Apply transforms to the ``env`` (such as reward scaling and state normalization)."""
env = TransformedEnv(env)
# we append transforms one by one, although we might as well create the
# transformed environment using the `env = TransformedEnv(base_env, transforms)`
# syntax.
env.append_transform(RewardScaling(loc=0.0, scale=reward_scaling))
# We concatenate all states into a single "observation_vector"
# even if there is a single tensor, it'll be renamed in "observation_vector".
# This facilitates the downstream operations as we know the name of the
# output tensor.
# In some environments (not half-cheetah), there may be more than one
# observation vector: in this case this code snippet will concatenate them
# all.
selected_keys = list(env.observation_spec.keys())
out_key = "observation_vector"
env.append_transform(CatTensors(in_keys=selected_keys, out_key=out_key))
# we normalize the states, but for now let's just instantiate a stateless
# version of the transform
env.append_transform(ObservationNorm(in_keys=[out_key], standard_normal=True))
env.append_transform(DoubleToFloat())
env.append_transform(StepCounter(max_frames_per_traj))
# We need a marker for the start of trajectories for our Ornstein-Uhlenbeck (OU)
# exploration:
env.append_transform(InitTracker())
return env
并行执行¶
以下 helper 函数允许我们并行运行环境。 并行运行环境可以显著加快收集速度 吞吐量。当使用转换后的环境时,我们需要选择是否 想要为每个环境单独执行转换,或者 集中数据并批量转换。两种方法都很容易 法典:
env = ParallelEnv(
lambda: TransformedEnv(GymEnv("HalfCheetah-v4"), transforms),
num_workers=4
)
env = TransformedEnv(
ParallelEnv(lambda: GymEnv("HalfCheetah-v4"), num_workers=4),
transforms
)
为了利用 PyTorch 的矢量化功能,我们采用了 第一种方法:
def parallel_env_constructor(
env_per_collector,
transform_state_dict,
):
if env_per_collector == 1:
def make_t_env():
env = make_transformed_env(make_env())
env.transform[2].init_stats(3)
env.transform[2].loc.copy_(transform_state_dict["loc"])
env.transform[2].scale.copy_(transform_state_dict["scale"])
return env
env_creator = EnvCreator(make_t_env)
return env_creator
parallel_env = ParallelEnv(
num_workers=env_per_collector,
create_env_fn=EnvCreator(lambda: make_env()),
create_env_kwargs=None,
pin_memory=False,
)
env = make_transformed_env(parallel_env)
# we call `init_stats` for a limited number of steps, just to instantiate
# the lazy buffers.
env.transform[2].init_stats(3, cat_dim=1, reduce_dim=[0, 1])
env.transform[2].load_state_dict(transform_state_dict)
return env
# The backend can be ``gym`` or ``dm_control``
backend = "gym"
注意
frame_skip
将多个步骤与单个操作一起批处理
如果> 1,则其他帧计数(例如,frames_per_batch、total_frames)
需要进行调整,以便收集的帧总数一致
跨实验。这很重要,因为可以提高跳帧,但保持
未更改的帧总数可能看起来像作弊:所有事物都比较,
一个包含 10M 个元素的数据集,其中跳帧为 2,另一个数据集的跳帧率为 2
跳帧 1 实际上具有与环境交互的比率
2:1!简而言之,应该谨慎对待
training 脚本,因为这可能会导致
训练策略之间的偏倚比较。
缩放奖励有助于我们控制信号幅度,以获得更多 高效学习。
reward_scaling = 5.0
我们还定义了何时截断轨迹。一千步(如果为 500 frame-skip = 2) 是用于 Cheetah 任务的好数字:
max_frames_per_traj = 500
观测值的标准化¶
为了计算归一化统计数据,我们运行任意数量的随机
步骤,并计算
收集的观察结果。该方法
用于此目的。为了获得汇总统计数据,我们创建了一个虚拟
环境并运行给定数量的步骤,在给定的
Number of steps 并计算其摘要统计信息。ObservationNorm.init_stats()
def get_env_stats():
"""Gets the stats of an environment."""
proof_env = make_transformed_env(make_env())
t = proof_env.transform[2]
t.init_stats(init_env_steps)
transform_state_dict = t.state_dict()
proof_env.close()
return transform_state_dict
标准化统计¶
用于统计计算的随机步数ObservationNorm
init_env_steps = 5000
transform_state_dict = get_env_stats()
每个数据收集器中的环境数
env_per_collector = 4
我们传递之前计算的统计数据,以规范化 环境:
parallel_env = parallel_env_constructor(
env_per_collector=env_per_collector,
transform_state_dict=transform_state_dict,
)
from torchrl.data import CompositeSpec
构建模型¶
现在我们来看看模型的设置。正如我们所看到的,DDPG 需要一个 value 网络,经过训练以估计状态-操作对的值,以及 参数化角色,学习如何选择最大化此值的动作。
回想一下,构建 TorchRL 模块需要两个步骤:
编写将用作网络的 ,
将网络包装在 where 通过指定 input 和 output 键来处理数据流。
tensordict.nn.TensorDictModule
在更复杂的场景中,可以
也被使用。tensordict.nn.TensorDictSequential
Q 值网络包装在 a 中,该网络会自动将 q 值的 to 设置为
网络和其他价值网络。ValueOperator
out_keys
"state_action_value
state_value
TorchRL 提供了 DDPG 网络的内置版本,如
原始论文。这些可以在 和 下
找到。
由于我们使用了惰性模块,因此有必要将惰性模块具体化 在能够将策略从一个设备移动到另一个设备并实现其他 操作。因此,最好使用小型 数据样本。为此,我们从 环境规范。
from torchrl.modules import (
ActorCriticWrapper,
DdpgMlpActor,
DdpgMlpQNet,
OrnsteinUhlenbeckProcessWrapper,
ProbabilisticActor,
TanhDelta,
ValueOperator,
)
def make_ddpg_actor(
transform_state_dict,
device="cpu",
):
proof_environment = make_transformed_env(make_env())
proof_environment.transform[2].init_stats(3)
proof_environment.transform[2].load_state_dict(transform_state_dict)
out_features = proof_environment.action_spec.shape[-1]
actor_net = DdpgMlpActor(
action_dim=out_features,
)
in_keys = ["observation_vector"]
out_keys = ["param"]
actor = TensorDictModule(
actor_net,
in_keys=in_keys,
out_keys=out_keys,
)
actor = ProbabilisticActor(
actor,
distribution_class=TanhDelta,
in_keys=["param"],
spec=CompositeSpec(action=proof_environment.action_spec),
).to(device)
q_net = DdpgMlpQNet()
in_keys = in_keys + ["action"]
qnet = ValueOperator(
in_keys=in_keys,
module=q_net,
).to(device)
# initialize lazy modules
qnet(actor(proof_environment.reset().to(device)))
return actor, qnet
actor, qnet = make_ddpg_actor(
transform_state_dict=transform_state_dict,
device=device,
)
勘探¶
正如原始论文中所建议的那样,该策略包装在一个 exploration 模块中。
让我们定义 OU 噪声达到其最小值之前的帧数OrnsteinUhlenbeckProcessWrapper
annealing_frames = 1_000_000
actor_model_explore = OrnsteinUhlenbeckProcessWrapper(
actor,
annealing_num_steps=annealing_frames,
).to(device)
if device == torch.device("cpu"):
actor_model_explore.share_memory()
数据收集器¶
TorchRL 提供了专门的类来帮助您通过执行 环境中的策略。这些 “数据收集器” 迭代计算 要在给定时间执行的操作,然后执行 环境并在需要时重置它。 数据收集器旨在帮助开发人员进行严格控制 关于每批数据的帧数,关于这个的 (a)sync 性质 集合和分配给数据收集的资源(例如 GPU、工作线程数量等)。
这里我们将使用 ,一个简单的单进程
数据收集器。TorchRL 提供了其他收集器,例如
,它执行了
以异步方式推出(例如,在
策略正在优化,从而将训练和
数据收集)。
要指定的参数包括:
环境工厂或环境,
策略、
收集器被视为空之前的帧总数,
每个轨迹的最大帧数(对于非终止 环境,例如 Environments)。
dm_control
注意
传递给 collector 将产生 注册新转换 替换为用于推理的环境。我们可以达到同样的效果 manually,就像我们在这个脚本中所做的那样。
max_frames_per_traj
StepCounter
还应传递:
收集的每个批次中的帧数,
独立于策略执行的随机步骤数,
用于策略执行的设备
在将数据传递到 main 之前用于存储数据的设备 过程。
我们在训练期间将使用的总帧数应在 1M 左右。
total_frames = 10_000 # 1_000_000
收集器在外部的每次迭代中返回的帧数 loop 等于每个子轨迹的长度乘以 环境在每个收集器中并行运行。
换句话说,我们希望来自 collector 的 batchs 具有一个形状,其中:[env_per_collector, traj_len]
traj_len=frames_per_batch/env_per_collector
traj_len = 200
frames_per_batch = env_per_collector * traj_len
init_random_frames = 5000
num_collectors = 2
from torchrl.collectors import SyncDataCollector
from torchrl.envs import ExplorationType
collector = SyncDataCollector(
parallel_env,
policy=actor_model_explore,
total_frames=total_frames,
frames_per_batch=frames_per_batch,
init_random_frames=init_random_frames,
reset_at_each_iter=False,
split_trajs=False,
device=collector_device,
exploration_type=ExplorationType.RANDOM,
)
Evaluator:构建记录器对象¶
由于训练数据是使用某种探索策略获得的,因此 true
我们的算法的性能需要在确定性模式下进行评估。我们
使用专用类 来执行此操作,该类在
环境,并返回一些获得的统计信息
从这些模拟中。Recorder
以下帮助程序函数生成此对象:
from torchrl.trainers import Recorder
def make_recorder(actor_model_explore, transform_state_dict, record_interval):
base_env = make_env()
environment = make_transformed_env(base_env)
environment.transform[2].init_stats(
3
) # must be instantiated to load the state dict
environment.transform[2].load_state_dict(transform_state_dict)
recorder_obj = Recorder(
record_frames=1000,
policy_exploration=actor_model_explore,
environment=environment,
exploration_type=ExplorationType.MEAN,
record_interval=record_interval,
)
return recorder_obj
我们将记录每 10 个收集批次的性能
record_interval = 10
recorder = make_recorder(
actor_model_explore, transform_state_dict, record_interval=record_interval
)
from torchrl.data.replay_buffers import (
LazyMemmapStorage,
PrioritizedSampler,
RandomSampler,
TensorDictReplayBuffer,
)
重放缓冲区¶
重放缓冲区有两种风格:prioritized(其中一些错误信号 用于为某些项目提供比其他项目更高的抽样可能性) 以及定期的循环体验重播。
TorchRL 重放缓冲区是可组合的:可以获取存储、采样 和写作策略。也可以 使用内存映射数组将张量存储在物理内存上。以下内容 函数负责创建具有所需 超参数:
from torchrl.envs import RandomCropTensorDict
def make_replay_buffer(buffer_size, batch_size, random_crop_len, prefetch=3, prb=False):
if prb:
sampler = PrioritizedSampler(
max_capacity=buffer_size,
alpha=0.7,
beta=0.5,
)
else:
sampler = RandomSampler()
replay_buffer = TensorDictReplayBuffer(
storage=LazyMemmapStorage(
buffer_size,
scratch_dir=buffer_scratch_dir,
),
batch_size=batch_size,
sampler=sampler,
pin_memory=False,
prefetch=prefetch,
transform=RandomCropTensorDict(random_crop_len, sample_dim=1),
)
return replay_buffer
我们将重放缓冲区存储在磁盘上的临时目录中
import tempfile
tmpdir = tempfile.TemporaryDirectory()
buffer_scratch_dir = tmpdir.name
重放缓冲区存储和批量大小¶
TorchRL 重放缓冲区计算沿第一维的元素数量。
由于我们将向缓冲区提供轨迹,因此我们需要调整缓冲区
size 的值除以我们的
数据收集器。
关于批次大小,我们的抽样策略将包括抽样
选择子轨迹之前的长度轨迹
或计算损失的时长。
这种策略平衡了存储某个
长度,需要提供具有足够异质性的样品
让我们蒙受损失。下图显示了来自收集器的数据流
每个批次获得 8 帧,其中 2 个环境并行运行,
将它们馈送到包含 1000 个轨迹的重放缓冲区,并且
采样每个子轨迹 2 个时间步长。traj_len=200
random_crop_len=25
![将轨迹存储在重放缓冲区中](https://pytorch.org/rl/0.4/_images/replaybuffer_traj.png)
让我们从缓冲区中存储的帧数开始
def ceil_div(x, y):
return -x // (-y)
buffer_size = 1_000_000
buffer_size = ceil_div(buffer_size, traj_len)
默认情况下,优先重放缓冲区处于禁用状态
prb = False
我们还需要定义每批数据将执行多少次更新
收集。这称为更新与数据或比率:UTD
update_to_data = 64
我们将用长度为 25 的轨迹来喂养损失:
random_crop_len = 25
在原始论文中,作者对一批 64 个进行了一次更新 元素。在这里,我们再现相同的比率 但是,同时在每个批次集合中实现多次更新。我们 调整我们的 batch-size 以实现相同的每帧更新数比率:
batch_size = ceil_div(64 * frames_per_batch, update_to_data * random_crop_len)
replay_buffer = make_replay_buffer(
buffer_size=buffer_size,
batch_size=batch_size,
random_crop_len=random_crop_len,
prefetch=3,
prb=prb,
)
损耗模块结构¶
我们使用 actor 构建了 loss 模块,并且刚刚创建了。
因为我们有目标参数要更新,所以我们_必须_创建一个目标网络
更新。qnet
gamma = 0.99
lmbda = 0.9
tau = 0.001 # Decay factor for the target network
loss_module = DDPGLoss(actor, qnet)
让我们使用 TD(lambda) 估算器!
loss_module.make_value_estimator(ValueEstimators.TDLambda, gamma=gamma, lmbda=lmbda)
注意
Off-policy 通常指示 TD(0) 估计器。在这里,我们使用 TD() estimator 的 Timator 进行 Tim 操作,这将引入一些偏差作为随后的轨迹 某个状态是使用过时的策略收集的。 这个技巧,作为数据收集过程中可以使用的多步骤技巧, 是我们通常发现效果很好的 “hacks” 的替代版本 尽管他们在回报中引入了一些偏见,但还是进行了实践 估计。
Target Network Updater (目标网络更新程序)¶
目标网络是不符合策略的 RL sota-implementation 的关键部分。
多亏了 and
类,更新目标网络参数变得容易。它们是以 loss 模块作为参数构建的,更新是
通过在
training 循环。
from torchrl.objectives.utils import SoftUpdate
target_net_updater = SoftUpdate(loss_module, eps=1 - tau)
优化¶
最后,我们将对策略和价值网络使用 Adam 优化器:
from torch import optim
optimizer_actor = optim.Adam(
loss_module.actor_network_params.values(True, True), lr=1e-4, weight_decay=0.0
)
optimizer_value = optim.Adam(
loss_module.value_network_params.values(True, True), lr=1e-3, weight_decay=1e-2
)
total_collection_steps = total_frames // frames_per_batch
是时候训练策略了¶
训练循环非常简单,因为我们已经构建了所有 模块。
rewards = []
rewards_eval = []
# Main loop
collected_frames = 0
pbar = tqdm.tqdm(total=total_frames)
r0 = None
for i, tensordict in enumerate(collector):
# update weights of the inference policy
collector.update_policy_weights_()
if r0 is None:
r0 = tensordict["next", "reward"].mean().item()
pbar.update(tensordict.numel())
# extend the replay buffer with the new data
current_frames = tensordict.numel()
collected_frames += current_frames
replay_buffer.extend(tensordict.cpu())
# optimization steps
if collected_frames >= init_random_frames:
for _ in range(update_to_data):
# sample from replay buffer
sampled_tensordict = replay_buffer.sample().to(device)
# Compute loss
loss_dict = loss_module(sampled_tensordict)
# optimize
loss_dict["loss_actor"].backward()
gn1 = torch.nn.utils.clip_grad_norm_(
loss_module.actor_network_params.values(True, True), 10.0
)
optimizer_actor.step()
optimizer_actor.zero_grad()
loss_dict["loss_value"].backward()
gn2 = torch.nn.utils.clip_grad_norm_(
loss_module.value_network_params.values(True, True), 10.0
)
optimizer_value.step()
optimizer_value.zero_grad()
gn = (gn1**2 + gn2**2) ** 0.5
# update priority
if prb:
replay_buffer.update_tensordict_priority(sampled_tensordict)
# update target network
target_net_updater.step()
rewards.append(
(
i,
tensordict["next", "reward"].mean().item(),
)
)
td_record = recorder(None)
if td_record is not None:
rewards_eval.append((i, td_record["r_evaluation"].item()))
if len(rewards_eval) and collected_frames >= init_random_frames:
target_value = loss_dict["target_value"].item()
loss_value = loss_dict["loss_value"].item()
loss_actor = loss_dict["loss_actor"].item()
rn = sampled_tensordict["next", "reward"].mean().item()
rs = sampled_tensordict["next", "reward"].std().item()
pbar.set_description(
f"reward: {rewards[-1][1]: 4.2f} (r0 = {r0: 4.2f}), "
f"reward eval: reward: {rewards_eval[-1][1]: 4.2f}, "
f"reward normalized={rn :4.2f}/{rs :4.2f}, "
f"grad norm={gn: 4.2f}, "
f"loss_value={loss_value: 4.2f}, "
f"loss_actor={loss_actor: 4.2f}, "
f"target value: {target_value: 4.2f}"
)
# update the exploration strategy
actor_model_explore.step(current_frames)
collector.shutdown()
del collector
0%| | 0/10000 [00:00<?, ?it/s]
8%|▊ | 800/10000 [00:00<00:02, 3341.44it/s]
16%|█▌ | 1600/10000 [00:01<00:08, 972.21it/s]
24%|██▍ | 2400/10000 [00:01<00:05, 1450.21it/s]
32%|███▏ | 3200/10000 [00:01<00:03, 1892.96it/s]
40%|████ | 4000/10000 [00:02<00:02, 2269.56it/s]
48%|████▊ | 4800/10000 [00:02<00:02, 2582.90it/s]
56%|█████▌ | 5600/10000 [00:02<00:01, 2831.95it/s]
reward: -2.79 (r0 = -1.96), reward eval: reward: -0.00, reward normalized=-2.48/5.85, grad norm= 55.29, loss_value= 313.46, loss_actor= 16.18, target value: -17.03: 56%|█████▌ | 5600/10000 [00:04<00:01, 2831.95it/s]
reward: -2.79 (r0 = -1.96), reward eval: reward: -0.00, reward normalized=-2.48/5.85, grad norm= 55.29, loss_value= 313.46, loss_actor= 16.18, target value: -17.03: 64%|██████▍ | 6400/10000 [00:04<00:04, 853.52it/s]
reward: -1.87 (r0 = -1.96), reward eval: reward: -0.00, reward normalized=-2.68/5.85, grad norm= 202.05, loss_value= 304.94, loss_actor= 13.16, target value: -17.24: 64%|██████▍ | 6400/10000 [00:06<00:04, 853.52it/s]
reward: -1.87 (r0 = -1.96), reward eval: reward: -0.00, reward normalized=-2.68/5.85, grad norm= 202.05, loss_value= 304.94, loss_actor= 13.16, target value: -17.24: 72%|███████▏ | 7200/10000 [00:07<00:05, 535.98it/s]
reward: -5.27 (r0 = -1.96), reward eval: reward: -0.00, reward normalized=-2.96/5.83, grad norm= 215.11, loss_value= 276.88, loss_actor= 17.72, target value: -20.46: 72%|███████▏ | 7200/10000 [00:09<00:05, 535.98it/s]
reward: -5.27 (r0 = -1.96), reward eval: reward: -0.00, reward normalized=-2.96/5.83, grad norm= 215.11, loss_value= 276.88, loss_actor= 17.72, target value: -20.46: 80%|████████ | 8000/10000 [00:10<00:04, 429.53it/s]
reward: -5.04 (r0 = -1.96), reward eval: reward: -0.00, reward normalized=-2.68/5.49, grad norm= 94.22, loss_value= 189.84, loss_actor= 15.74, target value: -17.99: 80%|████████ | 8000/10000 [00:11<00:04, 429.53it/s]
reward: -5.04 (r0 = -1.96), reward eval: reward: -0.00, reward normalized=-2.68/5.49, grad norm= 94.22, loss_value= 189.84, loss_actor= 15.74, target value: -17.99: 88%|████████▊ | 8800/10000 [00:12<00:03, 388.35it/s]
reward: -1.05 (r0 = -1.96), reward eval: reward: 2.85, reward normalized=-2.68/4.91, grad norm= 88.90, loss_value= 153.33, loss_actor= 14.20, target value: -19.37: 88%|████████▊ | 8800/10000 [00:15<00:03, 388.35it/s]
reward: -1.05 (r0 = -1.96), reward eval: reward: 2.85, reward normalized=-2.68/4.91, grad norm= 88.90, loss_value= 153.33, loss_actor= 14.20, target value: -19.37: 96%|█████████▌| 9600/10000 [00:16<00:01, 313.88it/s]
reward: 0.95 (r0 = -1.96), reward eval: reward: 2.85, reward normalized=-2.82/5.52, grad norm= 122.12, loss_value= 198.87, loss_actor= 14.87, target value: -19.42: 96%|█████████▌| 9600/10000 [00:18<00:01, 313.88it/s]
reward: 0.95 (r0 = -1.96), reward eval: reward: 2.85, reward normalized=-2.82/5.52, grad norm= 122.12, loss_value= 198.87, loss_actor= 14.87, target value: -19.42: : 10400it [00:19, 296.66it/s]
reward: -5.00 (r0 = -1.96), reward eval: reward: 2.85, reward normalized=-2.38/4.70, grad norm= 135.29, loss_value= 136.95, loss_actor= 19.89, target value: -17.72: : 10400it [00:21, 296.66it/s]
实验结果¶
我们制作了训练期间平均奖励的简单图。我们可以观察 我们的政策在解决任务方面学得相当好。
注意
如上所述,为了获得更合理的性能,
使用更大的值,例如 1M。total_frames
from matplotlib import pyplot as plt
plt.figure()
plt.plot(*zip(*rewards), label="training")
plt.plot(*zip(*rewards_eval), label="eval")
plt.legend()
plt.xlabel("iter")
plt.ylabel("reward")
plt.tight_layout()
![编码 DDPG](https://pytorch.org/rl/0.4/_images/sphx_glr_coding_ddpg_001.png)
结论¶
在本教程中,我们学习了如何在 TorchRL 中编写一个 loss 模块。 DDPG 的具体例子。
关键要点是:
后续步骤¶
为了进一步迭代这个 loss 模块,我们可以考虑:
使用 @dispatch(参见 [功能] Distpatch IQL 损失模块。
允许灵活的 TensorDict 键。
脚本总运行时间:(1 分 50.926 秒)
估计内存使用量:18 MB