AgileRL: 实现 DQN - 课程学习和自博弈

../../../_images/connect_four_self_opp.gif

图1:通过自博弈训练的玩四子棋的智能体

本教程展示了如何在四子棋经典环境中训练 DQN 智能体。

本教程重点介绍强化学习中使用的两种技术——课程学习自博弈。课程学习是指在独立的‘课程’中训练智能体完成难度逐渐增加的任务。想象一下,你正在努力成为国际象棋世界冠军。你不会决定通过立即与特级大师对弈来学习国际象棋——那会太难了。相反,你会与和你能力相当的人练习,慢慢提高,并逐渐与更强的对手对弈,直到你准备好与最强者竞争。同样的概念也适用于强化学习模型。有时,任务太难,无法一次性学会,因此我们必须创建一个课程来指导智能体,并教会它解决最终的困难环境。

本教程也使用自博弈。自博弈是竞争性强化学习环境中常用的一种技术。智能体通过与自身的副本(即对手)对弈进行训练,并学习击败这个对手。然后,对手会更新为这个更优秀版本的智能体的副本,智能体必须再次学习击败自身。这个过程会重复进行,智能体通过利用自身的弱点和发现新的策略来迭代地改进。

在本教程中,自博弈被视为课程的最后一课。然而,这两种技术可以独立使用,并且在资源无限的情况下,自博弈可以击败通过人类设计的课程进行课程学习训练的智能体。Richard Sutton 的《苦涩的教训》对课程学习提出了有趣的看法,对于任何承担此类任务的工程师来说,绝对值得考虑。然而,与 Sutton 不同,我们并非都拥有 Deepmind 和顶级机构提供的资源,因此在决定如何解决自己的强化学习问题时必须务实。如果你想进一步讨论这个令人兴奋的研究领域,请加入 AgileRL Discord 服务器,告诉我们你的想法!

什么是 DQN?

DQN(深度 Q 网络)是 Q-learning 的扩展,利用回放缓冲区和目标网络来提高学习的稳定性。有关 DQN 的更多信息,请查阅 AgileRL 文档

我可以使用它吗?

动作空间

观测空间

离散

✔️

✔️

连续

✔️

环境设置

为了遵循本教程,你需要安装下面所示的依赖项。建议使用新创建的虚拟环境,以避免依赖冲突。

agilerl==2.2.1; python_version >= '3.10' and python_version < '3.12'
pettingzoo[classic,atari,mpe]>=1.23.1
AutoROM>=0.6.1
SuperSuit>=3.9.0
torch>=2.0.1
numpy>=1.24.2
tqdm>=4.65.0
fastrand==1.3.0
gymnasium>=0.28.1
imageio>=2.31.1
Pillow>=9.5.0
PyYAML>=5.4.1

代码

在四子棋上使用 DQN 进行课程学习和自博弈

以下代码应该可以无问题运行。注释旨在帮助你理解如何在 PettingZoo 中使用 AgileRL。如果你有任何问题,请随时在 Discord 服务器中提问。

这是一个复杂的教程,我们将分阶段进行。完整的代码可以在本节末尾找到。虽然本教程的大部分内容是针对四子棋环境的,但它展示了这些技术如何更普遍地应用于其他问题。

导入

导入以下包、函数和类将使我们能够运行本教程。

导入
import copy
import os
import random
from collections import deque
from datetime import datetime

import numpy as np
import torch
import wandb
import yaml
from agilerl.components.replay_buffer import ReplayBuffer
from agilerl.hpo.mutation import Mutations
from agilerl.hpo.tournament import TournamentSelection
from agilerl.utils.utils import create_population, observation_space_channels_to_first
from agilerl.algorithms.core.wrappers import OptimizerWrapper
from tqdm import tqdm, trange

from pettingzoo.classic import connect_four_v3

课程学习

首先,我们需要设置和修改环境以实现课程学习。课程学习通过改变智能体训练的环境来实现。这可以通过改变特定动作发生时的结果来实施——例如改变环境返回的下一个观测,或者更简单地改变奖励。首先,我们将改变奖励。默认情况下,四子棋使用以下奖励:

  • 赢 = +1

  • 输 = -1

  • 游戏继续 = 0

为了更好地引导我们的智能体,我们可以为环境中的其他结果引入奖励,例如在某一行放置 3 个棋子时给予小额奖励,或者当对手做到同样的事情时给予小额负面奖励。我们还可以使用奖励塑形来鼓励智能体进行更多探索。在四子棋中,如果与随机对手对战,一个简单的获胜方法是始终在同一列下棋。智能体可能会通过这样做获得成功,从而未能学习其他更复杂的策略来帮助它击败更强的对手。因此,我们可以选择奖励垂直获胜的得分略低于水平或对角线获胜,以鼓励智能体尝试以不同的方式获胜。一个示例奖励系统可以定义如下:

  • 赢 (水平或对角线) = +1

  • 赢 (垂直) = +0.8

  • 三子连线 = +0.05

  • 对手三子连线 = -0.05

  • 输 = -1

  • 游戏继续 = 0

配置文件

最好使用 YAML 配置文件来定义课程中的各个课时,以便轻松修改和跟踪我们的设置。我们课程中的前三课可以定义如下:

第 1 课
---
# Connect Four Lesson 1
# Train against random agent: 'random', weak opponent: 'weak', strong opponent: 'strong', or use self-play: 'self'
opponent: random
opponent_pool_size:       # Size of opponent pool for self-play
opponent_upgrade:       # Epoch frequency to update opponent pool
eval_opponent:       # 'random', 'weak' or 'strong'
pretrained_path:       # Path to pretrained model weights
save_path: models/DQN/lesson1_trained_agent.pt  # Path to save trained model
max_train_episodes: 0  # Maximum number of training episodes in environment

## Game specific:
buffer_warm_up: true  # Fill replay buffer with random experiences
warm_up_opponent: random  # Difficulty level of warm up experiences
agent_warm_up: 3000  # Number of epochs to warm up agent by training on random experiences
block_vert_coef: 4  # How many times more likely to block vertically
rewards:  # Rewards for different outcomes
    win: 1
    vertical_win: 0.7
    three_in_row: 0.05
    opp_three_in_row: -0.05
    lose: -1
    play_continues: 0
第 2 课
---
# Connect Four Lesson 2
# Train against random agent: 'random', weak opponent: 'weak', strong opponent: 'strong', or use self-play: 'self'
opponent: weak
opponent_pool_size:       # Size of opponent pool for self-play
opponent_upgrade:       # Epoch frequency to update opponent pool
eval_opponent: weak  # 'random', 'weak' or 'strong'
pretrained_path: models/DQN/lesson1_trained_agent.pt  # Path to pretrained model weights
save_path: models/DQN/lesson2_trained_agent.pt  # Path to save trained model
max_train_episodes: 100  # Maximum number of training episodes in environment

## Game specific:
buffer_warm_up: false  # Fill replay buffer with random experiences
warm_up_opponent:       # Difficulty level of warm up experiences
agent_warm_up: 0  # Number of epochs to warm up agent by training on random experiences
block_vert_coef: 1  # How many times more likely to block vertically
rewards:  # Rewards for different outcomes
    win: 1
    vertical_win: 1
    three_in_row: 0.02
    opp_three_in_row: -0.02
    lose: -1
    play_continues: 0
第 3 课
---
# Connect Four Lesson 3
# Train against random agent: 'random', weak opponent: 'weak', strong opponent: 'strong', or use self-play: 'self'
opponent: strong
opponent_pool_size:       # Size of opponent pool for self-play
opponent_upgrade:       # Epoch frequency to update opponent pool
eval_opponent: strong  # 'random', 'weak' or 'strong'
pretrained_path: models/DQN/lesson2_trained_agent.pt   # Path to pretrained model weights
save_path: models/DQN/lesson3_trained_agent.pt  # Path to save trained model
max_train_episodes: 200  # Maximum number of training episodes in environment

## Game specific:
buffer_warm_up: false  # Fill replay buffer with random experiences
warm_up_opponent:  # Difficulty level of warm up experiences
agent_warm_up: 0  # Number of epochs to warm up agent by training on random experiences
block_vert_coef: 1  # How many times more likely to block vertically
rewards:  # Rewards for different outcomes
    win: 1
    vertical_win: 1
    three_in_row: 0.02
    opp_three_in_row: -0.02
    lose: -1
    play_continues: 0

为了实施我们的课程,我们创建了一个 CurriculumEnv 类,它作为四子棋环境之上的包装器,使我们能够修改奖励来指导智能体的训练。这使用了我们设置的配置文件来定义课程。

