#
# Copyright (c) 2017 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import gym
import numpy as np
from enum import IntEnum
import scipy.ndimage
from rl_coach.graph_managers.graph_manager import ScheduleParameters
from rl_coach.utils import lower_under_to_upper, short_dynamic_import
try:
import roboschool
from OpenGL import GL
except ImportError:
from rl_coach.logger import failed_imports
failed_imports.append("RoboSchool")
try:
from gym_extensions.continuous import mujoco
except:
from rl_coach.logger import failed_imports
failed_imports.append("GymExtensions")
try:
import pybullet_envs
except ImportError:
from rl_coach.logger import failed_imports
failed_imports.append("PyBullet")
from typing import Dict, Any, Union
from rl_coach.core_types import RunPhase, EnvironmentSteps
from rl_coach.environments.environment import Environment, EnvironmentParameters, LevelSelection
from rl_coach.spaces import DiscreteActionSpace, BoxActionSpace, ImageObservationSpace, VectorObservationSpace, \
PlanarMapsObservationSpace, TensorObservationSpace, StateSpace, RewardSpace
from rl_coach.filters.filter import NoInputFilter, NoOutputFilter
from rl_coach.filters.reward.reward_clipping_filter import RewardClippingFilter
from rl_coach.filters.observation.observation_rescale_to_size_filter import ObservationRescaleToSizeFilter
from rl_coach.filters.observation.observation_stacking_filter import ObservationStackingFilter
from rl_coach.filters.observation.observation_rgb_to_y_filter import ObservationRGBToYFilter
from rl_coach.filters.observation.observation_to_uint8_filter import ObservationToUInt8Filter
from rl_coach.filters.filter import InputFilter
import random
from rl_coach.base_parameters import VisualizationParameters
from rl_coach.logger import screen
# Parameters
class GymEnvironmentParameters(EnvironmentParameters):
def __init__(self, level=None):
super().__init__(level=level)
self.random_initialization_steps = 0
self.max_over_num_frames = 1
self.additional_simulator_parameters = {}
self.observation_space_type = None
@property
def path(self):
return 'rl_coach.environments.gym_environment:GymEnvironment'
# Generic parameters for vector environments such as mujoco, roboschool, bullet, etc.
class GymVectorEnvironment(GymEnvironmentParameters):
def __init__(self, level=None):
super().__init__(level=level)
self.frame_skip = 1
self.default_input_filter = NoInputFilter()
self.default_output_filter = NoOutputFilter()
# Roboschool
gym_roboschool_envs = ['inverted_pendulum', 'inverted_pendulum_swingup', 'inverted_double_pendulum', 'reacher',
'hopper', 'walker2d', 'half_cheetah', 'ant', 'humanoid', 'humanoid_flagrun',
'humanoid_flagrun_harder', 'pong']
roboschool_v1 = {e: "Roboschool{}".format(lower_under_to_upper(e) + '-v1') for e in gym_roboschool_envs}
# Mujoco
gym_mujoco_envs = ['inverted_pendulum', 'inverted_double_pendulum', 'reacher', 'hopper', 'walker2d', 'half_cheetah',
'ant', 'swimmer', 'humanoid', 'humanoid_standup', 'pusher', 'thrower', 'striker']
mujoco_v2 = {e: "{}".format(lower_under_to_upper(e) + '-v2') for e in gym_mujoco_envs}
mujoco_v2['walker2d'] = 'Walker2d-v2'
# Fetch
gym_fetch_envs = ['reach', 'slide', 'push', 'pick_and_place']
fetch_v1 = {e: "{}".format('Fetch' + lower_under_to_upper(e) + '-v1') for e in gym_fetch_envs}
"""
Atari Environment Components
"""
AtariInputFilter = InputFilter(is_a_reference_filter=True)
AtariInputFilter.add_reward_filter('clipping', RewardClippingFilter(-1.0, 1.0))
AtariInputFilter.add_observation_filter('observation', 'rescaling',
ObservationRescaleToSizeFilter(ImageObservationSpace(np.array([84, 84, 3]),
high=255)))
AtariInputFilter.add_observation_filter('observation', 'to_grayscale', ObservationRGBToYFilter())
AtariInputFilter.add_observation_filter('observation', 'to_uint8', ObservationToUInt8Filter(0, 255))
AtariInputFilter.add_observation_filter('observation', 'stacking', ObservationStackingFilter(4))
AtariOutputFilter = NoOutputFilter()
class Atari(GymEnvironmentParameters):
def __init__(self, level=None):
super().__init__(level=level)
self.frame_skip = 4
self.max_over_num_frames = 2
self.random_initialization_steps = 30
self.default_input_filter = AtariInputFilter
self.default_output_filter = AtariOutputFilter
gym_atari_envs = ['air_raid', 'alien', 'amidar', 'assault', 'asterix', 'asteroids', 'atlantis',
'bank_heist', 'battle_zone', 'beam_rider', 'berzerk', 'bowling', 'boxing', 'breakout', 'carnival',
'centipede', 'chopper_command', 'crazy_climber', 'demon_attack', 'double_dunk',
'elevator_action', 'enduro', 'fishing_derby', 'freeway', 'frostbite', 'gopher', 'gravitar',
'hero', 'ice_hockey', 'jamesbond', 'journey_escape', 'kangaroo', 'krull', 'kung_fu_master',
'montezuma_revenge', 'ms_pacman', 'name_this_game', 'phoenix', 'pitfall', 'pong', 'pooyan',
'private_eye', 'qbert', 'riverraid', 'road_runner', 'robotank', 'seaquest', 'skiing',
'solaris', 'space_invaders', 'star_gunner', 'tennis', 'time_pilot', 'tutankham', 'up_n_down',
'venture', 'video_pinball', 'wizard_of_wor', 'yars_revenge', 'zaxxon']
atari_deterministic_v4 = {e: "{}".format(lower_under_to_upper(e) + 'Deterministic-v4') for e in gym_atari_envs}
atari_no_frameskip_v4 = {e: "{}".format(lower_under_to_upper(e) + 'NoFrameskip-v4') for e in gym_atari_envs}
# default atari schedule used in the DeepMind papers
atari_schedule = ScheduleParameters()
atari_schedule.improve_steps = EnvironmentSteps(50000000)
atari_schedule.steps_between_evaluation_periods = EnvironmentSteps(250000)
atari_schedule.evaluation_steps = EnvironmentSteps(135000)
atari_schedule.heatup_steps = EnvironmentSteps(1)
class MaxOverFramesAndFrameskipEnvWrapper(gym.Wrapper):
def __init__(self, env, frameskip=4, max_over_num_frames=2):
super().__init__(env)
self.max_over_num_frames = max_over_num_frames
self.observations_stack = []
self.frameskip = frameskip
self.first_frame_to_max_over = self.frameskip - self.max_over_num_frames
def reset(self):
return self.env.reset()
def step(self, action):
total_reward = 0.0
done = None
info = None
self.observations_stack = []
for i in range(self.frameskip):
observation, reward, done, info = self.env.step(action)
if i >= self.first_frame_to_max_over:
self.observations_stack.append(observation)
total_reward += reward
if done:
# deal with last state in episode
if not self.observations_stack:
self.observations_stack.append(observation)
break
max_over_frames_observation = np.max(self.observations_stack, axis=0)
return max_over_frames_observation, total_reward, done, info
# Environment
class ObservationSpaceType(IntEnum):
Tensor = 0
Image = 1
Vector = 2
[docs]class GymEnvironment(Environment):
def __init__(self,
level: LevelSelection,
frame_skip: int,
visualization_parameters: VisualizationParameters,
target_success_rate: float=1.0,
additional_simulator_parameters: Dict[str, Any] = {},
seed: Union[None, int] = None,
human_control: bool=False,
custom_reward_threshold: Union[int, float]=None,
random_initialization_steps: int=1,
max_over_num_frames: int=1,
observation_space_type: ObservationSpaceType=None,
**kwargs):
"""
:param level: (str)
A string representing the gym level to run. This can also be a LevelSelection object.
For example, BreakoutDeterministic-v0
:param frame_skip: (int)
The number of frames to skip between any two actions given by the agent. The action will be repeated
for all the skipped frames.
:param visualization_parameters: (VisualizationParameters)
The parameters used for visualizing the environment, such as the render flag, storing videos etc.
:param additional_simulator_parameters: (Dict[str, Any])
Any additional parameters that the user can pass to the Gym environment. These parameters should be
accepted by the __init__ function of the implemented Gym environment.
:param seed: (int)
A seed to use for the random number generator when running the environment.
:param human_control: (bool)
A flag that allows controlling the environment using the keyboard keys.
:param custom_reward_threshold: (float)
Allows defining a custom reward that will be used to decide when the agent succeeded in passing the environment.
If not set, this value will be taken from the Gym environment definition.
:param random_initialization_steps: (int)
The number of random steps that will be taken in the environment after each reset.
This is a feature presented in the DQN paper, which improves the variability of the episodes the agent sees.
:param max_over_num_frames: (int)
This value will be used for merging multiple frames into a single frame by taking the maximum value for each
of the pixels in the frame. This is particularly used in Atari games, where the frames flicker, and objects
can be seen in one frame but disappear in the next.
:param observation_space_type:
This value will be used for generating observation space. Allows a custom space. Should be one of
ObservationSpaceType. If not specified, observation space is inferred from the number of dimensions
of the observation: 1D: Vector space, 3D: Image space if 1 or 3 channels, PlanarMaps space otherwise.
"""
super().__init__(level, seed, frame_skip, human_control, custom_reward_threshold,
visualization_parameters, target_success_rate)
self.random_initialization_steps = random_initialization_steps
self.max_over_num_frames = max_over_num_frames
self.additional_simulator_parameters = additional_simulator_parameters
# hide warnings
gym.logger.set_level(40)
"""
load and initialize environment
environment ids can be defined in 3 ways:
1. Native gym environments like BreakoutDeterministic-v0 for example
2. Custom gym environments written and installed as python packages.
This environments should have a python module with a class inheriting gym.Env, implementing the
relevant functions (_reset, _step, _render) and defining the observation and action space
For example: my_environment_package:MyEnvironmentClass will run an environment defined in the
MyEnvironmentClass class
3. Custom gym environments written as an independent module which is not installed.
This environments should have a python module with a class inheriting gym.Env, implementing the
relevant functions (_reset, _step, _render) and defining the observation and action space.
For example: path_to_my_environment.sub_directory.my_module:MyEnvironmentClass will run an
environment defined in the MyEnvironmentClass class which is located in the module in the relative path
path_to_my_environment.sub_directory.my_module
"""
if ':' in self.env_id:
# custom environments
if '/' in self.env_id or '.' in self.env_id:
# environment in a an absolute path module written as a unix path or in a relative path module
# written as a python import path
env_class = short_dynamic_import(self.env_id)
else:
# environment in a python package
env_class = gym.envs.registration.load(self.env_id)
# instantiate the environment
try:
self.env = env_class(**self.additional_simulator_parameters)
except:
screen.error("Failed to instantiate Gym environment class %s with arguments %s" %
(env_class, self.additional_simulator_parameters), crash=False)
raise
else:
self.env = gym.make(self.env_id)
# for classic control we want to use the native renderer because otherwise we will get 2 renderer windows
environment_to_always_use_with_native_rendering = ['classic_control', 'mujoco', 'robotics']
self.native_rendering = self.native_rendering or \
any([env in str(self.env.unwrapped.__class__)
for env in environment_to_always_use_with_native_rendering])
if self.native_rendering:
if hasattr(self, 'renderer'):
self.renderer.close()
# seed
if self.seed is not None:
self.env.seed(self.seed)
np.random.seed(self.seed)
random.seed(self.seed)
# frame skip and max between consecutive frames
self.is_mujoco_env = 'mujoco' in str(self.env.unwrapped.__class__)
self.is_roboschool_env = 'roboschool' in str(self.env.unwrapped.__class__)
self.is_atari_env = 'Atari' in str(self.env.unwrapped.__class__)
if self.is_atari_env:
self.env.unwrapped.frameskip = 1 # this accesses the atari env that is wrapped with a timelimit wrapper env
if self.env_id == "SpaceInvadersDeterministic-v4" and self.frame_skip == 4:
screen.warning("Warning: The frame-skip for Space Invaders was automatically updated from 4 to 3. "
"This is following the DQN paper where it was noticed that a frame-skip of 3 makes the "
"laser rays disappear. To force frame-skip of 4, please use SpaceInvadersNoFrameskip-v4.")
self.frame_skip = 3
self.env = MaxOverFramesAndFrameskipEnvWrapper(self.env,
frameskip=self.frame_skip,
max_over_num_frames=self.max_over_num_frames)
else:
self.env.unwrapped.frameskip = self.frame_skip
self.state_space = StateSpace({})
# observations
if not isinstance(self.env.observation_space, gym.spaces.dict.Dict):
state_space = {'observation': self.env.observation_space}
else:
state_space = self.env.observation_space.spaces
for observation_space_name, observation_space in state_space.items():
if observation_space_type == ObservationSpaceType.Tensor:
# we consider arbitrary input tensor which does not necessarily represent images
self.state_space[observation_space_name] = TensorObservationSpace(
shape=np.array(observation_space.shape),
low=observation_space.low,
high=observation_space.high
)
elif observation_space_type == ObservationSpaceType.Image or len(observation_space.shape) == 3:
# we assume gym has image observations (with arbitrary number of channels) where their values are
# within 0-255, and where the channel dimension is the last dimension
if observation_space.shape[-1] in [1, 3]:
self.state_space[observation_space_name] = ImageObservationSpace(
shape=np.array(observation_space.shape),
high=255,
channels_axis=-1
)
else:
# For any number of channels other than 1 or 3, use the generic PlanarMaps space
self.state_space[observation_space_name] = PlanarMapsObservationSpace(
shape=np.array(observation_space.shape),
low=0,
high=255,
channels_axis=-1
)
elif observation_space_type == ObservationSpaceType.Vector or len(observation_space.shape) == 1:
self.state_space[observation_space_name] = VectorObservationSpace(
shape=observation_space.shape[0],
low=observation_space.low,
high=observation_space.high
)
else:
raise screen.error("Failed to instantiate Gym environment class %s with observation space type %s" %
(env_class, observation_space_type), crash=True)
if 'desired_goal' in state_space.keys():
self.goal_space = self.state_space['desired_goal']
# actions
if type(self.env.action_space) == gym.spaces.box.Box:
self.action_space = BoxActionSpace(
shape=self.env.action_space.shape,
low=self.env.action_space.low,
high=self.env.action_space.high
)
elif type(self.env.action_space) == gym.spaces.discrete.Discrete:
actions_description = []
if hasattr(self.env.unwrapped, 'get_action_meanings'):
actions_description = self.env.unwrapped.get_action_meanings()
self.action_space = DiscreteActionSpace(
num_actions=self.env.action_space.n,
descriptions=actions_description
)
else:
raise screen.error((
"Failed to instantiate gym environment class {} due to unsupported "
"action space {}. Expected BoxActionSpace or DiscreteActionSpace."
).format(env_class, self.env.action_space), crash=True)
if self.human_control:
# TODO: add this to the action space
# map keyboard keys to actions
self.key_to_action = {}
if hasattr(self.env.unwrapped, 'get_keys_to_action'):
self.key_to_action = self.env.unwrapped.get_keys_to_action()
else:
screen.error("Error: Environment {} does not support human control.".format(self.env), crash=True)
# render
if self.is_rendered:
image = self.get_rendered_image()
scale = 1
if self.human_control:
scale = 2
if not self.native_rendering:
self.renderer.create_screen(image.shape[1]*scale, image.shape[0]*scale)
# the info is only updated after the first step
self.state_space['measurements'] = VectorObservationSpace(shape=len(self.info.keys()))
if self.env.spec and custom_reward_threshold is None:
self.reward_success_threshold = self.env.spec.reward_threshold
self.reward_space = RewardSpace(1, reward_success_threshold=self.reward_success_threshold)
self.target_success_rate = target_success_rate
def _wrap_state(self, state):
if not isinstance(self.env.observation_space, gym.spaces.Dict):
return {'observation': state}
return state
def _update_state(self):
if self.is_atari_env and hasattr(self, 'current_ale_lives') \
and self.current_ale_lives != self.env.unwrapped.ale.lives():
if self.phase == RunPhase.TRAIN or self.phase == RunPhase.HEATUP:
# signal termination for life loss
self.done = True
elif self.phase == RunPhase.TEST and not self.done:
# the episode is not terminated in evaluation, but we need to press fire again
self._press_fire()
self._update_ale_lives()
# TODO: update the measurements
if self.state and "desired_goal" in self.state.keys():
self.goal = self.state['desired_goal']
def _take_action(self, action):
if type(self.action_space) == BoxActionSpace:
action = self.action_space.clip_action_to_space(action)
self.state, self.reward, self.done, self.info = self.env.step(action)
self.state = self._wrap_state(self.state)
def _random_noop(self):
# simulate a random initial environment state by stepping for a random number of times between 0 and 30
step_count = 0
random_initialization_steps = random.randint(0, self.random_initialization_steps)
while self.action_space is not None and (self.state is None or step_count < random_initialization_steps):
step_count += 1
self.step(self.action_space.default_action)
def _press_fire(self):
fire_action = 1
if self.is_atari_env and self.env.unwrapped.get_action_meanings()[fire_action] == 'FIRE':
self.current_ale_lives = self.env.unwrapped.ale.lives()
self.step(fire_action)
if self.done:
self.reset_internal_state()
def _update_ale_lives(self):
if self.is_atari_env:
self.current_ale_lives = self.env.unwrapped.ale.lives()
def _restart_environment_episode(self, force_environment_reset=False):
# prevent reset of environment if there are ale lives left
if (self.is_atari_env and self.env.unwrapped.ale.lives() > 0) \
and not force_environment_reset:
self.step(self.action_space.default_action)
else:
self.state = self.env.reset()
self.state = self._wrap_state(self.state)
self._update_ale_lives()
if self.is_atari_env:
self._random_noop()
self._press_fire()
# initialize the number of lives
self._update_ale_lives()
def _render(self):
self.env.render(mode='human')
def get_rendered_image(self):
image = self.env.render(mode='rgb_array')
return image
def get_target_success_rate(self) -> float:
return self.target_success_rate
def close(self) -> None:
"""
Clean up to close rendering windows.
:return: None
"""
self.env.close()