目录

强化学习 (DQN) 教程

创建时间: 2017 年 3 月 24 日 |上次更新时间: 2024-6-18 |上次验证: Nov 05, 2024

作者Adam Paszke

马克塔斯

本教程介绍如何使用 PyTorch 训练深度 Q 学习 (DQN) 代理 在 Gymnasium 的 CartPole-v1 任务上。

您可能会发现阅读原始深度 Q 学习 (DQN) 论文会有所帮助

任务

代理必须在两个操作之间做出决定 - 向左移动购物车或 右 - 以便连接到它的杆保持直立。您可以找到更多 有关 Gymnasium 网站上的环境和其他更具挑战性的环境的信息。

购物车杆

购物车杆

当代理观察环境的当前状态并选择 一个 action 时,环境会转换为新状态,并且 返回指示操作后果的 REWARD 。在这个 task,则每个增量时间步和环境的奖励为 +1 如果杆子掉落得太远或小车移动超过 2.4 次,则终止 单位远离中心。这意味着将运行性能更好的场景 持续时间更长,累积的回报更大。

CartPole 任务设计为代理的 4 实数 表示环境状态 (位置、速度等) 的值。 我们不带任何缩放地获取这 4 个输入,并将它们传递给 具有 2 个输出的小型全连接网络,每个操作 1 个。 该网络经过训练,可以预测每个操作的预期值。 给定 input 状态。具有最高预期值的操作是 然后选择。

首先,让我们导入所需的包。首先,我们需要环境健身房, 使用 pip 安装。这是原始 OpenAI 的一个分支 Gym 项目,自 Gym v0.19 起由同一团队维护。 如果您在 Google Colab 中运行此命令,请运行:

%%bash
pip3 install gymnasium[classic_control]

我们还将使用 PyTorch 中的以下内容:

  • 神经网络 (torch.nn)

  • 优化 (torch.optim)

  • 自动区分 (torch.autograd)

import gymnasium as gym
import math
import random
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from itertools import count

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

env = gym.make("CartPole-v1")

# set up matplotlib
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:
    from IPython import display

plt.ion()

# if GPU is to be used
device = torch.device(
    "cuda" if torch.cuda.is_available() else
    "mps" if torch.backends.mps.is_available() else
    "cpu"
)

重放内存

我们将使用经验重放内存来训练我们的 DQN。它存储 代理观察到的转换,允许我们重用这些数据 后。通过从中随机采样,构建 batch 是 derelated 的。事实证明,这极大地稳定了 并改进 DQN 训练程序。

为此,我们需要两个类:

  • Transition- 一个命名元组,表示 我们的环境。它本质上映射 (state, action) 对 添加到他们的 (next_state, reward) 结果中,状态是 屏幕差异图像,如下所述。

  • ReplayMemory- 一个有界大小的循环缓冲区,其中包含 最近观察到的转变。它还实现了一种选择随机批次转换进行训练的方法。.sample()

Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))


class ReplayMemory(object):

    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)

    def push(self, *args):
        """Save a transition"""
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

现在,让我们定义我们的模型。但首先,让我们快速回顾一下什么是 DQN。

DQN 算法

我们的环境是确定性的,所以这里介绍的所有方程都是 也为了简单起见,确定性地表述。在 强化学习文献,它们也会包含期望 在环境中的随机转换上。

我们的目标是制定一项政策,试图最大限度地利用折扣、 累积奖励 \(R_{t_0} = \sum_{t=t_0}^{\infty} \gamma^{t - t_0} r_t\),其中 \(R_{t_0}\) 也称为回报。折扣 \(\gamma\) 应该是介于 \(0\)\(1\) 之间的常数,以确保和收敛。较低的 \(\gamma\) 使 来自不确定的遥远未来的奖励对我们的经纪人来说不那么重要 而不是在不久的将来可以相当自信的那些 大约。它还鼓励代理更及时地收集奖励 而不是暂时遥远的未来同等奖励。

Q-learning 背后的主要思想是,如果我们有一个函数 \(Q^*: State \times Action \rightarrow \mathbb{R}\),它可以告诉我们 如果我们在给定的 state 中,那么我们可以轻松地构建一个策略,使 奖励:

\[\pi^*(s) = \arg\!\max_a \ Q^*(s, a) \]

然而,我们并不了解世界的一切,所以我们没有 访问 \(Q^*\)。但是,由于神经网络是通用函数 近似器,我们可以简单地创建一个并训练它类似于 \(Q^*\)。

对于我们的训练更新规则,我们将使用一个事实,即某些策略的每个 \(Q\) 函数都遵循贝尔曼方程:

\[Q^{\pi}(s, a) = r + \gamma Q^{\pi}(s', \pi(s')) \]

相等的两边之间的差称为 时间差误差 \(\delta\)

\[\delta = Q(s, a) - (r + \gamma \max_a' Q(s', a)) \]