课程环境
class CurriculumEnv:
   """Wrapper around environment to modify reward for curriculum learning.

   :param env: Environment to learn in
   :type env: PettingZoo-style environment
   :param lesson: Lesson settings for curriculum learning
   :type lesson: dict
   """

   def __init__(self, env, lesson):
      self.env = env
      self.lesson = lesson

   def fill_replay_buffer(self, memory, opponent):
      """Fill the replay buffer with experiences collected by taking random actions in the environment.

      :param memory: Experience replay buffer
      :type memory: AgileRL experience replay buffer
      """
      print("Filling replay buffer ...")

      pbar = tqdm(total=memory.memory_size)
      while len(memory) < memory.memory_size:
         # Randomly decide whether random player will go first or second
         if random.random() > 0.5:
               opponent_first = False
         else:
               opponent_first = True

         mem_full = len(memory)
         self.reset()  # Reset environment at start of episode
         observation, reward, done, truncation, _ = self.last()

         (
               p1_state,
               p1_state_flipped,
               p1_action,
               p1_next_state,
               p1_next_state_flipped,
         ) = (None, None, None, None, None)
         done, truncation = False, False

         while not (done or truncation):
               # Player 0's turn
               p0_action_mask = observation["action_mask"]
               p0_state = np.moveaxis(observation["observation"], [-1], [-3])
               p0_state_flipped = np.expand_dims(np.flip(p0_state, 2), 0)
               p0_state = np.expand_dims(p0_state, 0)
               if opponent_first:
                  p0_action = self.env.action_space("player_0").sample(p0_action_mask)
               else:
                  if self.lesson["warm_up_opponent"] == "random":
                     p0_action = opponent.get_action(
                           p0_action_mask, p1_action, self.lesson["block_vert_coef"]
                     )
                  else:
                     p0_action = opponent.get_action(player=0)
               self.step(p0_action)  # Act in environment
               observation, env_reward, done, truncation, _ = self.last()
               p0_next_state = np.moveaxis(observation["observation"], [-1], [-3])
               p0_next_state_flipped = np.expand_dims(np.flip(p0_next_state, 2), 0)
               p0_next_state = np.expand_dims(p0_next_state, 0)

               if done or truncation:
                  reward = self.reward(done=True, player=0)
                  memory.save2memoryVectEnvs(
                     np.concatenate(
                           (p0_state, p1_state, p0_state_flipped, p1_state_flipped)
                     ),
                     [p0_action, p1_action, 6 - p0_action, 6 - p1_action],
                     [
                           reward,
                           LESSON["rewards"]["lose"],
                           reward,
                           LESSON["rewards"]["lose"],
                     ],
                     np.concatenate(
                           (
                              p0_next_state,
                              p1_next_state,
                              p0_next_state_flipped,
                              p1_next_state_flipped,
                           )
                     ),
                     [done, done, done, done],
                  )
               else:  # Play continues
                  if p1_state is not None:
                     reward = self.reward(done=False, player=1)
                     memory.save2memoryVectEnvs(
                           np.concatenate((p1_state, p1_state_flipped)),
                           [p1_action, 6 - p1_action],
                           [reward, reward],
                           np.concatenate((p1_next_state, p1_next_state_flipped)),
                           [done, done],
                     )

                  # Player 1's turn
                  p1_action_mask = observation["action_mask"]
                  p1_state = np.moveaxis(observation["observation"], [-1], [-3])
                  p1_state[[0, 1], :, :] = p1_state[[0, 1], :, :]
                  p1_state_flipped = np.expand_dims(np.flip(p1_state, 2), 0)
                  p1_state = np.expand_dims(p1_state, 0)
                  if not opponent_first:
                     p1_action = self.env.action_space("player_1").sample(
                           p1_action_mask
                     )
                  else:
                     if self.lesson["warm_up_opponent"] == "random":
                           p1_action = opponent.get_action(
                              p1_action_mask, p0_action, LESSON["block_vert_coef"]
                           )
                     else:
                           p1_action = opponent.get_action(player=1)
                  self.step(p1_action)  # Act in environment
                  observation, env_reward, done, truncation, _ = self.last()
                  p1_next_state = np.moveaxis(observation["observation"], [-1], [-3])
                  p1_next_state[[0, 1], :, :] = p1_next_state[[0, 1], :, :]
                  p1_next_state_flipped = np.expand_dims(np.flip(p1_next_state, 2), 0)
                  p1_next_state = np.expand_dims(p1_next_state, 0)

                  if done or truncation:
                     reward = self.reward(done=True, player=1)
                     memory.save2memoryVectEnvs(
                           np.concatenate(
                              (p0_state, p1_state, p0_state_flipped, p1_state_flipped)
                           ),
                           [p0_action, p1_action, 6 - p0_action, 6 - p1_action],
                           [
                              LESSON["rewards"]["lose"],
                              reward,
                              LESSON["rewards"]["lose"],
                              reward,
                           ],
                           np.concatenate(
                              (
                                 p0_next_state,
                                 p1_next_state,
                                 p0_next_state_flipped,
                                 p1_next_state_flipped,
                              )
                           ),
                           [done, done, done, done],
                     )

                  else:  # Play continues
                     reward = self.reward(done=False, player=0)
                     memory.save2memoryVectEnvs(
                           np.concatenate((p0_state, p0_state_flipped)),
                           [p0_action, 6 - p0_action],
                           [reward, reward],
                           np.concatenate((p0_next_state, p0_next_state_flipped)),
                           [done, done],
                     )

         pbar.update(len(memory) - mem_full)
      pbar.close()
      print("Replay buffer warmed up.")
      return memory

   def check_winnable(self, lst, piece):
      """Checks if four pieces in a row represent a winnable opportunity, e.g. [1, 1, 1, 0] or [2, 0, 2, 2].

      :param lst: List of pieces in row
      :type lst: List
      :param piece: Player piece we are checking (1 or 2)
      :type piece: int
      """
      return lst.count(piece) == 3 and lst.count(0) == 1

   def check_vertical_win(self, player):
      """Checks if a win is vertical.

      :param player: Player who we are checking, 0 or 1
      :type player: int
      """
      board = np.array(self.env.env.board).reshape(6, 7)
      piece = player + 1

      column_count = 7
      row_count = 6

      # Check vertical locations for win
      for c in range(column_count):
         for r in range(row_count - 3):
               if (
                  board[r][c] == piece
                  and board[r + 1][c] == piece
                  and board[r + 2][c] == piece
                  and board[r + 3][c] == piece
               ):
                  return True
      return False

   def check_three_in_row(self, player):
      """Checks if there are three pieces in a row and a blank space next, or two pieces - blank - piece.

      :param player: Player who we are checking, 0 or 1
      :type player: int
      """
      board = np.array(self.env.env.board).reshape(6, 7)
      piece = player + 1

      # Check horizontal locations
      column_count = 7
      row_count = 6
      three_in_row_count = 0

      # Check vertical locations
      for c in range(column_count):
         for r in range(row_count - 3):
               if self.check_winnable(board[r : r + 4, c].tolist(), piece):
                  three_in_row_count += 1

      # Check horizontal locations
      for r in range(row_count):
         for c in range(column_count - 3):
               if self.check_winnable(board[r, c : c + 4].tolist(), piece):
                  three_in_row_count += 1

      # Check positively sloped diagonals
      for c in range(column_count - 3):
         for r in range(row_count - 3):
               if self.check_winnable(
                  [
                     board[r, c],
                     board[r + 1, c + 1],
                     board[r + 2, c + 2],
                     board[r + 3, c + 3],
                  ],
                  piece,
               ):
                  three_in_row_count += 1

      # Check negatively sloped diagonals
      for c in range(column_count - 3):
         for r in range(3, row_count):
               if self.check_winnable(
                  [
                     board[r, c],
                     board[r - 1, c + 1],
                     board[r - 2, c + 2],
                     board[r - 3, c + 3],
                  ],
                  piece,
               ):
                  three_in_row_count += 1

      return three_in_row_count

   def reward(self, done, player):
      """Processes and returns reward from environment according to lesson criteria.

      :param done: Environment has terminated
      :type done: bool
      :param player: Player who we are checking, 0 or 1
      :type player: int
      """
      if done:
         reward = (
               self.lesson["rewards"]["vertical_win"]
               if self.check_vertical_win(player)
               else self.lesson["rewards"]["win"]
         )
      else:
         agent_three_count = self.check_three_in_row(1 - player)
         opp_three_count = self.check_three_in_row(player)
         if (agent_three_count + opp_three_count) == 0:
               reward = self.lesson["rewards"]["play_continues"]
         else:
               reward = (
                  self.lesson["rewards"]["three_in_row"] * agent_three_count
                  + self.lesson["rewards"]["opp_three_in_row"] * opp_three_count
               )
      return reward

   def last(self):
      """Wrapper around PettingZoo env last method."""
      return self.env.last()

   def step(self, action):
      """Wrapper around PettingZoo env step method."""
      self.env.step(action)

   def reset(self):
      """Wrapper around PettingZoo env reset method."""
      self.env.reset()

在定义课程中的不同课时时,我们可以通过修改智能体在环境中的观测来增加任务的难度——在四子棋中,我们可以提高对手的技能水平。通过循序渐进地这样做,我们可以帮助智能体提高。我们也可以在课时之间改变奖励;例如,一旦我们学会击败一个随机智能体,现在希望训练对抗一个更强的对手时,我们可能希望奖励所有方向的获胜相等。在本教程中,实现了一个 Opponent 类来提供不同难度的对手,用于训练我们的智能体。

对手
class Opponent:
   """Connect 4 opponent to train and/or evaluate against.

   :param env: Environment to learn in
   :type env: PettingZoo-style environment
   :param difficulty: Difficulty level of opponent, 'random', 'weak' or 'strong'
   :type difficulty: str
   """

   def __init__(self, env, difficulty):
      self.env = env.env
      self.difficulty = difficulty
      if self.difficulty == "random":
         self.get_action = self.random_opponent
      elif self.difficulty == "weak":
         self.get_action = self.weak_rule_based_opponent
      else:
         self.get_action = self.strong_rule_based_opponent
      self.num_cols = 7
      self.num_rows = 6
      self.length = 4
      self.top = [0] * self.num_cols

   def update_top(self):
      """Updates self.top, a list which tracks the row on top of the highest piece in each column."""
      board = np.array(self.env.env.board).reshape(self.num_rows, self.num_cols)
      non_zeros = np.where(board != 0)
      rows, cols = non_zeros
      top = np.zeros(board.shape[1], dtype=int)
      for col in range(board.shape[1]):
         column_pieces = rows[cols == col]
         if len(column_pieces) > 0:
               top[col] = np.min(column_pieces) - 1
         else:
               top[col] = 5
      full_columns = np.all(board != 0, axis=0)
      top[full_columns] = 6
      self.top = top

   def random_opponent(self, action_mask, last_opp_move=None, block_vert_coef=1):
      """Takes move for random opponent. If the lesson aims to randomly block vertical wins with a higher probability, this is done here too.

      :param action_mask: Mask of legal actions: 1=legal, 0=illegal
      :type action_mask: List
      :param last_opp_move: Most recent action taken by agent against this opponent
      :type last_opp_move: int
      :param block_vert_coef: How many times more likely to block vertically
      :type block_vert_coef: float
      """
      if last_opp_move is not None:
         action_mask[last_opp_move] *= block_vert_coef
      action = random.choices(list(range(self.num_cols)), action_mask)[0]
      return action

   def weak_rule_based_opponent(self, player):
      """Takes move for weak rule-based opponent.

      :param player: Player who we are checking, 0 or 1
      :type player: int
      """
      self.update_top()
      max_length = -1
      best_actions = []
      for action in range(self.num_cols):
         possible, reward, ended, lengths = self.outcome(
               action, player, return_length=True
         )
         if possible and lengths.sum() > max_length:
               best_actions = []
               max_length = lengths.sum()
         if possible and lengths.sum() == max_length:
               best_actions.append(action)
      best_action = random.choice(best_actions)
      return best_action

   def strong_rule_based_opponent(self, player):
      """Takes move for strong rule-based opponent.

      :param player: Player who we are checking, 0 or 1
      :type player: int
      """
      self.update_top()

      winning_actions = []
      for action in range(self.num_cols):
         possible, reward, ended = self.outcome(action, player)
         if possible and ended:
               winning_actions.append(action)
      if len(winning_actions) > 0:
         winning_action = random.choice(winning_actions)
         return winning_action

      opp = 1 if player == 0 else 0
      loss_avoiding_actions = []
      for action in range(self.num_cols):
         possible, reward, ended = self.outcome(action, opp)
         if possible and ended:
               loss_avoiding_actions.append(action)
      if len(loss_avoiding_actions) > 0:
         loss_avoiding_action = random.choice(loss_avoiding_actions)
         return loss_avoiding_action

      return self.weak_rule_based_opponent(player)  # take best possible move

   def outcome(self, action, player, return_length=False):
      """Takes move for weak rule-based opponent.

      :param action: Action to take in environment
      :type action: int
      :param player: Player who we are checking, 0 or 1
      :type player: int
      :param return_length: Return length of outcomes, defaults to False
      :type player: bool, optional
      """
      if not (self.top[action] < self.num_rows):  # action column is full
         return (False, None, None) + ((None,) if return_length else ())

      row, col = self.top[action], action
      piece = player + 1

      # down, up, left, right, down-left, up-right, down-right, up-left,
      directions = np.array(
         [
               [[-1, 0], [1, 0]],
               [[0, -1], [0, 1]],
               [[-1, -1], [1, 1]],
               [[-1, 1], [1, -1]],
         ]
      )  # |4x2x2|

      positions = np.array([row, col]).reshape(1, 1, 1, -1) + np.expand_dims(
         directions, -2
      ) * np.arange(1, self.length).reshape(
         1, 1, -1, 1
      )  # |4x2x3x2|
      valid_positions = np.logical_and(
         np.logical_and(
               positions[:, :, :, 0] >= 0, positions[:, :, :, 0] < self.num_rows
         ),
         np.logical_and(
               positions[:, :, :, 1] >= 0, positions[:, :, :, 1] < self.num_cols
         ),
      )  # |4x2x3|
      d0 = np.where(valid_positions, positions[:, :, :, 0], 0)
      d1 = np.where(valid_positions, positions[:, :, :, 1], 0)
      board = np.array(self.env.env.board).reshape(self.num_rows, self.num_cols)
      board_values = np.where(valid_positions, board[d0, d1], 0)
      a = (board_values == piece).astype(int)
      b = np.concatenate(
         (a, np.zeros_like(a[:, :, :1])), axis=-1
      )  # padding with zeros to compute length
      lengths = np.argmin(b, -1)

      ended = False
      # check if winnable in any direction
      for both_dir in board_values:
         # |2x3|
         line = np.concatenate((both_dir[0][::-1], [piece], both_dir[1]))
         if "".join(map(str, [piece] * self.length)) in "".join(map(str, line)):
               ended = True
               break

      # ended = np.any(np.greater_equal(np.sum(lengths, 1), self.length - 1))
      draw = True
      for c, v in enumerate(self.top):
         draw &= (v == self.num_rows) if c != col else (v == (self.num_rows - 1))
      ended |= draw
      reward = (-1) ** (player) if ended and not draw else 0

      return (True, reward, ended) + ((lengths,) if return_length else ())

