Source code for rl_coach.environments.doom_environment

#
# Copyright (c) 2017 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

try:
    import vizdoom
except ImportError:
    from rl_coach.logger import failed_imports
    failed_imports.append("ViZDoom")

import os
from enum import Enum
from os import path, environ
from typing import Union, List

import numpy as np

from rl_coach.base_parameters import VisualizationParameters
from rl_coach.environments.environment import Environment, EnvironmentParameters, LevelSelection
from rl_coach.filters.action.full_discrete_action_space_map import FullDiscreteActionSpaceMap
from rl_coach.filters.filter import InputFilter, OutputFilter
from rl_coach.filters.observation.observation_rescale_to_size_filter import ObservationRescaleToSizeFilter
from rl_coach.filters.observation.observation_rgb_to_y_filter import ObservationRGBToYFilter
from rl_coach.filters.observation.observation_stacking_filter import ObservationStackingFilter
from rl_coach.filters.observation.observation_to_uint8_filter import ObservationToUInt8Filter
from rl_coach.spaces import MultiSelectActionSpace, ImageObservationSpace, \
    VectorObservationSpace, StateSpace


# enum of the available levels and their path
class DoomLevel(Enum):
    BASIC = "basic.cfg"
    DEFEND = "defend_the_center.cfg"
    DEATHMATCH = "deathmatch.cfg"
    MY_WAY_HOME = "my_way_home.cfg"
    TAKE_COVER = "take_cover.cfg"
    HEALTH_GATHERING = "health_gathering.cfg"
    HEALTH_GATHERING_SUPREME_COACH_LOCAL = "D2_navigation.cfg"  # from https://github.com/IntelVCL/DirectFuturePrediction/tree/master/maps
    DEFEND_THE_LINE = "defend_the_line.cfg"
    DEADLY_CORRIDOR = "deadly_corridor.cfg"
    BATTLE_COACH_LOCAL = "D3_battle.cfg"  # from https://github.com/IntelVCL/DirectFuturePrediction/tree/master/maps

key_map = {
    'NO-OP': 96,  # `
    'ATTACK': 13,  # enter
    'CROUCH': 306,  # ctrl
    'DROP_SELECTED_ITEM': ord("t"),
    'DROP_SELECTED_WEAPON': ord("t"),
    'JUMP': 32,  # spacebar
    'LAND': ord("l"),
    'LOOK_DOWN': 274,  # down arrow
    'LOOK_UP': 273,  # up arrow
    'MOVE_BACKWARD': ord("s"),
    'MOVE_DOWN': ord("s"),
    'MOVE_FORWARD': ord("w"),
    'MOVE_LEFT': 276,
    'MOVE_RIGHT': 275,
    'MOVE_UP': ord("w"),
    'RELOAD': ord("r"),
    'SELECT_NEXT_WEAPON': ord("q"),
    'SELECT_PREV_WEAPON': ord("e"),
    'SELECT_WEAPON0': ord("0"),
    'SELECT_WEAPON1': ord("1"),
    'SELECT_WEAPON2': ord("2"),
    'SELECT_WEAPON3': ord("3"),
    'SELECT_WEAPON4': ord("4"),
    'SELECT_WEAPON5': ord("5"),
    'SELECT_WEAPON6': ord("6"),
    'SELECT_WEAPON7': ord("7"),
    'SELECT_WEAPON8': ord("8"),
    'SELECT_WEAPON9': ord("9"),
    'SPEED': 304,  # shift
    'STRAFE': 9,  # tab
    'TURN180': ord("u"),
    'TURN_LEFT': ord("a"),  # left arrow
    'TURN_RIGHT': ord("d"),  # right arrow
    'USE': ord("f"),
}


DoomInputFilter = InputFilter(is_a_reference_filter=True)
DoomInputFilter.add_observation_filter('observation', 'rescaling',
                                       ObservationRescaleToSizeFilter(ImageObservationSpace(np.array([60, 76, 3]),
                                                                                            high=255)))
DoomInputFilter.add_observation_filter('observation', 'to_grayscale', ObservationRGBToYFilter())
DoomInputFilter.add_observation_filter('observation', 'to_uint8', ObservationToUInt8Filter(0, 255))
DoomInputFilter.add_observation_filter('observation', 'stacking', ObservationStackingFilter(3))


DoomOutputFilter = OutputFilter(is_a_reference_filter=True)
DoomOutputFilter.add_action_filter('to_discrete', FullDiscreteActionSpaceMap())


class DoomEnvironmentParameters(EnvironmentParameters):
    def __init__(self, level=None):
        super().__init__(level=level)
        self.default_input_filter = DoomInputFilter
        self.default_output_filter = DoomOutputFilter
        self.cameras = [DoomEnvironment.CameraTypes.OBSERVATION]

    @property
    def path(self):
        return 'rl_coach.environments.doom_environment:DoomEnvironment'


