AgileRL:实现 MADDPG

本教程展示了如何在 太空侵略者 (Space Invaders) Atari 环境中训练 MADDPG 智能体。

什么是 MADDPG?

MADDPG (Multi-Agent Deep Deterministic Policy Gradients) 扩展了 DDPG (Deep Deterministic Policy Gradients) 算法,以实现在复杂环境中多个智能体的合作或竞争训练,通过去中心化执行者和中心化评论者架构增强学习过程的稳定性和收敛性。有关 MADDPG 的更多信息,请查阅 AgileRL 文档

我可以使用它吗?

动作空间

观察空间

离散

✔️

✔️

连续

✔️

✔️

环境设置

要遵循本教程,你需要安装以下依赖项。建议使用新创建的虚拟环境以避免依赖冲突。

agilerl==2.2.1; python_version >= '3.10' and python_version < '3.12'
pettingzoo[classic,atari,mpe]>=1.23.1
AutoROM>=0.6.1
SuperSuit>=3.9.0
torch>=2.0.1
numpy>=1.24.2
tqdm>=4.65.0
fastrand==1.3.0
gymnasium>=0.28.1
imageio>=2.31.1
Pillow>=9.5.0
PyYAML>=5.4.1

代码

使用 MADDPG 训练多个智能体

以下代码应该可以顺利运行。注释旨在帮助你理解如何在 PettingZoo 中使用 AgileRL。如果你有任何问题,请随时在 Discord 服务器中提问。

"""This tutorial shows how to train an MADDPG agent on the space invaders atari environment.

Authors: Michael (https://github.com/mikepratt1), Nick (https://github.com/nicku-a), Jaime (https://github.com/jaimesabalbermudez)
"""

import os
from copy import deepcopy

import numpy as np
import supersuit as ss
import torch
from agilerl.algorithms.core.registry import HyperparameterConfig, RLParameter
from agilerl.components.multi_agent_replay_buffer import MultiAgentReplayBuffer
from agilerl.utils.algo_utils import obs_channels_to_first
from agilerl.utils.utils import create_population, observation_space_channels_to_first
from agilerl.vector.pz_async_vec_env import AsyncPettingZooVecEnv
from tqdm import trange

from pettingzoo.atari import space_invaders_v2