通用设置

在我们进一步进行本教程之前,定义和设置训练所需的所有剩余内容将会很有帮助。

设置代码
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("===== AgileRL Curriculum Learning Demo =====")

lesson_number = 1

# Load lesson for curriculum
with open(f"./curriculums/connect_four/lesson{lesson_number}.yaml") as file:
   LESSON = yaml.safe_load(file)

# Define the network configuration
NET_CONFIG = {
   "encoder_config": {
   "channel_size": [128],  # CNN channel size
   "kernel_size": [4],  # CNN kernel size
   "stride_size": [1],  # CNN stride size
   }
}

# Define the initial hyperparameters
INIT_HP = {
   "POPULATION_SIZE": 6,
   # "ALGO": "Rainbow DQN",  # Algorithm
   "ALGO": "DQN",  # Algorithm
   "DOUBLE": True,
   # Swap image channels dimension from last to first [H, W, C] -> [C, H, W]
   "BATCH_SIZE": 256,  # Batch size
   "LR": 1e-4,  # Learning rate
   "GAMMA": 0.99,  # Discount factor
   "MEMORY_SIZE": 100000,  # Max memory buffer size
   "LEARN_STEP": 1,  # Learning frequency
   "N_STEP": 1,  # Step number to calculate td error
   "PER": False,  # Use prioritized experience replay buffer
   "ALPHA": 0.6,  # Prioritized replay buffer parameter
   "TAU": 0.01,  # For soft update of target parameters
   "BETA": 0.4,  # Importance sampling coefficient
   "PRIOR_EPS": 0.000001,  # Minimum priority for sampling
   "NUM_ATOMS": 51,  # Unit number of support
   "V_MIN": 0.0,  # Minimum value of support
   "V_MAX": 200.0,  # Maximum value of support
   "WANDB": False,  # Use Weights and Biases tracking
}

# Define the connect four environment
env = connect_four_v3.env()
env.reset()

# Configure the algo input arguments
observation_spaces = [env.observation_space(agent)["observation"] for agent in env.agents]
action_spaces = [env.action_space(agent) for agent in env.agents]

# Warp the environment in the curriculum learning wrapper
env = CurriculumEnv(env, LESSON)

# RL hyperparameters configuration for mutation during training
 hp_config = HyperparameterConfig(
     lr = RLParameter(min=1e-4, max=1e-2),
     learn_step = RLParameter(min=1, max=120, dtype=int),
     batch_size = RLParameter(
         min=8, max=64, dtype=int
         )
 )

# Create a population ready for evolutionary hyper-parameter optimisation
observation_space = observation_space_channels_to_first(observation_spaces[0])
action_space = action_spaces[0]
pop = create_population(
   INIT_HP["ALGO"],
   observation_space,
   action_space,
   NET_CONFIG,
   INIT_HP,
   hp_config=hp_config,
   population_size=INIT_HP["POPULATION_SIZE"],
   device=device,
)

# Configure the replay buffer
field_names = ["state", "action", "reward", "next_state", "done"]
memory = ReplayBuffer(
   action_dim=action_dim,  # Number of agent actions
   memory_size=INIT_HP["MEMORY_SIZE"],  # Max replay buffer size
   field_names=field_names,  # Field names to store in memory
   device=device,
)

# Instantiate a tournament selection object (used for HPO)
tournament = TournamentSelection(
   tournament_size=2,  # Tournament selection size
   elitism=True,  # Elitism in tournament selection
   population_size=INIT_HP["POPULATION_SIZE"],  # Population size
   evo_step=1,
)  # Evaluate using last N fitness scores

# Instantiate a mutations object (used for HPO)
mutations = Mutations(
   no_mutation=0.2,  # Probability of no mutation
   architecture=0,  # Probability of architecture mutation
   new_layer_prob=0.2,  # Probability of new layer mutation
   parameters=0.2,  # Probability of parameter mutation
   activation=0,  # Probability of activation function mutation
   rl_hp=0.2,  # Probability of RL hyperparameter mutation
   mutation_sd=0.1,  # Mutation strength
   rand_seed=1,
   device=device,
)

# Define training loop parameters
episodes_per_epoch = 10
max_episodes = LESSON["max_train_episodes"]  # Total episodes
max_steps = 500  # Maximum steps to take in each episode
evo_epochs = 20  # Evolution frequency
evo_loop = 50  # Number of evaluation episodes
elite = pop[0]  # Assign a placeholder "elite" agent
epsilon = 1.0  # Starting epsilon value
eps_end = 0.1  # Final epsilon value
eps_decay = 0.9998  # Epsilon decays
opp_update_counter = 0
wb = INIT_HP["WANDB"]

作为课程的一部分,我们也可以选择用随机经验填充回放缓冲区,并进行离线训练。

填充回放缓冲区
# Perform buffer and agent warmups if desired
if LESSON["buffer_warm_up"]:
   warm_up_opponent = Opponent(env, difficulty=LESSON["warm_up_opponent"])
   memory = env.fill_replay_buffer(
         memory, warm_up_opponent
   )  # Fill replay buffer with transitions
   if LESSON["agent_warm_up"] > 0:
         print("Warming up agents ...")
         agent = pop[0]
         # Train on randomly collected samples
         for epoch in trange(LESSON["agent_warm_up"]):
            experiences = memory.sample(agent.batch_size)
            agent.learn(experiences)
         pop = [agent.clone() for _ in pop]
         elite = agent
         print("Agent population warmed up.")

自博弈

在本教程中,我们将自博弈作为课程的最后一课。通过迭代地改进我们的智能体并使其学会战胜自身,我们可以让它发现新的策略并获得更高的性能。来自先前课程的预训练智能体的权重可以如下加载到种群中:

加载预训练权重
if LESSON["pretrained_path"] is not None:
   for agent in pop:
         # Load pretrained checkpoint
         agent.load_checkpoint(LESSON["pretrained_path"])
         # Reinit optimizer for new task
         agent.lr = INIT_HP["LR"]
         agent.optimizer = OptimizerWrapper(
            torch.optim.Adam
            networks=agent.actor,
            lr=agent.lr,
            network_names=agent.optimizer.network_names,
            lr_name=agent.optimizer.lr_name
         )

为了与我们智能体的旧版本进行训练,我们创建了一个对手池。在训练时,我们从这个池中随机选择一个对手。定期更新对手池,移除最旧的对手,并添加我们智能体最新版本的副本。这在与难度逐渐增加的对手训练和提供对手可能采取的各种动作之间取得了平衡。

创建对手池
if LESSON["opponent"] == "self":
   # Create initial pool of opponents
   opponent_pool = deque(maxlen=LESSON["opponent_pool_size"])
   for _ in range(LESSON["opponent_pool_size"]):
         opp = copy.deepcopy(pop[0])
         opp.actor.load_state_dict(pop[0].actor.state_dict())
         opp.actor.eval()
         opponent_pool.append(opp)

自博弈训练的示例课程配置可以定义如下:

第 4 课
---
# Connect Four Lesson 4
# Train against random agent: 'random', weak opponent: 'weak', strong opponent: 'strong', or use self-play: 'self'
opponent: self
opponent_pool_size: 6  # Size of opponent pool for self-play
opponent_upgrade: 6000  # Epoch frequency to update opponent pool
eval_opponent: strong  # 'random', 'weak' or 'strong'
pretrained_path: models/DQN/lesson3_trained_agent.pt  # Path to pretrained model weights
save_path: models/DQN/lesson4_trained_agent.pt  # Path to save trained model
max_train_episodes: 200  # Maximum number of training episodes in environment

## Game specific:
buffer_warm_up: false  # Fill replay buffer with random experiences
warm_up_opponent:       # Difficulty level of warm up experiences
agent_warm_up: 0  # Number of epochs to warm up agent by training on random experiences
block_vert_coef: 1  # How many times more likely to block vertically if playing random opponent
rewards:  # Rewards for different outcomes
    win: 1
    vertical_win: 1
    three_in_row: 0.01
    opp_three_in_row: -0.01
    lose: -1
    play_continues: 0

也可以仅通过自博弈来训练智能体,而不使用课程中的任何先前课时。这需要大量的训练时间,但最终可能会比其他方法产生更好的性能,并且可以避免《苦涩的教训》中讨论的一些错误。

