目录

使用 TorchRL 的竞争性多智能体强化学习 (DDPG) 教程

作者Matteo Bettini

另请参阅

BenchMARL 库提供最先进的 使用 TorchRL 的 MARL 算法的实现。

本教程演示如何使用 PyTorch 和 TorchRL 来 解决竞争性多智能体强化学习 (MARL) 问题。

为了便于使用,本教程将遵循已经可用的 Multi-Agent Reinforcement Learning (PPO) with TorchRL 教程的一般结构。

在本教程中,我们将使用 MADDPG 论文中的 simple_tag 环境。此环境是 Part 论文中介绍的名为 MultiAgentParticleEnvironments (MPE) 的集合。

目前有多个模拟器提供 MPE 环境。 在本教程中,我们将展示如何使用以下任一方式在 TorchRL 中训练此环境:

  • PettingZoo,在传统 CPU 版本的环境中;

  • VMAS 在 PyTorch 中提供矢量化实现, 能够在 GPU 上模拟多个环境以加快计算速度。

Simple tag

多代理simple_tag场景

主要学习内容:

  • 如何在 TorchRL 中使用有竞争力的多代理环境,它们的规格如何工作,以及它们如何与库集成;

  • 如何在 TorchRL 中将 Parallel PettingZoo 和 VMAS 环境与多个代理组一起使用;

  • 如何在 TorchRL 中创建不同的多智能体网络架构(例如,使用参数共享、集中式批评)

  • 我们如何用于承载多代理、多组数据;TensorDict

  • 我们如何将所有库组件(收集器、模块、重放缓冲区和损失)绑定到一个非策略的多代理 MADDPG/IDDPG 训练循环中。

如果您在 Google Colab 中运行此程序,请确保安装以下依赖项:

!pip3 install torchrl
!pip3 install vmas
!pip3 install pettingzoo[mpe]==1.24.3
!pip3 install tqdm

深度确定性策略梯度 (DDPG) 是一种非策略参与者-批评者算法 其中,使用 critic 网络的梯度优化确定性策略。 有关更多信息,请参阅深度确定性策略梯度论文。 这种算法是 typicall 训练的 off-policy 。有关非政策学习的更多信息,请参阅 Sutton、Richard S. 和 Andrew G. Barto。强化学习:简介。麻省理工学院出版社,2018 年。

非政策学习

非政策学习

这种方法已扩展到混合合作竞争环境的 Multi-Agent Actor-Critic 中的多代理学习, ,该算法引入了多代理 DDPG (MADDPG) 算法。 在多代理设置中,情况略有不同。我们现在有多个策略, 每个代理一个。政策通常是地方性的和分散的。这意味着 单个代理的策略将仅根据该代理的观察结果输出该代理的操作。 在 MARL 文献中,这被称为去中心化执行。 另一方面,批评者存在不同的表述,主要是:

  • MADDPG 中,批评者是集中的,并将全局状态和全局行动作为输入 的系统。全局状态可以是全局观察,也可以只是代理观察的串联。 全局操作是代理操作的串联。MADDPG 可用于执行集中训练的环境,因为它需要访问全球信息。

  • 在 IDDPG 中,批评者仅将一个代理的观察和行动作为输入。 这允许分散式培训,因为批评者和政策都只需要本地 信息来计算其输出。

集中式 Critic 有助于克服多个智能体同时学习的非平稳性,但是, 另一方面,它们可能会受到其较大的 input space 的影响。 在本教程中,我们将能够训练这两种公式,我们还将讨论如何 参数共享(在代理之间共享网络参数的做法)会影响每个 Sensor。

本教程的结构如下:

  1. 最初,我们将建立一组超参数以供使用。

  2. 随后,我们将利用 TorchRL 的 包装器。

  3. 之后,我们将制定政策和批评网络,讨论各种选择对 参数共享和 Critic 集中化。

  4. 之后,我们将创建 sampling collector 和 replay buffer。

  5. 最后,我们将执行我们的训练循环并检查结果。

如果您在 Colab 中或在带有 GUI 的机器上操作它,您还将有机会 在训练过程之前和之后呈现和可视化您自己的训练策略。

导入我们的依赖项:

import copy
import tempfile

import torch

from matplotlib import pyplot as plt
from tensordict import TensorDictBase

from tensordict.nn import TensorDictModule, TensorDictSequential
from torch import multiprocessing

from torchrl.collectors import SyncDataCollector
from torchrl.data import LazyMemmapStorage, RandomSampler, ReplayBuffer

from torchrl.envs import (
    check_env_specs,
    ExplorationType,
    PettingZooEnv,
    RewardSum,
    set_exploration_type,
    TransformedEnv,
    VmasEnv,
)

from torchrl.modules import (
    AdditiveGaussianModule,
    MultiAgentMLP,
    ProbabilisticActor,
    TanhDelta,
)

from torchrl.objectives import DDPGLoss, SoftUpdate, ValueEstimators

from torchrl.record import CSVLogger, PixelRenderTransform, VideoRecorder

from tqdm import tqdm

# Check if we're building the doc, in which case disable video rendering
try:
    is_sphinx = __sphinx_build__
except NameError:
    is_sphinx = False

定义超参数

我们为教程设置超参数。 取决于资源 可用,则可以选择在 GPU 或其他 GPU 上执行策略和模拟器 装置。 您可以调整其中一些值来调整计算要求。