if __name__ == "__main__":
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Define the network configuration
    NET_CONFIG = {
        "encoder_config": {
            "channel_size": [32, 32],  # CNN channel size
            "kernel_size": [1, 1],  # CNN kernel size
            "stride_size": [2, 2],  # CNN stride size
        },
        "head_config": {"hidden_size": [32, 32]},  # Actor head hidden size
    }

    # Define the initial hyperparameters
    INIT_HP = {
        "POPULATION_SIZE": 1,
        "ALGO": "MADDPG",  # Algorithm
        # Swap image channels dimension from last to first [H, W, C] -> [C, H, W]
        "CHANNELS_LAST": True,
        "BATCH_SIZE": 32,  # Batch size
        "O_U_NOISE": True,  # Ornstein Uhlenbeck action noise
        "EXPL_NOISE": 0.1,  # Action noise scale
        "MEAN_NOISE": 0.0,  # Mean action noise
        "THETA": 0.15,  # Rate of mean reversion in OU noise
        "DT": 0.01,  # Timestep for OU noise
        "LR_ACTOR": 0.001,  # Actor learning rate
        "LR_CRITIC": 0.001,  # Critic learning rate
        "GAMMA": 0.95,  # Discount factor
        "MEMORY_SIZE": 100000,  # Max memory buffer size
        "LEARN_STEP": 100,  # Learning frequency
        "TAU": 0.01,  # For soft update of target parameters
    }

    num_envs = 8
    # Define the space invaders environment as a parallel environment
    env = space_invaders_v2.parallel_env()

    # Environment processing for image based observations
    env = ss.frame_skip_v0(env, 4)
    env = ss.clip_reward_v0(env, lower_bound=-1, upper_bound=1)
    env = ss.color_reduction_v0(env, mode="B")
    env = ss.resize_v1(env, x_size=84, y_size=84)
    env = ss.frame_stack_v1(env, 4)
    env = AsyncPettingZooVecEnv([lambda: env for _ in range(num_envs)])

    env.reset()

    # Configure the multi-agent algo input arguments
    observation_spaces = [env.single_observation_space(agent) for agent in env.agents]
    action_spaces = [env.single_action_space(agent) for agent in env.agents]
    if INIT_HP["CHANNELS_LAST"]:
        observation_spaces = [
            observation_space_channels_to_first(obs) for obs in observation_spaces
        ]

    # Append number of agents and agent IDs to the initial hyperparameter dictionary
    INIT_HP["AGENT_IDS"] = env.agents

    # Mutation config for RL hyperparameters
    hp_config = HyperparameterConfig(
        lr_actor=RLParameter(min=1e-4, max=1e-2),
        lr_critic=RLParameter(min=1e-4, max=1e-2),
        batch_size=RLParameter(min=8, max=512, dtype=int),
        learn_step=RLParameter(
            min=20, max=200, dtype=int, grow_factor=1.5, shrink_factor=0.75
        ),
    )

    # Create a population ready for evolutionary hyper-parameter optimisation
    agent = create_population(
        INIT_HP["ALGO"],
        observation_spaces,
        action_spaces,
        NET_CONFIG,
        INIT_HP,
        hp_config,
        population_size=INIT_HP["POPULATION_SIZE"],
        num_envs=num_envs,
        device=device,
    )[0]

    # Configure the multi-agent replay buffer
    field_names = ["state", "action", "reward", "next_state", "done"]
    memory = MultiAgentReplayBuffer(
        INIT_HP["MEMORY_SIZE"],
        field_names=field_names,
        agent_ids=INIT_HP["AGENT_IDS"],
        device=device,
    )

    # Define training loop parameters
    agent_ids = deepcopy(env.agents)
    max_steps = 20000  # Max steps (default: 2000000)
    learning_delay = 500  # Steps before starting learning
    training_steps = 10000  # Frequency at which we evaluate training score
    eval_steps = None  # Evaluation steps per episode - go until done
    eval_loop = 1  # Number of evaluation episodes

    total_steps = 0

    # TRAINING LOOP
    print("Training...")
    pbar = trange(max_steps, unit="step")
    while np.less(agent.steps[-1], max_steps):
        state, info = env.reset()  # Reset environment at start of episode
        scores = np.zeros((num_envs, len(agent_ids)))
        completed_episode_scores = []
        steps = 0
        if INIT_HP["CHANNELS_LAST"]:
            state = {
                agent_id: obs_channels_to_first(s) for agent_id, s in state.items()
            }

        for idx_step in range(training_steps // num_envs):
            # Get next action from agent
            cont_actions, discrete_action = agent.get_action(
                obs=state, training=True, infos=info
            )
            if agent.discrete_actions:
                action = discrete_action
            else:
                action = cont_actions

            # Act in environment
            action = {agent: env.action_space(agent).sample() for agent in env.agents}
            next_state, reward, termination, truncation, info = env.step(action)
            if not termination:
                assert False
            scores += np.array(list(reward.values())).transpose()
            total_steps += num_envs
            steps += num_envs

            # Image processing if necessary for the environment
            if INIT_HP["CHANNELS_LAST"]:
                next_state = {
                    agent_id: obs_channels_to_first(ns)
                    for agent_id, ns in next_state.items()
                }

            # Save experiences to replay buffer
            memory.save_to_memory(
                state,
                cont_actions,
                reward,
                next_state,
                termination,
                is_vectorised=True,
            )

            # Learn according to learning frequency
            # Handle learn steps > num_envs
            if agent.learn_step > num_envs:
                learn_step = agent.learn_step // num_envs
                if (
                    idx_step % learn_step == 0
                    and len(memory) >= agent.batch_size
                    and memory.counter > learning_delay
                ):
                    # Sample replay buffer
                    experiences = memory.sample(agent.batch_size)
                    # Learn according to agent's RL algorithm
                    agent.learn(experiences)
            # Handle num_envs > learn step; learn multiple times per step in env
            elif len(memory) >= agent.batch_size and memory.counter > learning_delay:
                for _ in range(num_envs // agent.learn_step):
                    # Sample replay buffer
                    experiences = memory.sample(agent.batch_size)
                    # Learn according to agent's RL algorithm
                    agent.learn(experiences)

            state = next_state

            # Calculate scores and reset noise for finished episodes
            reset_noise_indices = []
            term_array = np.array(list(termination.values())).transpose()
            trunc_array = np.array(list(truncation.values())).transpose()
            for idx, (d, t) in enumerate(zip(term_array, trunc_array)):
                if np.any(d) or np.any(t):
                    completed_episode_scores.append(scores[idx])
                    agent.scores.append(scores[idx])
                    scores[idx] = 0
                    reset_noise_indices.append(idx)
            agent.reset_action_noise(reset_noise_indices)

        pbar.update(training_steps)

        agent.steps[-1] += steps

        # Evaluate population
        fitness = agent.test(
            env,
            swap_channels=INIT_HP["CHANNELS_LAST"],
            max_steps=eval_steps,
            loop=eval_loop,
            sum_scores=False,
        )
        pop_episode_scores = np.array(completed_episode_scores)
        mean_scores = np.mean(pop_episode_scores, axis=0)

        print(f"--- Global steps {total_steps} ---")
        print(f"Steps {agent.steps[-1]}")
        print("Scores:")
        for idx, sub_agent in enumerate(agent_ids):
            print(f"    {sub_agent} score: {mean_scores[idx]}")
        print("Fitness")
        for idx, sub_agent in enumerate(agent_ids):
            print(f"    {sub_agent} fitness: {fitness[idx]}")
        print("Previous 5 fitness avgs")
        for idx, sub_agent in enumerate(agent_ids):
            print(
                f"  {sub_agent} fitness average: {np.mean(agent.fitness[-5:], axis=0)[idx]}"
            )

        # Update step counter
        agent.steps.append(agent.steps[-1])

    # Save the trained algorithm
    path = "./models/MADDPG"
    filename = "MADDPG_trained_agent.pt"
    os.makedirs(path, exist_ok=True)
    save_path = os.path.join(path, filename)
    agent.save_checkpoint(save_path)

    pbar.close()
    env.close()

观看训练好的智能体对弈

以下代码允许你从之前的训练块中加载保存的 MADDPG 算法,测试算法性能,然后将多个片段可视化为 GIF。

import os

import imageio
import numpy as np
import supersuit as ss
import torch
from agilerl.algorithms.maddpg import MADDPG
from agilerl.utils.algo_utils import obs_channels_to_first
from agilerl.utils.utils import observation_space_channels_to_first
from PIL import Image, ImageDraw

from pettingzoo.atari import space_invaders_v2


# Define function to return image
def _label_with_episode_number(frame, episode_num):
    im = Image.fromarray(frame)

    drawer = ImageDraw.Draw(im)

    if np.mean(frame) < 128:
        text_color = (255, 255, 255)
    else:
        text_color = (0, 0, 0)
    drawer.text(
        (im.size[0] / 20, im.size[1] / 18), f"Episode: {episode_num+1}", fill=text_color
    )

    return im


if __name__ == "__main__":
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Configure the environment
    env = space_invaders_v2.parallel_env(render_mode="rgb_array")
    channels_last = True  # Needed for environments that use images as observations
    if channels_last:
        # Environment processing for image based observations
        env = ss.frame_skip_v0(env, 4)
        env = ss.clip_reward_v0(env, lower_bound=-1, upper_bound=1)
        env = ss.color_reduction_v0(env, mode="B")
        env = ss.resize_v1(env, x_size=84, y_size=84)
        env = ss.frame_stack_v1(env, 4)

    env.reset()

    observation_spaces = [env.observation_space(agent) for agent in env.agents]
    action_spaces = [env.action_space(agent) for agent in env.agents]

    # Pre-process image dimensions for pytorch convolutional layers
    if channels_last:
        observation_spaces = [
            observation_space_channels_to_first(space) for space in observation_spaces
        ]

    # Append number of agents and agent IDs to the initial hyperparameter dictionary
    n_agents = env.num_agents
    agent_ids = env.agents

    # Instantiate an MADDPG object
    maddpg = MADDPG(
        observation_spaces=observation_spaces,
        action_spaces=action_spaces,
        agent_ids=agent_ids,
        device=device,
    )

    # Load the saved algorithm into the MADDPG object
    path = "./models/MADDPG/MADDPG_trained_agent.pt"
    maddpg.load_checkpoint(path)

    # Define test loop parameters
    episodes = 10  # Number of episodes to test agent on
    max_steps = 500  # Max number of steps to take in the environment in each episode

    rewards = []  # List to collect total episodic reward
    frames = []  # List to collect frames
    indi_agent_rewards = {
        agent_id: [] for agent_id in agent_ids
    }  # Dictionary to collect inidivdual agent rewards

    # Test loop for inference
    for ep in range(episodes):
        state, info = env.reset()
        agent_reward = {agent_id: 0 for agent_id in agent_ids}
        score = 0
        for _ in range(max_steps):
            if channels_last:
                state = {
                    agent_id: obs_channels_to_first(s) for agent_id, s in state.items()
                }

            agent_mask = info["agent_mask"] if "agent_mask" in info.keys() else None
            env_defined_actions = (
                info["env_defined_actions"]
                if "env_defined_actions" in info.keys()
                else None
            )

            # Get next action from agent
            cont_actions, discrete_action = maddpg.get_action(state, training=False)

            if maddpg.discrete_actions:
                action = discrete_action
            else:
                action = cont_actions

            # Save the frame for this step and append to frames list
            frame = env.render()
            frames.append(_label_with_episode_number(frame, episode_num=ep))

            # Take action in environment
            action = {agent_id: a[0] for agent_id, a in action.items()}
            state, reward, termination, truncation, info = env.step(action)

            # Save agent's reward for this step in this episode
            for agent_id, r in reward.items():
                agent_reward[agent_id] += r

            # Determine total score for the episode and then append to rewards list
            score = sum(agent_reward.values())

            # Stop episode if any agents have terminated
            if any(truncation.values()) or any(termination.values()):
                break

        rewards.append(score)

        # Record agent specific episodic reward for each agent
        for agent_id in agent_ids:
            indi_agent_rewards[agent_id].append(agent_reward[agent_id])

        print("-" * 15, f"Episode: {ep}", "-" * 15)
        print("Episodic Reward: ", rewards[-1])
        for agent_id, reward_list in indi_agent_rewards.items():
            print(f"{agent_id} reward: {reward_list[-1]}")

    env.close()

    # Save the gif to specified path
    gif_path = "./videos/"
    os.makedirs(gif_path, exist_ok=True)
    imageio.mimwrite(
        os.path.join("./videos/", "space_invaders.gif"), frames, duration=10
    )