注意
转到末尾 以下载完整示例代码。
TorchRL简介¶
这个演示在ICML 2022的行业演示日上展示。
它全面概述了TorchRL的功能。如有任何问题或意见,欢迎随时联系 vmoens@fb.com 或提交问题。
TorchRL 是一个用于 PyTorch 的开源强化学习(RL)库。
PyTorch 生态系统团队(Meta)已决定对该库进行投资, 以提供一个领先的平台,用于在研究环境中开发强化学习(RL)解决方案。
它提供了pytorch和python-first,低级和高级 抽象 # 用于RL,旨在高效、有文档记录并经过适当测试。 代码旨在支持RL研究。大部分代码以高度模块化的方式用python编写,以便研究人员可以轻松替换组件、转换它们或编写新的组件,几乎不需要额外的努力。
本仓库力求与现有的 PyTorch 生态库保持一致,因此包含数据集模块(torchrl/envs)、变换(transforms)、模型、数据工具(例如采集器和容器)等。TorchRL 力求依赖尽可能少的外部库(仅需 Python 标准库、NumPy 和 PyTorch)。常用环境库(例如 OpenAI Gym)仅为可选依赖。
与其他领域不同,RL 更注重的是 算法 而不是媒体。因此,更难制作真正独立的组件。
TorchRL 不是:
一系列算法:我们无意提供强化学习(RL)算法的最先进(SOTA)实现, 仅将这些算法作为使用本库的示例提供。
一个研究框架:TorchRL 中的模块化体现在两个方面。首先,我们致力于构建可复用的组件,使其能够轻松地相互替换。其次,我们竭尽全力确保各组件可以独立于库中其余部分单独使用。
TorchRL 的核心依赖非常少,主要为 PyTorch 和 numpy。所有其他依赖(如 gym、torchvision、wandb / tensorboard)均为可选。
数据¶
TensorDict¶
import torch
from tensordict import TensorDict
让我们创建一个 TensorDict。构造函数支持多种格式,例如传入一个字典,或使用关键字参数:
batch_size = 5
data = TensorDict(
key1=torch.zeros(batch_size, 3),
key2=torch.zeros(batch_size, 5, 6, dtype=torch.bool),
batch_size=[batch_size],
)
print(data)
TensorDict(
fields={
key1: Tensor(shape=torch.Size([5, 3]), device=cpu, dtype=torch.float32, is_shared=False),
key2: Tensor(shape=torch.Size([5, 5, 6]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([5]),
device=None,
is_shared=False)
您可以沿着TensorDict的batch_size进行索引,以及查询键。
print(data[2])
print(data["key1"] is data.get("key1"))
TensorDict(
fields={
key1: Tensor(shape=torch.Size([3]), device=cpu, dtype=torch.float32, is_shared=False),
key2: Tensor(shape=torch.Size([5, 6]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False)
True
以下展示了如何堆叠多个 TensorDict。这对于编写展开循环特别有用!
data1 = TensorDict(
{
"key1": torch.zeros(batch_size, 1),
"key2": torch.zeros(batch_size, 5, 6, dtype=torch.bool),
},
batch_size=[batch_size],
)
data2 = TensorDict(
{
"key1": torch.ones(batch_size, 1),
"key2": torch.ones(batch_size, 5, 6, dtype=torch.bool),
},
batch_size=[batch_size],
)
data = torch.stack([data1, data2], 0)
data.batch_size, data["key1"]
(torch.Size([2, 5]), tensor([[[0.],
[0.],
[0.],
[0.],
[0.]],
[[1.],
[1.],
[1.],
[1.],
[1.]]]))
以下是 TensorDict 的其他功能:查看、置换、共享内存或扩展。
print(
"view(-1): ",
data.view(-1).batch_size,
data.view(-1).get("key1").shape,
)
print("to device: ", data.to("cpu"))
# print("pin_memory: ", data.pin_memory())
print("share memory: ", data.share_memory_())
print(
"permute(1, 0): ",
data.permute(1, 0).batch_size,
data.permute(1, 0).get("key1").shape,
)
print(
"expand: ",
data.expand(3, *data.batch_size).batch_size,
data.expand(3, *data.batch_size).get("key1").shape,
)
view(-1): torch.Size([10]) torch.Size([10, 1])
to device: TensorDict(
fields={
key1: Tensor(shape=torch.Size([2, 5, 1]), device=cpu, dtype=torch.float32, is_shared=False),
key2: Tensor(shape=torch.Size([2, 5, 5, 6]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([2, 5]),
device=cpu,
is_shared=False)
share memory: TensorDict(
fields={
key1: Tensor(shape=torch.Size([2, 5, 1]), device=cpu, dtype=torch.float32, is_shared=True),
key2: Tensor(shape=torch.Size([2, 5, 5, 6]), device=cpu, dtype=torch.bool, is_shared=True)},
batch_size=torch.Size([2, 5]),
device=None,
is_shared=True)
permute(1, 0): torch.Size([5, 2]) torch.Size([5, 2, 1])
expand: torch.Size([3, 2, 5]) torch.Size([3, 2, 5, 1])
你可以创建一个 嵌套数据。
data = TensorDict(
source={
"key1": torch.zeros(batch_size, 3),
"key2": TensorDict(
source={"sub_key1": torch.zeros(batch_size, 2, 1)},
batch_size=[batch_size, 2],
),
},
batch_size=[batch_size],
)
data
TensorDict(
fields={
key1: Tensor(shape=torch.Size([5, 3]), device=cpu, dtype=torch.float32, is_shared=False),
key2: TensorDict(
fields={
sub_key1: Tensor(shape=torch.Size([5, 2, 1]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([5, 2]),
device=None,
is_shared=False)},
batch_size=torch.Size([5]),
device=None,
is_shared=False)
回放缓冲区¶
经验回放缓冲区 是许多强化学习算法中的关键组件。TorchRL 提供了多种经验回放缓冲区的实现。 大多数基础功能可支持任意数据结构(如列表、元组、字典),但若要充分发挥经验回放缓冲区的全部能力,并实现快速的读写访问,则建议优先使用 TensorDict API。
from torchrl.data import PrioritizedReplayBuffer, ReplayBuffer
rb = ReplayBuffer(collate_fn=lambda x: x)
添加可以通过 add() (n=1)
或 extend() (n>1).
rb.add(1)
rb.sample(1)
rb.extend([2, 3])
rb.sample(3)
[2, 1, 3]
优先级重放缓冲区也可以使用:
rb = PrioritizedReplayBuffer(alpha=0.7, beta=1.1, collate_fn=lambda x: x)
rb.add(1)
rb.sample(1)
rb.update_priority(1, 0.5)
以下是使用带 data_stack 的重放缓冲区(replay buffer)的示例。 使用它们可以轻松地为多种应用场景抽象出重放缓冲区的行为。
collate_fn = torch.stack
rb = ReplayBuffer(collate_fn=collate_fn)
rb.add(TensorDict({"a": torch.randn(3)}, batch_size=[]))
len(rb)
rb.extend(TensorDict({"a": torch.randn(2, 3)}, batch_size=[2]))
print(len(rb))
print(rb.sample(10))
print(rb.sample(2).contiguous())
torch.manual_seed(0)
from torchrl.data import TensorDictPrioritizedReplayBuffer
rb = TensorDictPrioritizedReplayBuffer(alpha=0.7, beta=1.1, priority_key="td_error")
rb.extend(TensorDict({"a": torch.randn(2, 3)}, batch_size=[2]))
data_sample = rb.sample(2).contiguous()
print(data_sample)
print(data_sample["index"])
data_sample["td_error"] = torch.rand(2)
rb.update_tensordict_priority(data_sample)
for i, val in enumerate(rb._sampler._sum_tree):
print(i, val)
if i == len(rb):
break
3
TensorDict(
fields={
a: Tensor(shape=torch.Size([10, 3]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([10]),
device=None,
is_shared=False)
TensorDict(
fields={
a: Tensor(shape=torch.Size([2, 3]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([2]),
device=None,
is_shared=False)
TensorDict(
fields={
_weight: Tensor(shape=torch.Size([2]), device=cpu, dtype=torch.float32, is_shared=False),
a: Tensor(shape=torch.Size([2, 3]), device=cpu, dtype=torch.float32, is_shared=False),
index: Tensor(shape=torch.Size([2]), device=cpu, dtype=torch.int64, is_shared=False)},
batch_size=torch.Size([2]),
device=None,
is_shared=False)
tensor([1, 1])
0 1.0
1 0.28791671991348267
2 0.0
环境¶
TorchRL 提供了一系列 环境 包装器和实用工具。
Gym 环境¶
try:
import gymnasium as gym
except ModuleNotFoundError:
import gym
from torchrl.envs.libs.gym import GymEnv, GymWrapper, set_gym_backend
gym_env = gym.make("Pendulum-v1")
env = GymWrapper(gym_env)
env = GymEnv("Pendulum-v1")
data = env.reset()
env.rand_step(data)
TensorDict(
fields={
action: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.float32, is_shared=False),
done: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
next: TensorDict(
fields={
done: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
observation: Tensor(shape=torch.Size([3]), device=cpu, dtype=torch.float32, is_shared=False),
reward: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False),
observation: Tensor(shape=torch.Size([3]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False)
更改环境配置¶
env = GymEnv("Pendulum-v1", frame_skip=3, from_pixels=True, pixels_only=False)
env.reset()
env.close()
del env
from torchrl.envs import (
Compose,
NoopResetEnv,
ObservationNorm,
ToTensorImage,
TransformedEnv,
)
base_env = GymEnv("Pendulum-v1", frame_skip=3, from_pixels=True, pixels_only=False)
env = TransformedEnv(base_env, Compose(NoopResetEnv(3), ToTensorImage()))
env.append_transform(ObservationNorm(in_keys=["pixels"], loc=2, scale=1))
TransformedEnv(
env=GymEnv(env=Pendulum-v1, batch_size=torch.Size([]), device=None),
transform=Compose(
NoopResetEnv(noops=3, random=True),
ToTensorImage(keys=['pixels']),
ObservationNorm(loc=2.0000, scale=1.0000, keys=['pixels'])))
环境变换¶
变换(Transforms)的作用类似于 Gym 的封装器(wrappers),但其 API 更接近 torchvision 的 torch.distributions 变换。
提供了丰富的 变换 供您选择。
from torchrl.envs import (
Compose,
NoopResetEnv,
ObservationNorm,
StepCounter,
ToTensorImage,
TransformedEnv,
)
base_env = GymEnv("HalfCheetah-v4", frame_skip=3, from_pixels=True, pixels_only=False)
env = TransformedEnv(base_env, Compose(NoopResetEnv(3), ToTensorImage()))
env = env.append_transform(ObservationNorm(in_keys=["pixels"], loc=2, scale=1))
env.reset()
print("env: ", env)
print("last transform parent: ", env.transform[2].parent)
env: TransformedEnv(
env=GymEnv(env=HalfCheetah-v4, batch_size=torch.Size([]), device=None),
transform=Compose(
NoopResetEnv(noops=3, random=True),
ToTensorImage(keys=['pixels']),
ObservationNorm(loc=2.0000, scale=1.0000, keys=['pixels'])))
last transform parent: TransformedEnv(
env=GymEnv(env=HalfCheetah-v4, batch_size=torch.Size([]), device=None),
transform=Compose(
NoopResetEnv(noops=3, random=True),
ToTensorImage(keys=['pixels'])))
向量化环境¶
向量化/并行环境可以提供一些显著的速度提升。
from torchrl.envs import ParallelEnv
def make_env():
# You can control whether to use gym or gymnasium for your env
with set_gym_backend("gym"):
return GymEnv("Pendulum-v1", frame_skip=3, from_pixels=True, pixels_only=False)
base_env = ParallelEnv(
4,
make_env,
mp_start_method="fork", # This will break on Windows machines! Remove and decorate with if __name__ == "__main__"
)
env = TransformedEnv(
base_env, Compose(StepCounter(), ToTensorImage())
) # applies transforms on batch of envs
env.append_transform(ObservationNorm(in_keys=["pixels"], loc=2, scale=1))
env.reset()
print(env.action_spec)
env.close()
del env
BoundedContinuous(
shape=torch.Size([4, 1]),
space=ContinuousBox(
low=Tensor(shape=torch.Size([4, 1]), device=cpu, dtype=torch.float32, contiguous=True),
high=Tensor(shape=torch.Size([4, 1]), device=cpu, dtype=torch.float32, contiguous=True)),
device=cpu,
dtype=torch.float32,
domain=continuous)
模块¶
库中包含多个模块(工具、模型和封装器)。
模型¶
MLP模型示例:
from torch import nn
from torchrl.modules import ConvNet, MLP
from torchrl.modules.models.utils import SquashDims
net = MLP(num_cells=[32, 64], out_features=4, activation_class=nn.ELU)
print(net)
print(net(torch.randn(10, 3)).shape)
MLP(
(0): LazyLinear(in_features=0, out_features=32, bias=True)
(1): ELU(alpha=1.0)
(2): Linear(in_features=32, out_features=64, bias=True)
(3): ELU(alpha=1.0)
(4): Linear(in_features=64, out_features=4, bias=True)
)
torch.Size([10, 4])
卷积神经网络模型示例:
cnn = ConvNet(
num_cells=[32, 64],
kernel_sizes=[8, 4],
strides=[2, 1],
aggregator_class=SquashDims,
)
print(cnn)
print(cnn(torch.randn(10, 3, 32, 32)).shape) # last tensor is squashed
ConvNet(
(0): LazyConv2d(0, 32, kernel_size=(8, 8), stride=(2, 2))
(1): ELU(alpha=1.0)
(2): Conv2d(32, 64, kernel_size=(4, 4), stride=(1, 1))
(3): ELU(alpha=1.0)
(4): SquashDims()
)
torch.Size([10, 6400])
TensorDictModules¶
某些模块专为处理 tensordict 输入而设计。
from tensordict.nn import TensorDictModule
data = TensorDict({"key1": torch.randn(10, 3)}, batch_size=[10])
module = nn.Linear(3, 4)
td_module = TensorDictModule(module, in_keys=["key1"], out_keys=["key2"])
td_module(data)
print(data)
TensorDict(
fields={
key1: Tensor(shape=torch.Size([10, 3]), device=cpu, dtype=torch.float32, is_shared=False),
key2: Tensor(shape=torch.Size([10, 4]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([10]),
device=None,
is_shared=False)
模块序列¶
将模块序列化变得简单,通过TensorDictSequential:
from tensordict.nn import TensorDictSequential
backbone_module = nn.Linear(5, 3)
backbone = TensorDictModule(
backbone_module, in_keys=["observation"], out_keys=["hidden"]
)
actor_module = nn.Linear(3, 4)
actor = TensorDictModule(actor_module, in_keys=["hidden"], out_keys=["action"])
value_module = MLP(out_features=1, num_cells=[4, 5])
value = TensorDictModule(value_module, in_keys=["hidden", "action"], out_keys=["value"])
sequence = TensorDictSequential(backbone, actor, value)
print(sequence)
print(sequence.in_keys, sequence.out_keys)
data = TensorDict(
{"observation": torch.randn(3, 5)},
[3],
)
backbone(data)
actor(data)
value(data)
data = TensorDict(
{"observation": torch.randn(3, 5)},
[3],
)
sequence(data)
print(data)
TensorDictSequential(
module=ModuleList(
(0): TensorDictModule(
module=Linear(in_features=5, out_features=3, bias=True),
device=cpu,
in_keys=['observation'],
out_keys=['hidden'])
(1): TensorDictModule(
module=Linear(in_features=3, out_features=4, bias=True),
device=cpu,
in_keys=['hidden'],
out_keys=['action'])
(2): TensorDictModule(
module=MLP(
(0): LazyLinear(in_features=0, out_features=4, bias=True)
(1): Tanh()
(2): Linear(in_features=4, out_features=5, bias=True)
(3): Tanh()
(4): Linear(in_features=5, out_features=1, bias=True)
),
device=cpu,
in_keys=['hidden', 'action'],
out_keys=['value'])
),
device=cpu,
in_keys=['observation'],
out_keys=['hidden', 'action', 'value'])
['observation'] ['hidden', 'action', 'value']
TensorDict(
fields={
action: Tensor(shape=torch.Size([3, 4]), device=cpu, dtype=torch.float32, is_shared=False),
hidden: Tensor(shape=torch.Size([3, 3]), device=cpu, dtype=torch.float32, is_shared=False),
observation: Tensor(shape=torch.Size([3, 5]), device=cpu, dtype=torch.float32, is_shared=False),
value: Tensor(shape=torch.Size([3, 1]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([3]),
device=None,
is_shared=False)
函数式编程(集成学习 / 元强化学习)¶
功能调用从未如此简单。使用from_module()提取参数,并
将它们替换为to_module():
from tensordict import from_module
params = from_module(sequence)
print("extracted params", params)
extracted params TensorDict(
fields={
module: TensorDict(
fields={
0: TensorDict(
fields={
module: TensorDict(
fields={
bias: Parameter(shape=torch.Size([3]), device=cpu, dtype=torch.float32, is_shared=False),
weight: Parameter(shape=torch.Size([3, 5]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False),
1: TensorDict(
fields={
module: TensorDict(
fields={
bias: Parameter(shape=torch.Size([4]), device=cpu, dtype=torch.float32, is_shared=False),
weight: Parameter(shape=torch.Size([4, 3]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False),
2: TensorDict(
fields={
module: TensorDict(
fields={
0: TensorDict(
fields={
bias: Parameter(shape=torch.Size([4]), device=cpu, dtype=torch.float32, is_shared=False),
weight: Parameter(shape=torch.Size([4, 7]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False),
2: TensorDict(
fields={
bias: Parameter(shape=torch.Size([5]), device=cpu, dtype=torch.float32, is_shared=False),
weight: Parameter(shape=torch.Size([5, 4]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False),
4: TensorDict(
fields={
bias: Parameter(shape=torch.Size([1]), device=cpu, dtype=torch.float32, is_shared=False),
weight: Parameter(shape=torch.Size([1, 5]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False)
使用 tensordict 的函数调用:
with params.to_module(sequence):
data = sequence(data)
VMAP¶
快速执行多个相似架构的副本是快速训练模型的关键。
vmap() 专为此目的而设计:
TensorDict(
fields={
action: Tensor(shape=torch.Size([4, 3, 4]), device=cpu, dtype=torch.float32, is_shared=False),
hidden: Tensor(shape=torch.Size([4, 3, 3]), device=cpu, dtype=torch.float32, is_shared=False),
observation: Tensor(shape=torch.Size([4, 3, 5]), device=cpu, dtype=torch.float32, is_shared=False),
value: Tensor(shape=torch.Size([4, 3, 1]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([4, 3]),
device=None,
is_shared=False)
专门的类¶
TorchRL 还提供了一些专门的模块,用于检查输出值。
torch.manual_seed(0)
from torchrl.data import Bounded
from torchrl.modules import SafeModule
spec = Bounded(-torch.ones(3), torch.ones(3))
base_module = nn.Linear(5, 3)
module = SafeModule(
module=base_module, spec=spec, in_keys=["obs"], out_keys=["action"], safe=True
)
data = TensorDict({"obs": torch.randn(5)}, batch_size=[])
module(data)["action"]
data = TensorDict({"obs": torch.randn(5) * 100}, batch_size=[])
module(data)["action"] # safe=True projects the result within the set
tensor([-1., 1., -1.], grad_fn=<AsStridedBackward0>)
The Actor 类有一个预定义的输出键 ("action"):
from torchrl.modules import Actor
base_module = nn.Linear(5, 3)
actor = Actor(base_module, in_keys=["obs"])
data = TensorDict({"obs": torch.randn(5)}, batch_size=[])
actor(data) # action is the default value
from tensordict.nn import (
ProbabilisticTensorDictModule,
ProbabilisticTensorDictSequential,
)
使用概率模型也因为 tensordict.nn API 而变得简单:
from torchrl.modules import NormalParamExtractor, TanhNormal
td = TensorDict({"input": torch.randn(3, 5)}, [3])
net = nn.Sequential(
nn.Linear(5, 4), NormalParamExtractor()
) # splits the output in loc and scale
module = TensorDictModule(net, in_keys=["input"], out_keys=["loc", "scale"])
td_module = ProbabilisticTensorDictSequential(
module,
ProbabilisticTensorDictModule(
in_keys=["loc", "scale"],
out_keys=["action"],
distribution_class=TanhNormal,
return_log_prob=False,
),
)
td_module(td)
print(td)
TensorDict(
fields={
action: Tensor(shape=torch.Size([3, 2]), device=cpu, dtype=torch.float32, is_shared=False),
input: Tensor(shape=torch.Size([3, 5]), device=cpu, dtype=torch.float32, is_shared=False),
loc: Tensor(shape=torch.Size([3, 2]), device=cpu, dtype=torch.float32, is_shared=False),
scale: Tensor(shape=torch.Size([3, 2]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([3]),
device=None,
is_shared=False)
# returning the log-probability
td = TensorDict({"input": torch.randn(3, 5)}, [3])
td_module = ProbabilisticTensorDictSequential(
module,
ProbabilisticTensorDictModule(
in_keys=["loc", "scale"],
out_keys=["action"],
distribution_class=TanhNormal,
return_log_prob=True,
),
)
td_module(td)
print(td)
TensorDict(
fields={
action: Tensor(shape=torch.Size([3, 2]), device=cpu, dtype=torch.float32, is_shared=False),
input: Tensor(shape=torch.Size([3, 5]), device=cpu, dtype=torch.float32, is_shared=False),
loc: Tensor(shape=torch.Size([3, 2]), device=cpu, dtype=torch.float32, is_shared=False),
sample_log_prob: Tensor(shape=torch.Size([3]), device=cpu, dtype=torch.float32, is_shared=False),
scale: Tensor(shape=torch.Size([3, 2]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([3]),
device=None,
is_shared=False)
控制随机性和采样策略是通过上下文管理器实现的,
set_exploration_type:
from torchrl.envs.utils import ExplorationType, set_exploration_type
td = TensorDict({"input": torch.randn(3, 5)}, [3])
torch.manual_seed(0)
with set_exploration_type(ExplorationType.RANDOM):
td_module(td)
print("random:", td["action"])
with set_exploration_type(ExplorationType.DETERMINISTIC):
td_module(td)
print("mode:", td["action"])
random: tensor([[ 0.8728, -0.1334],
[-0.9833, 0.3494],
[-0.6887, -0.6402]], grad_fn=<_SafeTanhNoEpsBackward>)
mode: tensor([[-0.1132, 0.1762],
[-0.3430, -0.2668],
[ 0.2918, 0.6239]], grad_fn=<_SafeTanhNoEpsBackward>)
使用环境和模块¶
让我们看看环境和模块如何结合使用:
from torchrl.envs.utils import step_mdp
env = GymEnv("Pendulum-v1")
action_spec = env.action_spec
actor_module = nn.Linear(3, 1)
actor = SafeModule(
actor_module, spec=action_spec, in_keys=["observation"], out_keys=["action"]
)
torch.manual_seed(0)
env.set_seed(0)
max_steps = 100
data = env.reset()
data_stack = TensorDict(batch_size=[max_steps])
for i in range(max_steps):
actor(data)
data_stack[i] = env.step(data)
if data["done"].any():
break
data = step_mdp(data) # roughly equivalent to obs = next_obs
tensordicts_prealloc = data_stack.clone()
print("total steps:", i)
print(data_stack)
total steps: 99
TensorDict(
fields={
action: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.float32, is_shared=False),
done: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.bool, is_shared=False),
next: TensorDict(
fields={
done: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.bool, is_shared=False),
observation: Tensor(shape=torch.Size([100, 3]), device=cpu, dtype=torch.float32, is_shared=False),
reward: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([100]),
device=None,
is_shared=False),
observation: Tensor(shape=torch.Size([100, 3]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([100]),
device=None,
is_shared=False)
# equivalent
torch.manual_seed(0)
env.set_seed(0)
max_steps = 100
data = env.reset()
data_stack = []
for _ in range(max_steps):
actor(data)
data_stack.append(env.step(data))
if data["done"].any():
break
data = step_mdp(data) # roughly equivalent to obs = next_obs
tensordicts_stack = torch.stack(data_stack, 0)
print("total steps:", i)
print(tensordicts_stack)
total steps: 99
TensorDict(
fields={
action: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.float32, is_shared=False),
done: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.bool, is_shared=False),
next: TensorDict(
fields={
done: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.bool, is_shared=False),
observation: Tensor(shape=torch.Size([100, 3]), device=cpu, dtype=torch.float32, is_shared=False),
reward: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([100]),
device=None,
is_shared=False),
observation: Tensor(shape=torch.Size([100, 3]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([100]),
device=None,
is_shared=False)
(tensordicts_stack == tensordicts_prealloc).all()
True
torch.manual_seed(0)
env.set_seed(0)
tensordict_rollout = env.rollout(policy=actor, max_steps=max_steps)
tensordict_rollout
(tensordict_rollout == tensordicts_prealloc).all()
from tensordict.nn import TensorDictModule
Collectors¶
我们还提供了一组数据采集器,可自动按需收集每批次所需的帧数。 它们支持从单节点、单工作进程到多节点、多工作进程的各种配置。
from torchrl.collectors import MultiaSyncDataCollector, MultiSyncDataCollector
from torchrl.envs import EnvCreator, SerialEnv
from torchrl.envs.libs.gym import GymEnv
EnvCreator 确保我们可以将一个 lambda 函数从一个进程发送到另一个进程
为了简化起见,我们使用 SerialEnv(单个工作者),但对于较大的任务,
ParallelEnv(多个工作者)会更适合。
注意
多进程环境和多进程收集器可以结合使用!
parallel_env = SerialEnv(
3,
EnvCreator(lambda: GymEnv("Pendulum-v1")),
)
create_env_fn = [parallel_env, parallel_env]
actor_module = nn.Linear(3, 1)
actor = TensorDictModule(actor_module, in_keys=["observation"], out_keys=["action"])
同步多进程数据收集器¶
devices = ["cpu", "cpu"]
collector = MultiSyncDataCollector(
create_env_fn=create_env_fn, # either a list of functions or a ParallelEnv
policy=actor,
total_frames=240,
max_frames_per_traj=-1, # envs are terminating, we don't need to stop them early
frames_per_batch=60, # we want 60 frames at a time (we have 3 envs per sub-collector)
device=devices,
)
for i, d in enumerate(collector):
if i == 0:
print(d) # trajectories are split automatically in [6 workers x 10 steps]
collector.update_policy_weights_() # make sure that our policies have the latest weights if working on multiple devices
print(i)
collector.shutdown()
del collector
TensorDict(
fields={
action: Tensor(shape=torch.Size([2, 3, 10, 1]), device=cpu, dtype=torch.float32, is_shared=False),
collector: TensorDict(
fields={
traj_ids: Tensor(shape=torch.Size([2, 3, 10]), device=cpu, dtype=torch.int64, is_shared=False)},
batch_size=torch.Size([2, 3, 10]),
device=cpu,
is_shared=False),
done: Tensor(shape=torch.Size([2, 3, 10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
next: TensorDict(
fields={
done: Tensor(shape=torch.Size([2, 3, 10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
observation: Tensor(shape=torch.Size([2, 3, 10, 3]), device=cpu, dtype=torch.float32, is_shared=False),
reward: Tensor(shape=torch.Size([2, 3, 10, 1]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([2, 3, 10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([2, 3, 10, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([2, 3, 10]),
device=cpu,
is_shared=False),
observation: Tensor(shape=torch.Size([2, 3, 10, 3]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([2, 3, 10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([2, 3, 10, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([2, 3, 10]),
device=cpu,
is_shared=False)
3
异步多进程数据收集器¶
此类允许您在模型训练过程中收集数据。这在非策略(off-policy)设置中尤为有用, 因为它将推理与模型训练解耦。数据按“先就绪、先服务”的原则交付(工作线程会将其结果排队):
collector = MultiaSyncDataCollector(
create_env_fn=create_env_fn, # either a list of functions or a ParallelEnv
policy=actor,
total_frames=240,
max_frames_per_traj=-1, # envs are terminating, we don't need to stop them early
frames_per_batch=60, # we want 60 frames at a time (we have 3 envs per sub-collector)
device=devices,
)
for i, d in enumerate(collector):
if i == 0:
print(d) # trajectories are split automatically in [6 workers x 10 steps]
collector.update_policy_weights_() # make sure that our policies have the latest weights if working on multiple devices
print(i)
collector.shutdown()
del collector
del create_env_fn
del parallel_env
TensorDict(
fields={
action: Tensor(shape=torch.Size([3, 20, 1]), device=cpu, dtype=torch.float32, is_shared=False),
collector: TensorDict(
fields={
traj_ids: Tensor(shape=torch.Size([3, 20]), device=cpu, dtype=torch.int64, is_shared=False)},
batch_size=torch.Size([3, 20]),
device=cpu,
is_shared=False),
done: Tensor(shape=torch.Size([3, 20, 1]), device=cpu, dtype=torch.bool, is_shared=False),
next: TensorDict(
fields={
done: Tensor(shape=torch.Size([3, 20, 1]), device=cpu, dtype=torch.bool, is_shared=False),
observation: Tensor(shape=torch.Size([3, 20, 3]), device=cpu, dtype=torch.float32, is_shared=False),
reward: Tensor(shape=torch.Size([3, 20, 1]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([3, 20, 1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([3, 20, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([3, 20]),
device=cpu,
is_shared=False),
observation: Tensor(shape=torch.Size([3, 20, 3]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([3, 20, 1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([3, 20, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([3, 20]),
device=cpu,
is_shared=False)
3
目标¶
目标 是编写新算法时的主要入口点。
from torchrl.objectives import DDPGLoss
actor_module = nn.Linear(3, 1)
actor = TensorDictModule(actor_module, in_keys=["observation"], out_keys=["action"])
class ConcatModule(nn.Linear):
def forward(self, obs, action):
return super().forward(torch.cat([obs, action], -1))
value_module = ConcatModule(4, 1)
value = TensorDictModule(
value_module, in_keys=["observation", "action"], out_keys=["state_action_value"]
)
loss_fn = DDPGLoss(actor, value)
loss_fn.make_value_estimator(loss_fn.default_value_estimator, gamma=0.99)
data = TensorDict(
{
"observation": torch.randn(10, 3),
"next": {
"observation": torch.randn(10, 3),
"reward": torch.randn(10, 1),
"done": torch.zeros(10, 1, dtype=torch.bool),
},
"action": torch.randn(10, 1),
},
batch_size=[10],
device="cpu",
)
loss_td = loss_fn(data)
print(loss_td)
print(data)
TensorDict(
fields={
loss_actor: Tensor(shape=torch.Size([]), device=cpu, dtype=torch.float32, is_shared=False),
loss_value: Tensor(shape=torch.Size([]), device=cpu, dtype=torch.float32, is_shared=False),
pred_value: Tensor(shape=torch.Size([10]), device=cpu, dtype=torch.float32, is_shared=False),
pred_value_max: Tensor(shape=torch.Size([]), device=cpu, dtype=torch.float32, is_shared=False),
target_value: Tensor(shape=torch.Size([10]), device=cpu, dtype=torch.float32, is_shared=False),
target_value_max: Tensor(shape=torch.Size([]), device=cpu, dtype=torch.float32, is_shared=False),
td_error: Tensor(shape=torch.Size([10]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False)
TensorDict(
fields={
action: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.float32, is_shared=False),
next: TensorDict(
fields={
done: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
observation: Tensor(shape=torch.Size([10, 3]), device=cpu, dtype=torch.float32, is_shared=False),
reward: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([10]),
device=cpu,
is_shared=False),
observation: Tensor(shape=torch.Size([10, 3]), device=cpu, dtype=torch.float32, is_shared=False),
td_error: Tensor(shape=torch.Size([10]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([10]),
device=cpu,
is_shared=False)
安装库¶
该库已在PyPI上:pip install torchrl 请参阅README以获取更多信息。
贡献¶
我们正在积极招募贡献者和早期用户。如果你正在从事强化学习(RL)相关工作(或只是对此感到好奇),欢迎试用!请向我们提供反馈:TorchRL 的成功与否,取决于它在多大程度上满足研究人员的需求。为此,我们需要您的宝贵意见! 由于该库尚处于初期阶段,现在正是您按自身需求塑造它的绝佳时机!
有关更多信息,请参阅贡献指南。
脚本总运行时间: (2 分钟 32.651 秒)
估计内存使用量: 322 MB