# Copyright 2018 Tensorforce Team. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
import numpy as np
from tensorforce import TensorforceError
from tensorforce.environments import Environment
[docs]class OpenAIGym(Environment):
"""
[OpenAI Gym](https://gym.openai.com/) environment adapter (specification key: `gym`,
`openai_gym`).
May require:
```bash
pip install gym[all]
```
Args:
level (string): Gym id
(<span style="color:#C00000"><b>required</b></span>).
visualize (bool): Whether to visualize interaction
(<span style="color:#00C000"><b>default</b></span>: false).
max_episode_timesteps (false | int > 0): Whether to terminate an episode after a while,
and if so, maximum number of timesteps per episode
(<span style="color:#00C000"><b>default</b></span>: Gym default).
terminal_reward (float): Additional reward for early termination, if otherwise
indistinguishable from termination due to maximum number of timesteps
(<span style="color:#00C000"><b>default</b></span>: Gym default).
reward_threshold (float): Gym environment argument, the reward threshold before the task is
considered solved
(<span style="color:#00C000"><b>default</b></span>: Gym default).
tags (dict): Gym environment argument, a set of arbitrary key-value tags on this
environment, including simple property=True tags
(<span style="color:#00C000"><b>default</b></span>: Gym default).
monitor_directory (string): Monitor output directory
(<span style="color:#00C000"><b>default</b></span>: none).
kwargs: Additional Gym environment arguments.
"""
@classmethod
def levels(cls):
import gym
return list(gym.envs.registry.env_specs)
def __init__(
self, level, visualize=False, max_episode_timesteps=None, terminal_reward=0.0,
reward_threshold=None, tags=None, monitor_directory=None, **kwargs
):
import gym
import gym.wrappers
# Find level
if level not in gym.envs.registry.env_specs:
if level + '-v0' in gym.envs.registry.env_specs:
level = level + '-v0'
else:
for name in gym.envs.registry.env_specs:
if level == name[:name.rindex('-v')]:
level = name
break
assert level in self.__class__.levels()
self._max_episode_timesteps = max_episode_timesteps
self.terminal_reward = terminal_reward
# Check/update attributes
requires_register = False
if self._max_episode_timesteps is None:
self._max_episode_timesteps = gym.envs.registry.env_specs[level].max_episode_steps
if self._max_episode_timesteps is None:
self._max_episode_timesteps = False
elif self._max_episode_timesteps != gym.envs.registry.env_specs[level].max_episode_steps:
if not (
(self._max_episode_timesteps is False) and
(gym.envs.registry.env_specs[level].max_episode_steps is None)
):
requires_register = True
if reward_threshold is None:
reward_threshold = gym.envs.registry.env_specs[level].reward_threshold
elif reward_threshold != gym.envs.registry.env_specs[level].reward_threshold:
requires_register = True
if tags is None:
tags = dict(gym.envs.registry.env_specs[level].tags)
if 'wrapper_config.TimeLimit.max_episode_steps' in tags and \
max_episode_timesteps is not None:
tags.pop('wrapper_config.TimeLimit.max_episode_steps')
elif tags != gym.envs.registry.env_specs[level].tags:
requires_register = True
# Modified specification
if requires_register:
entry_point = gym.envs.registry.env_specs[level]._entry_point
_kwargs = dict(gym.envs.registry.env_specs[level]._kwargs)
nondeterministic = gym.envs.registry.env_specs[level].nondeterministic
if self._max_episode_timesteps is False:
max_episode_steps = None
else:
max_episode_steps = self._max_episode_timesteps
if '-v' in level and level[level.rindex('-v') + 2:].isdigit():
version = int(level[level.rindex('-v') + 2:])
level = level[:level.rindex('-v') + 2]
else:
version = -1
while True:
version += 1
if level + str(version) not in gym.envs.registry.env_specs:
level = level + str(version)
break
gym.register(
id=level, entry_point=entry_point, reward_threshold=reward_threshold,
kwargs=_kwargs, nondeterministic=nondeterministic, tags=tags,
max_episode_steps=max_episode_steps
)
assert level in self.__class__.levels()
self.level = level
self.visualize = visualize
self.create_gym(**kwargs)
if monitor_directory is not None:
self.environment = gym.wrappers.Monitor(
env=self.environment, directory=monitor_directory
)
self.states_spec = OpenAIGym.specs_from_gym_space(
space=self.environment.observation_space, ignore_value_bounds=True # TODO: not ignore?
)
self.actions_spec = OpenAIGym.specs_from_gym_space(
space=self.environment.action_space, ignore_value_bounds=False
)
def create_gym(self, **kwargs):
import gym
self.environment = gym.make(id=self.level, **kwargs)
def __str__(self):
return super().__str__() + '({})'.format(self.level)
def states(self):
return self.states_spec
def actions(self):
return self.actions_spec
def max_episode_timesteps(self):
if self._max_episode_timesteps is False:
return super().max_episode_timesteps()
else:
return self._max_episode_timesteps
def close(self):
self.environment.close()
self.environment = None
def reset(self):
import gym.wrappers
if isinstance(self.environment, gym.wrappers.Monitor):
self.environment.stats_recorder.done = True
states = self.environment.reset()
self.timestep = 0
return OpenAIGym.flatten_state(state=states)
def execute(self, actions):
if self.visualize:
self.environment.render()
actions = OpenAIGym.unflatten_action(action=actions)
states, reward, terminal, _ = self.environment.step(actions)
self.timestep += 1
if self.timestep == self._max_episode_timesteps:
assert terminal
terminal = 2
elif terminal:
assert self._max_episode_timesteps is False or \
self.timestep < self._max_episode_timesteps
reward += self.terminal_reward
terminal = 1
else:
terminal = 0
return OpenAIGym.flatten_state(state=states), terminal, reward
@staticmethod
def specs_from_gym_space(space, ignore_value_bounds):
import gym
if isinstance(space, gym.spaces.Discrete):
return dict(type='int', shape=(), num_values=space.n)
elif isinstance(space, gym.spaces.MultiBinary):
return dict(type='bool', shape=space.n)
elif isinstance(space, gym.spaces.MultiDiscrete):
num_discrete_space = len(space.nvec)
if (space.nvec == space.nvec[0]).all():
return dict(type='int', shape=num_discrete_space, num_values=space.nvec[0])
else:
specs = dict()
for n in range(num_discrete_space):
specs['gymmdc{}'.format(n)] = dict(
type='int', shape=(), num_values=space.nvec[n]
)
return specs
elif isinstance(space, gym.spaces.Box):
if ignore_value_bounds:
return dict(type='float', shape=space.shape)
elif (space.low == space.low[0]).all() and (space.high == space.high[0]).all():
return dict(
type='float', shape=space.shape, min_value=space.low[0],
max_value=space.high[0]
)
else:
specs = dict()
low = space.low.flatten()
high = space.high.flatten()
for n in range(low.shape[0]):
specs['gymbox{}'.format(n)] = dict(
type='float', shape=(), min_value=low[n], max_value=high[n]
)
return specs
elif isinstance(space, gym.spaces.Tuple):
specs = dict()
n = 0
for n, space in enumerate(space.spaces):
spec = OpenAIGym.specs_from_gym_space(
space=space, ignore_value_bounds=ignore_value_bounds
)
if 'type' in spec:
specs['gymtpl{}'.format(n)] = spec
else:
for name, spec in spec.items():
specs['gymtpl{}-{}'.format(n, name)] = spec
return specs
elif isinstance(space, gym.spaces.Dict):
specs = dict()
for space_name, space in space.spaces.items():
spec = OpenAIGym.specs_from_gym_space(
space=space, ignore_value_bounds=ignore_value_bounds
)
if 'type' in spec:
specs[space_name] = spec
else:
for name, spec in spec.items():
specs['{}-{}'.format(space_name, name)] = spec
return specs
else:
raise TensorforceError('Unknown Gym space.')
@staticmethod
def flatten_state(state):
if isinstance(state, tuple):
states = dict()
for n, state in enumerate(state):
state = OpenAIGym.flatten_state(state=state)
if isinstance(state, dict):
for name, state in state.items():
states['gymtpl{}-{}'.format(n, name)] = state
else:
states['gymtpl{}'.format(n)] = state
return states
elif isinstance(state, dict):
states = dict()
for state_name, state in state.items():
state = OpenAIGym.flatten_state(state=state)
if isinstance(state, dict):
for name, state in state.items():
states['{}-{}'.format(state_name, name)] = state
else:
states[state_name] = state
return states
elif np.isinf(state).any() or np.isnan(state).any():
raise TensorforceError("State contains inf or nan.")
else:
return state
@staticmethod
def unflatten_action(action):
if not isinstance(action, dict):
if np.isinf(action).any() or np.isnan(action).any():
raise TensorforceError("Action contains inf or nan.")
return action
elif all(name.startswith('gymmdc') for name in action) or \
all(name.startswith('gymbox') for name in action) or \
all(name.startswith('gymtpl') for name in action):
space_type = next(iter(action))[:6]
actions = list()
n = 0
while True:
if any(name.startswith(space_type + str(n) + '-') for name in action):
inner_action = {
name[name.index('-') + 1:] for name, inner_action in action.items()
if name.startswith(space_type + str(n))
}
actions.append(OpenAIGym.unflatten_action(action=inner_action))
elif any(name == space_type + str(n) for name in action):
actions.append(OpenAIGym.unflatten_action(action=action[space_type + str(n)]))
else:
break
n += 1
return tuple(actions)
else:
actions = dict()
for name, action in action.items():
if '-' in name:
name, inner_name = name.split('-', 1)
if name not in actions:
actions[name] = dict()
actions[name][inner_name] = action
else:
actions[name] = action
for name, action in actions.items():
if isinstance(action, dict):
actions[name] = OpenAIGym.unflatten_action(action=action)
return actions