训练循环

四子棋训练循环必须考虑到智能体每隔一次与环境互动才采取行动(对手轮流行动)。在将转换保存到回放缓冲区时必须考虑这一点。同样,在确定转换的奖励之前,我们必须等待下一个玩家回合的结果。因此,这并非一个真正的马尔可夫决策过程,但我们仍然可以在这些非平稳条件下相对成功地训练强化学习智能体。

定期评估种群中智能体的表现或‘适应度’,并进行进化步骤。表现最好的智能体更有可能成为下一代的成员,并且种群中智能体的超参数和神经网络架构会被突变。这种进化使我们能够在一次训练运行中优化超参数并最大化智能体的性能。

训练循环
if max_episodes > 0:
   if wb:
      wandb.init(
            # set the wandb project where this run will be logged
            project="AgileRL",
            name="{}-EvoHPO-{}-{}Opposition-CNN-{}".format(
               "connect_four_v3",
               INIT_HP["ALGO"],
               LESSON["opponent"],
               datetime.now().strftime("%m%d%Y%H%M%S"),
            ),
            # track hyperparameters and run metadata
            config={
               "algo": "Evo HPO Rainbow DQN",
               "env": "connect_four_v3",
               "INIT_HP": INIT_HP,
               "lesson": LESSON,
            },
      )

total_steps = 0
total_episodes = 0
pbar = trange(int(max_episodes / episodes_per_epoch))

# Training loop
for idx_epi in pbar:
   turns_per_episode = []
   train_actions_hist = [0] * action_dim
   for agent in pop:  # Loop through population
         for episode in range(episodes_per_epoch):
            env.reset()  # Reset environment at start of episode
            observation, env_reward, done, truncation, _ = env.last()

            (
               p1_state,
               p1_state_flipped,
               p1_action,
               p1_next_state,
               p1_next_state_flipped,
            ) = (None, None, None, None, None)

            if LESSON["opponent"] == "self":
               # Randomly choose opponent from opponent pool if using self-play
               opponent = random.choice(opponent_pool)
            else:
               # Create opponent of desired difficulty
               opponent = Opponent(env, difficulty=LESSON["opponent"])

            # Randomly decide whether agent will go first or second
            if random.random() > 0.5:
               opponent_first = False
            else:
               opponent_first = True

            score = 0
            turns = 0  # Number of turns counter

            for idx_step in range(max_steps):
               # Player 0"s turn
               p0_action_mask = observation["action_mask"]
               p0_state = np.moveaxis(observation["observation"], [-1], [-3])
               p0_state_flipped = np.expand_dims(np.flip(p0_state, 2), 0)
               p0_state = np.expand_dims(p0_state, 0)

               if opponent_first:
                     if LESSON["opponent"] == "self":
                        p0_action = opponent.get_action(
                           p0_state, 0, p0_action_mask
                        )[0]
                     elif LESSON["opponent"] == "random":
                        p0_action = opponent.get_action(
                           p0_action_mask, p1_action, LESSON["block_vert_coef"]
                        )
                     else:
                        p0_action = opponent.get_action(player=0)
               else:
                     p0_action = agent.get_action(
                        p0_state, epsilon, p0_action_mask
                     )[
                        0
                     ]  # Get next action from agent
                     train_actions_hist[p0_action] += 1

               env.step(p0_action)  # Act in environment
               observation, env_reward, done, truncation, _ = env.last()
               p0_next_state = np.moveaxis(
                     observation["observation"], [-1], [-3]
               )
               p0_next_state_flipped = np.expand_dims(
                     np.flip(p0_next_state, 2), 0
               )
               p0_next_state = np.expand_dims(p0_next_state, 0)

               if not opponent_first:
                     score += env_reward
               turns += 1

               # Check if game is over (Player 0 win)
               if done or truncation:
                     reward = env.reward(done=True, player=0)
                     memory.save2memoryVectEnvs(
                        np.concatenate(
                           (
                                 p0_state,
                                 p1_state,
                                 p0_state_flipped,
                                 p1_state_flipped,
                           )
                        ),
                        [p0_action, p1_action, 6 - p0_action, 6 - p1_action],
                        [
                           reward,
                           LESSON["rewards"]["lose"],
                           reward,
                           LESSON["rewards"]["lose"],
                        ],
                        np.concatenate(
                           (
                                 p0_next_state,
                                 p1_next_state,
                                 p0_next_state_flipped,
                                 p1_next_state_flipped,
                           )
                        ),
                        [done, done, done, done],
                     )
               else:  # Play continues
                     if p1_state is not None:
                        reward = env.reward(done=False, player=1)
                        memory.save2memoryVectEnvs(
                           np.concatenate((p1_state, p1_state_flipped)),
                           [p1_action, 6 - p1_action],
                           [reward, reward],
                           np.concatenate(
                                 (p1_next_state, p1_next_state_flipped)
                           ),
                           [done, done],
                        )

                     # Player 1"s turn
                     p1_action_mask = observation["action_mask"]
                     p1_state = np.moveaxis(
                        observation["observation"], [-1], [-3]
                     )
                     # Swap pieces so that the agent always sees the board from the same perspective
                     p1_state[[0, 1], :, :] = p1_state[[0, 1], :, :]
                     p1_state_flipped = np.expand_dims(np.flip(p1_state, 2), 0)
                     p1_state = np.expand_dims(p1_state, 0)

                     if not opponent_first:
                        if LESSON["opponent"] == "self":
                           p1_action = opponent.get_action(
                                 p1_state, 0, p1_action_mask
                           )[0]
                        elif LESSON["opponent"] == "random":
                           p1_action = opponent.get_action(
                                 p1_action_mask,
                                 p0_action,
                                 LESSON["block_vert_coef"],
                           )
                        else:
                           p1_action = opponent.get_action(player=1)
                     else:
                        p1_action = agent.get_action(
                           p1_state, epsilon, p1_action_mask
                        )[
                           0
                        ]  # Get next action from agent
                        train_actions_hist[p1_action] += 1

                     env.step(p1_action)  # Act in environment
                     observation, env_reward, done, truncation, _ = env.last()
                     p1_next_state = np.moveaxis(
                        observation["observation"], [-1], [-3]
                     )
                     p1_next_state[[0, 1], :, :] = p1_next_state[[0, 1], :, :]
                     p1_next_state_flipped = np.expand_dims(
                        np.flip(p1_next_state, 2), 0
                     )
                     p1_next_state = np.expand_dims(p1_next_state, 0)

                     if opponent_first:
                        score += env_reward
                     turns += 1

                     # Check if game is over (Player 1 win)
                     if done or truncation:
                        reward = env.reward(done=True, player=1)
                        memory.save2memoryVectEnvs(
                           np.concatenate(
                                 (
                                    p0_state,
                                    p1_state,
                                    p0_state_flipped,
                                    p1_state_flipped,
                                 )
                           ),
                           [
                                 p0_action,
                                 p1_action,
                                 6 - p0_action,
                                 6 - p1_action,
                           ],
                           [
                                 LESSON["rewards"]["lose"],
                                 reward,
                                 LESSON["rewards"]["lose"],
                                 reward,
                           ],
                           np.concatenate(
                                 (
                                    p0_next_state,
                                    p1_next_state,
                                    p0_next_state_flipped,
                                    p1_next_state_flipped,
                                 )
                           ),
                           [done, done, done, done],
                        )

                     else:  # Play continues
                        reward = env.reward(done=False, player=0)
                        memory.save2memoryVectEnvs(
                           np.concatenate((p0_state, p0_state_flipped)),
                           [p0_action, 6 - p0_action],
                           [reward, reward],
                           np.concatenate(
                                 (p0_next_state, p0_next_state_flipped)
                           ),
                           [done, done],
                        )

               # Learn according to learning frequency
               if (memory.counter % agent.learn_step == 0) and (
                     len(memory) >= agent.batch_size
               ):
                     # Sample replay buffer
                     # Learn according to agent"s RL algorithm
                     experiences = memory.sample(agent.batch_size)
                     agent.learn(experiences)

               # Stop episode if any agents have terminated
               if done or truncation:
                     break

            total_steps += idx_step + 1
            total_episodes += 1
            turns_per_episode.append(turns)
            # Save the total episode reward
            agent.scores.append(score)

            if LESSON["opponent"] == "self":
               if (total_episodes % LESSON["opponent_upgrade"] == 0) and (
                     (idx_epi + 1) > evo_epochs
               ):
                     elite_opp, _, _ = tournament._elitism(pop)
                     elite_opp.actor.eval()
                     opponent_pool.append(elite_opp)
                     opp_update_counter += 1

         # Update epsilon for exploration
         epsilon = max(eps_end, epsilon * eps_decay)

   mean_turns = np.mean(turns_per_episode)

   # Now evolve population if necessary
   if (idx_epi + 1) % evo_epochs == 0:
         # Evaluate population vs random actions
         fitnesses = []
         win_rates = []
         eval_actions_hist = [0] * action_dim  # Eval actions histogram
         eval_turns = 0  # Eval turns counter
         for agent in pop:
            with torch.no_grad():
               rewards = []
               for i in range(evo_loop):
                     env.reset()  # Reset environment at start of episode
                     observation, reward, done, truncation, _ = env.last()

                     player = -1  # Tracker for which player"s turn it is

                     # Create opponent of desired difficulty
                     opponent = Opponent(env, difficulty=LESSON["eval_opponent"])

                     # Randomly decide whether agent will go first or second
                     if random.random() > 0.5:
                        opponent_first = False
                     else:
                        opponent_first = True

                     score = 0

                     for idx_step in range(max_steps):
                        action_mask = observation["action_mask"]
                        if player < 0:
                           if opponent_first:
                                 if LESSON["eval_opponent"] == "random":
                                    action = opponent.get_action(action_mask)
                                 else:
                                    action = opponent.get_action(player=0)
                           else:
                                 state = np.moveaxis(
                                    observation["observation"], [-1], [-3]
                                 )
                                 state = np.expand_dims(state, 0)
                                 action = agent.get_action(state, 0, action_mask)[
                                    0
                                 ]  # Get next action from agent
                                 eval_actions_hist[action] += 1
                        if player > 0:
                           if not opponent_first:
                                 if LESSON["eval_opponent"] == "random":
                                    action = opponent.get_action(action_mask)
                                 else:
                                    action = opponent.get_action(player=1)
                           else:
                                 state = np.moveaxis(
                                    observation["observation"], [-1], [-3]
                                 )
                                 state[[0, 1], :, :] = state[[0, 1], :, :]
                                 state = np.expand_dims(state, 0)
                                 action = agent.get_action(state, 0, action_mask)[
                                    0
                                 ]  # Get next action from agent
                                 eval_actions_hist[action] += 1

                        env.step(action)  # Act in environment
                        observation, reward, done, truncation, _ = env.last()

                        if (player > 0 and opponent_first) or (
                           player < 0 and not opponent_first
                        ):
                           score += reward

                        eval_turns += 1

                        if done or truncation:
                           break

                        player *= -1

                     rewards.append(score)
            mean_fit = np.mean(rewards)
            agent.fitness.append(mean_fit)
            fitnesses.append(mean_fit)

         eval_turns = eval_turns / len(pop) / evo_loop

         pbar.set_postfix_str(
            f"    Train Mean Score: {np.mean(agent.scores[-episodes_per_epoch:])}   Train Mean Turns: {mean_turns}   Eval Mean Fitness: {np.mean(fitnesses)}   Eval Best Fitness: {np.max(fitnesses)}   Eval Mean Turns: {eval_turns}   Total Steps: {total_steps}"
         )
         pbar.update(0)

         # Format action histograms for visualisation
         train_actions_hist = [
            freq / sum(train_actions_hist) for freq in train_actions_hist
         ]
         eval_actions_hist = [
            freq / sum(eval_actions_hist) for freq in eval_actions_hist
         ]
         train_actions_dict = {
            f"train/action_{index}": action
            for index, action in enumerate(train_actions_hist)
         }
         eval_actions_dict = {
            f"eval/action_{index}": action
            for index, action in enumerate(eval_actions_hist)
         }

         if wb:
            wandb_dict = {
               "global_step": total_steps,
               "train/mean_score": np.mean(agent.scores[-episodes_per_epoch:]),
               "train/mean_turns_per_game": mean_turns,
               "train/epsilon": epsilon,
               "train/opponent_updates": opp_update_counter,
               "eval/mean_fitness": np.mean(fitnesses),
               "eval/best_fitness": np.max(fitnesses),
               "eval/mean_turns_per_game": eval_turns,
            }
            wandb_dict.update(train_actions_dict)
            wandb_dict.update(eval_actions_dict)
            wandb.log(wandb_dict)

         # Tournament selection and population mutation
         elite, pop = tournament.select(pop)
         pop = mutations.mutation(pop)