# Seed
seed = 0
torch.manual_seed(seed)

# Devices
is_fork = multiprocessing.get_start_method() == "fork"
device = (
    torch.device(0)
    if torch.cuda.is_available() and not is_fork
    else torch.device("cpu")
)

# Sampling
frames_per_batch = 1_000  # Number of team frames collected per sampling iteration
n_iters = 10  # Number of sampling and training iterations
total_frames = frames_per_batch * n_iters

# We will stop training the evaders after this many iterations,
# should be 0 <= iteration_when_stop_training_evaders <= n_iters
iteration_when_stop_training_evaders = n_iters // 2

# Replay buffer
memory_size = 1_000_000  # The replay buffer of each group can store this many frames

# Training
n_optimiser_steps = 100  # Number of optimisation steps per training iteration
train_batch_size = 128  # Number of frames trained in each optimiser step
lr = 3e-4  # Learning rate
max_grad_norm = 1.0  # Maximum norm for the gradients

# DDPG
gamma = 0.99  # Discount factor
polyak_tau = 0.005  # Tau for the soft-update of the target network

环境

多代理环境模拟多个代理与世界交互。 TorchRL API 允许集成各种类型的多代理环境风格。 在本教程中,我们将重点介绍多个代理组并行交互的环境。 也就是说:在每一步,所有代理都将获得观察并同步采取行动。

此外,TorchRL MARL API 允许将代理分成几组。每个组都将是 tensordict 的组内代理的数据堆叠在一起。因此,通过选择如何对代理进行分组, 您可以决定将哪些数据堆叠/保存为单独的条目。 可以在 VMAS 和 PettingZoo 等环境中构建时指定分组策略。 有关分组的更多信息,请参阅。MarlGroupMapType

simple_tag 环境中 有两组代理:追逐者(或“对手”)(红色圆圈)和逃避者(或“代理人”)(绿色圆圈)。 追逐者会因触碰逃避者 (+10) 而获得奖励。 接触后,追逐者团队将获得集体奖励,并且 触及的回避者受到相同的值 (-10) 的惩罚。 逃避者比追逐者具有更高的速度和加速度。 环境中也有障碍物(黑色圆圈)。 代理和障碍物根据均匀的随机分布生成。 代理在具有阻力和弹性碰撞的 2D 连续世界中起作用。 它们的作用是 2D 连续力,决定了它们的加速度。 每个代理都观察其位置, 速度、与所有其他代理和障碍物的相对位置以及逃避者的速度。

PettingZoo 和 VMAS 版本的奖励功能略有不同,因为 PettingZoo 会惩罚逃避者 出界,而 VMAS 则对它进行物理阻碍。这就是为什么您会观察到在 VMAS 中 两支队伍是相同的,只是标志相反,而在 PettingZoo 中,逃避者的奖励会更低。

现在,我们将实例化环境。 在本教程中,我们将剧集限制为 ,之后设置 terminated 标志。这是 PettingZoo 和 VMAS 模拟器中已经提供了功能,但也可以使用 TorchRL 转换。max_steps

max_steps = 100  # Environment steps before done

n_chasers = 2
n_evaders = 1
n_obstacles = 2

use_vmas = True  # Set this to True for a great performance speedup

if not use_vmas:
    base_env = PettingZooEnv(
        task="simple_tag_v3",
        parallel=True,  # Use the Parallel version
        seed=seed,
        # Scenario specific
        continuous_actions=True,
        num_good=n_evaders,
        num_adversaries=n_chasers,
        num_obstacles=n_obstacles,
        max_cycles=max_steps,
    )
else:
    num_vmas_envs = (
        frames_per_batch // max_steps
    )  # Number of vectorized environments. frames_per_batch collection will be divided among these environments
    base_env = VmasEnv(
        scenario="simple_tag",
        num_envs=num_vmas_envs,
        continuous_actions=True,
        max_steps=max_steps,
        device=device,
        seed=seed,
        # Scenario specific
        num_good_agents=n_evaders,
        num_adversaries=n_chasers,
        num_landmarks=n_obstacles,
    )

集团地图

PettingZoo 和 VMAS 环境使用 TorchRL MARL 分组 API。 我们可以访问组映射,将每个组映射到其中的代理,如下所示:

print(f"group_map: {base_env.group_map}")
group_map: {'adversary': ['adversary_0', 'adversary_1'], 'agent': ['agent_0']}

正如我们所看到的,它包含 2 组:“代理人”(逃避者)和“对手”(追逐者)。

环境不仅由其模拟器和转换定义,而且还 通过一系列元数据来描述在其 执行。 为了提高效率,TorchRL 在以下方面非常严格 环境规范,但您可以轻松检查您的环境规范是否 足够。 在我们的示例中,模拟器包装器负责为您的 base_env设置适当的规范,因此 你不应该关心这个。

有四个规格可供查看:

  • action_spec定义动作空间;

  • reward_spec定义奖励域;

  • done_spec定义 done 域;

  • observation_spec它定义了环境步骤的所有其他输出的域;

