# Copyright 2018 Tensorforce Team. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
import numpy as np
from tensorforce import TensorforceError, util
from tensorforce.environments import Environment
[docs]class OpenAIGym(Environment):
"""
[OpenAI Gym](https://gym.openai.com/) environment adapter (specification key: `gym`,
`openai_gym`).
May require:
```bash
pip3 install gym
pip3 install gym[all]
```
Args:
level (string | gym.Env): Gym id or instance
(<span style="color:#C00000"><b>required</b></span>).
visualize (bool): Whether to visualize interaction
(<span style="color:#00C000"><b>default</b></span>: false).
max_episode_steps (false | int > 0): Whether to terminate an episode after a while,
and if so, maximum number of timesteps per episode
(<span style="color:#00C000"><b>default</b></span>: Gym default).
terminal_reward (float): Additional reward for early termination, if otherwise
indistinguishable from termination due to maximum number of timesteps
(<span style="color:#00C000"><b>default</b></span>: Gym default).
reward_threshold (float): Gym environment argument, the reward threshold before the task is
considered solved
(<span style="color:#00C000"><b>default</b></span>: Gym default).
drop_states_indices (list[int]): Drop states indices
(<span style="color:#00C000"><b>default</b></span>: none).
visualize_directory (string): Visualization output directory
(<span style="color:#00C000"><b>default</b></span>: none).
kwargs: Additional Gym environment arguments.
"""
@classmethod
def levels(cls):
import gym
return list(gym.envs.registry.env_specs)
@classmethod
def create_level(cls, level, max_episode_steps, reward_threshold, **kwargs):
import gym
requires_register = False
# Find level
if level not in gym.envs.registry.env_specs:
if max_episode_steps is None: # interpret as false if level does not exist
max_episode_steps = False
env_specs = list(gym.envs.registry.env_specs)
if level + '-v0' in gym.envs.registry.env_specs:
env_specs.insert(0, level + '-v0')
for name in env_specs:
if level == name[:name.rindex('-v')]:
if max_episode_steps is False and \
gym.envs.registry.env_specs[name].max_episode_steps is not None:
continue
elif max_episode_steps != gym.envs.registry.env_specs[name].max_episode_steps:
continue
level = name
break
else:
level = env_specs[0]
requires_register = True
assert level in cls.levels()
# Check/update attributes
if max_episode_steps is None:
max_episode_steps = gym.envs.registry.env_specs[level].max_episode_steps
if max_episode_steps is None:
max_episode_steps = False
elif max_episode_steps != gym.envs.registry.env_specs[level].max_episode_steps:
if not (
(max_episode_steps is False) and
(gym.envs.registry.env_specs[level].max_episode_steps is None)
):
requires_register = True
if reward_threshold is None:
reward_threshold = gym.envs.registry.env_specs[level].reward_threshold
elif reward_threshold != gym.envs.registry.env_specs[level].reward_threshold:
requires_register = True
# if tags is None:
# tags = dict(gym.envs.registry.env_specs[level].tags)
# if 'wrapper_config.TimeLimit.max_episode_steps' in tags and \
# max_episode_steps is not None:
# tags.pop('wrapper_config.TimeLimit.max_episode_steps')
elif tags != gym.envs.registry.env_specs[level].tags:
requires_register = True
# Modified specification
if requires_register:
entry_point = gym.envs.registry.env_specs[level].entry_point
_kwargs = dict(gym.envs.registry.env_specs[level]._kwargs)
nondeterministic = gym.envs.registry.env_specs[level].nondeterministic
if '-v' in level and level[level.rindex('-v') + 2:].isdigit():
version = int(level[level.rindex('-v') + 2:])
level = level[:level.rindex('-v') + 2]
else:
version = -1
while True:
version += 1
if level + str(version) not in gym.envs.registry.env_specs:
level = level + str(version)
break
gym.register(
id=level, entry_point=entry_point, reward_threshold=reward_threshold,
nondeterministic=nondeterministic,
max_episode_steps=(None if max_episode_steps is False else max_episode_steps),
kwargs=_kwargs
)
assert level in cls.levels()
return gym.make(id=level, **kwargs), max_episode_steps
def __init__(
self, level, visualize=False, max_episode_steps=None, terminal_reward=0.0,
reward_threshold=None, drop_states_indices=None, visualize_directory=None, **kwargs
):
super().__init__()
import gym
import gym.wrappers
self.level = level
self.visualize = visualize
self.terminal_reward = terminal_reward
if isinstance(level, gym.Env):
self.environment = self.level
self.level = self.level.__class__.__name__
self.max_episode_steps = max_episode_steps
elif isinstance(level, type) and issubclass(level, gym.Env):
self.environment = self.level(**kwargs)
self.level = self.level.__class__.__name__
self.max_episode_steps = max_episode_steps
else:
self.environment, self.max_episode_steps = self.__class__.create_level(
level=self.level, max_episode_steps=max_episode_steps,
reward_threshold=reward_threshold, **kwargs
)
if visualize_directory is not None:
self.environment = gym.wrappers.Monitor(
env=self.environment, directory=visualize_directory
)
self.states_spec = OpenAIGym.specs_from_gym_space(
space=self.environment.observation_space, ignore_value_bounds=True # TODO: not ignore?
)
if drop_states_indices is None:
self.drop_states_indices = None
else:
assert util.is_atomic_values_spec(values_spec=self.states_spec)
self.drop_states_indices = sorted(drop_states_indices)
assert len(self.states_spec['shape']) == 1
num_dropped = len(self.drop_states_indices)
self.states_spec['shape'] = (self.states_spec['shape'][0] - num_dropped,)
self.actions_spec = OpenAIGym.specs_from_gym_space(
space=self.environment.action_space, ignore_value_bounds=False
)
def __str__(self):
return super().__str__() + '({})'.format(self.level)
def states(self):
return self.states_spec
def actions(self):
return self.actions_spec
def max_episode_timesteps(self):
if self.max_episode_steps is False:
return super().max_episode_timesteps()
else:
return self.max_episode_steps
def close(self):
self.environment.close()
self.environment = None
def reset(self):
import gym.wrappers
if isinstance(self.environment, gym.wrappers.Monitor):
self.environment.stats_recorder.done = True
states = self.environment.reset()
self.timestep = 0
states = OpenAIGym.flatten_state(state=states, states_spec=self.states_spec)
if self.drop_states_indices is not None:
for index in reversed(self.drop_states_indices):
states = np.concatenate([states[:index], states[index + 1:]])
return states
def execute(self, actions):
if self.visualize:
self.environment.render()
actions = OpenAIGym.unflatten_action(action=actions)
states, reward, terminal, _ = self.environment.step(actions)
self.timestep += 1
if self.timestep == self.max_episode_steps:
assert terminal
terminal = 2
elif terminal:
assert self.max_episode_steps is None or self.max_episode_steps is False or \
self.timestep < self.max_episode_steps
reward += self.terminal_reward
terminal = 1
else:
terminal = 0
states = OpenAIGym.flatten_state(state=states, states_spec=self.states_spec)
if self.drop_states_indices is not None:
for index in reversed(self.drop_states_indices):
states = np.concatenate([states[:index], states[index + 1:]])
return states, terminal, reward
@staticmethod
def specs_from_gym_space(space, ignore_value_bounds):
import gym
if isinstance(space, gym.spaces.Discrete):
return dict(type='int', shape=(), num_values=space.n)
elif isinstance(space, gym.spaces.MultiBinary):
return dict(type='bool', shape=space.n)
elif isinstance(space, gym.spaces.MultiDiscrete):
if (space.nvec == space.nvec.item(0)).all():
return dict(type='int', shape=space.nvec.shape, num_values=space.nvec.item(0))
else:
specs = dict()
nvec = space.nvec.flatten()
shape = '-'.join(str(x) for x in space.nvec.shape)
for n in range(nvec.shape[0]):
specs['gymmdc{}-{}'.format(n, shape)] = dict(
type='int', shape=(), num_values=nvec[n]
)
return specs
elif isinstance(space, gym.spaces.Box):
if ignore_value_bounds:
return dict(type='float', shape=space.shape)
elif (space.low == space.low.item(0)).all() and (space.high == space.high.item(0)).all():
return dict(
type='float', shape=space.shape, min_value=space.low.item(0),
max_value=space.high.item(0)
)
else:
specs = dict()
low = space.low.flatten()
high = space.high.flatten()
shape = '-'.join(str(x) for x in space.low.shape)
for n in range(low.shape[0]):
specs['gymbox{}-{}'.format(n, shape)] = dict(
type='float', shape=(), min_value=low[n], max_value=high[n]
)
return specs
elif isinstance(space, gym.spaces.Tuple):
specs = dict()
n = 0
for n, space in enumerate(space.spaces):
spec = OpenAIGym.specs_from_gym_space(
space=space, ignore_value_bounds=ignore_value_bounds
)
if 'type' in spec:
specs['gymtpl{}'.format(n)] = spec
else:
for name, spec in spec.items():
specs['gymtpl{}-{}'.format(n, name)] = spec
return specs
elif isinstance(space, gym.spaces.Dict):
specs = dict()
for space_name, space in space.spaces.items():
spec = OpenAIGym.specs_from_gym_space(
space=space, ignore_value_bounds=ignore_value_bounds
)
if 'type' in spec:
specs[space_name] = spec
else:
for name, spec in spec.items():
specs['{}-{}'.format(space_name, name)] = spec
return specs
else:
raise TensorforceError('Unknown Gym space.')
@staticmethod
def flatten_state(state, states_spec):
if isinstance(state, tuple):
states = dict()
for n, state in enumerate(state):
if 'gymtpl{}'.format(n) in states_spec:
spec = states_spec['gymtpl{}'.format(n)]
else:
spec = None
for name in states_spec:
if name.startswith('gymtpl{}-'.format(n)):
assert spec is None
spec = states_spec[name]
assert spec is not None
state = OpenAIGym.flatten_state(state=state, states_spec=spec)
if isinstance(state, dict):
for name, state in state.items():
states['gymtpl{}-{}'.format(n, name)] = state
else:
states['gymtpl{}'.format(n)] = state
return states
elif isinstance(state, dict):
states = dict()
for state_name, state in state.items():
if state_name in states_spec:
spec = states_spec[state_name]
else:
spec = None
for name in states_spec:
if name.startswith('{}-'.format(state_name)):
assert spec is None
spec = states_spec[name]
assert spec is not None
state = OpenAIGym.flatten_state(state=state, states_spec=spec)
if isinstance(state, dict):
for name, state in state.items():
states['{}-{}'.format(state_name, name)] = state
else:
states[state_name] = state
return states
elif np.isinf(state).any() or np.isnan(state).any():
raise TensorforceError("State contains inf or nan.")
elif 'gymbox0' in states_spec:
states = dict()
state = state.flatten()
shape = '-'.join(str(x) for x in state.shape)
for n in range(state.shape[0]):
states['gymbox{}-{}'.format(n, shape)] = state[n]
return states
elif 'gymmdc0' in states_spec:
states = dict()
state = state.flatten()
shape = '-'.join(str(x) for x in state.shape)
for n in range(state.shape[0]):
states['gymmdc{}-{}'.format(n, shape)] = state[n]
return states
else:
return state
@staticmethod
def unflatten_action(action):
if not isinstance(action, dict):
if np.isinf(action).any() or np.isnan(action).any():
raise TensorforceError("Action contains inf or nan.")
return action
elif all(name.startswith('gymmdc') for name in action) or \
all(name.startswith('gymbox') for name in action) or \
all(name.startswith('gymtpl') for name in action):
space_type = next(iter(action))[:6]
actions = list()
n = 0
while True:
if any(name.startswith(space_type + str(n) + '-') for name in action):
inner_action = {
name[name.index('-') + 1:] for name, inner_action in action.items()
if name.startswith(space_type + str(n))
}
actions.append(OpenAIGym.unflatten_action(action=inner_action))
elif any(name == space_type + str(n) for name in action):
actions.append(OpenAIGym.unflatten_action(action=action[space_type + str(n)]))
else:
break
n += 1
if all(name.startswith('gymmdc') for name in action) or \
all(name.startswith('gymbox') for name in action):
name = next(iter(action))
shape = tuple(int(x) for x in name[name.index('-') + 1:].split('-'))
return np.array(object=actions).reshape(shape=shape)
else:
return tuple(actions)
else:
actions = dict()
for name, action in action.items():
if '-' in name:
name, inner_name = name.split('-', 1)
if name not in actions:
actions[name] = dict()
actions[name][inner_name] = action
else:
actions[name] = action
for name, action in actions.items():
if isinstance(action, dict):
actions[name] = OpenAIGym.unflatten_action(action=action)
return actions