注意
点击 这里 下载完整示例代码
强化学习(DQN)教程¶
创建日期: 2017年3月24日 | 最后更新日期: 2024年6月18日 | 最后验证日期: 2024年11月5日
- Author: Adam Paszke
本教程展示了如何使用PyTorch在Gymnasium提供的CartPole-v1任务上训练一个深度Q学习(DQN)代理。
你可能会发现阅读原始的 深度Q学习(DQN) 论文会有所帮助
任务
代理必须在两个动作之间进行选择——将小车向左移动或向右移动——从而使附着在其上的杆保持直立。您可以在 Gymnasium 的网站 上找到更多关于该环境以及其他更具挑战性的环境的信息。
CartPole¶
当代理观察到环境当前的状态并选择一个动作时,环境会 过渡 到一个新的状态,并且回传一个奖励,该奖励表示该动作的后果。在这个任务中,每次增量时间步都会获得+1奖励,如果杆子倾斜得太厉害或者小车偏离中心超过2.4个单位,环境就会终止。这意味着表现更好的场景将会运行更长时间,累积更大的回报。
CartPole任务设计使得代理的输入是4个实数值,代表环境状态(位置、速度等)。我们直接使用这4个输入,不进行任何缩放,并将它们通过一个小型全连接网络,该网络有两个输出,每个动作对应一个。网络被训练以根据输入状态预测每个动作的预期价值。然后选择预期价值最高的动作。
包
首先,让我们导入所需的包。首先,我们需要 gymnasium 作为环境, 通过使用 pip 安装。这是一项来自原始 OpenAI Gym 项目的分支,并且自 Gym v0.19 起由同一团队维护。 如果您在 Google Colab 中运行此代码,请运行:
%%bash
pip3 install gymnasium[classic_control]
我们将从PyTorch中使用以下内容:
神经网络 (
torch.nn)优化 (
torch.optim)自动微分 (
torch.autograd)
import gymnasium as gym
import math
import random
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from itertools import count
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
env = gym.make("CartPole-v1")
# set up matplotlib
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:
from IPython import display
plt.ion()
# if GPU is to be used
device = torch.device(
"cuda" if torch.cuda.is_available() else
"mps" if torch.backends.mps.is_available() else
"cpu"
)
回放内存¶
我们将使用经验回放内存来训练我们的DQN。它存储代理观察到的转换,允许我们稍后重用这些数据。通过随机从中采样,构建一个批次的转换是去相关的。研究表明,这极大地稳定并改善了DQN的训练过程。
对于这个任务,我们需要两个类:
Transition- 一个表示环境单次过渡的命名元组。它基本上将(状态,动作)对映射到其(下一个状态,奖励)的结果,其中状态是稍后描述的屏幕差异图像。ReplayMemory- 一个循环缓冲区,具有固定大小,用于存储最近观察到的过渡。它还实现了用于选择训练批次的随机过渡的.sample()方法。
Transition = namedtuple('Transition',
('state', 'action', 'next_state', 'reward'))
class ReplayMemory(object):
def __init__(self, capacity):
self.memory = deque([], maxlen=capacity)
def push(self, *args):
"""Save a transition"""
self.memory.append(Transition(*args))
def sample(self, batch_size):
return random.sample(self.memory, batch_size)
def __len__(self):
return len(self.memory)
现在,我们来定义我们的模型。但在开始之前,让我们快速回顾一下DQN是什么。
DQN算法¶
我们的环境是确定性的,因此这里呈现的所有公式也为了简化而以确定性的方式表述。在强化学习文献中,它们也会包含对环境中随机转换的期望值。
我们的目标将是训练一个策略,试图最大化折现累计奖励 \(R_{t_0} = \sum_{t=t_0}^{\infty} \gamma^{t - t_0} r_t\),其中 \(R_{t_0}\) 也被称为 回报。折现因子 \(\gamma\) 应该是一个介于 \(0\) 和 \(1\) 之间的常数,以确保总和收敛。较低的 \(\gamma\) 会使我们的代理更重视近未来的奖励,而不是远未来的奖励,它对这些远未来的奖励只能相当确定。这也鼓励代理在时间上更接近地收集奖励,而不是在远未来的等效奖励。
The main idea behind Q-learning is that if we had a function \(Q^*: State \times Action \rightarrow \mathbb{R}\), that could tell 我们如果在给定状态下采取某个行动,我们的回报会是多少,那么我们可以很容易地构造一个最大化奖励的策略:
然而,我们并不了解这个世界的一切,所以无法访问\(Q^*\)。但是,由于神经网络是通用函数逼近器,我们可以简单地创建一个并训练它来类似于\(Q^*\)。
对于我们的训练更新规则,我们将使用一个事实,即每个\(Q\) 函数对于某些策略都遵守贝尔曼方程:
两者的差值被称为时差误差,\(\delta\):
为了最小化这种误差,我们将使用 Huber 损失。Huber 损失在误差较小时表现得像均方误差,在误差较大时表现得像平均绝对误差——这使得它对噪声较大的 \(Q\) 估计值更加鲁棒。我们从经验回放缓冲区中采样一批转换 \(B\) 来计算这一损失:
Q-network¶
我们的模型将是一个前向神经网络,它接受当前和上一屏幕片段之间的差异作为输入。它有两个输出,分别代表\(Q(s, \mathrm{left})\)和\(Q(s, \mathrm{right})\)(其中\(s\)是网络的输入)。实际上,网络试图预测给定当前输入采取每个动作的预期回报。
class DQN(nn.Module):
def __init__(self, n_observations, n_actions):
super(DQN, self).__init__()
self.layer1 = nn.Linear(n_observations, 128)
self.layer2 = nn.Linear(128, 128)
self.layer3 = nn.Linear(128, n_actions)
# Called with either one element to determine next action, or a batch
# during optimization. Returns tensor([[left0exp,right0exp]...]).
def forward(self, x):
x = F.relu(self.layer1(x))
x = F.relu(self.layer2(x))
return self.layer3(x)
训练¶
超参数和工具¶
此单元格实例化了我们的模型和优化器,并定义了一些实用工具:
select_action- 将根据ε贪婪策略选择一个动作。简单来说,我们有时会使用模型来选择动作,有时则会均匀随机选取一个动作。选择随机动作的概率将从EPS_START开始,并以指数方式衰减至EPS_END。EPS_DECAY控制衰减的速度。plot_durations- 一个辅助工具,用于绘制episode的持续时间,以及过去100个episode的平均值(官方评估中使用的衡量标准)。该图表将在包含主要训练循环的单元格下方显示,并在每个episode后更新。
# BATCH_SIZE is the number of transitions sampled from the replay buffer
# GAMMA is the discount factor as mentioned in the previous section
# EPS_START is the starting value of epsilon
# EPS_END is the final value of epsilon
# EPS_DECAY controls the rate of exponential decay of epsilon, higher means a slower decay
# TAU is the update rate of the target network
# LR is the learning rate of the ``AdamW`` optimizer
BATCH_SIZE = 128
GAMMA = 0.99
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 1000
TAU = 0.005
LR = 1e-4
# Get number of actions from gym action space
n_actions = env.action_space.n
# Get the number of state observations
state, info = env.reset()
n_observations = len(state)
policy_net = DQN(n_observations, n_actions).to(device)
target_net = DQN(n_observations, n_actions).to(device)
target_net.load_state_dict(policy_net.state_dict())
optimizer = optim.AdamW(policy_net.parameters(), lr=LR, amsgrad=True)
memory = ReplayMemory(10000)
steps_done = 0
def select_action(state):
global steps_done
sample = random.random()
eps_threshold = EPS_END + (EPS_START - EPS_END) * \
math.exp(-1. * steps_done / EPS_DECAY)
steps_done += 1
if sample > eps_threshold:
with torch.no_grad():
# t.max(1) will return the largest column value of each row.
# second column on max result is index of where max element was
# found, so we pick action with the larger expected reward.
return policy_net(state).max(1).indices.view(1, 1)
else:
return torch.tensor([[env.action_space.sample()]], device=device, dtype=torch.long)
episode_durations = []
def plot_durations(show_result=False):
plt.figure(1)
durations_t = torch.tensor(episode_durations, dtype=torch.float)
if show_result:
plt.title('Result')
else:
plt.clf()
plt.title('Training...')
plt.xlabel('Episode')
plt.ylabel('Duration')
plt.plot(durations_t.numpy())
# Take 100 episode averages and plot them too
if len(durations_t) >= 100:
means = durations_t.unfold(0, 100, 1).mean(1).view(-1)
means = torch.cat((torch.zeros(99), means))
plt.plot(means.numpy())
plt.pause(0.001) # pause a bit so that plots are updated
if is_ipython:
if not show_result:
display.display(plt.gcf())
display.clear_output(wait=True)
else:
display.display(plt.gcf())
训练循环¶
最后,我们的模型训练代码。
在这里,您可以找到一个执行优化单一步骤的optimize_model函数。它首先采样一个批次,将所有张量连接成一个单一的张量,计算\(Q(s_t, a_t)\)和\(V(s_{t+1}) = \max_a Q(s_{t+1}, a)\),并将它们组合成我们的损失。根据定义,如果\(s\)是终止状态,则我们设置\(V(s) = 0\)。我们还使用目标网络来计算\(V(s_{t+1})\)以增加稳定性。目标网络在每一步都通过软更新进行更新,该更新由先前定义的超参数TAU控制。
def optimize_model():
if len(memory) < BATCH_SIZE:
return
transitions = memory.sample(BATCH_SIZE)
# Transpose the batch (see https://stackoverflow.com/a/19343/3343043 for
# detailed explanation). This converts batch-array of Transitions
# to Transition of batch-arrays.
batch = Transition(*zip(*transitions))
# Compute a mask of non-final states and concatenate the batch elements
# (a final state would've been the one after which simulation ended)
non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
batch.next_state)), device=device, dtype=torch.bool)
non_final_next_states = torch.cat([s for s in batch.next_state
if s is not None])
state_batch = torch.cat(batch.state)
action_batch = torch.cat(batch.action)
reward_batch = torch.cat(batch.reward)
# Compute Q(s_t, a) - the model computes Q(s_t), then we select the
# columns of actions taken. These are the actions which would've been taken
# for each batch state according to policy_net
state_action_values = policy_net(state_batch).gather(1, action_batch)
# Compute V(s_{t+1}) for all next states.
# Expected values of actions for non_final_next_states are computed based
# on the "older" target_net; selecting their best reward with max(1).values
# This is merged based on the mask, such that we'll have either the expected
# state value or 0 in case the state was final.
next_state_values = torch.zeros(BATCH_SIZE, device=device)
with torch.no_grad():
next_state_values[non_final_mask] = target_net(non_final_next_states).max(1).values
# Compute the expected Q values
expected_state_action_values = (next_state_values * GAMMA) + reward_batch
# Compute Huber loss
criterion = nn.SmoothL1Loss()
loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))
# Optimize the model
optimizer.zero_grad()
loss.backward()
# In-place gradient clipping
torch.nn.utils.clip_grad_value_(policy_net.parameters(), 100)
optimizer.step()
以下,您可以找到主要的训练循环。在开始时,我们重置环境并获得初始 state 张量。然后,我们采样一个动作,执行它,观察下一个状态和奖励(始终为1),并优化我们的模型一次。当episode结束(我们的模型失败)时,我们将重新启动此循环。
Below, num_episodes 是设置为 600,如果可用 GPU,则否则计划进行 50 次episode,以防止训练时间过长。然而,50 次episode 对于观察 CartPole 的良好性能是不够的。你应该看到模型在 600 次训练episode 中不断达到 500 步。训练 RL 剂量过程可能会有噪音,因此如果没有观察到收敛,重新开始训练可以产生更好的结果。
if torch.cuda.is_available() or torch.backends.mps.is_available():
num_episodes = 600
else:
num_episodes = 50
for i_episode in range(num_episodes):
# Initialize the environment and get its state
state, info = env.reset()
state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
for t in count():
action = select_action(state)
observation, reward, terminated, truncated, _ = env.step(action.item())
reward = torch.tensor([reward], device=device)
done = terminated or truncated
if terminated:
next_state = None
else:
next_state = torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0)
# Store the transition in memory
memory.push(state, action, next_state, reward)
# Move to the next state
state = next_state
# Perform one step of the optimization (on the policy network)
optimize_model()
# Soft update of the target network's weights
# θ′ ← τ θ + (1 −τ )θ′
target_net_state_dict = target_net.state_dict()
policy_net_state_dict = policy_net.state_dict()
for key in policy_net_state_dict:
target_net_state_dict[key] = policy_net_state_dict[key]*TAU + target_net_state_dict[key]*(1-TAU)
target_net.load_state_dict(target_net_state_dict)
if done:
episode_durations.append(t + 1)
plot_durations()
break
print('Complete')
plot_durations(show_result=True)
plt.ioff()
plt.show()

/usr/local/lib/python3.10/dist-packages/gymnasium/utils/passive_env_checker.py:249: DeprecationWarning:
`np.bool8` is a deprecated alias for `np.bool_`. (Deprecated NumPy 1.24)
Complete
这里是说明整体数据流的图表。

动作要么随机选择,要么根据策略从gym环境中获取下一个步骤的样本。我们将结果记录在回放缓冲区中,并且在每次迭代时也执行优化步骤。优化过程会从回放缓冲区中随机选取一批数据来训练新的策略。在计算预期Q值时,还会使用“较旧”的target_net。每一步都会对它的权重进行软更新。
脚本总运行时间: (3分钟57.238秒)