if max_episodes > 0:
   if wb:
      wandb.finish()

# Save the trained agent
save_path = LESSON["save_path"]
os.makedirs(os.path.dirname(save_path), exist_ok=True)
elite.saveCheckpoint(save_path)
print(f"Elite agent saved to '{save_path}'.")

训练好的模型权重

训练好的模型权重位于 PettingZoo/tutorials/AgileRL/models。去看一看,与这些模型对战,看看你能否击败它们!

观看训练好的智能体对弈

以下代码允许你从之前的训练块加载保存的 DQN 智能体,测试智能体的性能,然后将多个对局过程可视化为 gif 图。

渲染训练好的智能体
import os

import imageio
import numpy as np
import torch
from agilerl.algorithms.dqn import DQN
from agilerl.utils.utils import observation_space_channels_to_first
from agilerl_dqn_curriculum import Opponent
from PIL import Image, ImageDraw, ImageFont

from pettingzoo.classic import connect_four_v3


# Define function to return image
def _label_with_episode_number(frame, episode_num, frame_no, p):
    im = Image.fromarray(frame)
    drawer = ImageDraw.Draw(im)
    text_color = (255, 255, 255)
    font = ImageFont.truetype("arial.ttf", size=45)
    drawer.text(
        (100, 5),
        f"Episode: {episode_num+1}     Frame: {frame_no}",
        fill=text_color,
        font=font,
    )
    if p == 1:
        player = "Player 1"
        color = (255, 0, 0)
    if p == 2:
        player = "Player 2"
        color = (100, 255, 150)
    if p is None:
        player = "Self-play"
        color = (255, 255, 255)
    drawer.text((700, 5), f"Agent: {player}", fill=color, font=font)
    return im


# Resizes frames to make file size smaller
def resize_frames(frames, fraction):
    resized_frames = []
    for img in frames:
        new_width = int(img.width * fraction)
        new_height = int(img.height * fraction)
        img_resized = img.resize((new_width, new_height))
        resized_frames.append(np.array(img_resized))

    return resized_frames


if __name__ == "__main__":
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    path = "./models/DQN/lesson3_trained_agent.pt"  # Path to saved agent checkpoint

    env = connect_four_v3.env(render_mode="rgb_array")
    env.reset()

    # Configure the algo input arguments
    observation_spaces = [
        env.observation_space(agent)["observation"] for agent in env.agents
    ]
    action_spaces = [env.action_space(agent) for agent in env.agents]

    # Instantiate an DQN object
    dqn = DQN(
        observation_space=observation_space_channels_to_first(observation_spaces[0]),
        action_space=action_spaces[0],
        device=device,
    )

    # Load the saved algorithm into the DQN object
    dqn.load_checkpoint(path)

    for opponent_difficulty in ["random", "weak", "strong", "self"]:
        # Create opponent
        if opponent_difficulty == "self":
            opponent = dqn
        else:
            opponent = Opponent(env, opponent_difficulty)

        # Define test loop parameters
        episodes = 2  # Number of episodes to test agent on
        max_steps = (
            500  # Max number of steps to take in the environment in each episode
        )

        rewards = []  # List to collect total episodic reward
        frames = []  # List to collect frames

        print("============================================")
        print(f"Agent: {path}")
        print(f"Opponent: {opponent_difficulty}")

        # Test loop for inference
        for ep in range(episodes):
            if ep / episodes < 0.5:
                opponent_first = False
                p = 1
            else:
                opponent_first = True
                p = 2
            if opponent_difficulty == "self":
                p = None
            env.reset()  # Reset environment at start of episode
            frame = env.render()
            frames.append(
                _label_with_episode_number(frame, episode_num=ep, frame_no=0, p=p)
            )
            observation, reward, done, truncation, _ = env.last()
            player = -1  # Tracker for which player's turn it is
            score = 0
            for idx_step in range(max_steps):
                action_mask = observation["action_mask"]
                if player < 0:
                    state = np.moveaxis(observation["observation"], [-1], [-3])
                    state = np.expand_dims(state, 0)
                    if opponent_first:
                        if opponent_difficulty == "self":
                            action = opponent.get_action(
                                state, epsilon=0, action_mask=action_mask
                            )[0]
                        elif opponent_difficulty == "random":
                            action = opponent.get_action(action_mask)
                        else:
                            action = opponent.get_action(player=0)
                    else:
                        action = dqn.get_action(
                            state, epsilon=0, action_mask=action_mask
                        )[
                            0
                        ]  # Get next action from agent
                if player > 0:
                    state = np.moveaxis(observation["observation"], [-1], [-3])
                    state[[0, 1], :, :] = state[[0, 1], :, :]
                    state = np.expand_dims(state, 0)
                    if not opponent_first:
                        if opponent_difficulty == "self":
                            action = opponent.get_action(
                                state, epsilon=0, action_mask=action_mask
                            )[0]
                        elif opponent_difficulty == "random":
                            action = opponent.get_action(action_mask)
                        else:
                            action = opponent.get_action(player=1)
                    else:
                        action = dqn.get_action(
                            state, epsilon=0, action_mask=action_mask
                        )[
                            0
                        ]  # Get next action from agent
                env.step(action)  # Act in environment
                observation, reward, termination, truncation, _ = env.last()
                # Save the frame for this step and append to frames list
                frame = env.render()
                frames.append(
                    _label_with_episode_number(
                        frame, episode_num=ep, frame_no=idx_step, p=p
                    )
                )

                if (player > 0 and opponent_first) or (
                    player < 0 and not opponent_first
                ):
                    score += reward
                else:
                    score -= reward

                # Stop episode if any agents have terminated
                if truncation or termination:
                    break

                player *= -1

            print("-" * 15, f"Episode: {ep+1}", "-" * 15)
            print(f"Episode length: {idx_step}")
            print(f"Score: {score}")

        print("============================================")

        frames = resize_frames(frames, 0.5)

        # Save the gif to specified path
        gif_path = "./videos/"
        os.makedirs(gif_path, exist_ok=True)
        imageio.mimwrite(
            os.path.join("./videos/", f"connect_four_{opponent_difficulty}_opp.gif"),
            frames,
            duration=400,
            loop=True,
        )

    env.close()

完整的训练代码

完整的训练代码

请注意,在第 612 行,max_episodes 被设置为 10,以便快速测试本教程代码。可以删除此行,并取消其下方行的注释,以使用配置文件中设置的对局数。

"""This tutorial shows how to train a DQN agent on the connect four environment, using curriculum learning and self play.

Author: Nick (https://github.com/nicku-a), Jaime (https://github.com/jaimesabalbermudez)
"""

import copy
import os
import random
from collections import deque
from typing import List, Optional, Tuple

import numpy as np
import torch
import yaml
from agilerl.algorithms import DQN
from agilerl.algorithms.core import OptimizerWrapper
from agilerl.algorithms.core.registry import HyperparameterConfig, RLParameter
from agilerl.components.data import Transition
from agilerl.components.replay_buffer import ReplayBuffer
from agilerl.hpo.mutation import Mutations
from agilerl.hpo.tournament import TournamentSelection
from agilerl.utils.algo_utils import obs_channels_to_first
from agilerl.utils.utils import create_population, observation_space_channels_to_first
from tqdm import tqdm, trange

from pettingzoo import ParallelEnv
from pettingzoo.classic import connect_four_v3


