AgileRL: 实现 DQN - 课程学习和自博弈¶

图1:通过自博弈训练的玩四子棋的智能体¶
本教程重点介绍强化学习中使用的两种技术——课程学习和自博弈。课程学习是指在独立的‘课程’中训练智能体完成难度逐渐增加的任务。想象一下,你正在努力成为国际象棋世界冠军。你不会决定通过立即与特级大师对弈来学习国际象棋——那会太难了。相反,你会与和你能力相当的人练习,慢慢提高,并逐渐与更强的对手对弈,直到你准备好与最强者竞争。同样的概念也适用于强化学习模型。有时,任务太难,无法一次性学会,因此我们必须创建一个课程来指导智能体,并教会它解决最终的困难环境。
本教程也使用自博弈。自博弈是竞争性强化学习环境中常用的一种技术。智能体通过与自身的副本(即对手)对弈进行训练,并学习击败这个对手。然后,对手会更新为这个更优秀版本的智能体的副本,智能体必须再次学习击败自身。这个过程会重复进行,智能体通过利用自身的弱点和发现新的策略来迭代地改进。
在本教程中,自博弈被视为课程的最后一课。然而,这两种技术可以独立使用,并且在资源无限的情况下,自博弈可以击败通过人类设计的课程进行课程学习训练的智能体。Richard Sutton 的《苦涩的教训》对课程学习提出了有趣的看法,对于任何承担此类任务的工程师来说,绝对值得考虑。然而,与 Sutton 不同,我们并非都拥有 Deepmind 和顶级机构提供的资源,因此在决定如何解决自己的强化学习问题时必须务实。如果你想进一步讨论这个令人兴奋的研究领域,请加入 AgileRL Discord 服务器,告诉我们你的想法!
什么是 DQN?¶
DQN(深度 Q 网络)是 Q-learning 的扩展,利用回放缓冲区和目标网络来提高学习的稳定性。有关 DQN 的更多信息,请查阅 AgileRL 文档。
我可以使用它吗?¶
动作空间 |
观测空间 |
|
---|---|---|
离散 |
✔️ |
✔️ |
连续 |
❌ |
✔️ |
环境设置¶
为了遵循本教程,你需要安装下面所示的依赖项。建议使用新创建的虚拟环境,以避免依赖冲突。
agilerl==2.2.1; python_version >= '3.10' and python_version < '3.12'
pettingzoo[classic,atari,mpe]>=1.23.1
AutoROM>=0.6.1
SuperSuit>=3.9.0
torch>=2.0.1
numpy>=1.24.2
tqdm>=4.65.0
fastrand==1.3.0
gymnasium>=0.28.1
imageio>=2.31.1
Pillow>=9.5.0
PyYAML>=5.4.1
代码¶
在四子棋上使用 DQN 进行课程学习和自博弈¶
以下代码应该可以无问题运行。注释旨在帮助你理解如何在 PettingZoo 中使用 AgileRL。如果你有任何问题,请随时在 Discord 服务器中提问。
这是一个复杂的教程,我们将分阶段进行。完整的代码可以在本节末尾找到。虽然本教程的大部分内容是针对四子棋环境的,但它展示了这些技术如何更普遍地应用于其他问题。
导入¶
导入以下包、函数和类将使我们能够运行本教程。
导入
import copy
import os
import random
from collections import deque
from datetime import datetime
import numpy as np
import torch
import wandb
import yaml
from agilerl.components.replay_buffer import ReplayBuffer
from agilerl.hpo.mutation import Mutations
from agilerl.hpo.tournament import TournamentSelection
from agilerl.utils.utils import create_population, observation_space_channels_to_first
from agilerl.algorithms.core.wrappers import OptimizerWrapper
from tqdm import tqdm, trange
from pettingzoo.classic import connect_four_v3
课程学习¶
首先,我们需要设置和修改环境以实现课程学习。课程学习通过改变智能体训练的环境来实现。这可以通过改变特定动作发生时的结果来实施——例如改变环境返回的下一个观测,或者更简单地改变奖励。首先,我们将改变奖励。默认情况下,四子棋使用以下奖励:
赢 = +1
输 = -1
游戏继续 = 0
为了更好地引导我们的智能体,我们可以为环境中的其他结果引入奖励,例如在某一行放置 3 个棋子时给予小额奖励,或者当对手做到同样的事情时给予小额负面奖励。我们还可以使用奖励塑形来鼓励智能体进行更多探索。在四子棋中,如果与随机对手对战,一个简单的获胜方法是始终在同一列下棋。智能体可能会通过这样做获得成功,从而未能学习其他更复杂的策略来帮助它击败更强的对手。因此,我们可以选择奖励垂直获胜的得分略低于水平或对角线获胜,以鼓励智能体尝试以不同的方式获胜。一个示例奖励系统可以定义如下:
赢 (水平或对角线) = +1
赢 (垂直) = +0.8
三子连线 = +0.05
对手三子连线 = -0.05
输 = -1
游戏继续 = 0
配置文件¶
最好使用 YAML 配置文件来定义课程中的各个课时,以便轻松修改和跟踪我们的设置。我们课程中的前三课可以定义如下:
第 1 课
---
# Connect Four Lesson 1
# Train against random agent: 'random', weak opponent: 'weak', strong opponent: 'strong', or use self-play: 'self'
opponent: random
opponent_pool_size: # Size of opponent pool for self-play
opponent_upgrade: # Epoch frequency to update opponent pool
eval_opponent: # 'random', 'weak' or 'strong'
pretrained_path: # Path to pretrained model weights
save_path: models/DQN/lesson1_trained_agent.pt # Path to save trained model
max_train_episodes: 0 # Maximum number of training episodes in environment
## Game specific:
buffer_warm_up: true # Fill replay buffer with random experiences
warm_up_opponent: random # Difficulty level of warm up experiences
agent_warm_up: 3000 # Number of epochs to warm up agent by training on random experiences
block_vert_coef: 4 # How many times more likely to block vertically
rewards: # Rewards for different outcomes
win: 1
vertical_win: 0.7
three_in_row: 0.05
opp_three_in_row: -0.05
lose: -1
play_continues: 0
第 2 课
---
# Connect Four Lesson 2
# Train against random agent: 'random', weak opponent: 'weak', strong opponent: 'strong', or use self-play: 'self'
opponent: weak
opponent_pool_size: # Size of opponent pool for self-play
opponent_upgrade: # Epoch frequency to update opponent pool
eval_opponent: weak # 'random', 'weak' or 'strong'
pretrained_path: models/DQN/lesson1_trained_agent.pt # Path to pretrained model weights
save_path: models/DQN/lesson2_trained_agent.pt # Path to save trained model
max_train_episodes: 100 # Maximum number of training episodes in environment
## Game specific:
buffer_warm_up: false # Fill replay buffer with random experiences
warm_up_opponent: # Difficulty level of warm up experiences
agent_warm_up: 0 # Number of epochs to warm up agent by training on random experiences
block_vert_coef: 1 # How many times more likely to block vertically
rewards: # Rewards for different outcomes
win: 1
vertical_win: 1
three_in_row: 0.02
opp_three_in_row: -0.02
lose: -1
play_continues: 0
第 3 课
---
# Connect Four Lesson 3
# Train against random agent: 'random', weak opponent: 'weak', strong opponent: 'strong', or use self-play: 'self'
opponent: strong
opponent_pool_size: # Size of opponent pool for self-play
opponent_upgrade: # Epoch frequency to update opponent pool
eval_opponent: strong # 'random', 'weak' or 'strong'
pretrained_path: models/DQN/lesson2_trained_agent.pt # Path to pretrained model weights
save_path: models/DQN/lesson3_trained_agent.pt # Path to save trained model
max_train_episodes: 200 # Maximum number of training episodes in environment
## Game specific:
buffer_warm_up: false # Fill replay buffer with random experiences
warm_up_opponent: # Difficulty level of warm up experiences
agent_warm_up: 0 # Number of epochs to warm up agent by training on random experiences
block_vert_coef: 1 # How many times more likely to block vertically
rewards: # Rewards for different outcomes
win: 1
vertical_win: 1
three_in_row: 0.02
opp_three_in_row: -0.02
lose: -1
play_continues: 0
为了实施我们的课程,我们创建了一个 CurriculumEnv
类,它作为四子棋环境之上的包装器,使我们能够修改奖励来指导智能体的训练。这使用了我们设置的配置文件来定义课程。
课程环境
class CurriculumEnv:
"""Wrapper around environment to modify reward for curriculum learning.
:param env: Environment to learn in
:type env: PettingZoo-style environment
:param lesson: Lesson settings for curriculum learning
:type lesson: dict
"""
def __init__(self, env, lesson):
self.env = env
self.lesson = lesson
def fill_replay_buffer(self, memory, opponent):
"""Fill the replay buffer with experiences collected by taking random actions in the environment.
:param memory: Experience replay buffer
:type memory: AgileRL experience replay buffer
"""
print("Filling replay buffer ...")
pbar = tqdm(total=memory.memory_size)
while len(memory) < memory.memory_size:
# Randomly decide whether random player will go first or second
if random.random() > 0.5:
opponent_first = False
else:
opponent_first = True
mem_full = len(memory)
self.reset() # Reset environment at start of episode
observation, reward, done, truncation, _ = self.last()
(
p1_state,
p1_state_flipped,
p1_action,
p1_next_state,
p1_next_state_flipped,
) = (None, None, None, None, None)
done, truncation = False, False
while not (done or truncation):
# Player 0's turn
p0_action_mask = observation["action_mask"]
p0_state = np.moveaxis(observation["observation"], [-1], [-3])
p0_state_flipped = np.expand_dims(np.flip(p0_state, 2), 0)
p0_state = np.expand_dims(p0_state, 0)
if opponent_first:
p0_action = self.env.action_space("player_0").sample(p0_action_mask)
else:
if self.lesson["warm_up_opponent"] == "random":
p0_action = opponent.get_action(
p0_action_mask, p1_action, self.lesson["block_vert_coef"]
)
else:
p0_action = opponent.get_action(player=0)
self.step(p0_action) # Act in environment
observation, env_reward, done, truncation, _ = self.last()
p0_next_state = np.moveaxis(observation["observation"], [-1], [-3])
p0_next_state_flipped = np.expand_dims(np.flip(p0_next_state, 2), 0)
p0_next_state = np.expand_dims(p0_next_state, 0)
if done or truncation:
reward = self.reward(done=True, player=0)
memory.save2memoryVectEnvs(
np.concatenate(
(p0_state, p1_state, p0_state_flipped, p1_state_flipped)
),
[p0_action, p1_action, 6 - p0_action, 6 - p1_action],
[
reward,
LESSON["rewards"]["lose"],
reward,
LESSON["rewards"]["lose"],
],
np.concatenate(
(
p0_next_state,
p1_next_state,
p0_next_state_flipped,
p1_next_state_flipped,
)
),
[done, done, done, done],
)
else: # Play continues
if p1_state is not None:
reward = self.reward(done=False, player=1)
memory.save2memoryVectEnvs(
np.concatenate((p1_state, p1_state_flipped)),
[p1_action, 6 - p1_action],
[reward, reward],
np.concatenate((p1_next_state, p1_next_state_flipped)),
[done, done],
)
# Player 1's turn
p1_action_mask = observation["action_mask"]
p1_state = np.moveaxis(observation["observation"], [-1], [-3])
p1_state[[0, 1], :, :] = p1_state[[0, 1], :, :]
p1_state_flipped = np.expand_dims(np.flip(p1_state, 2), 0)
p1_state = np.expand_dims(p1_state, 0)
if not opponent_first:
p1_action = self.env.action_space("player_1").sample(
p1_action_mask
)
else:
if self.lesson["warm_up_opponent"] == "random":
p1_action = opponent.get_action(
p1_action_mask, p0_action, LESSON["block_vert_coef"]
)
else:
p1_action = opponent.get_action(player=1)
self.step(p1_action) # Act in environment
observation, env_reward, done, truncation, _ = self.last()
p1_next_state = np.moveaxis(observation["observation"], [-1], [-3])
p1_next_state[[0, 1], :, :] = p1_next_state[[0, 1], :, :]
p1_next_state_flipped = np.expand_dims(np.flip(p1_next_state, 2), 0)
p1_next_state = np.expand_dims(p1_next_state, 0)
if done or truncation:
reward = self.reward(done=True, player=1)
memory.save2memoryVectEnvs(
np.concatenate(
(p0_state, p1_state, p0_state_flipped, p1_state_flipped)
),
[p0_action, p1_action, 6 - p0_action, 6 - p1_action],
[
LESSON["rewards"]["lose"],
reward,
LESSON["rewards"]["lose"],
reward,
],
np.concatenate(
(
p0_next_state,
p1_next_state,
p0_next_state_flipped,
p1_next_state_flipped,
)
),
[done, done, done, done],
)
else: # Play continues
reward = self.reward(done=False, player=0)
memory.save2memoryVectEnvs(
np.concatenate((p0_state, p0_state_flipped)),
[p0_action, 6 - p0_action],
[reward, reward],
np.concatenate((p0_next_state, p0_next_state_flipped)),
[done, done],
)
pbar.update(len(memory) - mem_full)
pbar.close()
print("Replay buffer warmed up.")
return memory
def check_winnable(self, lst, piece):
"""Checks if four pieces in a row represent a winnable opportunity, e.g. [1, 1, 1, 0] or [2, 0, 2, 2].
:param lst: List of pieces in row
:type lst: List
:param piece: Player piece we are checking (1 or 2)
:type piece: int
"""
return lst.count(piece) == 3 and lst.count(0) == 1
def check_vertical_win(self, player):
"""Checks if a win is vertical.
:param player: Player who we are checking, 0 or 1
:type player: int
"""
board = np.array(self.env.env.board).reshape(6, 7)
piece = player + 1
column_count = 7
row_count = 6
# Check vertical locations for win
for c in range(column_count):
for r in range(row_count - 3):
if (
board[r][c] == piece
and board[r + 1][c] == piece
and board[r + 2][c] == piece
and board[r + 3][c] == piece
):
return True
return False
def check_three_in_row(self, player):
"""Checks if there are three pieces in a row and a blank space next, or two pieces - blank - piece.
:param player: Player who we are checking, 0 or 1
:type player: int
"""
board = np.array(self.env.env.board).reshape(6, 7)
piece = player + 1
# Check horizontal locations
column_count = 7
row_count = 6
three_in_row_count = 0
# Check vertical locations
for c in range(column_count):
for r in range(row_count - 3):
if self.check_winnable(board[r : r + 4, c].tolist(), piece):
three_in_row_count += 1
# Check horizontal locations
for r in range(row_count):
for c in range(column_count - 3):
if self.check_winnable(board[r, c : c + 4].tolist(), piece):
three_in_row_count += 1
# Check positively sloped diagonals
for c in range(column_count - 3):
for r in range(row_count - 3):
if self.check_winnable(
[
board[r, c],
board[r + 1, c + 1],
board[r + 2, c + 2],
board[r + 3, c + 3],
],
piece,
):
three_in_row_count += 1
# Check negatively sloped diagonals
for c in range(column_count - 3):
for r in range(3, row_count):
if self.check_winnable(
[
board[r, c],
board[r - 1, c + 1],
board[r - 2, c + 2],
board[r - 3, c + 3],
],
piece,
):
three_in_row_count += 1
return three_in_row_count
def reward(self, done, player):
"""Processes and returns reward from environment according to lesson criteria.
:param done: Environment has terminated
:type done: bool
:param player: Player who we are checking, 0 or 1
:type player: int
"""
if done:
reward = (
self.lesson["rewards"]["vertical_win"]
if self.check_vertical_win(player)
else self.lesson["rewards"]["win"]
)
else:
agent_three_count = self.check_three_in_row(1 - player)
opp_three_count = self.check_three_in_row(player)
if (agent_three_count + opp_three_count) == 0:
reward = self.lesson["rewards"]["play_continues"]
else:
reward = (
self.lesson["rewards"]["three_in_row"] * agent_three_count
+ self.lesson["rewards"]["opp_three_in_row"] * opp_three_count
)
return reward
def last(self):
"""Wrapper around PettingZoo env last method."""
return self.env.last()
def step(self, action):
"""Wrapper around PettingZoo env step method."""
self.env.step(action)
def reset(self):
"""Wrapper around PettingZoo env reset method."""
self.env.reset()
在定义课程中的不同课时时,我们可以通过修改智能体在环境中的观测来增加任务的难度——在四子棋中,我们可以提高对手的技能水平。通过循序渐进地这样做,我们可以帮助智能体提高。我们也可以在课时之间改变奖励;例如,一旦我们学会击败一个随机智能体,现在希望训练对抗一个更强的对手时,我们可能希望奖励所有方向的获胜相等。在本教程中,实现了一个 Opponent
类来提供不同难度的对手,用于训练我们的智能体。
对手
class Opponent:
"""Connect 4 opponent to train and/or evaluate against.
:param env: Environment to learn in
:type env: PettingZoo-style environment
:param difficulty: Difficulty level of opponent, 'random', 'weak' or 'strong'
:type difficulty: str
"""
def __init__(self, env, difficulty):
self.env = env.env
self.difficulty = difficulty
if self.difficulty == "random":
self.get_action = self.random_opponent
elif self.difficulty == "weak":
self.get_action = self.weak_rule_based_opponent
else:
self.get_action = self.strong_rule_based_opponent
self.num_cols = 7
self.num_rows = 6
self.length = 4
self.top = [0] * self.num_cols
def update_top(self):
"""Updates self.top, a list which tracks the row on top of the highest piece in each column."""
board = np.array(self.env.env.board).reshape(self.num_rows, self.num_cols)
non_zeros = np.where(board != 0)
rows, cols = non_zeros
top = np.zeros(board.shape[1], dtype=int)
for col in range(board.shape[1]):
column_pieces = rows[cols == col]
if len(column_pieces) > 0:
top[col] = np.min(column_pieces) - 1
else:
top[col] = 5
full_columns = np.all(board != 0, axis=0)
top[full_columns] = 6
self.top = top
def random_opponent(self, action_mask, last_opp_move=None, block_vert_coef=1):
"""Takes move for random opponent. If the lesson aims to randomly block vertical wins with a higher probability, this is done here too.
:param action_mask: Mask of legal actions: 1=legal, 0=illegal
:type action_mask: List
:param last_opp_move: Most recent action taken by agent against this opponent
:type last_opp_move: int
:param block_vert_coef: How many times more likely to block vertically
:type block_vert_coef: float
"""
if last_opp_move is not None:
action_mask[last_opp_move] *= block_vert_coef
action = random.choices(list(range(self.num_cols)), action_mask)[0]
return action
def weak_rule_based_opponent(self, player):
"""Takes move for weak rule-based opponent.
:param player: Player who we are checking, 0 or 1
:type player: int
"""
self.update_top()
max_length = -1
best_actions = []
for action in range(self.num_cols):
possible, reward, ended, lengths = self.outcome(
action, player, return_length=True
)
if possible and lengths.sum() > max_length:
best_actions = []
max_length = lengths.sum()
if possible and lengths.sum() == max_length:
best_actions.append(action)
best_action = random.choice(best_actions)
return best_action
def strong_rule_based_opponent(self, player):
"""Takes move for strong rule-based opponent.
:param player: Player who we are checking, 0 or 1
:type player: int
"""
self.update_top()
winning_actions = []
for action in range(self.num_cols):
possible, reward, ended = self.outcome(action, player)
if possible and ended:
winning_actions.append(action)
if len(winning_actions) > 0:
winning_action = random.choice(winning_actions)
return winning_action
opp = 1 if player == 0 else 0
loss_avoiding_actions = []
for action in range(self.num_cols):
possible, reward, ended = self.outcome(action, opp)
if possible and ended:
loss_avoiding_actions.append(action)
if len(loss_avoiding_actions) > 0:
loss_avoiding_action = random.choice(loss_avoiding_actions)
return loss_avoiding_action
return self.weak_rule_based_opponent(player) # take best possible move
def outcome(self, action, player, return_length=False):
"""Takes move for weak rule-based opponent.
:param action: Action to take in environment
:type action: int
:param player: Player who we are checking, 0 or 1
:type player: int
:param return_length: Return length of outcomes, defaults to False
:type player: bool, optional
"""
if not (self.top[action] < self.num_rows): # action column is full
return (False, None, None) + ((None,) if return_length else ())
row, col = self.top[action], action
piece = player + 1
# down, up, left, right, down-left, up-right, down-right, up-left,
directions = np.array(
[
[[-1, 0], [1, 0]],
[[0, -1], [0, 1]],
[[-1, -1], [1, 1]],
[[-1, 1], [1, -1]],
]
) # |4x2x2|
positions = np.array([row, col]).reshape(1, 1, 1, -1) + np.expand_dims(
directions, -2
) * np.arange(1, self.length).reshape(
1, 1, -1, 1
) # |4x2x3x2|
valid_positions = np.logical_and(
np.logical_and(
positions[:, :, :, 0] >= 0, positions[:, :, :, 0] < self.num_rows
),
np.logical_and(
positions[:, :, :, 1] >= 0, positions[:, :, :, 1] < self.num_cols
),
) # |4x2x3|
d0 = np.where(valid_positions, positions[:, :, :, 0], 0)
d1 = np.where(valid_positions, positions[:, :, :, 1], 0)
board = np.array(self.env.env.board).reshape(self.num_rows, self.num_cols)
board_values = np.where(valid_positions, board[d0, d1], 0)
a = (board_values == piece).astype(int)
b = np.concatenate(
(a, np.zeros_like(a[:, :, :1])), axis=-1
) # padding with zeros to compute length
lengths = np.argmin(b, -1)
ended = False
# check if winnable in any direction
for both_dir in board_values:
# |2x3|
line = np.concatenate((both_dir[0][::-1], [piece], both_dir[1]))
if "".join(map(str, [piece] * self.length)) in "".join(map(str, line)):
ended = True
break
# ended = np.any(np.greater_equal(np.sum(lengths, 1), self.length - 1))
draw = True
for c, v in enumerate(self.top):
draw &= (v == self.num_rows) if c != col else (v == (self.num_rows - 1))
ended |= draw
reward = (-1) ** (player) if ended and not draw else 0
return (True, reward, ended) + ((lengths,) if return_length else ())
通用设置¶
在我们进一步进行本教程之前,定义和设置训练所需的所有剩余内容将会很有帮助。
设置代码
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("===== AgileRL Curriculum Learning Demo =====")
lesson_number = 1
# Load lesson for curriculum
with open(f"./curriculums/connect_four/lesson{lesson_number}.yaml") as file:
LESSON = yaml.safe_load(file)
# Define the network configuration
NET_CONFIG = {
"encoder_config": {
"channel_size": [128], # CNN channel size
"kernel_size": [4], # CNN kernel size
"stride_size": [1], # CNN stride size
}
}
# Define the initial hyperparameters
INIT_HP = {
"POPULATION_SIZE": 6,
# "ALGO": "Rainbow DQN", # Algorithm
"ALGO": "DQN", # Algorithm
"DOUBLE": True,
# Swap image channels dimension from last to first [H, W, C] -> [C, H, W]
"BATCH_SIZE": 256, # Batch size
"LR": 1e-4, # Learning rate
"GAMMA": 0.99, # Discount factor
"MEMORY_SIZE": 100000, # Max memory buffer size
"LEARN_STEP": 1, # Learning frequency
"N_STEP": 1, # Step number to calculate td error
"PER": False, # Use prioritized experience replay buffer
"ALPHA": 0.6, # Prioritized replay buffer parameter
"TAU": 0.01, # For soft update of target parameters
"BETA": 0.4, # Importance sampling coefficient
"PRIOR_EPS": 0.000001, # Minimum priority for sampling
"NUM_ATOMS": 51, # Unit number of support
"V_MIN": 0.0, # Minimum value of support
"V_MAX": 200.0, # Maximum value of support
"WANDB": False, # Use Weights and Biases tracking
}
# Define the connect four environment
env = connect_four_v3.env()
env.reset()
# Configure the algo input arguments
observation_spaces = [env.observation_space(agent)["observation"] for agent in env.agents]
action_spaces = [env.action_space(agent) for agent in env.agents]
# Warp the environment in the curriculum learning wrapper
env = CurriculumEnv(env, LESSON)
# RL hyperparameters configuration for mutation during training
hp_config = HyperparameterConfig(
lr = RLParameter(min=1e-4, max=1e-2),
learn_step = RLParameter(min=1, max=120, dtype=int),
batch_size = RLParameter(
min=8, max=64, dtype=int
)
)
# Create a population ready for evolutionary hyper-parameter optimisation
observation_space = observation_space_channels_to_first(observation_spaces[0])
action_space = action_spaces[0]
pop = create_population(
INIT_HP["ALGO"],
observation_space,
action_space,
NET_CONFIG,
INIT_HP,
hp_config=hp_config,
population_size=INIT_HP["POPULATION_SIZE"],
device=device,
)
# Configure the replay buffer
field_names = ["state", "action", "reward", "next_state", "done"]
memory = ReplayBuffer(
action_dim=action_dim, # Number of agent actions
memory_size=INIT_HP["MEMORY_SIZE"], # Max replay buffer size
field_names=field_names, # Field names to store in memory
device=device,
)
# Instantiate a tournament selection object (used for HPO)
tournament = TournamentSelection(
tournament_size=2, # Tournament selection size
elitism=True, # Elitism in tournament selection
population_size=INIT_HP["POPULATION_SIZE"], # Population size
evo_step=1,
) # Evaluate using last N fitness scores
# Instantiate a mutations object (used for HPO)
mutations = Mutations(
no_mutation=0.2, # Probability of no mutation
architecture=0, # Probability of architecture mutation
new_layer_prob=0.2, # Probability of new layer mutation
parameters=0.2, # Probability of parameter mutation
activation=0, # Probability of activation function mutation
rl_hp=0.2, # Probability of RL hyperparameter mutation
mutation_sd=0.1, # Mutation strength
rand_seed=1,
device=device,
)
# Define training loop parameters
episodes_per_epoch = 10
max_episodes = LESSON["max_train_episodes"] # Total episodes
max_steps = 500 # Maximum steps to take in each episode
evo_epochs = 20 # Evolution frequency
evo_loop = 50 # Number of evaluation episodes
elite = pop[0] # Assign a placeholder "elite" agent
epsilon = 1.0 # Starting epsilon value
eps_end = 0.1 # Final epsilon value
eps_decay = 0.9998 # Epsilon decays
opp_update_counter = 0
wb = INIT_HP["WANDB"]
作为课程的一部分,我们也可以选择用随机经验填充回放缓冲区,并进行离线训练。
填充回放缓冲区
# Perform buffer and agent warmups if desired
if LESSON["buffer_warm_up"]:
warm_up_opponent = Opponent(env, difficulty=LESSON["warm_up_opponent"])
memory = env.fill_replay_buffer(
memory, warm_up_opponent
) # Fill replay buffer with transitions
if LESSON["agent_warm_up"] > 0:
print("Warming up agents ...")
agent = pop[0]
# Train on randomly collected samples
for epoch in trange(LESSON["agent_warm_up"]):
experiences = memory.sample(agent.batch_size)
agent.learn(experiences)
pop = [agent.clone() for _ in pop]
elite = agent
print("Agent population warmed up.")
自博弈¶
在本教程中,我们将自博弈作为课程的最后一课。通过迭代地改进我们的智能体并使其学会战胜自身,我们可以让它发现新的策略并获得更高的性能。来自先前课程的预训练智能体的权重可以如下加载到种群中:
加载预训练权重
if LESSON["pretrained_path"] is not None:
for agent in pop:
# Load pretrained checkpoint
agent.load_checkpoint(LESSON["pretrained_path"])
# Reinit optimizer for new task
agent.lr = INIT_HP["LR"]
agent.optimizer = OptimizerWrapper(
torch.optim.Adam
networks=agent.actor,
lr=agent.lr,
network_names=agent.optimizer.network_names,
lr_name=agent.optimizer.lr_name
)
为了与我们智能体的旧版本进行训练,我们创建了一个对手池。在训练时,我们从这个池中随机选择一个对手。定期更新对手池,移除最旧的对手,并添加我们智能体最新版本的副本。这在与难度逐渐增加的对手训练和提供对手可能采取的各种动作之间取得了平衡。
创建对手池
if LESSON["opponent"] == "self":
# Create initial pool of opponents
opponent_pool = deque(maxlen=LESSON["opponent_pool_size"])
for _ in range(LESSON["opponent_pool_size"]):
opp = copy.deepcopy(pop[0])
opp.actor.load_state_dict(pop[0].actor.state_dict())
opp.actor.eval()
opponent_pool.append(opp)
自博弈训练的示例课程配置可以定义如下:
第 4 课
---
# Connect Four Lesson 4
# Train against random agent: 'random', weak opponent: 'weak', strong opponent: 'strong', or use self-play: 'self'
opponent: self
opponent_pool_size: 6 # Size of opponent pool for self-play
opponent_upgrade: 6000 # Epoch frequency to update opponent pool
eval_opponent: strong # 'random', 'weak' or 'strong'
pretrained_path: models/DQN/lesson3_trained_agent.pt # Path to pretrained model weights
save_path: models/DQN/lesson4_trained_agent.pt # Path to save trained model
max_train_episodes: 200 # Maximum number of training episodes in environment
## Game specific:
buffer_warm_up: false # Fill replay buffer with random experiences
warm_up_opponent: # Difficulty level of warm up experiences
agent_warm_up: 0 # Number of epochs to warm up agent by training on random experiences
block_vert_coef: 1 # How many times more likely to block vertically if playing random opponent
rewards: # Rewards for different outcomes
win: 1
vertical_win: 1
three_in_row: 0.01
opp_three_in_row: -0.01
lose: -1
play_continues: 0
也可以仅通过自博弈来训练智能体,而不使用课程中的任何先前课时。这需要大量的训练时间,但最终可能会比其他方法产生更好的性能,并且可以避免《苦涩的教训》中讨论的一些错误。
训练循环¶
四子棋训练循环必须考虑到智能体每隔一次与环境互动才采取行动(对手轮流行动)。在将转换保存到回放缓冲区时必须考虑这一点。同样,在确定转换的奖励之前,我们必须等待下一个玩家回合的结果。因此,这并非一个真正的马尔可夫决策过程,但我们仍然可以在这些非平稳条件下相对成功地训练强化学习智能体。
定期评估种群中智能体的表现或‘适应度’,并进行进化步骤。表现最好的智能体更有可能成为下一代的成员,并且种群中智能体的超参数和神经网络架构会被突变。这种进化使我们能够在一次训练运行中优化超参数并最大化智能体的性能。
训练循环
if max_episodes > 0:
if wb:
wandb.init(
# set the wandb project where this run will be logged
project="AgileRL",
name="{}-EvoHPO-{}-{}Opposition-CNN-{}".format(
"connect_four_v3",
INIT_HP["ALGO"],
LESSON["opponent"],
datetime.now().strftime("%m%d%Y%H%M%S"),
),
# track hyperparameters and run metadata
config={
"algo": "Evo HPO Rainbow DQN",
"env": "connect_four_v3",
"INIT_HP": INIT_HP,
"lesson": LESSON,
},
)
total_steps = 0
total_episodes = 0
pbar = trange(int(max_episodes / episodes_per_epoch))
# Training loop
for idx_epi in pbar:
turns_per_episode = []
train_actions_hist = [0] * action_dim
for agent in pop: # Loop through population
for episode in range(episodes_per_epoch):
env.reset() # Reset environment at start of episode
observation, env_reward, done, truncation, _ = env.last()
(
p1_state,
p1_state_flipped,
p1_action,
p1_next_state,
p1_next_state_flipped,
) = (None, None, None, None, None)
if LESSON["opponent"] == "self":
# Randomly choose opponent from opponent pool if using self-play
opponent = random.choice(opponent_pool)
else:
# Create opponent of desired difficulty
opponent = Opponent(env, difficulty=LESSON["opponent"])
# Randomly decide whether agent will go first or second
if random.random() > 0.5:
opponent_first = False
else:
opponent_first = True
score = 0
turns = 0 # Number of turns counter
for idx_step in range(max_steps):
# Player 0"s turn
p0_action_mask = observation["action_mask"]
p0_state = np.moveaxis(observation["observation"], [-1], [-3])
p0_state_flipped = np.expand_dims(np.flip(p0_state, 2), 0)
p0_state = np.expand_dims(p0_state, 0)
if opponent_first:
if LESSON["opponent"] == "self":
p0_action = opponent.get_action(
p0_state, 0, p0_action_mask
)[0]
elif LESSON["opponent"] == "random":
p0_action = opponent.get_action(
p0_action_mask, p1_action, LESSON["block_vert_coef"]
)
else:
p0_action = opponent.get_action(player=0)
else:
p0_action = agent.get_action(
p0_state, epsilon, p0_action_mask
)[
0
] # Get next action from agent
train_actions_hist[p0_action] += 1
env.step(p0_action) # Act in environment
observation, env_reward, done, truncation, _ = env.last()
p0_next_state = np.moveaxis(
observation["observation"], [-1], [-3]
)
p0_next_state_flipped = np.expand_dims(
np.flip(p0_next_state, 2), 0
)
p0_next_state = np.expand_dims(p0_next_state, 0)
if not opponent_first:
score += env_reward
turns += 1
# Check if game is over (Player 0 win)
if done or truncation:
reward = env.reward(done=True, player=0)
memory.save2memoryVectEnvs(
np.concatenate(
(
p0_state,
p1_state,
p0_state_flipped,
p1_state_flipped,
)
),
[p0_action, p1_action, 6 - p0_action, 6 - p1_action],
[
reward,
LESSON["rewards"]["lose"],
reward,
LESSON["rewards"]["lose"],
],
np.concatenate(
(
p0_next_state,
p1_next_state,
p0_next_state_flipped,
p1_next_state_flipped,
)
),
[done, done, done, done],
)
else: # Play continues
if p1_state is not None:
reward = env.reward(done=False, player=1)
memory.save2memoryVectEnvs(
np.concatenate((p1_state, p1_state_flipped)),
[p1_action, 6 - p1_action],
[reward, reward],
np.concatenate(
(p1_next_state, p1_next_state_flipped)
),
[done, done],
)
# Player 1"s turn
p1_action_mask = observation["action_mask"]
p1_state = np.moveaxis(
observation["observation"], [-1], [-3]
)
# Swap pieces so that the agent always sees the board from the same perspective
p1_state[[0, 1], :, :] = p1_state[[0, 1], :, :]
p1_state_flipped = np.expand_dims(np.flip(p1_state, 2), 0)
p1_state = np.expand_dims(p1_state, 0)
if not opponent_first:
if LESSON["opponent"] == "self":
p1_action = opponent.get_action(
p1_state, 0, p1_action_mask
)[0]
elif LESSON["opponent"] == "random":
p1_action = opponent.get_action(
p1_action_mask,
p0_action,
LESSON["block_vert_coef"],
)
else:
p1_action = opponent.get_action(player=1)
else:
p1_action = agent.get_action(
p1_state, epsilon, p1_action_mask
)[
0
] # Get next action from agent
train_actions_hist[p1_action] += 1
env.step(p1_action) # Act in environment
observation, env_reward, done, truncation, _ = env.last()
p1_next_state = np.moveaxis(
observation["observation"], [-1], [-3]
)
p1_next_state[[0, 1], :, :] = p1_next_state[[0, 1], :, :]
p1_next_state_flipped = np.expand_dims(
np.flip(p1_next_state, 2), 0
)
p1_next_state = np.expand_dims(p1_next_state, 0)
if opponent_first:
score += env_reward
turns += 1
# Check if game is over (Player 1 win)
if done or truncation:
reward = env.reward(done=True, player=1)
memory.save2memoryVectEnvs(
np.concatenate(
(
p0_state,
p1_state,
p0_state_flipped,
p1_state_flipped,
)
),
[
p0_action,
p1_action,
6 - p0_action,
6 - p1_action,
],
[
LESSON["rewards"]["lose"],
reward,
LESSON["rewards"]["lose"],
reward,
],
np.concatenate(
(
p0_next_state,
p1_next_state,
p0_next_state_flipped,
p1_next_state_flipped,
)
),
[done, done, done, done],
)
else: # Play continues
reward = env.reward(done=False, player=0)
memory.save2memoryVectEnvs(
np.concatenate((p0_state, p0_state_flipped)),
[p0_action, 6 - p0_action],
[reward, reward],
np.concatenate(
(p0_next_state, p0_next_state_flipped)
),
[done, done],
)
# Learn according to learning frequency
if (memory.counter % agent.learn_step == 0) and (
len(memory) >= agent.batch_size
):
# Sample replay buffer
# Learn according to agent"s RL algorithm
experiences = memory.sample(agent.batch_size)
agent.learn(experiences)
# Stop episode if any agents have terminated
if done or truncation:
break
total_steps += idx_step + 1
total_episodes += 1
turns_per_episode.append(turns)
# Save the total episode reward
agent.scores.append(score)
if LESSON["opponent"] == "self":
if (total_episodes % LESSON["opponent_upgrade"] == 0) and (
(idx_epi + 1) > evo_epochs
):
elite_opp, _, _ = tournament._elitism(pop)
elite_opp.actor.eval()
opponent_pool.append(elite_opp)
opp_update_counter += 1
# Update epsilon for exploration
epsilon = max(eps_end, epsilon * eps_decay)
mean_turns = np.mean(turns_per_episode)
# Now evolve population if necessary
if (idx_epi + 1) % evo_epochs == 0:
# Evaluate population vs random actions
fitnesses = []
win_rates = []
eval_actions_hist = [0] * action_dim # Eval actions histogram
eval_turns = 0 # Eval turns counter
for agent in pop:
with torch.no_grad():
rewards = []
for i in range(evo_loop):
env.reset() # Reset environment at start of episode
observation, reward, done, truncation, _ = env.last()
player = -1 # Tracker for which player"s turn it is
# Create opponent of desired difficulty
opponent = Opponent(env, difficulty=LESSON["eval_opponent"])
# Randomly decide whether agent will go first or second
if random.random() > 0.5:
opponent_first = False
else:
opponent_first = True
score = 0
for idx_step in range(max_steps):
action_mask = observation["action_mask"]
if player < 0:
if opponent_first:
if LESSON["eval_opponent"] == "random":
action = opponent.get_action(action_mask)
else:
action = opponent.get_action(player=0)
else:
state = np.moveaxis(
observation["observation"], [-1], [-3]
)
state = np.expand_dims(state, 0)
action = agent.get_action(state, 0, action_mask)[
0
] # Get next action from agent
eval_actions_hist[action] += 1
if player > 0:
if not opponent_first:
if LESSON["eval_opponent"] == "random":
action = opponent.get_action(action_mask)
else:
action = opponent.get_action(player=1)
else:
state = np.moveaxis(
observation["observation"], [-1], [-3]
)
state[[0, 1], :, :] = state[[0, 1], :, :]
state = np.expand_dims(state, 0)
action = agent.get_action(state, 0, action_mask)[
0
] # Get next action from agent
eval_actions_hist[action] += 1
env.step(action) # Act in environment
observation, reward, done, truncation, _ = env.last()
if (player > 0 and opponent_first) or (
player < 0 and not opponent_first
):
score += reward
eval_turns += 1
if done or truncation:
break
player *= -1
rewards.append(score)
mean_fit = np.mean(rewards)
agent.fitness.append(mean_fit)
fitnesses.append(mean_fit)
eval_turns = eval_turns / len(pop) / evo_loop
pbar.set_postfix_str(
f" Train Mean Score: {np.mean(agent.scores[-episodes_per_epoch:])} Train Mean Turns: {mean_turns} Eval Mean Fitness: {np.mean(fitnesses)} Eval Best Fitness: {np.max(fitnesses)} Eval Mean Turns: {eval_turns} Total Steps: {total_steps}"
)
pbar.update(0)
# Format action histograms for visualisation
train_actions_hist = [
freq / sum(train_actions_hist) for freq in train_actions_hist
]
eval_actions_hist = [
freq / sum(eval_actions_hist) for freq in eval_actions_hist
]
train_actions_dict = {
f"train/action_{index}": action
for index, action in enumerate(train_actions_hist)
}
eval_actions_dict = {
f"eval/action_{index}": action
for index, action in enumerate(eval_actions_hist)
}
if wb:
wandb_dict = {
"global_step": total_steps,
"train/mean_score": np.mean(agent.scores[-episodes_per_epoch:]),
"train/mean_turns_per_game": mean_turns,
"train/epsilon": epsilon,
"train/opponent_updates": opp_update_counter,
"eval/mean_fitness": np.mean(fitnesses),
"eval/best_fitness": np.max(fitnesses),
"eval/mean_turns_per_game": eval_turns,
}
wandb_dict.update(train_actions_dict)
wandb_dict.update(eval_actions_dict)
wandb.log(wandb_dict)
# Tournament selection and population mutation
elite, pop = tournament.select(pop)
pop = mutations.mutation(pop)
if max_episodes > 0:
if wb:
wandb.finish()
# Save the trained agent
save_path = LESSON["save_path"]
os.makedirs(os.path.dirname(save_path), exist_ok=True)
elite.saveCheckpoint(save_path)
print(f"Elite agent saved to '{save_path}'.")
训练好的模型权重¶
训练好的模型权重位于 PettingZoo/tutorials/AgileRL/models
。去看一看,与这些模型对战,看看你能否击败它们!
观看训练好的智能体对弈¶
以下代码允许你从之前的训练块加载保存的 DQN 智能体,测试智能体的性能,然后将多个对局过程可视化为 gif 图。
渲染训练好的智能体
import os
import imageio
import numpy as np
import torch
from agilerl.algorithms.dqn import DQN
from agilerl.utils.utils import observation_space_channels_to_first
from agilerl_dqn_curriculum import Opponent
from PIL import Image, ImageDraw, ImageFont
from pettingzoo.classic import connect_four_v3
# Define function to return image
def _label_with_episode_number(frame, episode_num, frame_no, p):
im = Image.fromarray(frame)
drawer = ImageDraw.Draw(im)
text_color = (255, 255, 255)
font = ImageFont.truetype("arial.ttf", size=45)
drawer.text(
(100, 5),
f"Episode: {episode_num+1} Frame: {frame_no}",
fill=text_color,
font=font,
)
if p == 1:
player = "Player 1"
color = (255, 0, 0)
if p == 2:
player = "Player 2"
color = (100, 255, 150)
if p is None:
player = "Self-play"
color = (255, 255, 255)
drawer.text((700, 5), f"Agent: {player}", fill=color, font=font)
return im
# Resizes frames to make file size smaller
def resize_frames(frames, fraction):
resized_frames = []
for img in frames:
new_width = int(img.width * fraction)
new_height = int(img.height * fraction)
img_resized = img.resize((new_width, new_height))
resized_frames.append(np.array(img_resized))
return resized_frames
if __name__ == "__main__":
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
path = "./models/DQN/lesson3_trained_agent.pt" # Path to saved agent checkpoint
env = connect_four_v3.env(render_mode="rgb_array")
env.reset()
# Configure the algo input arguments
observation_spaces = [
env.observation_space(agent)["observation"] for agent in env.agents
]
action_spaces = [env.action_space(agent) for agent in env.agents]
# Instantiate an DQN object
dqn = DQN(
observation_space=observation_space_channels_to_first(observation_spaces[0]),
action_space=action_spaces[0],
device=device,
)
# Load the saved algorithm into the DQN object
dqn.load_checkpoint(path)
for opponent_difficulty in ["random", "weak", "strong", "self"]:
# Create opponent
if opponent_difficulty == "self":
opponent = dqn
else:
opponent = Opponent(env, opponent_difficulty)
# Define test loop parameters
episodes = 2 # Number of episodes to test agent on
max_steps = (
500 # Max number of steps to take in the environment in each episode
)
rewards = [] # List to collect total episodic reward
frames = [] # List to collect frames
print("============================================")
print(f"Agent: {path}")
print(f"Opponent: {opponent_difficulty}")
# Test loop for inference
for ep in range(episodes):
if ep / episodes < 0.5:
opponent_first = False
p = 1
else:
opponent_first = True
p = 2
if opponent_difficulty == "self":
p = None
env.reset() # Reset environment at start of episode
frame = env.render()
frames.append(
_label_with_episode_number(frame, episode_num=ep, frame_no=0, p=p)
)
observation, reward, done, truncation, _ = env.last()
player = -1 # Tracker for which player's turn it is
score = 0
for idx_step in range(max_steps):
action_mask = observation["action_mask"]
if player < 0:
state = np.moveaxis(observation["observation"], [-1], [-3])
state = np.expand_dims(state, 0)
if opponent_first:
if opponent_difficulty == "self":
action = opponent.get_action(
state, epsilon=0, action_mask=action_mask
)[0]
elif opponent_difficulty == "random":
action = opponent.get_action(action_mask)
else:
action = opponent.get_action(player=0)
else:
action = dqn.get_action(
state, epsilon=0, action_mask=action_mask
)[
0
] # Get next action from agent
if player > 0:
state = np.moveaxis(observation["observation"], [-1], [-3])
state[[0, 1], :, :] = state[[0, 1], :, :]
state = np.expand_dims(state, 0)
if not opponent_first:
if opponent_difficulty == "self":
action = opponent.get_action(
state, epsilon=0, action_mask=action_mask
)[0]
elif opponent_difficulty == "random":
action = opponent.get_action(action_mask)
else:
action = opponent.get_action(player=1)
else:
action = dqn.get_action(
state, epsilon=0, action_mask=action_mask
)[
0
] # Get next action from agent
env.step(action) # Act in environment
observation, reward, termination, truncation, _ = env.last()
# Save the frame for this step and append to frames list
frame = env.render()
frames.append(
_label_with_episode_number(
frame, episode_num=ep, frame_no=idx_step, p=p
)
)
if (player > 0 and opponent_first) or (
player < 0 and not opponent_first
):
score += reward
else:
score -= reward
# Stop episode if any agents have terminated
if truncation or termination:
break
player *= -1
print("-" * 15, f"Episode: {ep+1}", "-" * 15)
print(f"Episode length: {idx_step}")
print(f"Score: {score}")
print("============================================")
frames = resize_frames(frames, 0.5)
# Save the gif to specified path
gif_path = "./videos/"
os.makedirs(gif_path, exist_ok=True)
imageio.mimwrite(
os.path.join("./videos/", f"connect_four_{opponent_difficulty}_opp.gif"),
frames,
duration=400,
loop=True,
)
env.close()
完整的训练代码¶
完整的训练代码
请注意,在第 612 行,
max_episodes
被设置为 10,以便快速测试本教程代码。可以删除此行,并取消其下方行的注释,以使用配置文件中设置的对局数。
"""This tutorial shows how to train a DQN agent on the connect four environment, using curriculum learning and self play.
Author: Nick (https://github.com/nicku-a), Jaime (https://github.com/jaimesabalbermudez)
"""
import copy
import os
import random
from collections import deque
from typing import List, Optional, Tuple
import numpy as np
import torch
import yaml
from agilerl.algorithms import DQN
from agilerl.algorithms.core import OptimizerWrapper
from agilerl.algorithms.core.registry import HyperparameterConfig, RLParameter
from agilerl.components.data import Transition
from agilerl.components.replay_buffer import ReplayBuffer
from agilerl.hpo.mutation import Mutations
from agilerl.hpo.tournament import TournamentSelection
from agilerl.utils.algo_utils import obs_channels_to_first
from agilerl.utils.utils import create_population, observation_space_channels_to_first
from tqdm import tqdm, trange
from pettingzoo import ParallelEnv
from pettingzoo.classic import connect_four_v3
class CurriculumEnv:
"""Wrapper around environment to modify reward for curriculum learning.
:param env: Environment to learn in
:type env: PettingZoo-style environment
:param lesson: Lesson settings for curriculum learning
:type lesson: dict
"""
def __init__(self, env: ParallelEnv, lesson: dict):
self.env = env
self.lesson = lesson
def fill_replay_buffer(
self, memory: ReplayBuffer, opponent: "Opponent"
) -> ReplayBuffer:
"""Fill the replay buffer with experiences collected by taking random actions in the environment.
:param memory: Experience replay buffer
:type memory: AgileRL experience replay buffer
:param opponent: Opponent to train against
:type opponent: Opponent
:return: Filled replay buffer
:rtype: ReplayBuffer
"""
print("Filling replay buffer ...")
pbar = tqdm(total=memory.max_size)
while len(memory) < memory.max_size:
# Randomly decide whether random player will go first or second
opponent_first = random.random() > 0.5
mem_full = len(memory)
self.reset() # Reset environment at start of episode
observation, reward, done, truncation, _ = self.last()
(
p1_state,
p1_state_flipped,
p1_action,
p1_next_state,
p1_next_state_flipped,
) = (None, None, None, None, None)
done, truncation = False, False
while not (done or truncation):
# Player 0's turn
p0_action_mask = observation["action_mask"]
p0_state, p0_state_flipped = transform_and_flip(observation, player=0)
if opponent_first:
p0_action = self.env.action_space("player_0").sample(p0_action_mask)
else:
if self.lesson["warm_up_opponent"] == "random":
p0_action = opponent.get_action(
p0_action_mask, p1_action, self.lesson["block_vert_coef"]
)
else:
p0_action = opponent.get_action(player=0)
self.step(p0_action) # Act in environment
observation, env_reward, done, truncation, _ = self.last()
p0_next_state, p0_next_state_flipped = transform_and_flip(
observation, player=0
)
if done or truncation:
reward = self.reward(done=True, player=0)
transition = Transition(
obs=np.concatenate(
(p0_state, p1_state, p0_state_flipped, p1_state_flipped)
),
action=np.array(
[p0_action, p1_action, 6 - p0_action, 6 - p1_action]
),
reward=np.array(
[
reward,
LESSON["rewards"]["lose"],
reward,
LESSON["rewards"]["lose"],
]
),
next_obs=np.concatenate(
(
p0_next_state,
p1_next_state,
p0_next_state_flipped,
p1_next_state_flipped,
)
),
done=np.array([done, done, done, done]),
batch_size=[4],
)
memory.add(transition.to_tensordict())
else: # Play continues
if p1_state is not None:
reward = self.reward(done=False, player=1)
transition = Transition(
obs=np.concatenate((p1_state, p1_state_flipped)),
action=np.array([p1_action, 6 - p1_action]),
reward=np.array([reward, reward]),
next_obs=np.concatenate(
(p1_next_state, p1_next_state_flipped)
),
done=np.array([done, done]),
batch_size=[2],
)
memory.add(transition.to_tensordict())
# Player 1's turn
p1_action_mask = observation["action_mask"]
p1_state, p1_state_flipped = transform_and_flip(
observation, player=1
)
if not opponent_first:
p1_action = self.env.action_space("player_1").sample(
p1_action_mask
)
else:
if self.lesson["warm_up_opponent"] == "random":
p1_action = opponent.get_action(
p1_action_mask, p0_action, LESSON["block_vert_coef"]
)
else:
p1_action = opponent.get_action(player=1)
self.step(p1_action) # Act in environment
observation, env_reward, done, truncation, _ = self.last()
p1_next_state, p1_next_state_flipped = transform_and_flip(
observation, player=1
)
if done or truncation:
reward = self.reward(done=True, player=1)
transition = Transition(
obs=np.concatenate(
(p0_state, p1_state, p0_state_flipped, p1_state_flipped)
),
action=np.array(
[p0_action, p1_action, 6 - p0_action, 6 - p1_action]
),
reward=np.array(
[
LESSON["rewards"]["lose"],
reward,
LESSON["rewards"]["lose"],
reward,
]
),
next_obs=np.concatenate(
(
p0_next_state,
p1_next_state,
p0_next_state_flipped,
p1_next_state_flipped,
)
),
done=np.array([done, done, done, done]),
batch_size=[4],
)
memory.add(transition.to_tensordict())
else: # Play continues
reward = self.reward(done=False, player=0)
transition = Transition(
obs=np.concatenate((p0_state, p0_state_flipped)),
action=np.array([p0_action, 6 - p0_action]),
reward=np.array([reward, reward]),
next_obs=np.concatenate(
(p0_next_state, p0_next_state_flipped)
),
done=np.array([done, done]),
batch_size=[2],
)
memory.add(transition.to_tensordict())
pbar.update(len(memory) - mem_full)
pbar.close()
print("Replay buffer warmed up.")
return memory
def check_winnable(self, lst: List[int], piece: int) -> bool:
"""Checks if four pieces in a row represent a winnable opportunity, e.g. [1, 1, 1, 0] or [2, 0, 2, 2].
:param lst: List of pieces in row
:type lst: List
:param piece: Player piece we are checking (1 or 2)
:type piece: int
"""
return lst.count(piece) == 3 and lst.count(0) == 1
def check_vertical_win(self, player: int) -> bool:
"""Checks if a win is vertical.
:param player: Player who we are checking, 0 or 1
:type player: int
"""
board = np.array(self.env.env.board).reshape(6, 7)
piece = player + 1
column_count = 7
row_count = 6
# Check vertical locations for win
for c in range(column_count):
for r in range(row_count - 3):
if (
board[r][c] == piece
and board[r + 1][c] == piece
and board[r + 2][c] == piece
and board[r + 3][c] == piece
):
return True
return False
def check_three_in_row(self, player: int) -> int:
"""Checks if there are three pieces in a row and a blank space next, or two pieces - blank - piece.
:param player: Player who we are checking, 0 or 1
:type player: int
"""
board = np.array(self.env.env.board).reshape(6, 7)
piece = player + 1
# Check horizontal locations
column_count = 7
row_count = 6
three_in_row_count = 0
# Check vertical locations
for c in range(column_count):
for r in range(row_count - 3):
if self.check_winnable(board[r : r + 4, c].tolist(), piece):
three_in_row_count += 1
# Check horizontal locations
for r in range(row_count):
for c in range(column_count - 3):
if self.check_winnable(board[r, c : c + 4].tolist(), piece):
three_in_row_count += 1
# Check positively sloped diagonals
for c in range(column_count - 3):
for r in range(row_count - 3):
if self.check_winnable(
[
board[r, c],
board[r + 1, c + 1],
board[r + 2, c + 2],
board[r + 3, c + 3],
],
piece,
):
three_in_row_count += 1
# Check negatively sloped diagonals
for c in range(column_count - 3):
for r in range(3, row_count):
if self.check_winnable(
[
board[r, c],
board[r - 1, c + 1],
board[r - 2, c + 2],
board[r - 3, c + 3],
],
piece,
):
three_in_row_count += 1
return three_in_row_count
def reward(self, done: bool, player: int) -> float:
"""Processes and returns reward from environment according to lesson criteria.
:param done: Environment has terminated
:type done: bool
:param player: Player who we are checking, 0 or 1
:type player: int
"""
if done:
reward = (
self.lesson["rewards"]["vertical_win"]
if self.check_vertical_win(player)
else self.lesson["rewards"]["win"]
)
else:
agent_three_count = self.check_three_in_row(1 - player)
opp_three_count = self.check_three_in_row(player)
if (agent_three_count + opp_three_count) == 0:
reward = self.lesson["rewards"]["play_continues"]
else:
reward = (
self.lesson["rewards"]["three_in_row"] * agent_three_count
+ self.lesson["rewards"]["opp_three_in_row"] * opp_three_count
)
return reward
def last(self) -> Tuple[dict, float, bool, bool, dict]:
"""Wrapper around PettingZoo env last method."""
return self.env.last()
def step(self, action: int) -> None:
"""Wrapper around PettingZoo env step method."""
self.env.step(action)
def reset(self) -> None:
"""Wrapper around PettingZoo env reset method."""
self.env.reset()
class Opponent:
"""Connect 4 opponent to train and/or evaluate against.
:param env: Environment to learn in
:type env: PettingZoo-style environment
:param difficulty: Difficulty level of opponent, 'random', 'weak' or 'strong'
:type difficulty: str
"""
def __init__(self, env: ParallelEnv, difficulty: str):
self.env = env.env
self.difficulty = difficulty
if self.difficulty == "random":
self.get_action = self.random_opponent
elif self.difficulty == "weak":
self.get_action = self.weak_rule_based_opponent
else:
self.get_action = self.strong_rule_based_opponent
self.num_cols = 7
self.num_rows = 6
self.length = 4
self.top = [0] * self.num_cols
def update_top(self) -> None:
"""Updates self.top, a list which tracks the row on top of the highest piece in each column."""
board = np.array(self.env.env.board).reshape(self.num_rows, self.num_cols)
non_zeros = np.where(board != 0)
rows, cols = non_zeros
top = np.zeros(board.shape[1], dtype=int)
for col in range(board.shape[1]):
column_pieces = rows[cols == col]
if len(column_pieces) > 0:
top[col] = np.min(column_pieces) - 1
else:
top[col] = 5
full_columns = np.all(board != 0, axis=0)
top[full_columns] = 6
self.top = top
def random_opponent(
self,
action_mask: List[int],
last_opp_move: Optional[int] = None,
block_vert_coef: float = 1,
) -> int:
"""Takes move for random opponent.
If the lesson aims to randomly block vertical wins with a higher probability, this is done here too.
:param action_mask: Mask of legal actions: 1=legal, 0=illegal
:type action_mask: List
:param last_opp_move: Most recent action taken by agent against this opponent
:type last_opp_move: int
:param block_vert_coef: How many times more likely to block vertically
:type block_vert_coef: float
"""
if last_opp_move is not None:
action_mask[last_opp_move] *= block_vert_coef
action = random.choices(list(range(self.num_cols)), action_mask)[0]
return action
def weak_rule_based_opponent(self, player: int) -> int:
"""Takes move for weak rule-based opponent.
:param player: Player who we are checking, 0 or 1
:type player: int
"""
self.update_top()
max_length = -1
best_actions = []
for action in range(self.num_cols):
possible, reward, ended, lengths = self.outcome(
action, player, return_length=True
)
if possible and lengths.sum() > max_length:
best_actions = []
max_length = lengths.sum()
if possible and lengths.sum() == max_length:
best_actions.append(action)
best_action = random.choice(best_actions)
return best_action
def strong_rule_based_opponent(self, player: int) -> int:
"""Takes move for strong rule-based opponent.
:param player: Player who we are checking, 0 or 1
:type player: int
"""
self.update_top()
winning_actions = []
for action in range(self.num_cols):
possible, reward, ended = self.outcome(action, player)
if possible and ended:
winning_actions.append(action)
if len(winning_actions) > 0:
winning_action = random.choice(winning_actions)
return winning_action
opp = 1 if player == 0 else 0
loss_avoiding_actions = []
for action in range(self.num_cols):
possible, reward, ended = self.outcome(action, opp)
if possible and ended:
loss_avoiding_actions.append(action)
if len(loss_avoiding_actions) > 0:
loss_avoiding_action = random.choice(loss_avoiding_actions)
return loss_avoiding_action
return self.weak_rule_based_opponent(player) # take best possible move
def outcome(
self, action: int, player: int, return_length: bool = False
) -> Tuple[bool, Optional[float], bool, Optional[np.ndarray]]:
"""Takes move for weak rule-based opponent.
:param action: Action to take in environment
:type action: int
:param player: Player who we are checking, 0 or 1
:type player: int
:param return_length: Return length of outcomes, defaults to False
:type player: bool, optional
"""
if not (self.top[action] < self.num_rows): # action column is full
return (False, None, None) + ((None,) if return_length else ())
row, col = self.top[action], action
piece = player + 1
# down, up, left, right, down-left, up-right, down-right, up-left,
directions = np.array(
[
[[-1, 0], [1, 0]],
[[0, -1], [0, 1]],
[[-1, -1], [1, 1]],
[[-1, 1], [1, -1]],
]
) # |4x2x2|
positions = np.array([row, col]).reshape(1, 1, 1, -1) + np.expand_dims(
directions, -2
) * np.arange(1, self.length).reshape(
1, 1, -1, 1
) # |4x2x3x2|
valid_positions = np.logical_and(
np.logical_and(
positions[:, :, :, 0] >= 0, positions[:, :, :, 0] < self.num_rows
),
np.logical_and(
positions[:, :, :, 1] >= 0, positions[:, :, :, 1] < self.num_cols
),
) # |4x2x3|
d0 = np.where(valid_positions, positions[:, :, :, 0], 0)
d1 = np.where(valid_positions, positions[:, :, :, 1], 0)
board = np.array(self.env.env.board).reshape(self.num_rows, self.num_cols)
board_values = np.where(valid_positions, board[d0, d1], 0)
a = (board_values == piece).astype(int)
b = np.concatenate(
(a, np.zeros_like(a[:, :, :1])), axis=-1
) # padding with zeros to compute length
lengths = np.argmin(b, -1)
ended = False
# check if winnable in any direction
for both_dir in board_values:
# |2x3|
line = np.concatenate((both_dir[0][::-1], [piece], both_dir[1]))
if "".join(map(str, [piece] * self.length)) in "".join(map(str, line)):
ended = True
break
# ended = np.any(np.greater_equal(np.sum(lengths, 1), self.length - 1))
draw = True
for c, v in enumerate(self.top):
draw &= (v == self.num_rows) if c != col else (v == (self.num_rows - 1))
ended |= draw
reward = (-1) ** (player) if ended and not draw else 0
return (True, reward, ended) + ((lengths,) if return_length else ())
def transform_and_flip(observation, player):
"""Transforms and flips observation for input to agent's neural network.
:param observation: Observation to preprocess
:type observation: dict[str, np.ndarray]
:param player: Player, 0 or 1
:type player: int
"""
state = observation["observation"]
# Pre-process dimensions for PyTorch (N, C, H, W)
state = obs_channels_to_first(state)
if player == 1:
# Swap pieces so that the agent always sees the board from the same perspective
state[[0, 1], :, :] = state[[1, 0], :, :]
state_flipped = np.expand_dims(np.flip(state, 2), 0)
state = np.expand_dims(state, 0)
return state, state_flipped
if __name__ == "__main__":
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("===== AgileRL Curriculum Learning Demo =====")
for lesson_number in range(1, 5):
# Load lesson for curriculum
with open(f"./curriculums/connect_four/lesson{lesson_number}.yaml") as file:
LESSON = yaml.safe_load(file)
# Define the network configuration
NET_CONFIG = {
"encoder_config": {
"channel_size": [128], # CNN channel size
"kernel_size": [4], # CNN kernel size
"stride_size": [1], # CNN stride size
},
"head_config": {
"hidden_size": [64, 64], # Actor head hidden size
},
}
# Define the initial hyperparameters
INIT_HP = {
"POPULATION_SIZE": 6,
# "ALGO": "Rainbow DQN", # Algorithm
"ALGO": "DQN", # Algorithm
"DOUBLE": True,
# Swap image channels dimension from last to first [H, W, C] -> [C, H, W]
"BATCH_SIZE": 256, # Batch size
"LR": 1e-4, # Learning rate
"GAMMA": 0.99, # Discount factor
"MEMORY_SIZE": 10000, # Max memory buffer size
"LEARN_STEP": 1, # Learning frequency
"CUDAGRAPHS": False, # Use CUDA graphs
"N_STEP": 1, # Step number to calculate td error
"PER": False, # Use prioritized experience replay buffer
"ALPHA": 0.6, # Prioritized replay buffer parameter
"TAU": 0.01, # For soft update of target parameters
"BETA": 0.4, # Importance sampling coefficient
"PRIOR_EPS": 0.000001, # Minimum priority for sampling
"NUM_ATOMS": 51, # Unit number of support
"V_MIN": 0.0, # Minimum value of support
"V_MAX": 200.0, # Maximum value of support
}
# Define the connect four environment
env = connect_four_v3.env()
env.reset()
# Configure the algo input arguments
observation_spaces = [env.observation_space(agent) for agent in env.agents]
action_spaces = [env.action_space(agent) for agent in env.agents]
# Warp the environment in the curriculum learning wrapper
env = CurriculumEnv(env, LESSON)
# Pre-process dimensions for PyTorch layers
# We only need to worry about the state dim of a single agent
# We flatten the 6x7x2 observation as input to the agent"s neural network
observation_space = observation_space_channels_to_first(
observation_spaces[0]["observation"]
)
action_space = action_spaces[0]
# Mutation config for RL hyperparameters
hp_config = HyperparameterConfig(
lr=RLParameter(min=1e-4, max=1e-2),
batch_size=RLParameter(min=8, max=64, dtype=int),
learn_step=RLParameter(
min=1, max=120, dtype=int, grow_factor=1.5, shrink_factor=0.75
),
)
# Create a population ready for evolutionary hyper-parameter optimisation
pop: List[DQN] = create_population(
INIT_HP["ALGO"],
observation_space,
action_spaces[0],
NET_CONFIG,
INIT_HP,
hp_config,
population_size=INIT_HP["POPULATION_SIZE"],
device=device,
)
# Configure the replay buffer
memory = ReplayBuffer(
max_size=INIT_HP["MEMORY_SIZE"], # Max replay buffer size
device=device,
)
# Instantiate a tournament selection object (used for HPO)
tournament = TournamentSelection(
tournament_size=2, # Tournament selection size
elitism=True, # Elitism in tournament selection
population_size=INIT_HP["POPULATION_SIZE"], # Population size
eval_loop=1, # Evaluate using last N fitness scores
)
# Instantiate a mutations object (used for HPO)
mutations = Mutations(
no_mutation=0.2, # Probability of no mutation
architecture=0, # Probability of architecture mutation
new_layer_prob=0.2, # Probability of new layer mutation
parameters=0.2, # Probability of parameter mutation
activation=0, # Probability of activation function mutation
rl_hp=0.2, # Probability of RL hyperparameter mutation
mutation_sd=0.1, # Mutation strength
rand_seed=1,
device=device,
)
# Define training loop parameters
episodes_per_epoch = 10
max_episodes = LESSON["max_train_episodes"] # Total episodes
max_steps = 500 # Maximum steps to take in each episode
evo_epochs = 20 # Evolution frequency
evo_loop = 50 # Number of evaluation episodes
elite = pop[0] # Assign a placeholder "elite" agent
epsilon = 1.0 # Starting epsilon value
eps_end = 0.1 # Final epsilon value
eps_decay = 0.9998 # Epsilon decays
opp_update_counter = 0
if LESSON["pretrained_path"] is not None:
for agent in pop:
# Load pretrained checkpoint
agent.load_checkpoint(LESSON["pretrained_path"])
# Reinit optimizer for new task
agent.lr = INIT_HP["LR"]
agent.optimizer = OptimizerWrapper(
torch.optim.Adam,
networks=agent.actor,
lr=agent.lr,
network_names=agent.optimizer.network_names,
lr_name=agent.optimizer.lr_name,
optimizer_kwargs={"capturable": agent.capturable},
)
if LESSON["opponent"] == "self":
# Create initial pool of opponents
opponent_pool = deque(maxlen=LESSON["opponent_pool_size"])
for _ in range(LESSON["opponent_pool_size"]):
opp = copy.deepcopy(pop[0])
opp.actor.load_state_dict(pop[0].actor.state_dict())
opp.actor.eval()
opponent_pool.append(opp)
# Perform buffer and agent warmups if desired
if LESSON["buffer_warm_up"]:
warm_up_opponent = Opponent(env, difficulty=LESSON["warm_up_opponent"])
memory = env.fill_replay_buffer(
memory, warm_up_opponent
) # Fill replay buffer with transitions
if LESSON["agent_warm_up"] > 0:
print("Warming up agents ...")
agent = pop[0]
# Train on randomly collected samples
for epoch in trange(LESSON["agent_warm_up"]):
experiences = memory.sample(agent.batch_size)
agent.learn(experiences)
pop = [agent.clone() for _ in pop]
elite = agent
print("Agent population warmed up.")
total_steps = 0
total_episodes = 0
pbar = trange(int(max_episodes / episodes_per_epoch))
# Training loop
for idx_epi in pbar:
turns_per_episode = []
train_actions_hist = [0] * action_spaces[0].n
for agent in pop: # Loop through population
for episode in range(episodes_per_epoch):
env.reset() # Reset environment at start of episode
observation, cumulative_reward, done, truncation, _ = env.last()
(
p1_state,
p1_state_flipped,
p1_action,
p1_next_state,
p1_next_state_flipped,
) = (None, None, None, None, None)
if LESSON["opponent"] == "self":
# Randomly choose opponent from opponent pool if using self-play
opponent = random.choice(opponent_pool)
else:
# Create opponent of desired difficulty
opponent = Opponent(env, difficulty=LESSON["opponent"])
# Randomly decide whether agent will go first or second
opponent_first = random.random() > 0.5
score = 0
turns = 0 # Number of turns counter
for idx_step in range(max_steps):
# Player 0"s turn
p0_action_mask = observation["action_mask"]
p0_state, p0_state_flipped = transform_and_flip(
observation, player=0
)
if opponent_first:
if LESSON["opponent"] == "self":
p0_action = opponent.get_action(
p0_state, 0, p0_action_mask
)[0]
elif LESSON["opponent"] == "random":
p0_action = opponent.get_action(
p0_action_mask, p1_action, LESSON["block_vert_coef"]
)
else:
p0_action = opponent.get_action(player=0)
else:
p0_action = agent.get_action(
p0_state, epsilon, p0_action_mask
)[
0
] # Get next action from agent
train_actions_hist[p0_action] += 1
env.step(p0_action) # Act in environment
observation, cumulative_reward, done, truncation, _ = env.last()
p0_next_state, p0_next_state_flipped = transform_and_flip(
observation, player=0
)
if not opponent_first:
score = cumulative_reward
turns += 1
# Check if game is over (Player 0 win)
if done or truncation:
reward = env.reward(done=True, player=0)
transition = Transition(
obs=np.concatenate(
(
p0_state,
p1_state,
p0_state_flipped,
p1_state_flipped,
)
),
action=np.array(
[p0_action, p1_action, 6 - p0_action, 6 - p1_action]
),
reward=np.array(
[
reward,
LESSON["rewards"]["lose"],
reward,
LESSON["rewards"]["lose"],
]
),
next_obs=np.concatenate(
(
p0_next_state,
p1_next_state,
p0_next_state_flipped,
p1_next_state_flipped,
)
),
done=np.array([done, done, done, done]),
batch_size=[4],
)
memory.add(transition.to_tensordict())
else: # Play continues
if p1_state is not None:
reward = env.reward(done=False, player=1)
transition = Transition(
obs=np.concatenate((p1_state, p1_state_flipped)),
action=np.array([p1_action, 6 - p1_action]),
reward=np.array([reward, reward]),
next_obs=np.concatenate(
(p1_next_state, p1_next_state_flipped)
),
done=np.array([done, done]),
batch_size=[2],
)
memory.add(transition.to_tensordict())
# Player 1"s turn
p1_action_mask = observation["action_mask"]
p1_state, p1_state_flipped = transform_and_flip(
observation, player=1
)
if not opponent_first:
if LESSON["opponent"] == "self":
p1_action = opponent.get_action(
p1_state, 0, p1_action_mask
)[0]
elif LESSON["opponent"] == "random":
p1_action = opponent.get_action(
p1_action_mask,
p0_action,
LESSON["block_vert_coef"],
)
else:
p1_action = opponent.get_action(player=1)
else:
p1_action = agent.get_action(
p1_state, epsilon, p1_action_mask
)[
0
] # Get next action from agent
train_actions_hist[p1_action] += 1
env.step(p1_action) # Act in environment
(
observation,
cumulative_reward,
done,
truncation,
_,
) = env.last()
p1_next_state, p1_next_state_flipped = transform_and_flip(
observation, player=1
)
if opponent_first:
score = cumulative_reward
turns += 1
# Check if game is over (Player 1 win)
if done or truncation:
reward = env.reward(done=True, player=1)
transition = Transition(
obs=np.concatenate(
(
p0_state,
p1_state,
p0_state_flipped,
p1_state_flipped,
)
),
action=np.array(
[
p0_action,
p1_action,
6 - p0_action,
6 - p1_action,
]
),
reward=np.array(
[
reward,
LESSON["rewards"]["lose"],
reward,
LESSON["rewards"]["lose"],
]
),
next_obs=np.concatenate(
(
p0_next_state,
p1_next_state,
p0_next_state_flipped,
p1_next_state_flipped,
)
),
done=np.array([done, done, done, done]),
batch_size=[4],
)
memory.add(transition.to_tensordict())
else: # Play continues
reward = env.reward(done=False, player=0)
transition = Transition(
obs=np.concatenate((p0_state, p0_state_flipped)),
action=np.array([p0_action, 6 - p0_action]),
reward=np.array([reward, reward]),
next_obs=np.concatenate(
(p0_next_state, p0_next_state_flipped)
),
done=np.array([done, done]),
batch_size=[2],
)
memory.add(transition.to_tensordict())
# Learn according to learning frequency
if (memory.counter % agent.learn_step == 0) and (
len(memory) >= agent.batch_size
):
# Sample replay buffer
# Learn according to agent"s RL algorithm
experiences = memory.sample(agent.batch_size)
agent.learn(experiences)
# Stop episode if any agents have terminated
if done or truncation:
break
total_steps += idx_step + 1
total_episodes += 1
turns_per_episode.append(turns)
# Save the total episode reward
agent.scores.append(score)
if LESSON["opponent"] == "self":
if (total_episodes % LESSON["opponent_upgrade"] == 0) and (
(idx_epi + 1) > evo_epochs
):
elite_opp, _, _ = tournament._elitism(pop)
elite_opp.actor.eval()
opponent_pool.append(elite_opp)
opp_update_counter += 1
# Update epsilon for exploration
epsilon = max(eps_end, epsilon * eps_decay)
mean_turns = np.mean(turns_per_episode)
# Now evolve population if necessary
if (idx_epi + 1) % evo_epochs == 0:
# Evaluate population vs random actions
fitnesses = []
win_rates = []
eval_actions_hist = [0] * action_spaces[0].n # Eval actions histogram
eval_turns = 0 # Eval turns counter
for agent in pop:
with torch.no_grad():
rewards = []
for i in range(evo_loop):
env.reset() # Reset environment at start of episode
(
observation,
cumulative_reward,
done,
truncation,
_,
) = env.last()
player = -1 # Tracker for which player"s turn it is
# Create opponent of desired difficulty
opponent = Opponent(env, difficulty=LESSON["eval_opponent"])
# Randomly decide whether agent will go first or second
if random.random() > 0.5:
opponent_first = False
else:
opponent_first = True
score = 0
for idx_step in range(max_steps):
action_mask = observation["action_mask"]
if player < 0:
if opponent_first:
if LESSON["eval_opponent"] == "random":
action = opponent.get_action(action_mask)
else:
action = opponent.get_action(player=0)
else:
state = np.moveaxis(
observation["observation"], [-1], [-3]
)
state = np.expand_dims(state, 0)
action = agent.get_action(
state, 0, action_mask
)[
0
] # Get next action from agent
eval_actions_hist[action] += 1
if player > 0:
if not opponent_first:
if LESSON["eval_opponent"] == "random":
action = opponent.get_action(action_mask)
else:
action = opponent.get_action(player=1)
else:
state = np.moveaxis(
observation["observation"], [-1], [-3]
)
state[[0, 1], :, :] = state[[1, 0], :, :]
state = np.expand_dims(state, 0)
action = agent.get_action(
state, 0, action_mask
)[
0
] # Get next action from agent
eval_actions_hist[action] += 1
env.step(action) # Act in environment
(
observation,
cumulative_reward,
done,
truncation,
_,
) = env.last()
if (player > 0 and opponent_first) or (
player < 0 and not opponent_first
):
score = cumulative_reward
eval_turns += 1
if done or truncation:
break
player *= -1
rewards.append(score)
mean_fit = np.mean(rewards)
agent.fitness.append(mean_fit)
fitnesses.append(mean_fit)
eval_turns = eval_turns / len(pop) / evo_loop
pbar.set_postfix_str(
f"Train Mean Score: {np.mean(agent.scores[-episodes_per_epoch:])} "
f"Train Mean Turns: {mean_turns} "
f"Eval Mean Fitness: {np.mean(fitnesses)} "
f"Eval Best Fitness: {np.max(fitnesses)} "
f"Eval Mean Turns: {eval_turns} "
f"Total Steps: {total_steps}"
)
pbar.update(0)
# Tournament selection and population mutation
elite, pop = tournament.select(pop)
pop = mutations.mutation(pop)
# Save the trained agent
save_path = LESSON["save_path"]
os.makedirs(os.path.dirname(save_path), exist_ok=True)
elite.save_checkpoint(save_path)
print(f"Elite agent saved to '{save_path}'.")