为了最大限度地减少此错误,我们将使用 Huber 损失。Huber 损失行为 当误差较小时,与均方误差类似,但与均值类似 绝对误差 (当误差较大时) - 这使得它更健壮 当 \(Q\) 的估计值非常嘈杂时,异常值。我们计算 this 在一批转换 \(B\) 上,从回放中采样 记忆:

\[\mathcal{L} = \frac{1}{|B|}\sum_{(s, a, s', r) \ \in \ B} \mathcal{L}(\delta)\]
\[\text{where} \quad \mathcal{L}(\delta) = \begin{cases} \frac{1}{2}{\delta^2} & \text{for } |\delta|\le 1, \\ |\delta|- \frac{1}{2} & \text{否则。 \end{cases}\]

Q 网络

我们的模型将是一个前馈神经网络,它接收 当前和以前的屏幕补丁之间的差异。它有两个 输出,代表 \(Q(s, \mathrm{left})\)\(Q(s, \mathrm{right})\) (其中 \(s\) 是 网络)。实际上,该网络正在尝试预测 根据当前输入执行每个操作。

class DQN(nn.Module):

    def __init__(self, n_observations, n_actions):
        super(DQN, self).__init__()
        self.layer1 = nn.Linear(n_observations, 128)
        self.layer2 = nn.Linear(128, 128)
        self.layer3 = nn.Linear(128, n_actions)

    # Called with either one element to determine next action, or a batch
    # during optimization. Returns tensor([[left0exp,right0exp]...]).
    def forward(self, x):
        x = F.relu(self.layer1(x))
        x = F.relu(self.layer2(x))
        return self.layer3(x)

训练

超参数和实用程序

这个 cell 实例化了我们的模型和它的优化器,并定义了一些 公用事业:

  • select_action- 将根据 Epsilon 选择一个操作 贪婪策略。简单地说,我们有时会使用我们的模型来选择 动作,有时我们只统一采样一个。这 选择随机操作的概率将从 开始,并呈指数衰减方向。 控制衰减的速率。EPS_STARTEPS_ENDEPS_DECAY

  • plot_durations- 用于绘制剧集持续时间的助手, 以及过去 100 集的平均值(用于 官方评估)。绘图将位于单元格下方 包含主训练循环,并将在每个 插曲。

# BATCH_SIZE is the number of transitions sampled from the replay buffer
# GAMMA is the discount factor as mentioned in the previous section
# EPS_START is the starting value of epsilon
# EPS_END is the final value of epsilon
# EPS_DECAY controls the rate of exponential decay of epsilon, higher means a slower decay
# TAU is the update rate of the target network
# LR is the learning rate of the ``AdamW`` optimizer
BATCH_SIZE = 128
GAMMA = 0.99
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 1000
TAU = 0.005
LR = 1e-4

# Get number of actions from gym action space
n_actions = env.action_space.n
# Get the number of state observations
state, info = env.reset()
n_observations = len(state)

policy_net = DQN(n_observations, n_actions).to(device)
target_net = DQN(n_observations, n_actions).to(device)
target_net.load_state_dict(policy_net.state_dict())

optimizer = optim.AdamW(policy_net.parameters(), lr=LR, amsgrad=True)
memory = ReplayMemory(10000)


steps_done = 0


def select_action(state):
    global steps_done
    sample = random.random()
    eps_threshold = EPS_END + (EPS_START - EPS_END) * \
        math.exp(-1. * steps_done / EPS_DECAY)
    steps_done += 1
    if sample > eps_threshold:
        with torch.no_grad():
            # t.max(1) will return the largest column value of each row.
            # second column on max result is index of where max element was
            # found, so we pick action with the larger expected reward.
            return policy_net(state).max(1).indices.view(1, 1)
    else:
        return torch.tensor([[env.action_space.sample()]], device=device, dtype=torch.long)


episode_durations = []


def plot_durations(show_result=False):
    plt.figure(1)
    durations_t = torch.tensor(episode_durations, dtype=torch.float)
    if show_result:
        plt.title('Result')
    else:
        plt.clf()
        plt.title('Training...')
    plt.xlabel('Episode')
    plt.ylabel('Duration')
    plt.plot(durations_t.numpy())
    # Take 100 episode averages and plot them too
    if len(durations_t) >= 100:
        means = durations_t.unfold(0, 100, 1).mean(1).view(-1)
        means = torch.cat((torch.zeros(99), means))
        plt.plot(means.numpy())

    plt.pause(0.001)  # pause a bit so that plots are updated
    if is_ipython:
        if not show_result:
            display.display(plt.gcf())
            display.clear_output(wait=True)
        else:
            display.display(plt.gcf())

训练循环

最后,用于训练模型的代码。

在这里,您可以找到一个执行 优化的单个步骤。它首先对一个批次进行采样,然后连接 将所有张量合为一个张量,计算 \(Q(s_t, a_t)\)\(V(s_{t+1}) = \max_a Q(s_{t+1}, a)\),并将它们组合成我们的 损失。根据定义,如果 \(s\) 是一个终端,我们就设置 \(V(s) = 0\) 州。我们还使用目标网络来计算 \(V(s_{t+1})\) 增加稳定性。目标网络在每一步都通过由 之前定义的 hyperparameter 。optimize_modelTAU

def optimize_model():
    if len(memory) < BATCH_SIZE:
        return
    transitions = memory.sample(BATCH_SIZE)
    # Transpose the batch (see https://stackoverflow.com/a/19343/3343043 for
    # detailed explanation). This converts batch-array of Transitions
    # to Transition of batch-arrays.
    batch = Transition(*zip(*transitions))

    # Compute a mask of non-final states and concatenate the batch elements
    # (a final state would've been the one after which simulation ended)
    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                          batch.next_state)), device=device, dtype=torch.bool)
    non_final_next_states = torch.cat([s for s in batch.next_state
                                                if s is not None])
    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)

    # Compute Q(s_t, a) - the model computes Q(s_t), then we select the
    # columns of actions taken. These are the actions which would've been taken
    # for each batch state according to policy_net
    state_action_values = policy_net(state_batch).gather(1, action_batch)

    # Compute V(s_{t+1}) for all next states.
    # Expected values of actions for non_final_next_states are computed based
    # on the "older" target_net; selecting their best reward with max(1).values
    # This is merged based on the mask, such that we'll have either the expected
    # state value or 0 in case the state was final.
    next_state_values = torch.zeros(BATCH_SIZE, device=device)
    with torch.no_grad():
        next_state_values[non_final_mask] = target_net(non_final_next_states).max(1).values
    # Compute the expected Q values
    expected_state_action_values = (next_state_values * GAMMA) + reward_batch

    # Compute Huber loss
    criterion = nn.SmoothL1Loss()
    loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))

    # Optimize the model
    optimizer.zero_grad()
    loss.backward()
    # In-place gradient clipping
    torch.nn.utils.clip_grad_value_(policy_net.parameters(), 100)
    optimizer.step()