class CurriculumEnv:
    """Wrapper around environment to modify reward for curriculum learning.

    :param env: Environment to learn in
    :type env: PettingZoo-style environment
    :param lesson: Lesson settings for curriculum learning
    :type lesson: dict
    """

    def __init__(self, env: ParallelEnv, lesson: dict):
        self.env = env
        self.lesson = lesson

    def fill_replay_buffer(
        self, memory: ReplayBuffer, opponent: "Opponent"
    ) -> ReplayBuffer:
        """Fill the replay buffer with experiences collected by taking random actions in the environment.

        :param memory: Experience replay buffer
        :type memory: AgileRL experience replay buffer
        :param opponent: Opponent to train against
        :type opponent: Opponent
        :return: Filled replay buffer
        :rtype: ReplayBuffer
        """
        print("Filling replay buffer ...")

        pbar = tqdm(total=memory.max_size)
        while len(memory) < memory.max_size:
            # Randomly decide whether random player will go first or second
            opponent_first = random.random() > 0.5

            mem_full = len(memory)
            self.reset()  # Reset environment at start of episode
            observation, reward, done, truncation, _ = self.last()

            (
                p1_state,
                p1_state_flipped,
                p1_action,
                p1_next_state,
                p1_next_state_flipped,
            ) = (None, None, None, None, None)
            done, truncation = False, False

            while not (done or truncation):
                # Player 0's turn
                p0_action_mask = observation["action_mask"]
                p0_state, p0_state_flipped = transform_and_flip(observation, player=0)
                if opponent_first:
                    p0_action = self.env.action_space("player_0").sample(p0_action_mask)
                else:
                    if self.lesson["warm_up_opponent"] == "random":
                        p0_action = opponent.get_action(
                            p0_action_mask, p1_action, self.lesson["block_vert_coef"]
                        )
                    else:
                        p0_action = opponent.get_action(player=0)
                self.step(p0_action)  # Act in environment
                observation, env_reward, done, truncation, _ = self.last()
                p0_next_state, p0_next_state_flipped = transform_and_flip(
                    observation, player=0
                )

                if done or truncation:
                    reward = self.reward(done=True, player=0)
                    transition = Transition(
                        obs=np.concatenate(
                            (p0_state, p1_state, p0_state_flipped, p1_state_flipped)
                        ),
                        action=np.array(
                            [p0_action, p1_action, 6 - p0_action, 6 - p1_action]
                        ),
                        reward=np.array(
                            [
                                reward,
                                LESSON["rewards"]["lose"],
                                reward,
                                LESSON["rewards"]["lose"],
                            ]
                        ),
                        next_obs=np.concatenate(
                            (
                                p0_next_state,
                                p1_next_state,
                                p0_next_state_flipped,
                                p1_next_state_flipped,
                            )
                        ),
                        done=np.array([done, done, done, done]),
                        batch_size=[4],
                    )
                    memory.add(transition.to_tensordict())
                else:  # Play continues
                    if p1_state is not None:
                        reward = self.reward(done=False, player=1)
                        transition = Transition(
                            obs=np.concatenate((p1_state, p1_state_flipped)),
                            action=np.array([p1_action, 6 - p1_action]),
                            reward=np.array([reward, reward]),
                            next_obs=np.concatenate(
                                (p1_next_state, p1_next_state_flipped)
                            ),
                            done=np.array([done, done]),
                            batch_size=[2],
                        )
                        memory.add(transition.to_tensordict())

                    # Player 1's turn
                    p1_action_mask = observation["action_mask"]
                    p1_state, p1_state_flipped = transform_and_flip(
                        observation, player=1
                    )
                    if not opponent_first:
                        p1_action = self.env.action_space("player_1").sample(
                            p1_action_mask
                        )
                    else:
                        if self.lesson["warm_up_opponent"] == "random":
                            p1_action = opponent.get_action(
                                p1_action_mask, p0_action, LESSON["block_vert_coef"]
                            )
                        else:
                            p1_action = opponent.get_action(player=1)
                    self.step(p1_action)  # Act in environment
                    observation, env_reward, done, truncation, _ = self.last()
                    p1_next_state, p1_next_state_flipped = transform_and_flip(
                        observation, player=1
                    )

                    if done or truncation:
                        reward = self.reward(done=True, player=1)
                        transition = Transition(
                            obs=np.concatenate(
                                (p0_state, p1_state, p0_state_flipped, p1_state_flipped)
                            ),
                            action=np.array(
                                [p0_action, p1_action, 6 - p0_action, 6 - p1_action]
                            ),
                            reward=np.array(
                                [
                                    LESSON["rewards"]["lose"],
                                    reward,
                                    LESSON["rewards"]["lose"],
                                    reward,
                                ]
                            ),
                            next_obs=np.concatenate(
                                (
                                    p0_next_state,
                                    p1_next_state,
                                    p0_next_state_flipped,
                                    p1_next_state_flipped,
                                )
                            ),
                            done=np.array([done, done, done, done]),
                            batch_size=[4],
                        )
                        memory.add(transition.to_tensordict())
                    else:  # Play continues
                        reward = self.reward(done=False, player=0)
                        transition = Transition(
                            obs=np.concatenate((p0_state, p0_state_flipped)),
                            action=np.array([p0_action, 6 - p0_action]),
                            reward=np.array([reward, reward]),
                            next_obs=np.concatenate(
                                (p0_next_state, p0_next_state_flipped)
                            ),
                            done=np.array([done, done]),
                            batch_size=[2],
                        )
                        memory.add(transition.to_tensordict())

            pbar.update(len(memory) - mem_full)
        pbar.close()
        print("Replay buffer warmed up.")
        return memory

    def check_winnable(self, lst: List[int], piece: int) -> bool:
        """Checks if four pieces in a row represent a winnable opportunity, e.g. [1, 1, 1, 0] or [2, 0, 2, 2].

        :param lst: List of pieces in row
        :type lst: List
        :param piece: Player piece we are checking (1 or 2)
        :type piece: int
        """
        return lst.count(piece) == 3 and lst.count(0) == 1

    def check_vertical_win(self, player: int) -> bool:
        """Checks if a win is vertical.

        :param player: Player who we are checking, 0 or 1
        :type player: int
        """
        board = np.array(self.env.env.board).reshape(6, 7)
        piece = player + 1

        column_count = 7
        row_count = 6

        # Check vertical locations for win
        for c in range(column_count):
            for r in range(row_count - 3):
                if (
                    board[r][c] == piece
                    and board[r + 1][c] == piece
                    and board[r + 2][c] == piece
                    and board[r + 3][c] == piece
                ):
                    return True
        return False

    def check_three_in_row(self, player: int) -> int:
        """Checks if there are three pieces in a row and a blank space next, or two pieces - blank - piece.

        :param player: Player who we are checking, 0 or 1
        :type player: int
        """
        board = np.array(self.env.env.board).reshape(6, 7)
        piece = player + 1

        # Check horizontal locations
        column_count = 7
        row_count = 6
        three_in_row_count = 0

        # Check vertical locations
        for c in range(column_count):
            for r in range(row_count - 3):
                if self.check_winnable(board[r : r + 4, c].tolist(), piece):
                    three_in_row_count += 1

        # Check horizontal locations
        for r in range(row_count):
            for c in range(column_count - 3):
                if self.check_winnable(board[r, c : c + 4].tolist(), piece):
                    three_in_row_count += 1

        # Check positively sloped diagonals
        for c in range(column_count - 3):
            for r in range(row_count - 3):
                if self.check_winnable(
                    [
                        board[r, c],
                        board[r + 1, c + 1],
                        board[r + 2, c + 2],
                        board[r + 3, c + 3],
                    ],
                    piece,
                ):
                    three_in_row_count += 1

        # Check negatively sloped diagonals
        for c in range(column_count - 3):
            for r in range(3, row_count):
                if self.check_winnable(
                    [
                        board[r, c],
                        board[r - 1, c + 1],
                        board[r - 2, c + 2],
                        board[r - 3, c + 3],
                    ],
                    piece,
                ):
                    three_in_row_count += 1

        return three_in_row_count

    def reward(self, done: bool, player: int) -> float:
        """Processes and returns reward from environment according to lesson criteria.

        :param done: Environment has terminated
        :type done: bool
        :param player: Player who we are checking, 0 or 1
        :type player: int
        """
        if done:
            reward = (
                self.lesson["rewards"]["vertical_win"]
                if self.check_vertical_win(player)
                else self.lesson["rewards"]["win"]
            )
        else:
            agent_three_count = self.check_three_in_row(1 - player)
            opp_three_count = self.check_three_in_row(player)
            if (agent_three_count + opp_three_count) == 0:
                reward = self.lesson["rewards"]["play_continues"]
            else:
                reward = (
                    self.lesson["rewards"]["three_in_row"] * agent_three_count
                    + self.lesson["rewards"]["opp_three_in_row"] * opp_three_count
                )
        return reward

    def last(self) -> Tuple[dict, float, bool, bool, dict]:
        """Wrapper around PettingZoo env last method."""
        return self.env.last()

    def step(self, action: int) -> None:
        """Wrapper around PettingZoo env step method."""
        self.env.step(action)

    def reset(self) -> None:
        """Wrapper around PettingZoo env reset method."""
        self.env.reset()