[docs]class DoomEnvironment(Environment): class CameraTypes(Enum): OBSERVATION = ("observation", "screen_buffer") DEPTH = ("depth", "depth_buffer") LABELS = ("labels", "labels_buffer") MAP = ("map", "automap_buffer") def __init__(self, level: LevelSelection, seed: int, frame_skip: int, human_control: bool, custom_reward_threshold: Union[int, float], visualization_parameters: VisualizationParameters, cameras: List[CameraTypes], target_success_rate: float=1.0, **kwargs): """ :param level: (str) A string representing the doom level to run. This can also be a LevelSelection object. This should be one of the levels defined in the DoomLevel enum. For example, HEALTH_GATHERING. :param seed: (int) A seed to use for the random number generator when running the environment. :param frame_skip: (int) The number of frames to skip between any two actions given by the agent. The action will be repeated for all the skipped frames. :param human_control: (bool) A flag that allows controlling the environment using the keyboard keys. :param custom_reward_threshold: (float) Allows defining a custom reward that will be used to decide when the agent succeeded in passing the environment. :param visualization_parameters: (VisualizationParameters) The parameters used for visualizing the environment, such as the render flag, storing videos etc. :param cameras: (List[CameraTypes]) A list of camera types to use as observation in the state returned from the environment. Each camera should be an enum from CameraTypes, and there are several options like an RGB observation, a depth map, a segmentation map, and a top down map of the enviornment. :param target_success_rate: (float) Stop experiment if given target success rate was achieved. """ super().__init__(level, seed, frame_skip, human_control, custom_reward_threshold, visualization_parameters, target_success_rate) self.cameras = cameras # load the emulator with the required level self.level = DoomLevel[level.upper()] local_scenarios_path = path.join(os.path.dirname(os.path.realpath(__file__)), 'doom') if 'COACH_LOCAL' in level: self.scenarios_dir = local_scenarios_path elif 'VIZDOOM_ROOT' in environ: self.scenarios_dir = path.join(environ.get('VIZDOOM_ROOT'), 'scenarios') else: self.scenarios_dir = path.join(os.path.dirname(os.path.realpath(vizdoom.__file__)), 'scenarios') self.game = vizdoom.DoomGame() self.game.load_config(path.join(self.scenarios_dir, self.level.value)) self.game.set_window_visible(False) self.game.add_game_args("+vid_forcesurface 1") self.wait_for_explicit_human_action = True if self.human_control: self.game.set_screen_resolution(vizdoom.ScreenResolution.RES_640X480) elif self.is_rendered: self.game.set_screen_resolution(vizdoom.ScreenResolution.RES_320X240) else: # lower resolution since we actually take only 76x60 and we don't need to render self.game.set_screen_resolution(vizdoom.ScreenResolution.RES_160X120) self.game.set_render_hud(False) self.game.set_render_crosshair(False) self.game.set_render_decals(False) self.game.set_render_particles(False) for camera in self.cameras: if hasattr(self.game, 'set_{}_enabled'.format(camera.value[1])): getattr(self.game, 'set_{}_enabled'.format(camera.value[1]))(True) self.game.init() # actions actions_description = ['NO-OP'] actions_description += [str(action).split(".")[1] for action in self.game.get_available_buttons()] actions_description = actions_description[::-1] self.action_space = MultiSelectActionSpace(self.game.get_available_buttons_size(), max_simultaneous_selected_actions=1, descriptions=actions_description, allow_no_action_to_be_selected=True) # human control if self.human_control: # TODO: add this to the action space # map keyboard keys to actions for idx, action in enumerate(self.action_space.descriptions): if action in key_map.keys(): self.key_to_action[(key_map[action],)] = idx # states self.state_space = StateSpace({ "measurements": VectorObservationSpace(self.game.get_state().game_variables.shape[0], measurements_names=[str(m) for m in self.game.get_available_game_variables()]) }) for camera in self.cameras: self.state_space[camera.value[0]] = ImageObservationSpace( shape=np.array([self.game.get_screen_height(), self.game.get_screen_width(), 3]), high=255) # seed if seed is not None: self.game.set_seed(seed) self.reset_internal_state() # render if self.is_rendered: image = self.get_rendered_image() self.renderer.create_screen(image.shape[1], image.shape[0]) self.target_success_rate = target_success_rate def _update_state(self): # extract all data from the current state state = self.game.get_state() if state is not None and state.screen_buffer is not None: self.measurements = state.game_variables self.state = {'measurements': self.measurements} for camera in self.cameras: observation = getattr(state, camera.value[1]) if len(observation.shape) == 3: self.state[camera.value[0]] = np.transpose(observation, (1, 2, 0)) elif len(observation.shape) == 2: self.state[camera.value[0]] = np.repeat(np.expand_dims(observation, -1), 3, axis=-1) self.reward = self.game.get_last_reward() self.done = self.game.is_episode_finished() def _take_action(self, action): self.game.make_action(list(action), self.frame_skip) def _restart_environment_episode(self, force_environment_reset=False): self.game.new_episode() def get_rendered_image(self) -> np.ndarray: """ Return a numpy array containing the image that will be rendered to the screen. This can be different from the observation. For example, mujoco's observation is a measurements vector. :return: numpy array containing the image that will be rendered to the screen """ image = [self.state[camera.value[0]] for camera in self.cameras] image = np.vstack(image) return image def get_target_success_rate(self) -> float: return self.target_success_rate