在下面,您可以找到主要的训练循环。一开始,我们重置 环境并获取初始 Tensor。然后,我们取样 一个 action,执行它,观察下一个 state 和奖励(总是 1) 并优化我们的模型一次。当剧集结束时(我们的模型 失败),我们会重启循环。state

在下面,如果 GPU 可用,则 num_episodes 设置为 600,否则设置为 50 安排了剧集,因此训练不会花费太长时间。然而,50 剧集不足以在 CartPole 上观察到良好的性能。 您应该会看到模型在 600 次训练中不断实现 500 步 情节。训练 RL 代理可能是一个嘈杂的过程,因此请重新开始训练 如果未观察到收敛,则可以产生更好的结果。

if torch.cuda.is_available() or torch.backends.mps.is_available():
    num_episodes = 600
else:
    num_episodes = 50

for i_episode in range(num_episodes):
    # Initialize the environment and get its state
    state, info = env.reset()
    state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
    for t in count():
        action = select_action(state)
        observation, reward, terminated, truncated, _ = env.step(action.item())
        reward = torch.tensor([reward], device=device)
        done = terminated or truncated

        if terminated:
            next_state = None
        else:
            next_state = torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0)

        # Store the transition in memory
        memory.push(state, action, next_state, reward)

        # Move to the next state
        state = next_state

        # Perform one step of the optimization (on the policy network)
        optimize_model()

        # Soft update of the target network's weights
        # θ′ ← τ θ + (1 −τ )θ′
        target_net_state_dict = target_net.state_dict()
        policy_net_state_dict = policy_net.state_dict()
        for key in policy_net_state_dict:
            target_net_state_dict[key] = policy_net_state_dict[key]*TAU + target_net_state_dict[key]*(1-TAU)
        target_net.load_state_dict(target_net_state_dict)

        if done:
            episode_durations.append(t + 1)
            plot_durations()
            break

print('Complete')
plot_durations(show_result=True)
plt.ioff()
plt.show()
结果
/usr/local/lib/python3.10/dist-packages/gymnasium/utils/passive_env_checker.py:249: DeprecationWarning:

`np.bool8` is a deprecated alias for `np.bool_`.  (Deprecated NumPy 1.24)

Complete

下图说明了整个结果数据流。

../_images/reinforcement_learning_diagram.jpg

操作是随机选择的,也可以根据策略选择,从而获得下一个 健身环境中的步骤示例。我们将结果记录在 重放内存,并在每次迭代时运行 Optimization Step。 优化从重放内存中随机选择一个批次来训练 新策略。“较旧”target_net也用于优化中,以计算 预期的 Q 值。在每个步骤中都会对其权重执行软更新。

脚本总运行时间:(3 分 57.238 秒)

由 Sphinx-Gallery 生成的图库

文档

访问 PyTorch 的全面开发人员文档

查看文档

教程

获取面向初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并解答您的问题

查看资源