class Opponent:
    """Connect 4 opponent to train and/or evaluate against.

    :param env: Environment to learn in
    :type env: PettingZoo-style environment
    :param difficulty: Difficulty level of opponent, 'random', 'weak' or 'strong'
    :type difficulty: str
    """

    def __init__(self, env: ParallelEnv, difficulty: str):
        self.env = env.env
        self.difficulty = difficulty
        if self.difficulty == "random":
            self.get_action = self.random_opponent
        elif self.difficulty == "weak":
            self.get_action = self.weak_rule_based_opponent
        else:
            self.get_action = self.strong_rule_based_opponent
        self.num_cols = 7
        self.num_rows = 6
        self.length = 4
        self.top = [0] * self.num_cols

    def update_top(self) -> None:
        """Updates self.top, a list which tracks the row on top of the highest piece in each column."""
        board = np.array(self.env.env.board).reshape(self.num_rows, self.num_cols)
        non_zeros = np.where(board != 0)
        rows, cols = non_zeros
        top = np.zeros(board.shape[1], dtype=int)
        for col in range(board.shape[1]):
            column_pieces = rows[cols == col]
            if len(column_pieces) > 0:
                top[col] = np.min(column_pieces) - 1
            else:
                top[col] = 5
        full_columns = np.all(board != 0, axis=0)
        top[full_columns] = 6
        self.top = top

    def random_opponent(
        self,
        action_mask: List[int],
        last_opp_move: Optional[int] = None,
        block_vert_coef: float = 1,
    ) -> int:
        """Takes move for random opponent.

        If the lesson aims to randomly block vertical wins with a higher probability, this is done here too.

        :param action_mask: Mask of legal actions: 1=legal, 0=illegal
        :type action_mask: List
        :param last_opp_move: Most recent action taken by agent against this opponent
        :type last_opp_move: int
        :param block_vert_coef: How many times more likely to block vertically
        :type block_vert_coef: float
        """
        if last_opp_move is not None:
            action_mask[last_opp_move] *= block_vert_coef
        action = random.choices(list(range(self.num_cols)), action_mask)[0]
        return action

    def weak_rule_based_opponent(self, player: int) -> int:
        """Takes move for weak rule-based opponent.

        :param player: Player who we are checking, 0 or 1
        :type player: int
        """
        self.update_top()
        max_length = -1
        best_actions = []
        for action in range(self.num_cols):
            possible, reward, ended, lengths = self.outcome(
                action, player, return_length=True
            )
            if possible and lengths.sum() > max_length:
                best_actions = []
                max_length = lengths.sum()
            if possible and lengths.sum() == max_length:
                best_actions.append(action)
        best_action = random.choice(best_actions)
        return best_action

    def strong_rule_based_opponent(self, player: int) -> int:
        """Takes move for strong rule-based opponent.

        :param player: Player who we are checking, 0 or 1
        :type player: int
        """
        self.update_top()

        winning_actions = []
        for action in range(self.num_cols):
            possible, reward, ended = self.outcome(action, player)
            if possible and ended:
                winning_actions.append(action)
        if len(winning_actions) > 0:
            winning_action = random.choice(winning_actions)
            return winning_action

        opp = 1 if player == 0 else 0
        loss_avoiding_actions = []
        for action in range(self.num_cols):
            possible, reward, ended = self.outcome(action, opp)
            if possible and ended:
                loss_avoiding_actions.append(action)
        if len(loss_avoiding_actions) > 0:
            loss_avoiding_action = random.choice(loss_avoiding_actions)
            return loss_avoiding_action

        return self.weak_rule_based_opponent(player)  # take best possible move

    def outcome(
        self, action: int, player: int, return_length: bool = False
    ) -> Tuple[bool, Optional[float], bool, Optional[np.ndarray]]:
        """Takes move for weak rule-based opponent.

        :param action: Action to take in environment
        :type action: int
        :param player: Player who we are checking, 0 or 1
        :type player: int
        :param return_length: Return length of outcomes, defaults to False
        :type player: bool, optional
        """
        if not (self.top[action] < self.num_rows):  # action column is full
            return (False, None, None) + ((None,) if return_length else ())

        row, col = self.top[action], action
        piece = player + 1

        # down, up, left, right, down-left, up-right, down-right, up-left,
        directions = np.array(
            [
                [[-1, 0], [1, 0]],
                [[0, -1], [0, 1]],
                [[-1, -1], [1, 1]],
                [[-1, 1], [1, -1]],
            ]
        )  # |4x2x2|

        positions = np.array([row, col]).reshape(1, 1, 1, -1) + np.expand_dims(
            directions, -2
        ) * np.arange(1, self.length).reshape(
            1, 1, -1, 1
        )  # |4x2x3x2|
        valid_positions = np.logical_and(
            np.logical_and(
                positions[:, :, :, 0] >= 0, positions[:, :, :, 0] < self.num_rows
            ),
            np.logical_and(
                positions[:, :, :, 1] >= 0, positions[:, :, :, 1] < self.num_cols
            ),
        )  # |4x2x3|
        d0 = np.where(valid_positions, positions[:, :, :, 0], 0)
        d1 = np.where(valid_positions, positions[:, :, :, 1], 0)
        board = np.array(self.env.env.board).reshape(self.num_rows, self.num_cols)
        board_values = np.where(valid_positions, board[d0, d1], 0)
        a = (board_values == piece).astype(int)
        b = np.concatenate(
            (a, np.zeros_like(a[:, :, :1])), axis=-1
        )  # padding with zeros to compute length
        lengths = np.argmin(b, -1)

        ended = False
        # check if winnable in any direction
        for both_dir in board_values:
            # |2x3|
            line = np.concatenate((both_dir[0][::-1], [piece], both_dir[1]))
            if "".join(map(str, [piece] * self.length)) in "".join(map(str, line)):
                ended = True
                break

        # ended = np.any(np.greater_equal(np.sum(lengths, 1), self.length - 1))
        draw = True
        for c, v in enumerate(self.top):
            draw &= (v == self.num_rows) if c != col else (v == (self.num_rows - 1))
        ended |= draw
        reward = (-1) ** (player) if ended and not draw else 0

        return (True, reward, ended) + ((lengths,) if return_length else ())


def transform_and_flip(observation, player):
    """Transforms and flips observation for input to agent's neural network.

    :param observation: Observation to preprocess
    :type observation: dict[str, np.ndarray]
    :param player: Player, 0 or 1
    :type player: int
    """
    state = observation["observation"]
    # Pre-process dimensions for PyTorch (N, C, H, W)
    state = obs_channels_to_first(state)
    if player == 1:
        # Swap pieces so that the agent always sees the board from the same perspective
        state[[0, 1], :, :] = state[[1, 0], :, :]
    state_flipped = np.expand_dims(np.flip(state, 2), 0)
    state = np.expand_dims(state, 0)
    return state, state_flipped


