目录

竞争性多智能体强化学习 (DDPG) 使用 TorchRL 教程

作者: Matteo Bettini

另请参见

The BenchMARL 库提供了使用 TorchRL 的最新 MARL 算法实现。

本教程演示了如何使用 PyTorch 和 TorchRL 解决竞争性多智能体强化学习(MARL)问题。

为便于使用,本教程将沿用已有的使用TorchRL进行多智能体强化学习(PPO)教程的一般结构。

在本教程中,我们将使用来自MADDPG 论文simple_tag 环境。该环境属于一组称为多智能体粒子环境(MultiAgentParticleEnvironments,MPE)的环境,这些环境随该论文一同提出。

目前有多个模拟器提供多智能体粒子环境(MPE)。 在本教程中,我们将展示如何使用 TorchRL 在该环境中进行训练,可选择以下任一方式:

  • PettingZoo,在环境的传统CPU版本中;

  • VMAS, 提供了在PyTorch中的向量化实现, 能够在GPU上模拟多个环境以加速计算。

Simple tag

多智能体simple_tag场景

核心收获:

  • 如何在 TorchRL 中使用竞争性多智能体环境,它们的规格如何工作,以及它们如何与库集成;

  • 如何在 TorchRL 中使用 Parallel PettingZoo 和 VMAS 环境以及多个智能体组

  • 如何在 TorchRL 中创建不同的多智能体网络架构(例如,使用参数共享、集中式评估器)

  • 我们如何使用 TensorDict 来传输多智能体多组数据;

  • 我们如何将所有库组件(收集器、模块、重放缓冲区和损失函数)整合到一个离策略多智能体MADDPG/IDDPG训练循环中。

如果您在 Google Colab 中运行此代码,请确保安装以下依赖项:

!pip3 install torchrl
!pip3 install vmas
!pip3 install pettingzoo[mpe]==1.24.3
!pip3 install tqdm

深度确定性策略梯度(DDPG)是一种无策略的Actor-Critic算法,其中使用Critic网络提供的梯度来优化确定性策略。 如需了解更多信息,请参阅《深度确定性策略梯度》论文。 此类算法通常采用无策略方式进行训练。有关无策略学习的更多内容,请参阅: Sutton, Richard S.,Barto, Andrew G. 《强化学习:导论》。麻省理工学院出版社,2018年。

Off-policy learning

离线学习

该方法已被拓展至多智能体学习领域,详见论文《混合协作-竞争环境下的多智能体Actor-Critic方法》, 该论文提出了多智能体深度确定性策略梯度(MADDPG)算法。 在多智能体场景中,情况略有不同。我们现在拥有多个策略 \(\mathbf{\pi}\), 每个智能体对应一个策略。这些策略通常是局部化且去中心化的。这意味着, 单个智能体的策略仅根据其自身观测结果来输出该智能体的动作。 在多智能体强化学习(MARL)文献中,这被称为去中心化执行。 另一方面,评论家(critic)则存在多种不同的设计形式,主要包括:

  • MADDPG 中,评估器是集中式的,并将系统的全局状态和全局动作作为输入。全局状态可以是一个全局观察值,也可以是代理观察值的拼接。全局动作是代理动作的拼接。MADDPG 可以在执行 集中式训练 的上下文中使用,因为它需要访问全局信息。

  • 在IDDPG中,评估器仅将一个代理的观察和动作作为输入。 这允许分散式训练,因为评估器和策略都只需要本地信息来计算它们的输出。

集中式评论器有助于克服多个智能体同时学习时带来的非平稳性问题,但另一方面,它们可能受其庞大输入空间的影响。 在本教程中,我们将能够训练这两种建模方式,并且还将讨论参数共享(即在各智能体之间共享网络参数的做法)对每种方式的影响。

本教程的结构如下:

  1. 首先,我们将为使用建立一组超参数。

  2. 随后,我们将构建一个多智能体环境,利用 TorchRL 提供的封装器来对接 PettingZoo 或 VMAS。

  3. 随后,我们将构建策略网络和评论家网络,并讨论各种设计选择对参数共享和评论家集中化的影响。

  4. 之后,我们将创建采样收集器和重放缓冲区。

  5. 最后,我们将执行训练循环并检查结果。

如果您在 Colab 或具有图形用户界面(GUI)的机器上运行此程序,您还将有机会在训练过程前后渲染并可视化您自己训练得到的策略。

导入我们的依赖项:

import copy
import tempfile

import torch

from matplotlib import pyplot as plt
from tensordict import TensorDictBase

from tensordict.nn import TensorDictModule, TensorDictSequential
from torch import multiprocessing

from torchrl.collectors import SyncDataCollector
from torchrl.data import LazyMemmapStorage, RandomSampler, ReplayBuffer

from torchrl.envs import (
    check_env_specs,
    ExplorationType,
    PettingZooEnv,
    RewardSum,
    set_exploration_type,
    TransformedEnv,
    VmasEnv,
)

from torchrl.modules import (
    AdditiveGaussianModule,
    MultiAgentMLP,
    ProbabilisticActor,
    TanhDelta,
)

from torchrl.objectives import DDPGLoss, SoftUpdate, ValueEstimators

from torchrl.record import CSVLogger, PixelRenderTransform, VideoRecorder

from tqdm import tqdm

# Check if we're building the doc, in which case disable video rendering
try:
    is_sphinx = __sphinx_build__
except NameError:
    is_sphinx = False

定义超参数

我们为本教程设置了超参数。 根据可用的计算资源, 您可以选择在 GPU 或其他设备上运行策略和模拟器。 您可以调整其中部分值,以适应不同的计算需求。

# Seed
seed = 0
torch.manual_seed(seed)

