教程:环境逻辑¶
介绍¶
现在我们对环境仓库的结构有了基本的了解,我们可以开始思考有趣的部分了——环境逻辑!
在本教程中,我们将创建一个双人游戏,其中包含一名试图逃脱的囚犯和一名试图抓住囚犯的守卫。这个游戏将在一个 7x7 的网格上进行,其中
囚犯从左上角开始,
守卫从右下角开始,
逃生门随机放置在网格的中间
囚犯和守卫都可以沿着四个基本方向(上、下、左、右)移动。
代码¶
/custom-environment/env/custom_environment.py¶
import functools
import random
from copy import copy
import numpy as np
from gymnasium.spaces import Discrete, MultiDiscrete
from pettingzoo import ParallelEnv
class CustomEnvironment(ParallelEnv):
"""The metadata holds environment constants.
The "name" metadata allows the environment to be pretty printed.
"""
metadata = {
"name": "custom_environment_v0",
}
def __init__(self):
"""The init method takes in environment arguments.
Should define the following attributes:
- escape x and y coordinates
- guard x and y coordinates
- prisoner x and y coordinates
- timestamp
- possible_agents
Note: as of v1.18.1, the action_spaces and observation_spaces attributes are deprecated.
Spaces should be defined in the action_space() and observation_space() methods.
If these methods are not overridden, spaces will be inferred from self.observation_spaces/action_spaces, raising a warning.
These attributes should not be changed after initialization.
"""
self.escape_y = None
self.escape_x = None
self.guard_y = None
self.guard_x = None
self.prisoner_y = None
self.prisoner_x = None
self.timestep = None
self.possible_agents = ["prisoner", "guard"]
def reset(self, seed=None, options=None):
"""Reset set the environment to a starting point.
It needs to initialize the following attributes:
- agents
- timestamp
- prisoner x and y coordinates
- guard x and y coordinates
- escape x and y coordinates
- observation
- infos
And must set up the environment so that render(), step(), and observe() can be called without issues.
"""
self.agents = copy(self.possible_agents)
self.timestep = 0
self.prisoner_x = 0
self.prisoner_y = 0
self.guard_x = 6
self.guard_y = 6
self.escape_x = random.randint(2, 5)
self.escape_y = random.randint(2, 5)
observations = {
a: (
self.prisoner_x + 7 * self.prisoner_y,
self.guard_x + 7 * self.guard_y,
self.escape_x + 7 * self.escape_y,
)
for a in self.agents
}
# Get dummy infos. Necessary for proper parallel_to_aec conversion
infos = {a: {} for a in self.agents}
return observations, infos
def step(self, actions):
"""Takes in an action for the current agent (specified by agent_selection).
Needs to update:
- prisoner x and y coordinates
- guard x and y coordinates
- terminations
- truncations
- rewards
- timestamp
- infos
And any internal state used by observe() or render()
"""
# Execute actions
prisoner_action = actions["prisoner"]
guard_action = actions["guard"]
if prisoner_action == 0 and self.prisoner_x > 0:
self.prisoner_x -= 1
elif prisoner_action == 1 and self.prisoner_x < 6:
self.prisoner_x += 1
elif prisoner_action == 2 and self.prisoner_y > 0:
self.prisoner_y -= 1
elif prisoner_action == 3 and self.prisoner_y < 6:
self.prisoner_y += 1
if guard_action == 0 and self.guard_x > 0:
self.guard_x -= 1
elif guard_action == 1 and self.guard_x < 6:
self.guard_x += 1
elif guard_action == 2 and self.guard_y > 0:
self.guard_y -= 1
elif guard_action == 3 and self.guard_y < 6:
self.guard_y += 1
# Check termination conditions
terminations = {a: False for a in self.agents}
rewards = {a: 0 for a in self.agents}
if self.prisoner_x == self.guard_x and self.prisoner_y == self.guard_y:
rewards = {"prisoner": -1, "guard": 1}
terminations = {a: True for a in self.agents}
elif self.prisoner_x == self.escape_x and self.prisoner_y == self.escape_y:
rewards = {"prisoner": 1, "guard": -1}
terminations = {a: True for a in self.agents}
# Check truncation conditions (overwrites termination conditions)
truncations = {a: False for a in self.agents}
if self.timestep > 100:
rewards = {"prisoner": 0, "guard": 0}
truncations = {"prisoner": True, "guard": True}
self.timestep += 1
# Get observations
observations = {
a: (
self.prisoner_x + 7 * self.prisoner_y,
self.guard_x + 7 * self.guard_y,
self.escape_x + 7 * self.escape_y,
)
for a in self.agents
}
# Get dummy infos (not used in this example)
infos = {a: {} for a in self.agents}
if any(terminations.values()) or all(truncations.values()):
self.agents = []
return observations, rewards, terminations, truncations, infos
def render(self):
"""Renders the environment."""
grid = np.full((7, 7), " ")
grid[self.prisoner_y, self.prisoner_x] = "P"
grid[self.guard_y, self.guard_x] = "G"
grid[self.escape_y, self.escape_x] = "E"
print(f"{grid} \n")
# Observation space should be defined here.
# lru_cache allows observation and action spaces to be memoized, reducing clock cycles required to get each agent's space.
# If your spaces change over time, remove this line (disable caching).
@functools.lru_cache(maxsize=None)
def observation_space(self, agent):
# gymnasium spaces are defined and documented here: https://gymnasium.org.cn/api/spaces/
return MultiDiscrete([7 * 7] * 3)
# Action space should be defined here.
# If your spaces change over time, remove this line (disable caching).
@functools.lru_cache(maxsize=None)
def action_space(self, agent):
return Discrete(4)