print("action_spec:", base_env.full_action_spec)
print("reward_spec:", base_env.full_reward_spec)
print("done_spec:", base_env.full_done_spec)
print("observation_spec:", base_env.observation_spec)
action_spec: CompositeSpec(
    adversary: CompositeSpec(
        action: BoundedTensorSpec(
            shape=torch.Size([10, 2, 2]),
            space=ContinuousBox(
                low=Tensor(shape=torch.Size([10, 2, 2]), device=cpu, dtype=torch.float32, contiguous=True),
                high=Tensor(shape=torch.Size([10, 2, 2]), device=cpu, dtype=torch.float32, contiguous=True)),
            device=cpu,
            dtype=torch.float32,
            domain=continuous),
        device=cpu,
        shape=torch.Size([10, 2])),
    agent: CompositeSpec(
        action: BoundedTensorSpec(
            shape=torch.Size([10, 1, 2]),
            space=ContinuousBox(
                low=Tensor(shape=torch.Size([10, 1, 2]), device=cpu, dtype=torch.float32, contiguous=True),
                high=Tensor(shape=torch.Size([10, 1, 2]), device=cpu, dtype=torch.float32, contiguous=True)),
            device=cpu,
            dtype=torch.float32,
            domain=continuous),
        device=cpu,
        shape=torch.Size([10, 1])),
    device=cpu,
    shape=torch.Size([10]))
reward_spec: CompositeSpec(
    adversary: CompositeSpec(
        reward: UnboundedContinuousTensorSpec(
            shape=torch.Size([10, 2, 1]),
            space=None,
            device=cpu,
            dtype=torch.float32,
            domain=continuous),
        device=cpu,
        shape=torch.Size([10, 2])),
    agent: CompositeSpec(
        reward: UnboundedContinuousTensorSpec(
            shape=torch.Size([10, 1, 1]),
            space=None,
            device=cpu,
            dtype=torch.float32,
            domain=continuous),
        device=cpu,
        shape=torch.Size([10, 1])),
    device=cpu,
    shape=torch.Size([10]))
done_spec: CompositeSpec(
    done: DiscreteTensorSpec(
        shape=torch.Size([10, 1]),
        space=DiscreteBox(n=2),
        device=cpu,
        dtype=torch.bool,
        domain=discrete),
    terminated: DiscreteTensorSpec(
        shape=torch.Size([10, 1]),
        space=DiscreteBox(n=2),
        device=cpu,
        dtype=torch.bool,
        domain=discrete),
    device=cpu,
    shape=torch.Size([10]))
observation_spec: CompositeSpec(
    adversary: CompositeSpec(
        observation: UnboundedContinuousTensorSpec(
            shape=torch.Size([10, 2, 14]),
            space=None,
            device=cpu,
            dtype=torch.float32,
            domain=continuous),
        device=cpu,
        shape=torch.Size([10, 2])),
    agent: CompositeSpec(
        observation: UnboundedContinuousTensorSpec(
            shape=torch.Size([10, 1, 12]),
            space=None,
            device=cpu,
            dtype=torch.float32,
            domain=continuous),
        device=cpu,
        shape=torch.Size([10, 1])),
    device=cpu,
    shape=torch.Size([10]))

使用刚才显示的命令,我们可以访问每个值的域。

我们可以看到,所有 spec 都被结构化为一个字典,其中根始终包含组名称。 所有进出环境的 tensordict 数据都将遵循此结构。 此外,每组的规格都有前导形状(1 个用于代理,2 个用于对手), 这意味着该组的张量数据将始终具有前导形状(组内的代理将数据堆叠在一起)。(n_agents_in_that_group)

查看 ,我们可以看到有一些 key 位于 agent 组之外 (),它们没有前导多代理维度。 这些键由所有代理共享,并表示用于重置的环境全局完成状态。 默认情况下,就像在这种情况下一样,并行 PettingZoo 环境会在任何代理完成时完成,但这种行为 可以通过在 PettingZoo 环境构建中设置来覆盖。done_spec"done", "terminated", "truncated"done_on_any

要在 tensordict 中快速访问每个值的键,我们只需向环境请求 各自的键,以及 我们将立即了解哪些是按代理分配的,哪些是共享的。 此信息将有助于告诉所有其他 TorchRL 组件在何处找到每个值

print("action_keys:", base_env.action_keys)
print("reward_keys:", base_env.reward_keys)
print("done_keys:", base_env.done_keys)
action_keys: [('adversary', 'action'), ('agent', 'action')]
reward_keys: [('adversary', 'reward'), ('agent', 'reward')]
done_keys: ['done', 'terminated']

变换

我们可以将所需的任何 TorchRL 转换附加到我们的环境中。 这些将以某种所需的方式修改其 input/output。 我们强调,在多代理上下文中,明确提供要修改的键至关重要。

例如,在本例中,我们将实例化一个转换,该转换将对整个剧集的奖励求和。 我们将告诉这个 transform 在哪里可以找到每个奖励键的重置键。 本质上,我们只是说 设置 tensordict 键时,应重置每个组的 episode reward,即调用该 key。 转换后的环境将继承 包装环境的设备和元数据,并根据序列转换它们 of transforms。RewardSum"_reset"env.reset()

env = TransformedEnv(
    base_env,
    RewardSum(
        in_keys=base_env.reward_keys,
        reset_keys=["_reset"] * len(base_env.group_map.keys()),
    ),
)

该函数运行一个小的 rollout,并将其输出与环境进行比较 规格。如果没有引发错误,我们可以确信 spec 已正确定义:check_env_specs()

check_env_specs(env)

推出