# Devices
is_fork = multiprocessing.get_start_method() == "fork"
device = (
    torch.device(0)
    if torch.cuda.is_available() and not is_fork
    else torch.device("cpu")
)

# Sampling
frames_per_batch = 1_000  # Number of team frames collected per sampling iteration
n_iters = 10  # Number of sampling and training iterations
total_frames = frames_per_batch * n_iters

# We will stop training the evaders after this many iterations,
# should be 0 <= iteration_when_stop_training_evaders <= n_iters
iteration_when_stop_training_evaders = n_iters // 2

# Replay buffer
memory_size = 1_000_000  # The replay buffer of each group can store this many frames

# Training
n_optimiser_steps = 100  # Number of optimization steps per training iteration
train_batch_size = 128  # Number of frames trained in each optimiser step
lr = 3e-4  # Learning rate
max_grad_norm = 1.0  # Maximum norm for the gradients

# DDPG
gamma = 0.99  # Discount factor
polyak_tau = 0.005  # Tau for the soft-update of the target network

环境

多智能体环境用于模拟多个智能体与环境的交互。 TorchRL API 支持集成多种类型的多智能体环境。 本教程将重点介绍多个智能体组并行交互的环境。 即:在每一步中,所有智能体将同步获得观测值并采取动作。

此外,TorchRL MARL API 允许将代理分成组。每组将在 tensordict 中作为一个单独的条目。组内代理的数据会被堆叠在一起。因此,通过选择如何分组您的代理,您可以决定哪些数据被堆叠/作为单独的条目保留。 分组策略可以在 VMAS 和 PettingZoo 等环境的构建时指定。 有关分组的更多信息,请参见 MarlGroupMapType

simple_tag环境中 有两个团队的代理:追逐者(或“对手”)(红色圆圈)和逃避者(或“代理”)(绿色圆圈)。 追逐者因接触逃避者而获得奖励(+10)。 一旦接触,追逐者团队将集体获得奖励,被接触的逃避者将受到相同的惩罚(-10)。 逃避者的速度和加速度高于追逐者。 环境中还有障碍物(黑色圆圈)。 代理和障碍物根据均匀随机分布生成。 代理在一个具有阻力和弹性碰撞的二维连续世界中行动。 它们的动作是二维连续力,决定了它们的加速度。 每个代理观察其位置、 速度、与其他所有代理和障碍物的相对位置以及逃避者的速度。

PettingZoo 与 VMAS 的版本在奖励函数上略有差异:PettingZoo 会对逃逸者越界行为施加惩罚,而 VMAS 则通过物理方式阻止其越界。正因如此,您会观察到:在 VMAS 中,两支队伍的奖励数值完全相同,仅符号相反;而在 PettingZoo 中,逃逸者的奖励则较低。

我们将现在实例化环境。 对于本教程,我们将限制集数为 max_steps,之后设置终止标志。这种功能已经在PettingZoo和VMAS模拟器中提供,但也可以使用TorchRL StepCounter 转换。

max_steps = 100  # Environment steps before done

n_chasers = 2
n_evaders = 1
n_obstacles = 2

use_vmas = True  # Set this to True for a great performance speedup

if not use_vmas:
    base_env = PettingZooEnv(
        task="simple_tag_v3",
        parallel=True,  # Use the Parallel version
        seed=seed,
        # Scenario specific
        continuous_actions=True,
        num_good=n_evaders,
        num_adversaries=n_chasers,
        num_obstacles=n_obstacles,
        max_cycles=max_steps,
    )
else:
    num_vmas_envs = (
        frames_per_batch // max_steps
    )  # Number of vectorized environments. frames_per_batch collection will be divided among these environments
    base_env = VmasEnv(
        scenario="simple_tag",
        num_envs=num_vmas_envs,
        continuous_actions=True,
        max_steps=max_steps,
        device=device,
        seed=seed,
        # Scenario specific
        num_good_agents=n_evaders,
        num_adversaries=n_chasers,
        num_landmarks=n_obstacles,
    )

组映射

PettingZoo 和 VMAS 环境使用 TorchRL 的多智能体强化学习(MARL)分组 API。 我们可以按如下方式访问分组映射(group map),该映射将每个分组映射到其中的智能体:

print(f"group_map: {base_env.group_map}")
group_map: {'adversary': ['adversary_0', 'adversary_1'], 'agent': ['agent_0']}

正如我们所见,它包含两组:“agents”(逃避者)和“adversaries”(追逐者)。

环境不仅由其模拟器和变换(transforms)定义,还由一系列描述其执行期间可预期行为的元数据定义。 出于效率考虑,TorchRL 对环境规范(specs)的要求非常严格,但你可以轻松检查自己环境的规范是否恰当。 在我们的示例中,模拟器包装器(simulator wrapper)会负责为你的 base_env 设置正确的规范,因此你通常无需关心此问题。

有四个规格需要查看:

  • action_spec 定义了动作空间;

  • reward_spec 定义奖励域;

  • done_spec 定义完成域;

  • observation_spec 定义了环境步骤中所有其他输出的域;

print("action_spec:", base_env.full_action_spec)
print("reward_spec:", base_env.full_reward_spec)
print("done_spec:", base_env.full_done_spec)
print("observation_spec:", base_env.observation_spec)
action_spec: Composite(
    adversary: Composite(
        action: BoundedContinuous(
            shape=torch.Size([10, 2, 2]),
            space=ContinuousBox(
                low=Tensor(shape=torch.Size([10, 2, 2]), device=cpu, dtype=torch.float32, contiguous=True),
                high=Tensor(shape=torch.Size([10, 2, 2]), device=cpu, dtype=torch.float32, contiguous=True)),
            device=cpu,
            dtype=torch.float32,
            domain=continuous),
        device=cpu,
        shape=torch.Size([10, 2])),
    agent: Composite(
        action: BoundedContinuous(
            shape=torch.Size([10, 1, 2]),
            space=ContinuousBox(
                low=Tensor(shape=torch.Size([10, 1, 2]), device=cpu, dtype=torch.float32, contiguous=True),
                high=Tensor(shape=torch.Size([10, 1, 2]), device=cpu, dtype=torch.float32, contiguous=True)),
            device=cpu,
            dtype=torch.float32,
            domain=continuous),
        device=cpu,
        shape=torch.Size([10, 1])),
    device=cpu,
    shape=torch.Size([10]))