if __name__ == "__main__":
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print("===== AgileRL Curriculum Learning Demo =====")

    for lesson_number in range(1, 5):
        # Load lesson for curriculum
        with open(f"./curriculums/connect_four/lesson{lesson_number}.yaml") as file:
            LESSON = yaml.safe_load(file)

        # Define the network configuration
        NET_CONFIG = {
            "encoder_config": {
                "channel_size": [128],  # CNN channel size
                "kernel_size": [4],  # CNN kernel size
                "stride_size": [1],  # CNN stride size
            },
            "head_config": {
                "hidden_size": [64, 64],  # Actor head hidden size
            },
        }

        # Define the initial hyperparameters
        INIT_HP = {
            "POPULATION_SIZE": 6,
            # "ALGO": "Rainbow DQN",  # Algorithm
            "ALGO": "DQN",  # Algorithm
            "DOUBLE": True,
            # Swap image channels dimension from last to first [H, W, C] -> [C, H, W]
            "BATCH_SIZE": 256,  # Batch size
            "LR": 1e-4,  # Learning rate
            "GAMMA": 0.99,  # Discount factor
            "MEMORY_SIZE": 10000,  # Max memory buffer size
            "LEARN_STEP": 1,  # Learning frequency
            "CUDAGRAPHS": False,  # Use CUDA graphs
            "N_STEP": 1,  # Step number to calculate td error
            "PER": False,  # Use prioritized experience replay buffer
            "ALPHA": 0.6,  # Prioritized replay buffer parameter
            "TAU": 0.01,  # For soft update of target parameters
            "BETA": 0.4,  # Importance sampling coefficient
            "PRIOR_EPS": 0.000001,  # Minimum priority for sampling
            "NUM_ATOMS": 51,  # Unit number of support
            "V_MIN": 0.0,  # Minimum value of support
            "V_MAX": 200.0,  # Maximum value of support
        }

        # Define the connect four environment
        env = connect_four_v3.env()
        env.reset()

        # Configure the algo input arguments
        observation_spaces = [env.observation_space(agent) for agent in env.agents]
        action_spaces = [env.action_space(agent) for agent in env.agents]

        # Warp the environment in the curriculum learning wrapper
        env = CurriculumEnv(env, LESSON)

        # Pre-process dimensions for PyTorch layers
        # We only need to worry about the state dim of a single agent
        # We flatten the 6x7x2 observation as input to the agent"s neural network
        observation_space = observation_space_channels_to_first(
            observation_spaces[0]["observation"]
        )
        action_space = action_spaces[0]

        # Mutation config for RL hyperparameters
        hp_config = HyperparameterConfig(
            lr=RLParameter(min=1e-4, max=1e-2),
            batch_size=RLParameter(min=8, max=64, dtype=int),
            learn_step=RLParameter(
                min=1, max=120, dtype=int, grow_factor=1.5, shrink_factor=0.75
            ),
        )

        # Create a population ready for evolutionary hyper-parameter optimisation
        pop: List[DQN] = create_population(
            INIT_HP["ALGO"],
            observation_space,
            action_spaces[0],
            NET_CONFIG,
            INIT_HP,
            hp_config,
            population_size=INIT_HP["POPULATION_SIZE"],
            device=device,
        )

        # Configure the replay buffer
        memory = ReplayBuffer(
            max_size=INIT_HP["MEMORY_SIZE"],  # Max replay buffer size
            device=device,
        )

        # Instantiate a tournament selection object (used for HPO)
        tournament = TournamentSelection(
            tournament_size=2,  # Tournament selection size
            elitism=True,  # Elitism in tournament selection
            population_size=INIT_HP["POPULATION_SIZE"],  # Population size
            eval_loop=1,  # Evaluate using last N fitness scores
        )

        # Instantiate a mutations object (used for HPO)
        mutations = Mutations(
            no_mutation=0.2,  # Probability of no mutation
            architecture=0,  # Probability of architecture mutation
            new_layer_prob=0.2,  # Probability of new layer mutation
            parameters=0.2,  # Probability of parameter mutation
            activation=0,  # Probability of activation function mutation
            rl_hp=0.2,  # Probability of RL hyperparameter mutation
            mutation_sd=0.1,  # Mutation strength
            rand_seed=1,
            device=device,
        )

        # Define training loop parameters
        episodes_per_epoch = 10
        max_episodes = LESSON["max_train_episodes"]  # Total episodes
        max_steps = 500  # Maximum steps to take in each episode
        evo_epochs = 20  # Evolution frequency
        evo_loop = 50  # Number of evaluation episodes
        elite = pop[0]  # Assign a placeholder "elite" agent
        epsilon = 1.0  # Starting epsilon value
        eps_end = 0.1  # Final epsilon value
        eps_decay = 0.9998  # Epsilon decays
        opp_update_counter = 0

        if LESSON["pretrained_path"] is not None:
            for agent in pop:
                # Load pretrained checkpoint
                agent.load_checkpoint(LESSON["pretrained_path"])
                # Reinit optimizer for new task
                agent.lr = INIT_HP["LR"]
                agent.optimizer = OptimizerWrapper(
                    torch.optim.Adam,
                    networks=agent.actor,
                    lr=agent.lr,
                    network_names=agent.optimizer.network_names,
                    lr_name=agent.optimizer.lr_name,
                    optimizer_kwargs={"capturable": agent.capturable},
                )

        if LESSON["opponent"] == "self":
            # Create initial pool of opponents
            opponent_pool = deque(maxlen=LESSON["opponent_pool_size"])
            for _ in range(LESSON["opponent_pool_size"]):
                opp = copy.deepcopy(pop[0])
                opp.actor.load_state_dict(pop[0].actor.state_dict())
                opp.actor.eval()
                opponent_pool.append(opp)

        # Perform buffer and agent warmups if desired
        if LESSON["buffer_warm_up"]:
            warm_up_opponent = Opponent(env, difficulty=LESSON["warm_up_opponent"])
            memory = env.fill_replay_buffer(
                memory, warm_up_opponent
            )  # Fill replay buffer with transitions
            if LESSON["agent_warm_up"] > 0:
                print("Warming up agents ...")
                agent = pop[0]
                # Train on randomly collected samples
                for epoch in trange(LESSON["agent_warm_up"]):
                    experiences = memory.sample(agent.batch_size)
                    agent.learn(experiences)

                pop = [agent.clone() for _ in pop]
                elite = agent
                print("Agent population warmed up.")

        total_steps = 0
        total_episodes = 0
        pbar = trange(int(max_episodes / episodes_per_epoch))

        # Training loop
        for idx_epi in pbar:
            turns_per_episode = []
            train_actions_hist = [0] * action_spaces[0].n
            for agent in pop:  # Loop through population
                for episode in range(episodes_per_epoch):
                    env.reset()  # Reset environment at start of episode
                    observation, cumulative_reward, done, truncation, _ = env.last()

                    (
                        p1_state,
                        p1_state_flipped,
                        p1_action,
                        p1_next_state,
                        p1_next_state_flipped,
                    ) = (None, None, None, None, None)

                    if LESSON["opponent"] == "self":
                        # Randomly choose opponent from opponent pool if using self-play
                        opponent = random.choice(opponent_pool)
                    else:
                        # Create opponent of desired difficulty
                        opponent = Opponent(env, difficulty=LESSON["opponent"])

                    # Randomly decide whether agent will go first or second
                    opponent_first = random.random() > 0.5

                    score = 0
                    turns = 0  # Number of turns counter
                    for idx_step in range(max_steps):
                        # Player 0"s turn
                        p0_action_mask = observation["action_mask"]
                        p0_state, p0_state_flipped = transform_and_flip(
                            observation, player=0
                        )

                        if opponent_first:
                            if LESSON["opponent"] == "self":
                                p0_action = opponent.get_action(
                                    p0_state, 0, p0_action_mask
                                )[0]
                            elif LESSON["opponent"] == "random":
                                p0_action = opponent.get_action(
                                    p0_action_mask, p1_action, LESSON["block_vert_coef"]
                                )
                            else:
                                p0_action = opponent.get_action(player=0)
                        else:
                            p0_action = agent.get_action(
                                p0_state, epsilon, p0_action_mask
                            )[
                                0
                            ]  # Get next action from agent
                            train_actions_hist[p0_action] += 1

                        env.step(p0_action)  # Act in environment
                        observation, cumulative_reward, done, truncation, _ = env.last()
                        p0_next_state, p0_next_state_flipped = transform_and_flip(
                            observation, player=0
                        )
                        if not opponent_first:
                            score = cumulative_reward
                        turns += 1

                        # Check if game is over (Player 0 win)
                        if done or truncation:
                            reward = env.reward(done=True, player=0)
                            transition = Transition(
                                obs=np.concatenate(
                                    (
                                        p0_state,
                                        p1_state,
                                        p0_state_flipped,
                                        p1_state_flipped,
                                    )
                                ),
                                action=np.array(
                                    [p0_action, p1_action, 6 - p0_action, 6 - p1_action]
                                ),
                                reward=np.array(
                                    [
                                        reward,
                                        LESSON["rewards"]["lose"],
                                        reward,
                                        LESSON["rewards"]["lose"],
                                    ]
                                ),
                                next_obs=np.concatenate(
                                    (
                                        p0_next_state,
                                        p1_next_state,
                                        p0_next_state_flipped,
                                        p1_next_state_flipped,
                                    )
                                ),
                                done=np.array([done, done, done, done]),
                                batch_size=[4],
                            )
                            memory.add(transition.to_tensordict())
                        else:  # Play continues
                            if p1_state is not None:
                                reward = env.reward(done=False, player=1)
                                transition = Transition(
                                    obs=np.concatenate((p1_state, p1_state_flipped)),
                                    action=np.array([p1_action, 6 - p1_action]),
                                    reward=np.array([reward, reward]),
                                    next_obs=np.concatenate(
                                        (p1_next_state, p1_next_state_flipped)
                                    ),
                                    done=np.array([done, done]),
                                    batch_size=[2],
                                )
                                memory.add(transition.to_tensordict())

                            # Player 1"s turn
                            p1_action_mask = observation["action_mask"]
                            p1_state, p1_state_flipped = transform_and_flip(
                                observation, player=1
                            )

                            if not opponent_first:
                                if LESSON["opponent"] == "self":
                                    p1_action = opponent.get_action(
                                        p1_state, 0, p1_action_mask
                                    )[0]
                                elif LESSON["opponent"] == "random":
                                    p1_action = opponent.get_action(
                                        p1_action_mask,
                                        p0_action,
                                        LESSON["block_vert_coef"],
                                    )
                                else:
                                    p1_action = opponent.get_action(player=1)
                            else:
                                p1_action = agent.get_action(
                                    p1_state, epsilon, p1_action_mask
                                )[
                                    0
                                ]  # Get next action from agent
                                train_actions_hist[p1_action] += 1

                            env.step(p1_action)  # Act in environment
                            (
                                observation,
                                cumulative_reward,
                                done,
                                truncation,
                                _,
                            ) = env.last()
                            p1_next_state, p1_next_state_flipped = transform_and_flip(
                                observation, player=1
                            )

                            if opponent_first:
                                score = cumulative_reward
                            turns += 1

                            # Check if game is over (Player 1 win)
                            if done or truncation:
                                reward = env.reward(done=True, player=1)
                                transition = Transition(
                                    obs=np.concatenate(
                                        (
                                            p0_state,
                                            p1_state,
                                            p0_state_flipped,
                                            p1_state_flipped,
                                        )
                                    ),
                                    action=np.array(
                                        [
                                            p0_action,
                                            p1_action,
                                            6 - p0_action,
                                            6 - p1_action,
                                        ]
                                    ),
                                    reward=np.array(
                                        [
                                            reward,
                                            LESSON["rewards"]["lose"],
                                            reward,
                                            LESSON["rewards"]["lose"],
                                        ]
                                    ),
                                    next_obs=np.concatenate(
                                        (
                                            p0_next_state,
                                            p1_next_state,
                                            p0_next_state_flipped,
                                            p1_next_state_flipped,
                                        )
                                    ),
                                    done=np.array([done, done, done, done]),
                                    batch_size=[4],
                                )
                                memory.add(transition.to_tensordict())
                            else:  # Play continues
                                reward = env.reward(done=False, player=0)
                                transition = Transition(
                                    obs=np.concatenate((p0_state, p0_state_flipped)),
                                    action=np.array([p0_action, 6 - p0_action]),
                                    reward=np.array([reward, reward]),
                                    next_obs=np.concatenate(
                                        (p0_next_state, p0_next_state_flipped)
                                    ),
                                    done=np.array([done, done]),
                                    batch_size=[2],
                                )
                                memory.add(transition.to_tensordict())

                        # Learn according to learning frequency
                        if (memory.counter % agent.learn_step == 0) and (
                            len(memory) >= agent.batch_size
                        ):
                            # Sample replay buffer
                            # Learn according to agent"s RL algorithm
                            experiences = memory.sample(agent.batch_size)
                            agent.learn(experiences)

                        # Stop episode if any agents have terminated
                        if done or truncation:
                            break

                    total_steps += idx_step + 1
                    total_episodes += 1
                    turns_per_episode.append(turns)
                    # Save the total episode reward
                    agent.scores.append(score)

                    if LESSON["opponent"] == "self":
                        if (total_episodes % LESSON["opponent_upgrade"] == 0) and (
                            (idx_epi + 1) > evo_epochs
                        ):
                            elite_opp, _, _ = tournament._elitism(pop)
                            elite_opp.actor.eval()
                            opponent_pool.append(elite_opp)
                            opp_update_counter += 1

                # Update epsilon for exploration
                epsilon = max(eps_end, epsilon * eps_decay)

            mean_turns = np.mean(turns_per_episode)

            # Now evolve population if necessary
            if (idx_epi + 1) % evo_epochs == 0:
                # Evaluate population vs random actions
                fitnesses = []
                win_rates = []
                eval_actions_hist = [0] * action_spaces[0].n  # Eval actions histogram
                eval_turns = 0  # Eval turns counter
                for agent in pop:
                    with torch.no_grad():
                        rewards = []
                        for i in range(evo_loop):
                            env.reset()  # Reset environment at start of episode
                            (
                                observation,
                                cumulative_reward,
                                done,
                                truncation,
                                _,
                            ) = env.last()

                            player = -1  # Tracker for which player"s turn it is

                            # Create opponent of desired difficulty
                            opponent = Opponent(env, difficulty=LESSON["eval_opponent"])

                            # Randomly decide whether agent will go first or second
                            if random.random() > 0.5:
                                opponent_first = False
                            else:
                                opponent_first = True

                            score = 0

                            for idx_step in range(max_steps):
                                action_mask = observation["action_mask"]
                                if player < 0:
                                    if opponent_first:
                                        if LESSON["eval_opponent"] == "random":
                                            action = opponent.get_action(action_mask)
                                        else:
                                            action = opponent.get_action(player=0)
                                    else:
                                        state = np.moveaxis(
                                            observation["observation"], [-1], [-3]
                                        )
                                        state = np.expand_dims(state, 0)
                                        action = agent.get_action(
                                            state, 0, action_mask
                                        )[
                                            0
                                        ]  # Get next action from agent
                                        eval_actions_hist[action] += 1
                                if player > 0:
                                    if not opponent_first:
                                        if LESSON["eval_opponent"] == "random":
                                            action = opponent.get_action(action_mask)
                                        else:
                                            action = opponent.get_action(player=1)
                                    else:
                                        state = np.moveaxis(
                                            observation["observation"], [-1], [-3]
                                        )
                                        state[[0, 1], :, :] = state[[1, 0], :, :]
                                        state = np.expand_dims(state, 0)
                                        action = agent.get_action(
                                            state, 0, action_mask
                                        )[
                                            0
                                        ]  # Get next action from agent
                                        eval_actions_hist[action] += 1

                                env.step(action)  # Act in environment
                                (
                                    observation,
                                    cumulative_reward,
                                    done,
                                    truncation,
                                    _,
                                ) = env.last()

                                if (player > 0 and opponent_first) or (
                                    player < 0 and not opponent_first
                                ):
                                    score = cumulative_reward

                                eval_turns += 1

                                if done or truncation:
                                    break

                                player *= -1

                            rewards.append(score)
                    mean_fit = np.mean(rewards)
                    agent.fitness.append(mean_fit)
                    fitnesses.append(mean_fit)

                eval_turns = eval_turns / len(pop) / evo_loop

                pbar.set_postfix_str(
                    f"Train Mean Score: {np.mean(agent.scores[-episodes_per_epoch:])} "
                    f"Train Mean Turns: {mean_turns} "
                    f"Eval Mean Fitness: {np.mean(fitnesses)} "
                    f"Eval Best Fitness: {np.max(fitnesses)} "
                    f"Eval Mean Turns: {eval_turns} "
                    f"Total Steps: {total_steps}"
                )
                pbar.update(0)

                # Tournament selection and population mutation
                elite, pop = tournament.select(pop)
                pop = mutations.mutation(pop)

        # Save the trained agent
        save_path = LESSON["save_path"]
        os.makedirs(os.path.dirname(save_path), exist_ok=True)
        elite.save_checkpoint(save_path)
        print(f"Elite agent saved to '{save_path}'.")