为了好玩,让我们看看简单的随机推出是什么样的。您可以 调用 env.rollout(n_steps) 并获取环境输入内容的概览 和输出如下所示。操作将自动从操作规范中随机抽取 域。

n_rollout_steps = 5
rollout = env.rollout(n_rollout_steps)
print(f"rollout of {n_rollout_steps} steps:", rollout)
print("Shape of the rollout TensorDict:", rollout.batch_size)
rollout of 5 steps: TensorDict(
    fields={
        adversary: TensorDict(
            fields={
                action: Tensor(shape=torch.Size([10, 5, 2, 2]), device=cpu, dtype=torch.float32, is_shared=False),
                episode_reward: Tensor(shape=torch.Size([10, 5, 2, 1]), device=cpu, dtype=torch.float32, is_shared=False),
                observation: Tensor(shape=torch.Size([10, 5, 2, 14]), device=cpu, dtype=torch.float32, is_shared=False)},
            batch_size=torch.Size([10, 5, 2]),
            device=cpu,
            is_shared=False),
        agent: TensorDict(
            fields={
                action: Tensor(shape=torch.Size([10, 5, 1, 2]), device=cpu, dtype=torch.float32, is_shared=False),
                episode_reward: Tensor(shape=torch.Size([10, 5, 1, 1]), device=cpu, dtype=torch.float32, is_shared=False),
                observation: Tensor(shape=torch.Size([10, 5, 1, 12]), device=cpu, dtype=torch.float32, is_shared=False)},
            batch_size=torch.Size([10, 5, 1]),
            device=cpu,
            is_shared=False),
        done: Tensor(shape=torch.Size([10, 5, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        next: TensorDict(
            fields={
                adversary: TensorDict(
                    fields={
                        episode_reward: Tensor(shape=torch.Size([10, 5, 2, 1]), device=cpu, dtype=torch.float32, is_shared=False),
                        observation: Tensor(shape=torch.Size([10, 5, 2, 14]), device=cpu, dtype=torch.float32, is_shared=False),
                        reward: Tensor(shape=torch.Size([10, 5, 2, 1]), device=cpu, dtype=torch.float32, is_shared=False)},
                    batch_size=torch.Size([10, 5, 2]),
                    device=cpu,
                    is_shared=False),
                agent: TensorDict(
                    fields={
                        episode_reward: Tensor(shape=torch.Size([10, 5, 1, 1]), device=cpu, dtype=torch.float32, is_shared=False),
                        observation: Tensor(shape=torch.Size([10, 5, 1, 12]), device=cpu, dtype=torch.float32, is_shared=False),
                        reward: Tensor(shape=torch.Size([10, 5, 1, 1]), device=cpu, dtype=torch.float32, is_shared=False)},
                    batch_size=torch.Size([10, 5, 1]),
                    device=cpu,
                    is_shared=False),
                done: Tensor(shape=torch.Size([10, 5, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                terminated: Tensor(shape=torch.Size([10, 5, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
            batch_size=torch.Size([10, 5]),
            device=cpu,
            is_shared=False),
        terminated: Tensor(shape=torch.Size([10, 5, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
    batch_size=torch.Size([10, 5]),
    device=cpu,
    is_shared=False)
Shape of the rollout TensorDict: torch.Size([10, 5])

我们可以看到,我们的推出已经有 . 这意味着它中的所有张量都将具有此前导维度。batch_size(n_rollout_steps)

更深入地看,我们可以看到输出的 tensordict 可以按以下方式划分:

  • 在根(可通过运行 )中,我们将找到所有可用的键 after a reset 在第一个时间步调用。我们可以通过索引来了解它们在推出步骤中的演变 维度。在这些键中,我们会找到每个代理不同的键 在 Tensordict 中,它将具有 Batch Size 表示它正在存储额外的代理维度。组 tensordict 之外的那些 将是共享的。rollout.exclude("next")n_rollout_stepsrollout[group_name](n_rollout_steps, n_agents_in_group)

  • 在下一个(可通过运行 访问)。我们将找到与根相同的结构,但下面突出显示了一些细微的差异。rollout.get("next")

在 TorchRL 中,约定是 done 和 observations 将同时出现在 root 和 next 中(因为它们是 在 Reset 时和 step) 后可用。操作仅在 root 中可用(因为没有操作 由步骤产生)和奖励将仅在 Next 中可用(因为在重置时没有奖励)。 这种结构遵循 强化学习:简介(Sutton 和 Barto)中的结构,其中 root 表示时间的数据, next 表示世界步长时的数据。

渲染随机卷展栏

如果您在 Google Colab 上,或者在具有 OpenGL 和 GUI 的计算机上,您实际上可以渲染随机转出。 这将使您了解随机策略在此任务中将实现什么,以便进行比较 有了这个政策,您将训练自己!

要渲染卷展栏,请按照本教程末尾的“渲染”部分中的说明进行操作 ,然后删除 .policy=agents_exploration_policyenv.rollout()

政策

DDPG 使用确定性策略。这意味着我们的 neural network 将输出要执行的操作。 由于操作是连续的,因此我们使用 Tanh-Delta 分布来遵循 操作空间边界。这个类唯一做的是应用 Tanh 变换来确保操作 在域边界内。

我们需要做出的另一个重要决定是,我们是否希望团队中的代理共享策略参数。 一方面,共享参数意味着它们都将共享相同的策略,这将使它们能够从 彼此的经历。这也将导致更快的训练。 另一方面,这将使它们在行为上同质化,因为它们实际上将共享相同的模型。 在这个例子中,我们将启用共享,因为我们不介意同质性,并且可以从计算 速度,但重要的是要始终在您自己的问题中考虑这个决定!

我们分三个步骤设计策略。

第一:定义神经网络n_obs_per_agent -> n_actions_per_agents

为此,我们使用 ,一个专门用于 多个代理,提供大量自定义功能。MultiAgentMLP

我们将为每个组定义不同的策略,并将它们存储在字典中。

policy_modules = {}
for group, agents in env.group_map.items():
    share_parameters_policy = True  # Can change this based on the group

    policy_net = MultiAgentMLP(
        n_agent_inputs=env.observation_spec[group, "observation"].shape[
            -1
        ],  # n_obs_per_agent
        n_agent_outputs=env.full_action_spec[group, "action"].shape[
            -1
        ],  # n_actions_per_agents
        n_agents=len(agents),  # Number of agents in the group
        centralised=False,  # the policies are decentralised (i.e., each agent will act from its local observation)
        share_params=share_parameters_policy,
        device=device,
        depth=2,
        num_cells=256,
        activation_class=torch.nn.Tanh,
    )

    # Wrap the neural network in a :class:`~tensordict.nn.TensorDictModule`.
    # This is simply a module that will read the ``in_keys`` from a tensordict, feed them to the
    # neural networks, and write the
    # outputs in-place at the ``out_keys``.

    policy_module = TensorDictModule(
        policy_net,
        in_keys=[(group, "observation")],
        out_keys=[(group, "param")],
    )  # We just name the input and output that the network will read and write to the input tensordict
    policy_modules[group] = policy_module

第二:将 包装在TensorDictModuleProbabilisticActor

我们现在需要构建 TanhDelta 发行版。 我们指示类构建一个 out of the policy action 参数。我们还提供了此 发行版,我们从环境规范中收集。ProbabilisticActor

的名称(因此是 from 的名称 above)必须以 distribution constructor 关键字参数 (param) 结尾。in_keysout_keysTensorDictModule

policies = {}
for group, _agents in env.group_map.items():
    policy = ProbabilisticActor(
        module=policy_modules[group],
        spec=env.full_action_spec[group, "action"],
        in_keys=[(group, "param")],
        out_keys=[(group, "action")],
        distribution_class=TanhDelta,
        distribution_kwargs={
            "low": env.full_action_spec[group, "action"].space.low,
            "high": env.full_action_spec[group, "action"].space.high,
        },
        return_log_prob=False,
    )
    policies[group] = policy

第三:探索

由于 DDPG 策略是确定性的,因此我们需要一种方法在收集期间执行探索。

为此,我们需要在将策略传递给收集器之前将探索层附加到策略中。 在本例中,我们使用 ,它将高斯噪声添加到我们的动作中 (如果噪声使 action 超出范围,则将其固定)。AdditiveGaussianModule

此探索包装器使用一个参数,该参数乘以噪声来确定其大小。 Sigma 可以在整个训练过程中进行退火以减少探查。 Sigma 将从 转到 in 。sigmasigma_initsigma_endannealing_num_steps

exploration_policies = {}
for group, _agents in env.group_map.items():
    exploration_policy = TensorDictSequential(
        policies[group],
        AdditiveGaussianModule(
            spec=policies[group].spec,
            annealing_num_steps=total_frames
            // 2,  # Number of frames after which sigma is sigma_end
            action_key=(group, "action"),
            sigma_init=0.9,  # Initial value of the sigma
            sigma_end=0.1,  # Final value of the sigma
        ),
    )
    exploration_policies[group] = exploration_policy

评论家网络

批评者网络是 DDPG 算法的关键组成部分,尽管它 在采样时不使用。这个模块将阅读观察和采取的行动,以及 返回相应的值 estimates。

和以前一样,应该仔细考虑在 agent group 内共享 critic 参数的决定。 通常,参数共享将提供更快的训练收敛,但有一些重要的 需要考虑的事项:

  • 当代理具有不同的奖励功能时,不建议分享,因为批评者需要学习 为同一状态分配不同的值(例如,在混合合作社竞争环境中)。 在这种情况下,由于两个组已经在使用单独的网络,因此共享决定仅适用 对于组内的代理,我们已经知道它们具有相同的 reward 函数。

  • 在分散式训练环境中,如果没有额外的基础设施,就无法执行共享 同步参数。

在所有其他情况下,奖励函数(与奖励区分开来)对所有代理都相同 在一个组中(如当前方案中), 共享可以提高性能。这可能是以代理策略的同质性为代价的。 通常,了解哪个选项更可取的最佳方法是快速试验这两个选项。

这也是我们必须在 MADDPG 和 IDDPG 之间进行选择的地方:

  • 使用 MADDPG,我们将获得具有完全可观察性的 central critic (即,它将所有串联的全局代理观察和操作作为输入)。 我们可以这样做,因为我们在模拟器中 并且培训是集中的。

  • 有了 IDDPG,我们将有一个本地的去中心化批评者,就像政策一样。

无论如何,critic 输出将具有 shape 。 如果批评者是集中和共享的, 沿维度的所有值都将相同。(..., n_agents_in_group, 1)n_agents_in_group

与该策略一样,我们为每个组创建一个评论家网络,并将它们存储在字典中。

critics = {}
for group, agents in env.group_map.items():
    share_parameters_critic = True  # Can change for each group
    MADDPG = True  # IDDPG if False, can change for each group

    # This module applies the lambda function: reading the action and observation entries for the group
    # and concatenating them in a new ``(group, "obs_action")`` entry
    cat_module = TensorDictModule(
        lambda obs, action: torch.cat([obs, action], dim=-1),
        in_keys=[(group, "observation"), (group, "action")],
        out_keys=[(group, "obs_action")],
    )

    critic_module = TensorDictModule(
        module=MultiAgentMLP(
            n_agent_inputs=env.observation_spec[group, "observation"].shape[-1]
            + env.full_action_spec[group, "action"].shape[-1],
            n_agent_outputs=1,  # 1 value per agent
            n_agents=len(agents),
            centralised=MADDPG,
            share_params=share_parameters_critic,
            device=device,
            depth=2,
            num_cells=256,
            activation_class=torch.nn.Tanh,
        ),
        in_keys=[(group, "obs_action")],  # Read ``(group, "obs_action")``
        out_keys=[
            (group, "state_action_value")
        ],  # Write ``(group, "state_action_value")``
    )

    critics[group] = TensorDictSequential(
        cat_module, critic_module
    )  # Run them in sequence

让我们试试我们的 policy 和 critic 模块。如前所述,使用 of 可以直接读取输出 运行这些模块,因为它们知道要读取哪些信息 以及在哪里写它。TensorDictModule

我们可以看到,在每个组的网络运行后,它们的输出键被添加到 组条目。

从这时起,多代理特定的组件已经实例化了,我们将简单地使用相同的组件 组件,就像在单代理学习中一样。这不是很棒吗?

reset_td = env.reset()
for group, _agents in env.group_map.items():
    print(
        f"Running value and policy for group '{group}':",
        critics[group](policies[group](reset_td)),
    )
Running value and policy for group 'adversary': TensorDict(
    fields={
        adversary: TensorDict(
            fields={
                action: Tensor(shape=torch.Size([10, 2, 2]), device=cpu, dtype=torch.float32, is_shared=False),
                episode_reward: Tensor(shape=torch.Size([10, 2, 1]), device=cpu, dtype=torch.float32, is_shared=False),
                obs_action: Tensor(shape=torch.Size([10, 2, 16]), device=cpu, dtype=torch.float32, is_shared=False),
                observation: Tensor(shape=torch.Size([10, 2, 14]), device=cpu, dtype=torch.float32, is_shared=False),
                param: Tensor(shape=torch.Size([10, 2, 2]), device=cpu, dtype=torch.float32, is_shared=False),
                state_action_value: Tensor(shape=torch.Size([10, 2, 1]), device=cpu, dtype=torch.float32, is_shared=False)},
            batch_size=torch.Size([10, 2]),
            device=cpu,
            is_shared=False),
        agent: TensorDict(
            fields={
                episode_reward: Tensor(shape=torch.Size([10, 1, 1]), device=cpu, dtype=torch.float32, is_shared=False),
                observation: Tensor(shape=torch.Size([10, 1, 12]), device=cpu, dtype=torch.float32, is_shared=False)},
            batch_size=torch.Size([10, 1]),
            device=cpu,
            is_shared=False),
        done: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        terminated: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
    batch_size=torch.Size([10]),
    device=cpu,
    is_shared=False)
Running value and policy for group 'agent': TensorDict(
    fields={
        adversary: TensorDict(
            fields={
                action: Tensor(shape=torch.Size([10, 2, 2]), device=cpu, dtype=torch.float32, is_shared=False),
                episode_reward: Tensor(shape=torch.Size([10, 2, 1]), device=cpu, dtype=torch.float32, is_shared=False),
                obs_action: Tensor(shape=torch.Size([10, 2, 16]), device=cpu, dtype=torch.float32, is_shared=False),
                observation: Tensor(shape=torch.Size([10, 2, 14]), device=cpu, dtype=torch.float32, is_shared=False),
                param: Tensor(shape=torch.Size([10, 2, 2]), device=cpu, dtype=torch.float32, is_shared=False),
                state_action_value: Tensor(shape=torch.Size([10, 2, 1]), device=cpu, dtype=torch.float32, is_shared=False)},
            batch_size=torch.Size([10, 2]),
            device=cpu,
            is_shared=False),
        agent: TensorDict(
            fields={
                action: Tensor(shape=torch.Size([10, 1, 2]), device=cpu, dtype=torch.float32, is_shared=False),
                episode_reward: Tensor(shape=torch.Size([10, 1, 1]), device=cpu, dtype=torch.float32, is_shared=False),
                obs_action: Tensor(shape=torch.Size([10, 1, 14]), device=cpu, dtype=torch.float32, is_shared=False),
                observation: Tensor(shape=torch.Size([10, 1, 12]), device=cpu, dtype=torch.float32, is_shared=False),
                param: Tensor(shape=torch.Size([10, 1, 2]), device=cpu, dtype=torch.float32, is_shared=False),
                state_action_value: Tensor(shape=torch.Size([10, 1, 1]), device=cpu, dtype=torch.float32, is_shared=False)},
            batch_size=torch.Size([10, 1]),
            device=cpu,
            is_shared=False),
        done: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        terminated: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
    batch_size=torch.Size([10]),
    device=cpu,
    is_shared=False)

数据收集器

TorchRL 提供了一组数据收集器类。简而言之,这些 类执行三个操作:重置环境、计算操作 使用策略和最新观察结果,在环境中执行一个步骤,然后重复 最后两个步骤,直到环境发出停止信号(或达到 Done) state) 的 S S

我们将使用最简单的数据收集器,其输出与环境卷展栏相同。 唯一的区别是它会自动重置 DONE 状态,直到收集到所需的帧。

我们需要为它提供我们的勘探政策。此外,要像运行一个组一样运行所有组中的策略, 我们把它们放在一个序列中。它们不会相互干扰,因为每个组都在不同的地方写入和读取键。

# Put exploration policies from each group in a sequence
agents_exploration_policy = TensorDictSequential(*exploration_policies.values())

collector = SyncDataCollector(
    env,
    agents_exploration_policy,
    device=device,
    frames_per_batch=frames_per_batch,
    total_frames=total_frames,
)

重放缓冲区

重放缓冲区是非策略 RL 算法的常见构建部分。 缓冲区有很多种,在本教程中,我们使用一个基本的缓冲区来存储和采样 tensordict 数据。

replay_buffers = {}
for group, _agents in env.group_map.items():
    replay_buffer = ReplayBuffer(
        storage=LazyMemmapStorage(
            memory_size, device=device
        ),  # We will store up to memory_size multi-agent transitions
        sampler=RandomSampler(),
        batch_size=train_batch_size,  # We will sample batches of this size
    )
    replay_buffers[group] = replay_buffer

损失函数

为了方便使用类,可以直接从 TorchRL 导入 DDPG 损失。这是使用 DDPG 的最简单方法: 它隐藏了 DDPG 的数学运算和控制流 随它去。

还可以为每个组设置不同的策略。

losses = {}
for group, _agents in env.group_map.items():
    loss_module = DDPGLoss(
        actor_network=policies[group],  # Use the non-explorative policies
        value_network=critics[group],
        delay_value=True,  # Whether to use a target network for the value
        loss_function="l2",
    )
    loss_module.set_keys(
        state_action_value=(group, "state_action_value"),
        reward=(group, "reward"),
        done=(group, "done"),
        terminated=(group, "terminated"),
    )
    loss_module.make_value_estimator(ValueEstimators.TD0, gamma=gamma)

    losses[group] = loss_module

target_updaters = {
    group: SoftUpdate(loss, tau=polyak_tau) for group, loss in losses.items()
}

optimisers = {
    group: {
        "loss_actor": torch.optim.Adam(
            loss.actor_network_params.flatten_keys().values(), lr=lr
        ),
        "loss_value": torch.optim.Adam(
            loss.value_network_params.flatten_keys().values(), lr=lr
        ),
    }
    for group, loss in losses.items()
}

Training utils (培训实用程序)

我们确实必须定义两个 helper 函数,我们将在训练循环中使用它们。 它们非常简单,不包含任何重要的逻辑。

def process_batch(batch: TensorDictBase) -> TensorDictBase:
    """
    If the `(group, "terminated")` and `(group, "done")` keys are not present, create them by expanding
    `"terminated"` and `"done"`.
    This is needed to present them with the same shape as the reward to the loss.
    """
    for group in env.group_map.keys():
        keys = list(batch.keys(True, True))
        group_shape = batch.get_item_shape(group)
        nested_done_key = ("next", group, "done")
        nested_terminated_key = ("next", group, "terminated")
        if nested_done_key not in keys:
            batch.set(
                nested_done_key,
                batch.get(("next", "done")).unsqueeze(-1).expand((*group_shape, 1)),
            )
        if nested_terminated_key not in keys:
            batch.set(
                nested_terminated_key,
                batch.get(("next", "terminated"))
                .unsqueeze(-1)
                .expand((*group_shape, 1)),
            )
    return batch

训练循环

现在,我们已具备编写训练循环所需的所有部分。 这些步骤包括:

  • 收集所有组的数据
    • 循环组
      • 将组数据存储在组缓冲区中

      • 循环 epochs
        • 来自组缓冲液的样品

        • 计算采样数据的损失

        • 反向传播损失

        • 优化

      • 重复

    • 重复

  • 重复

pbar = tqdm(
    total=n_iters,
    desc=", ".join(
        [f"episode_reward_mean_{group} = 0" for group in env.group_map.keys()]
    ),
)
episode_reward_mean_map = {group: [] for group in env.group_map.keys()}
train_group_map = copy.deepcopy(env.group_map)

# Training/collection iterations
for iteration, batch in enumerate(collector):
    current_frames = batch.numel()
    batch = process_batch(batch)  # Util to expand done keys if needed
    # Loop over groups
    for group in train_group_map.keys():
        group_batch = batch.exclude(
            *[
                key
                for _group in env.group_map.keys()
                if _group != group
                for key in [_group, ("next", _group)]
            ]
        )  # Exclude data from other groups
        group_batch = group_batch.reshape(
            -1
        )  # This just affects the leading dimensions in batch_size of the tensordict
        replay_buffers[group].extend(group_batch)

        for _ in range(n_optimiser_steps):
            subdata = replay_buffers[group].sample()
            loss_vals = losses[group](subdata)

            for loss_name in ["loss_actor", "loss_value"]:
                loss = loss_vals[loss_name]
                optimiser = optimisers[group][loss_name]

                loss.backward()

                # Optional
                params = optimiser.param_groups[0]["params"]
                torch.nn.utils.clip_grad_norm_(params, max_grad_norm)

                optimiser.step()
                optimiser.zero_grad()

            # Soft-update the target network
            target_updaters[group].step()

        # Exploration sigma anneal update
        exploration_policies[group].step(current_frames)

    # Stop training a certain group when a condition is met (e.g., number of training iterations)
    if iteration == iteration_when_stop_training_evaders:
        del train_group_map["agent"]

    # Logging
    for group in env.group_map.keys():
        episode_reward_mean = (
            batch.get(("next", group, "episode_reward"))[
                batch.get(("next", group, "done"))
            ]
            .mean()
            .item()
        )
        episode_reward_mean_map[group].append(episode_reward_mean)

    pbar.set_description(
        ", ".join(
            [
                f"episode_reward_mean_{group} = {episode_reward_mean_map[group][-1]}"
                for group in env.group_map.keys()
            ]
        ),
        refresh=False,
    )
    pbar.update()
Traceback (most recent call last):
  File "/pytorch/rl/docs/source/reference/generated/tutorials/multiagent_competitive_ddpg.py", line 820, in <module>
    exploration_policies[group].step(current_frames)
  File "/pytorch/rl/env/lib/python3.8/site-packages/tensordict/nn/common.py", line 1302, in __getattr__
    return getattr(super().__getattr__("module"), name)
  File "/pytorch/rl/env/lib/python3.8/site-packages/torch/nn/modules/module.py", line 1914, in __getattr__
    raise AttributeError(
AttributeError: 'ModuleList' object has no attribute 'step'

结果

我们可以绘制每集获得的平均奖励。

要使训练持续时间更长,请增加 hyperparameter 。n_iters

在本地运行此脚本时,您可能需要将打开的窗口关闭到 继续执行屏幕的其余部分。

fig, axs = plt.subplots(2, 1)
for i, group in enumerate(env.group_map.keys()):
    axs[i].plot(episode_reward_mean_map[group], label=f"Episode reward mean {group}")
    axs[i].set_ylabel("Reward")
    axs[i].axvline(
        x=iteration_when_stop_training_evaders,
        label="Agent (evader) stop training",
        color="orange",
    )
    axs[i].legend()
axs[-1].set_xlabel("Training iterations")
plt.show()

呈现

渲染指令适用于 VMAS,也就是使用 .use_vmas=True

TorchRL 提供了一些实用程序来录制和保存渲染的视频。您可以在此处了解有关这些工具的更多信息。

在下面的代码块中,我们附加了一个转换,它将从 VMAS 中调用该方法 打包的环境,并将帧堆栈保存到 mp4 文件中,该文件的位置由自定义 logger video_logger。请注意,此代码可能需要一些外部依赖项,例如 torchvision。render()

if use_vmas and not is_sphinx:
    # Replace tmpdir with any desired path where the video should be saved
    with tempfile.TemporaryDirectory() as tmpdir:
        video_logger = CSVLogger("vmas_logs", tmpdir, video_format="mp4")
        print("Creating rendering env")
        env_with_render = TransformedEnv(env.base_env, env.transform.clone())
        env_with_render = env_with_render.append_transform(
            PixelRenderTransform(
                out_keys=["pixels"],
                # the np.ndarray has a negative stride and needs to be copied before being cast to a tensor
                preproc=lambda x: x.copy(),
                as_non_tensor=True,
                # asking for array rather than on-screen rendering
                mode="rgb_array",
            )
        )
        env_with_render = env_with_render.append_transform(
            VideoRecorder(logger=video_logger, tag="vmas_rendered")
        )
        with set_exploration_type(ExplorationType.MODE):
            print("Rendering rollout...")
            env_with_render.rollout(100, policy=agents_exploration_policy)
        print("Saving the video...")
        env_with_render.transform.dump()
        print("Saved! Saved directory tree:")
        video_logger.print_log_dir()

结论和下一步

在本教程中,我们看到了:

  • 如何在 TorchRL 中创建有竞争力的多组多代理环境,其规范如何工作,以及它如何与库集成;

  • 如何在 TorchRL 中为多个组创建多智能体网络架构;

  • 我们如何用于承载多代理、多组数据;tensordict.TensorDict

  • 我们如何将所有库组件(收集器、模块、重放缓冲区和损失)绑定到一个多代理多组 MADDPG/IDDPG 训练循环中。

现在您已经精通了多智能体 DDPG,您可以在 GitHub 存储库。 这些是许多 MARL 算法的纯代码脚本,例如本教程中所示的 QMIX、MADDPG、IJL 等等!

另外,请记得查看我们的教程:使用 TorchRL 的多智能体强化学习 (PPO) 教程

最后,您可以修改本教程的参数以尝试许多其他配置和场景 成为 MARL 母版。

PettingZoo 和 VMAS 包含更多场景。 以下是一些视频,介绍了您可以在 VMAS 中尝试的一些可能场景。

VMAS scenarios

VMAS 中可用的方案

脚本总运行时间:(0 分 58.493 秒)

估计内存使用量:3114 MB

由 Sphinx-Gallery 生成的图库

文档

访问 PyTorch 的全面开发人员文档

查看文档

教程

获取面向初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并解答您的问题

查看资源