reward_spec: Composite(
    adversary: Composite(
        reward: UnboundedContinuous(
            shape=torch.Size([10, 2, 1]),
            space=ContinuousBox(
                low=Tensor(shape=torch.Size([10, 2, 1]), device=cpu, dtype=torch.float32, contiguous=True),
                high=Tensor(shape=torch.Size([10, 2, 1]), device=cpu, dtype=torch.float32, contiguous=True)),
            device=cpu,
            dtype=torch.float32,
            domain=continuous),
        device=cpu,
        shape=torch.Size([10, 2])),
    agent: Composite(
        reward: UnboundedContinuous(
            shape=torch.Size([10, 1, 1]),
            space=ContinuousBox(
                low=Tensor(shape=torch.Size([10, 1, 1]), device=cpu, dtype=torch.float32, contiguous=True),
                high=Tensor(shape=torch.Size([10, 1, 1]), device=cpu, dtype=torch.float32, contiguous=True)),
            device=cpu,
            dtype=torch.float32,
            domain=continuous),
        device=cpu,
        shape=torch.Size([10, 1])),
    device=cpu,
    shape=torch.Size([10]))
done_spec: Composite(
    done: Categorical(
        shape=torch.Size([10, 1]),
        space=CategoricalBox(n=2),
        device=cpu,
        dtype=torch.bool,
        domain=discrete),
    terminated: Categorical(
        shape=torch.Size([10, 1]),
        space=CategoricalBox(n=2),
        device=cpu,
        dtype=torch.bool,
        domain=discrete),
    device=cpu,
    shape=torch.Size([10]))
observation_spec: Composite(
    adversary: Composite(
        observation: UnboundedContinuous(
            shape=torch.Size([10, 2, 14]),
            space=ContinuousBox(
                low=Tensor(shape=torch.Size([10, 2, 14]), device=cpu, dtype=torch.float32, contiguous=True),
                high=Tensor(shape=torch.Size([10, 2, 14]), device=cpu, dtype=torch.float32, contiguous=True)),
            device=cpu,
            dtype=torch.float32,
            domain=continuous),
        device=cpu,
        shape=torch.Size([10, 2])),
    agent: Composite(
        observation: UnboundedContinuous(
            shape=torch.Size([10, 1, 12]),
            space=ContinuousBox(
                low=Tensor(shape=torch.Size([10, 1, 12]), device=cpu, dtype=torch.float32, contiguous=True),
                high=Tensor(shape=torch.Size([10, 1, 12]), device=cpu, dtype=torch.float32, contiguous=True)),
            device=cpu,
            dtype=torch.float32,
            domain=continuous),
        device=cpu,
        shape=torch.Size([10, 1])),
    device=cpu,
    shape=torch.Size([10]))

使用刚刚展示的命令,我们可以访问每个值的域。

我们可以看到所有规格都以字典的形式结构化,根节点始终包含组名。 这种结构将在所有进出环境的tensordict数据中遵循。 此外,每个组的规格具有前导形状(n_agents_in_that_group)(代理为1,对手为2), 这意味着该组的张量数据将始终具有该前导形状(组内的代理的数据是堆叠的)。

查看done_spec,我们可以看到有一些键不在代理组之外 ("done", "terminated", "truncated"),这些键没有前导多代理维度。 这些键由所有代理共享,并表示用于重置的环境全局完成状态。 默认情况下,如本例所示,当任何代理完成时,平行PettingZoo环境将完成,但此行为 可以通过在PettingZoo环境构造时设置done_on_any来覆盖。

为快速获取张量字典(tensordict)中每个值对应的键,我们只需向环境查询相应的键, 即可立即明确哪些键是按智能体(per-agent)划分的,哪些是共享的(shared)。 该信息将有助于告知 TorchRL 的其他所有组件:每个值应从何处获取。

print("action_keys:", base_env.action_keys)
print("reward_keys:", base_env.reward_keys)
print("done_keys:", base_env.done_keys)
action_keys: [('adversary', 'action'), ('agent', 'action')]
reward_keys: [('adversary', 'reward'), ('agent', 'reward')]
done_keys: ['done', 'terminated']

变换

我们可以向环境追加所需的任意 TorchRL 变换。 这些变换将以某种期望的方式修改环境的输入/输出。 需要强调的是,在多智能体场景中,必须显式地指定待修改的键(keys)。

例如,在这种情况下,我们将实例化一个RewardSum变换,它将在整个episode中累加奖励。 我们将告诉这个变换在哪里可以找到每个奖励键的重置键。 基本上我们只是说 当"_reset" tensordict键被设置时,每个组的episode奖励应该重置,这意味着env.reset() 被调用。 转换后的环境将继承 包装环境的设备和元数据,并根据其包含的变换序列进行转换。

env = TransformedEnv(
    base_env,
    RewardSum(
        in_keys=base_env.reward_keys,
        reset_keys=["_reset"] * len(base_env.group_map.keys()),
    ),
)

the check_env_specs() 函数运行一个小规模滚动并将其输出与环境规范进行比较。如果没有错误抛出,我们可以确信规范已经正确定义:

check_env_specs(env)

展开

为了好玩,让我们看看一个简单的随机展开是什么样子。你可以调用 env.rollout(n_steps) 并获得环境输入和输出的概览。动作将自动从动作规范域中随机抽取。

n_rollout_steps = 5
rollout = env.rollout(n_rollout_steps)
print(f"rollout of {n_rollout_steps} steps:", rollout)
print("Shape of the rollout TensorDict:", rollout.batch_size)
rollout of 5 steps: TensorDict(
    fields={
        adversary: TensorDict(
            fields={
                action: Tensor(shape=torch.Size([10, 5, 2, 2]), device=cpu, dtype=torch.float32, is_shared=False),
                episode_reward: Tensor(shape=torch.Size([10, 5, 2, 1]), device=cpu, dtype=torch.float32, is_shared=False),
                observation: Tensor(shape=torch.Size([10, 5, 2, 14]), device=cpu, dtype=torch.float32, is_shared=False)},
            batch_size=torch.Size([10, 5, 2]),
            device=cpu,
            is_shared=False),
        agent: TensorDict(
            fields={
                action: Tensor(shape=torch.Size([10, 5, 1, 2]), device=cpu, dtype=torch.float32, is_shared=False),
                episode_reward: Tensor(shape=torch.Size([10, 5, 1, 1]), device=cpu, dtype=torch.float32, is_shared=False),
                observation: Tensor(shape=torch.Size([10, 5, 1, 12]), device=cpu, dtype=torch.float32, is_shared=False)},
            batch_size=torch.Size([10, 5, 1]),
            device=cpu,
            is_shared=False),
        done: Tensor(shape=torch.Size([10, 5, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        next: TensorDict(
            fields={
                adversary: TensorDict(
                    fields={
                        episode_reward: Tensor(shape=torch.Size([10, 5, 2, 1]), device=cpu, dtype=torch.float32, is_shared=False),
                        observation: Tensor(shape=torch.Size([10, 5, 2, 14]), device=cpu, dtype=torch.float32, is_shared=False),
                        reward: Tensor(shape=torch.Size([10, 5, 2, 1]), device=cpu, dtype=torch.float32, is_shared=False)},
                    batch_size=torch.Size([10, 5, 2]),
                    device=cpu,
                    is_shared=False),
                agent: TensorDict(
                    fields={
                        episode_reward: Tensor(shape=torch.Size([10, 5, 1, 1]), device=cpu, dtype=torch.float32, is_shared=False),
                        observation: Tensor(shape=torch.Size([10, 5, 1, 12]), device=cpu, dtype=torch.float32, is_shared=False),
                        reward: Tensor(shape=torch.Size([10, 5, 1, 1]), device=cpu, dtype=torch.float32, is_shared=False)},
                    batch_size=torch.Size([10, 5, 1]),
                    device=cpu,
                    is_shared=False),
                done: Tensor(shape=torch.Size([10, 5, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                terminated: Tensor(shape=torch.Size([10, 5, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
            batch_size=torch.Size([10, 5]),
            device=cpu,
            is_shared=False),
        terminated: Tensor(shape=torch.Size([10, 5, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
    batch_size=torch.Size([10, 5]),
    device=cpu,
    is_shared=False)
Shape of the rollout TensorDict: torch.Size([10, 5])

我们可以看到我们的展开有 batch_size(n_rollout_steps)。 这意味着其中的所有张量都将具有这一前导维度。

深入观察,我们可以看到输出的张量字典可以按以下方式划分:

  • 在根目录 (通过运行 rollout.exclude("next") 可访问) 我们将找到所有在第一次调用重置后可用的键。我们可以通过索引 n_rollout_steps 维度来查看它们在展开步骤中的演变。在这些键中,我们将找到每个代理在 rollout[group_name] tensordicts 中不同的键,这将具有批量大小 (n_rollout_steps, n_agents_in_group) 表示它存储了额外的代理维度。组外的将是共享的。

  • 在下一个 (通过运行 rollout.get("next") 可访问)。我们将发现与根相同的结构,但有一些细微的差异如下所示。

在 TorchRL 中,约定是:`done`(完成标志)和观测值(observations)同时存在于 `root` 和 `next` 中(因为它们在环境重置时和执行一步操作后均可用)。而动作(action)仅存在于 `root` 中(因为执行一步操作本身不会产生新的动作),奖励(reward)仅存在于 `next` 中(因为在环境重置时没有奖励)。 该结构遵循《强化学习导论》(Sutton 与 Barto 著)中的表示方式,其中 `root` 表示世界步长中时间点 \(t\) 的数据,而 `next` 表示时间点 \(t+1\) 的数据。

渲染随机回放

如果你使用的是 Google Colab,或在一台装有 OpenGL 和图形用户界面(GUI)的机器上运行,你实际上可以渲染一次随机策略的轨迹(rollout)。 这将帮助你了解随机策略在此任务中所能达到的效果,以便与你自己训练的策略进行对比!

要渲染一个回放,请遵循本教程末尾渲染部分中的说明 并只需删除 policy=agents_exploration_policyenv.rollout()

政策

DDPG 使用确定性策略。这意味着我们的神经网络将直接输出要执行的动作。 由于动作是连续的,我们采用 Tanh-Delta 分布以满足动作空间的边界约束。该类唯一的作用是对动作应用 Tanh 变换,以确保动作处于定义域的边界范围内。

另一个重要的决定是我们是否希望团队内的代理共享策略参数。 一方面,共享参数意味着它们将共享相同的策略,这将使它们能够从彼此的经验中受益。这也将导致更快的训练。 另一方面,这将使它们的行为同质化,因为它们实际上共享相同的模型。 在这个例子中,我们将启用共享,因为我们不介意同质性,并且可以从计算速度中受益,但在自己的问题中始终要考虑这个决定!

我们分三个步骤设计策略。

首先: 定义一个神经网络 n_obs_per_agent -> n_actions_per_agents

为此我们使用MultiAgentMLP,一个专门为多个代理设计的TorchRL模块,提供了许多自定义选项。

我们将为每个组定义不同的策略,并将它们存储在一个字典中。

policy_modules = {}
for group, agents in env.group_map.items():
    share_parameters_policy = True  # Can change this based on the group

    policy_net = MultiAgentMLP(
        n_agent_inputs=env.observation_spec[group, "observation"].shape[
            -1
        ],  # n_obs_per_agent
        n_agent_outputs=env.full_action_spec[group, "action"].shape[
            -1
        ],  # n_actions_per_agents
        n_agents=len(agents),  # Number of agents in the group
        centralised=False,  # the policies are decentralised (i.e., each agent will act from its local observation)
        share_params=share_parameters_policy,
        device=device,
        depth=2,
        num_cells=256,
        activation_class=torch.nn.Tanh,
    )

    # Wrap the neural network in a :class:`~tensordict.nn.TensorDictModule`.
    # This is simply a module that will read the ``in_keys`` from a tensordict, feed them to the
    # neural networks, and write the
    # outputs in-place at the ``out_keys``.

    policy_module = TensorDictModule(
        policy_net,
        in_keys=[(group, "observation")],
        out_keys=[(group, "param")],
    )  # We just name the input and output that the network will read and write to the input tensordict
    policy_modules[group] = policy_module

第二: 将 TensorDictModule 包裹在一个 ProbabilisticActor

我们现在需要构建TanhDelta分布。 我们指示ProbabilisticActor类 根据策略动作参数构建一个TanhDelta。我们还提供了这个分布的最小值和最大值,这些值我们从环境规范中收集。

in_keys 的名称(以及上述 out_keys 的名称,源自 TensorDictModule)必须以 TanhDelta 分布构造函数的关键字参数(param)结尾。

policies = {}
for group, _agents in env.group_map.items():
    policy = ProbabilisticActor(
        module=policy_modules[group],
        spec=env.full_action_spec[group, "action"],
        in_keys=[(group, "param")],
        out_keys=[(group, "action")],
        distribution_class=TanhDelta,
        distribution_kwargs={
            "low": env.full_action_spec[group, "action"].space.low,
            "high": env.full_action_spec[group, "action"].space.high,
        },
        return_log_prob=False,
    )
    policies[group] = policy

第三: 探索

由于DDPG策略是确定性的,我们需要一种方法在收集过程中进行探索。

为此目的,我们需要在将策略传递给收集器之前添加一个探索层。 在这种情况下,我们使用一个 AdditiveGaussianModule,它为我们的动作添加高斯噪声 (如果噪声使动作超出范围,则对其进行限制)。

这个探索包装器使用一个sigma参数,该参数与噪声相乘以确定其幅度。 Sigma 可以在整个训练过程中进行退火以减少探索。 Sigma 将从sigma_init变为sigma_end,在annealing_num_steps内完成。

exploration_policies = {}
for group, _agents in env.group_map.items():
    exploration_policy = TensorDictSequential(
        policies[group],
        AdditiveGaussianModule(
            spec=policies[group].spec,
            annealing_num_steps=total_frames
            // 2,  # Number of frames after which sigma is sigma_end
            action_key=(group, "action"),
            sigma_init=0.9,  # Initial value of the sigma
            sigma_end=0.1,  # Final value of the sigma
        ),
    )
    exploration_policies[group] = exploration_policy

评估网络

评论家网络(Critic Network)是DDPG算法的关键组成部分,尽管它在采样阶段并不被使用。该模块将读取观测值与所采取的动作,并返回相应的价值估计值。

如前所述,应仔细考虑在代理组内共享评估器参数的决定。 一般来说,参数共享将加快训练收敛速度,但有几个重要的注意事项:

  • 当智能体具有不同的奖励函数时,不建议共享网络,因为评论家(critic)需要学习为同一状态分配不同的价值(例如,在混合协作-竞争环境中)。 在这种情况下,由于两组智能体已分别使用独立的网络,因此共享决策仅适用于组内智能体——而我们已知组内智能体具有相同的奖励函数。

  • 在去中心化训练环境中,若无额外的基础设施来同步参数,则无法执行共享操作。

在所有其他情况下,当一组智能体(如当前场景中)的奖励函数(需区别于奖励本身)相同时, 共享参数可带来性能提升。但这可能以智能体策略同质化为代价。 通常,判断哪种选择更优的最佳方式是快速对两种方案进行实验。

这里也是我们选择 MADDPG 和 IDDPG 的地方:

  • 借助 MADDPG,我们将获得一个具备全局观察能力的中心化评论家(即:它将以所有智能体的全局观测值与动作的拼接结果作为输入)。 我们之所以能够实现这一点,是因为我们处于仿真环境中,且训练过程是集中式的。

  • 使用IDDPG,我们将拥有一个本地分散的评估器,就像策略一样。

在任何情况下,评论者的输出将具有形状 (..., n_agents_in_group, 1)。 如果评论者是集中且共享的, 则沿 n_agents_in_group 维的所有值都将相同。

与策略类似,我们为每个组创建一个评估网络,并将它们存储在一个字典中。

critics = {}
for group, agents in env.group_map.items():
    share_parameters_critic = True  # Can change for each group
    MADDPG = True  # IDDPG if False, can change for each group

    # This module applies the lambda function: reading the action and observation entries for the group
    # and concatenating them in a new ``(group, "obs_action")`` entry
    cat_module = TensorDictModule(
        lambda obs, action: torch.cat([obs, action], dim=-1),
        in_keys=[(group, "observation"), (group, "action")],
        out_keys=[(group, "obs_action")],
    )

    critic_module = TensorDictModule(
        module=MultiAgentMLP(
            n_agent_inputs=env.observation_spec[group, "observation"].shape[-1]
            + env.full_action_spec[group, "action"].shape[-1],
            n_agent_outputs=1,  # 1 value per agent
            n_agents=len(agents),
            centralised=MADDPG,
            share_params=share_parameters_critic,
            device=device,
            depth=2,
            num_cells=256,
            activation_class=torch.nn.Tanh,
        ),
        in_keys=[(group, "obs_action")],  # Read ``(group, "obs_action")``
        out_keys=[
            (group, "state_action_value")
        ],  # Write ``(group, "state_action_value")``
    )

    critics[group] = TensorDictSequential(
        cat_module, critic_module
    )  # Run them in sequence

让我们尝试我们的策略和评估模块。正如前面提到的,使用 TensorDictModule 可以直接读取环境的输出来运行这些模块,因为它们知道要读取哪些信息以及在哪里写入。

我们可以看到,每个组的网络运行后,其输出键会以该组条目为名添加到数据中。

从这一点开始,多智能体专用组件已经实例化,我们将直接沿用单智能体学习中使用的相同组件。这难道不令人兴奋吗?

reset_td = env.reset()
for group, _agents in env.group_map.items():
    print(
        f"Running value and policy for group '{group}':",
        critics[group](policies[group](reset_td)),
    )
Running value and policy for group 'adversary': TensorDict(
    fields={
        adversary: TensorDict(
            fields={
                action: Tensor(shape=torch.Size([10, 2, 2]), device=cpu, dtype=torch.float32, is_shared=False),
                episode_reward: Tensor(shape=torch.Size([10, 2, 1]), device=cpu, dtype=torch.float32, is_shared=False),
                obs_action: Tensor(shape=torch.Size([10, 2, 16]), device=cpu, dtype=torch.float32, is_shared=False),
                observation: Tensor(shape=torch.Size([10, 2, 14]), device=cpu, dtype=torch.float32, is_shared=False),
                param: Tensor(shape=torch.Size([10, 2, 2]), device=cpu, dtype=torch.float32, is_shared=False),
                state_action_value: Tensor(shape=torch.Size([10, 2, 1]), device=cpu, dtype=torch.float32, is_shared=False)},
            batch_size=torch.Size([10, 2]),
            device=cpu,
            is_shared=False),
        agent: TensorDict(
            fields={
                episode_reward: Tensor(shape=torch.Size([10, 1, 1]), device=cpu, dtype=torch.float32, is_shared=False),
                observation: Tensor(shape=torch.Size([10, 1, 12]), device=cpu, dtype=torch.float32, is_shared=False)},
            batch_size=torch.Size([10, 1]),
            device=cpu,
            is_shared=False),
        done: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        terminated: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
    batch_size=torch.Size([10]),
    device=cpu,
    is_shared=False)
Running value and policy for group 'agent': TensorDict(
    fields={
        adversary: TensorDict(
            fields={
                action: Tensor(shape=torch.Size([10, 2, 2]), device=cpu, dtype=torch.float32, is_shared=False),
                episode_reward: Tensor(shape=torch.Size([10, 2, 1]), device=cpu, dtype=torch.float32, is_shared=False),
                obs_action: Tensor(shape=torch.Size([10, 2, 16]), device=cpu, dtype=torch.float32, is_shared=False),
                observation: Tensor(shape=torch.Size([10, 2, 14]), device=cpu, dtype=torch.float32, is_shared=False),
                param: Tensor(shape=torch.Size([10, 2, 2]), device=cpu, dtype=torch.float32, is_shared=False),
                state_action_value: Tensor(shape=torch.Size([10, 2, 1]), device=cpu, dtype=torch.float32, is_shared=False)},
            batch_size=torch.Size([10, 2]),
            device=cpu,
            is_shared=False),
        agent: TensorDict(
            fields={
                action: Tensor(shape=torch.Size([10, 1, 2]), device=cpu, dtype=torch.float32, is_shared=False),
                episode_reward: Tensor(shape=torch.Size([10, 1, 1]), device=cpu, dtype=torch.float32, is_shared=False),
                obs_action: Tensor(shape=torch.Size([10, 1, 14]), device=cpu, dtype=torch.float32, is_shared=False),
                observation: Tensor(shape=torch.Size([10, 1, 12]), device=cpu, dtype=torch.float32, is_shared=False),
                param: Tensor(shape=torch.Size([10, 1, 2]), device=cpu, dtype=torch.float32, is_shared=False),
                state_action_value: Tensor(shape=torch.Size([10, 1, 1]), device=cpu, dtype=torch.float32, is_shared=False)},
            batch_size=torch.Size([10, 1]),
            device=cpu,
            is_shared=False),
        done: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        terminated: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
    batch_size=torch.Size([10]),
    device=cpu,
    is_shared=False)

数据收集器

TorchRL 提供了一组数据收集器类。简而言之,这些类执行三项操作:重置环境、利用策略和最新观测值计算动作、在环境中执行一步操作,并重复后两个步骤,直至环境发出停止信号(或达到“完成”状态)。

我们将使用最简单的数据收集器,其输出与环境 rollout 相同, 唯一的区别在于:该收集器会在达到所需帧数之前自动重置已完成(done)的状态。

我们需要向其提供探索策略。此外,为了将所有组的策略当作一个整体来运行,我们将它们按顺序排列。由于每个组都在不同的位置读写键值,因此它们彼此之间不会相互干扰。

# Put exploration policies from each group in a sequence
agents_exploration_policy = TensorDictSequential(*exploration_policies.values())

collector = SyncDataCollector(
    env,
    agents_exploration_policy,
    device=device,
    frames_per_batch=frames_per_batch,
    total_frames=total_frames,
)

回放缓冲区

经验回放缓冲区(Replay Buffer)是各类离策略强化学习(RL)算法中常见的核心组件。 缓冲区有多种类型;在本教程中,我们使用一种基础缓冲区,以随机方式存储和采样 tensordict 数据。

replay_buffers = {}
for group, _agents in env.group_map.items():
    replay_buffer = ReplayBuffer(
        storage=LazyMemmapStorage(
            memory_size, device=device
        ),  # We will store up to memory_size multi-agent transitions
        sampler=RandomSampler(),
        batch_size=train_batch_size,  # We will sample batches of this size
    )
    replay_buffers[group] = replay_buffer

损失函数

为方便起见,DDPG 损失函数可直接从 TorchRL 导入,使用的是 DDPGLoss 类。这是使用 DDPG 最简单的方式: 它隐藏了 DDPG 的数学运算以及与之相关的控制流程。

也可以为每个组设置不同的策略。

losses = {}
for group, _agents in env.group_map.items():
    loss_module = DDPGLoss(
        actor_network=policies[group],  # Use the non-explorative policies
        value_network=critics[group],
        delay_value=True,  # Whether to use a target network for the value
        loss_function="l2",
    )
    loss_module.set_keys(
        state_action_value=(group, "state_action_value"),
        reward=(group, "reward"),
        done=(group, "done"),
        terminated=(group, "terminated"),
    )
    loss_module.make_value_estimator(ValueEstimators.TD0, gamma=gamma)

    losses[group] = loss_module

target_updaters = {
    group: SoftUpdate(loss, tau=polyak_tau) for group, loss in losses.items()
}

optimisers = {
    group: {
        "loss_actor": torch.optim.Adam(
            loss.actor_network_params.flatten_keys().values(), lr=lr
        ),
        "loss_value": torch.optim.Adam(
            loss.value_network_params.flatten_keys().values(), lr=lr
        ),
    }
    for group, loss in losses.items()
}

训练工具

我们确实需要定义两个辅助函数,将在训练循环中使用。 它们非常简单,不包含任何重要逻辑。

def process_batch(batch: TensorDictBase) -> TensorDictBase:
    """
    If the `(group, "terminated")` and `(group, "done")` keys are not present, create them by expanding
    `"terminated"` and `"done"`.
    This is needed to present them with the same shape as the reward to the loss.
    """
    for group in env.group_map.keys():
        keys = list(batch.keys(True, True))
        group_shape = batch.get_item_shape(group)
        nested_done_key = ("next", group, "done")
        nested_terminated_key = ("next", group, "terminated")
        if nested_done_key not in keys:
            batch.set(
                nested_done_key,
                batch.get(("next", "done")).unsqueeze(-1).expand((*group_shape, 1)),
            )
        if nested_terminated_key not in keys:
            batch.set(
                nested_terminated_key,
                batch.get(("next", "terminated"))
                .unsqueeze(-1)
                .expand((*group_shape, 1)),
            )
    return batch

训练循环

现在,我们已具备编写训练循环所需的全部要素。 步骤包括:

  • Collect data for all groups
    • Loop over groups
      • 将组数据存储在组缓冲区中

      • Loop over epochs
        • 从组缓冲区采样

        • 在采样数据上计算损失

        • 反向传播损失

        • 优化

      • 重复

    • 重复

  • 重复

pbar = tqdm(
    total=n_iters,
    desc=", ".join(
        [f"episode_reward_mean_{group} = 0" for group in env.group_map.keys()]
    ),
)
episode_reward_mean_map = {group: [] for group in env.group_map.keys()}
train_group_map = copy.deepcopy(env.group_map)

# Training/collection iterations
for iteration, batch in enumerate(collector):
    current_frames = batch.numel()
    batch = process_batch(batch)  # Util to expand done keys if needed
    # Loop over groups
    for group in train_group_map.keys():
        group_batch = batch.exclude(
            *[
                key
                for _group in env.group_map.keys()
                if _group != group
                for key in [_group, ("next", _group)]
            ]
        )  # Exclude data from other groups
        group_batch = group_batch.reshape(
            -1
        )  # This just affects the leading dimensions in batch_size of the tensordict
        replay_buffers[group].extend(group_batch)

        for _ in range(n_optimiser_steps):
            subdata = replay_buffers[group].sample()
            loss_vals = losses[group](subdata)

            for loss_name in ["loss_actor", "loss_value"]:
                loss = loss_vals[loss_name]
                optimiser = optimisers[group][loss_name]

                loss.backward()

                # Optional
                params = optimiser.param_groups[0]["params"]
                torch.nn.utils.clip_grad_norm_(params, max_grad_norm)

                optimiser.step()
                optimiser.zero_grad()

            # Soft-update the target network
            target_updaters[group].step()

        # Exploration sigma anneal update
        exploration_policies[group][-1].step(current_frames)

    # Stop training a certain group when a condition is met (e.g., number of training iterations)
    if iteration == iteration_when_stop_training_evaders:
        del train_group_map["agent"]

    # Logging
    for group in env.group_map.keys():
        episode_reward_mean = (
            batch.get(("next", group, "episode_reward"))[
                batch.get(("next", group, "done"))
            ]
            .mean()
            .item()
        )
        episode_reward_mean_map[group].append(episode_reward_mean)

    pbar.set_description(
        ", ".join(
            [
                f"episode_reward_mean_{group} = {episode_reward_mean_map[group][-1]}"
                for group in env.group_map.keys()
            ]
        ),
        refresh=False,
    )
    pbar.update()
episode_reward_mean_adversary = 0, episode_reward_mean_agent = 0:   0%|          | 0/10 [00:00<?, ?it/s]
episode_reward_mean_adversary = 1.0, episode_reward_mean_agent = -1.0:  10%|█         | 1/10 [00:02<00:26,  2.96s/it]
episode_reward_mean_adversary = 0.0, episode_reward_mean_agent = 0.0:  20%|██        | 2/10 [00:06<00:24,  3.07s/it]
episode_reward_mean_adversary = 1.0, episode_reward_mean_agent = -1.0:  30%|███       | 3/10 [00:09<00:21,  3.14s/it]
episode_reward_mean_adversary = 0.0, episode_reward_mean_agent = 0.0:  40%|████      | 4/10 [00:12<00:18,  3.15s/it]
episode_reward_mean_adversary = 2.0, episode_reward_mean_agent = -2.0:  50%|█████     | 5/10 [00:15<00:15,  3.15s/it]
episode_reward_mean_adversary = 0.0, episode_reward_mean_agent = 0.0:  60%|██████    | 6/10 [00:18<00:12,  3.13s/it]
episode_reward_mean_adversary = 2.0, episode_reward_mean_agent = -2.0:  70%|███████   | 7/10 [00:21<00:08,  2.86s/it]
episode_reward_mean_adversary = 0.0, episode_reward_mean_agent = 0.0:  80%|████████  | 8/10 [00:23<00:05,  2.69s/it]
episode_reward_mean_adversary = 0.0, episode_reward_mean_agent = 0.0:  90%|█████████ | 9/10 [00:25<00:02,  2.58s/it]
episode_reward_mean_adversary = 1.0, episode_reward_mean_agent = -1.0: 100%|██████████| 10/10 [00:27<00:00,  2.47s/it]

结果

我们可以绘制每集获得的平均奖励。

为了使训练持续更长时间,请增加n_iters超参数。

在本地运行此脚本时,您可能需要关闭已打开的窗口,才能继续执行屏幕上的其余操作。

fig, axs = plt.subplots(2, 1)
for i, group in enumerate(env.group_map.keys()):
    axs[i].plot(episode_reward_mean_map[group], label=f"Episode reward mean {group}")
    axs[i].set_ylabel("Reward")
    axs[i].axvline(
        x=iteration_when_stop_training_evaders,
        label="Agent (evader) stop training",
        color="orange",
    )
    axs[i].legend()
axs[-1].set_xlabel("Training iterations")
plt.show()
multiagent competitive ddpg

渲染

渲染指令适用于VMAS, 即在使用 use_vmas=True 运行时。

TorchRL 提供了一些用于录制和保存渲染视频的工具。您可以点击此处了解有关这些工具的更多信息。

在以下代码块中,我们附加了一个转换,该转换将调用VMAS包装环境中的render()方法,并将帧堆栈保存到mp4文件中,该文件的位置由自定义日志记录器video_logger确定。请注意,此代码可能需要一些外部依赖项,例如torchvision。

if use_vmas and not is_sphinx:
    # Replace tmpdir with any desired path where the video should be saved
    with tempfile.TemporaryDirectory() as tmpdir:
        video_logger = CSVLogger("vmas_logs", tmpdir, video_format="mp4")
        print("Creating rendering env")
        env_with_render = TransformedEnv(env.base_env, env.transform.clone())
        env_with_render = env_with_render.append_transform(
            PixelRenderTransform(
                out_keys=["pixels"],
                # the np.ndarray has a negative stride and needs to be copied before being cast to a tensor
                preproc=lambda x: x.copy(),
                as_non_tensor=True,
                # asking for array rather than on-screen rendering
                mode="rgb_array",
            )
        )
        env_with_render = env_with_render.append_transform(
            VideoRecorder(logger=video_logger, tag="vmas_rendered")
        )
        with set_exploration_type(ExplorationType.DETERMINISTIC):
            print("Rendering rollout...")
            env_with_render.rollout(100, policy=agents_exploration_policy)
        print("Saving the video...")
        env_with_render.transform.dump()
        print("Saved! Saved directory tree:")
        video_logger.print_log_dir()

结论和下一步

在这个教程中,我们已经看到了:

  • 如何在 TorchRL 中创建具有竞争力的多组多智能体环境,其规格如何工作,以及它如何与库集成;

  • 如何在 TorchRL 中为多个组创建多智能体网络架构;

  • 我们如何使用 tensordict.TensorDict 来传输多智能体多组数据;

  • 我们如何将所有库组件(收集器、模块、重放缓冲区和损失函数)整合到多智能体多组MADDPG/IDDPG训练循环中。

现在,您已经熟练掌握了多智能体 DDPG,可以前往 GitHub 仓库查看 TorchRL 提供的所有多智能体实现。 这些是纯代码脚本,涵盖了众多 MARL 算法,例如本教程中介绍的算法、QMIX、MADDPG、IQL,以及更多!

同时请务必查看我们的教程:使用 TorchRL 实现多智能体强化学习(PPO)教程

最后,您可以修改本教程的参数,尝试许多其他配置和场景,从而成为多智能体强化学习(MARL)专家。

PettingZoo 和 VMAS 包含许多更多的情景。 这里有一些你可以在 VMAS 中尝试的可能情景的视频。

VMAS scenarios

Scenarios available in VMAS

脚本总运行时间: (1 分钟 31.974 秒)

估计内存使用量: 323 MB

通过 Sphinx-Gallery 生成的画廊

文档

访问 PyTorch 的全面开发人员文档

查看文档

教程

获取面向初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并解答您的问题

查看资源