Source code for tensorforce.execution.runner

# Copyright 2018 Tensorforce Team. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

from collections import OrderedDict
import time
from tqdm import tqdm

import numpy as np

from tensorforce import Agent, Environment, TensorforceError, util
from tensorforce.environments import RemoteEnvironment


[docs]class Runner(object): """ Tensorforce runner utility. Args: agent (specification | Agent object): Agent specification or object, the latter is not closed automatically as part of `runner.close()` (<span style="color:#C00000"><b>required</b></span>). environment (specification | Environment object): Environment specification or object, the latter is not closed automatically as part of `runner.close()` (<span style="color:#C00000"><b>required</b></span>, or alternatively `environments`, invalid for "socket-client" remote mode). max_episode_timesteps (int > 0): Maximum number of timesteps per episode, overwrites the environment default if defined (<span style="color:#00C000"><b>default</b></span>: environment default, invalid for "socket-client" remote mode). evaluation (bool): Whether to run the (last if multiple) environment in evaluation mode (<span style="color:#00C000"><b>default</b></span>: no evaluation). num_parallel (int > 0): Number of environment instances to execute in parallel (<span style="color:#00C000"><b>default</b></span>: no parallel execution, implicitly specified by `environments`). environments (list[specification | Environment object]): Environment specifications or objects to execute in parallel, the latter are not closed automatically as part of `runner.close()` (<span style="color:#00C000"><b>default</b></span>: no parallel execution, alternatively specified via `environment` and `num_parallel`, invalid for "socket-client" remote mode). remote ("multiprocessing" | "socket-client"): Communication mode for remote environment execution of parallelized environment execution, "socket-client" mode requires a corresponding "socket-server" running (<span style="color:#00C000"><b>default</b></span>: local execution). blocking (bool): Whether remote environment calls should be blocking, only valid if remote mode given (<span style="color:#00C000"><b>default</b></span>: not blocking, invalid unless "multiprocessing" or "socket-client" remote mode). host (str, iter[str]): Socket server hostname(s) or IP address(es) (<span style="color:#C00000"><b>required</b></span> only for "socket-client" remote mode). port (int, iter[int]): Socket server port(s), increasing sequence if single host and port given (<span style="color:#C00000"><b>required</b></span> only for "socket-client" remote mode). """ def __init__( self, agent, environment=None, max_episode_timesteps=None, evaluation=False, num_parallel=None, environments=None, remote=None, blocking=False, host=None, port=None ): if environment is None and environments is None: assert num_parallel is not None and remote == 'socket-client' environments = [None for _ in range(num_parallel)] elif environment is None: assert environments is not None assert num_parallel is None or num_parallel == len(environments) if not util.is_iterable(x=environments): raise TensorforceError.type( name='parallel-runner', argument='environments', value=environments ) elif len(environments) == 0: raise TensorforceError.value( name='parallel-runner', argument='environments', value=environments ) num_parallel = len(environments) environments = list(environments) elif num_parallel is None: assert environments is None num_parallel = 1 environments = [environment] else: assert environments is None assert not isinstance(environment, Environment) environments = [environment for _ in range(num_parallel)] if port is None or isinstance(port, int): if isinstance(host, str): port = [port + n for n in range(num_parallel)] else: port = [port for _ in range(num_parallel)] else: assert len(port) == num_parallel if host is None or isinstance(host, str): host = [host for _ in range(num_parallel)] else: assert len(host) == num_parallel self.environments = list() self.is_environment_external = isinstance(environments[0], Environment) environment = Environment.create( environment=environments[0], max_episode_timesteps=max_episode_timesteps, remote=remote, blocking=blocking, host=host[0], port=port[0] ) self.is_environment_remote = isinstance(environment, RemoteEnvironment) states = environment.states() actions = environment.actions() self.environments.append(environment) for n, environment in enumerate(environments[1:], start=1): assert isinstance(environment, Environment) == self.is_environment_external environment = Environment.create( environment=environment, max_episode_timesteps=max_episode_timesteps, remote=remote, blocking=blocking, host=host[n], port=port[n] ) assert isinstance(environment, RemoteEnvironment) == self.is_environment_remote assert environment.states() == states assert environment.actions() == actions self.environments.append(environment) self.evaluation = evaluation self.is_agent_external = isinstance(agent, Agent) if num_parallel - int(self.evaluation) > 1: self.agent = Agent.create( agent=agent, environment=environment, parallel_interactions=(num_parallel - int(self.evaluation)) ) else: self.agent = Agent.create(agent=agent, environment=environment) def close(self): if hasattr(self, 'tqdm'): self.tqdm.close() if not self.is_agent_external: self.agent.close() if not self.is_environment_external: for environment in self.environments: environment.close() # TODO: make average reward another possible criteria for runner-termination
[docs] def run( self, # General num_episodes=None, num_timesteps=None, num_updates=None, # Parallel batch_agent_calls=False, sync_timesteps=False, sync_episodes=False, num_sleep_secs=0.001, # Callback callback=None, callback_episode_frequency=None, callback_timestep_frequency=None, # Tqdm use_tqdm=True, mean_horizon=1, # Evaluation evaluation=False, save_best_agent=None, evaluation_callback=None ): """ Run experiment. Args: num_episodes (int > 0): Number of episodes to run experiment (<span style="color:#00C000"><b>default</b></span>: no episode limit). num_timesteps (int > 0): Number of timesteps to run experiment (<span style="color:#00C000"><b>default</b></span>: no timestep limit). num_updates (int > 0): Number of agent updates to run experiment (<span style="color:#00C000"><b>default</b></span>: no update limit). batch_agent_calls (bool): Whether to batch agent calls for parallel environment execution (<span style="color:#00C000"><b>default</b></span>: separate call per environment). sync_timesteps (bool): Whether to synchronize parallel environment execution on timestep-level, implied by batch_agent_calls (<span style="color:#00C000"><b>default</b></span>: not synchronized unless batch_agent_calls). sync_episodes (bool): Whether to synchronize parallel environment execution on episode-level (<span style="color:#00C000"><b>default</b></span>: not synchronized). num_sleep_secs (float): Sleep duration if no environment is ready (<span style="color:#00C000"><b>default</b></span>: one milliseconds). callback ((Runner, parallel) -> bool): Callback function taking the runner instance plus parallel index and returning a boolean value indicating whether execution should continue (<span style="color:#00C000"><b>default</b></span>: callback always true). callback_episode_frequency (int): Episode interval between callbacks (<span style="color:#00C000"><b>default</b></span>: every episode). callback_timestep_frequency (int): Timestep interval between callbacks (<span style="color:#00C000"><b>default</b></span>: not specified). use_tqdm (bool): Whether to display a tqdm progress bar for the experiment run (<span style="color:#00C000"><b>default</b></span>: display progress bar). mean_horizon (int): Number of episodes progress bar values and evaluation score are averaged over (<span style="color:#00C000"><b>default</b></span>: not averaged). evaluation (bool): Whether to run in evaluation mode, only valid if a single environment (<span style="color:#00C000"><b>default</b></span>: no evaluation). save_best_agent (string): Directory to save the best version of the agent according to the evaluation score (<span style="color:#00C000"><b>default</b></span>: best agent is not saved). evaluation_callback (int | Runner -> float): Callback function taking the runner instance and returning an evaluation score (<span style="color:#00C000"><b>default</b></span>: cumulative evaluation reward averaged over mean_horizon episodes). """ # General if num_episodes is None: self.num_episodes = float('inf') else: self.num_episodes = num_episodes if num_timesteps is None: self.num_timesteps = float('inf') else: self.num_timesteps = num_timesteps if num_updates is None: self.num_updates = float('inf') else: self.num_updates = num_updates # Parallel self.batch_agent_calls = batch_agent_calls self.sync_timesteps = sync_timesteps or self.batch_agent_calls self.sync_episodes = sync_episodes self.num_sleep_secs = num_sleep_secs # Callback assert callback_episode_frequency is None or callback_timestep_frequency is None if callback_episode_frequency is None and callback_timestep_frequency is None: callback_episode_frequency = 1 if callback_episode_frequency is None: self.callback_episode_frequency = float('inf') else: self.callback_episode_frequency = callback_episode_frequency if callback_timestep_frequency is None: self.callback_timestep_frequency = float('inf') else: self.callback_timestep_frequency = callback_timestep_frequency if callback is None: self.callback = (lambda r, p: True) elif util.is_iterable(x=callback): def sequential_callback(runner, parallel): result = True for fn in callback: x = fn(runner, parallel) if isinstance(result, bool): result = result and x return result self.callback = sequential_callback else: def boolean_callback(runner, parallel): result = callback(runner, parallel) if isinstance(result, bool): return result else: return True self.callback = boolean_callback # Experiment statistics self.episode_rewards = list() self.episode_timesteps = list() self.episode_seconds = list() self.episode_agent_seconds = list() if self.is_environment_remote: self.episode_env_seconds = list() if self.evaluation or evaluation: self.evaluation_rewards = list() self.evaluation_timesteps = list() self.evaluation_seconds = list() self.evaluation_agent_seconds = list() if self.is_environment_remote: self.evaluation_env_seconds = list() if len(self.environments) == 1: # for tqdm self.episode_rewards = self.evaluation_rewards self.episode_timesteps = self.evaluation_timesteps self.episode_seconds = self.evaluation_seconds self.episode_agent_seconds = self.evaluation_agent_seconds if self.is_environment_remote: self.episode_env_seconds = self.evaluation_env_seconds else: # for tqdm self.evaluation_rewards = self.episode_rewards self.evaluation_timesteps = self.episode_timesteps self.evaluation_seconds = self.episode_seconds self.evaluation_agent_seconds = self.episode_agent_seconds if self.is_environment_remote: self.evaluation_env_seconds = self.episode_env_seconds # Timestep/episode/update counter self.timesteps = 0 self.episodes = 0 self.updates = 0 # Tqdm if use_tqdm: if hasattr(self, 'tqdm'): self.tqdm.close() assert self.num_episodes != float('inf') or self.num_timesteps != float('inf') inner_callback = self.callback if self.num_episodes != float('inf'): # Episode-based tqdm (default option if both num_episodes and num_timesteps set) assert self.num_episodes != float('inf') bar_format = ( '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}, reward={postfix[0]:.2f}, ts/ep=' '{postfix[1]}, sec/ep={postfix[2]:.2f}, ms/ts={postfix[3]:.1f}, agent=' '{postfix[4]:.1f}%]' ) postfix = [0.0, 0, 0.0, 0.0, 0.0] if self.is_environment_remote: bar_format = bar_format[:-1] + ', comm={postfix[5]:.1f}%]' postfix.append(0.0) self.tqdm = tqdm( desc='Episodes', total=self.num_episodes, bar_format=bar_format, initial=self.episodes, postfix=postfix ) self.tqdm_last_update = self.episodes def tqdm_callback(runner, parallel): mean_reward = float(np.mean(runner.evaluation_rewards[-mean_horizon:])) mean_ts_per_ep = int(np.mean(runner.episode_timesteps[-mean_horizon:])) mean_sec_per_ep = float(np.mean(runner.episode_seconds[-mean_horizon:])) mean_agent_sec = float(np.mean(runner.episode_agent_seconds[-mean_horizon:])) mean_ms_per_ts = mean_sec_per_ep * 1000.0 / mean_ts_per_ep mean_rel_agent = mean_agent_sec * 100.0 / mean_sec_per_ep runner.tqdm.postfix[0] = mean_reward runner.tqdm.postfix[1] = mean_ts_per_ep runner.tqdm.postfix[2] = mean_sec_per_ep runner.tqdm.postfix[3] = mean_ms_per_ts runner.tqdm.postfix[4] = mean_rel_agent if runner.is_environment_remote: mean_env_sec = float(np.mean(runner.episode_env_seconds[-mean_horizon:])) mean_rel_comm = (mean_agent_sec + mean_env_sec) * 100.0 / mean_sec_per_ep runner.tqdm.postfix[5] = mean_rel_comm runner.tqdm.update(n=(runner.episodes - runner.tqdm_last_update)) runner.tqdm_last_update = runner.episodes return inner_callback(runner, parallel) else: # Timestep-based tqdm self.tqdm = tqdm( desc='Timesteps', total=self.num_timesteps, initial=self.timesteps, postfix=dict(mean_reward='n/a') ) self.tqdm_last_update = self.timesteps def tqdm_callback(runner, parallel): # sum_timesteps_reward = sum(runner.timestep_rewards[num_mean_reward:]) # num_timesteps = min(num_mean_reward, runner.evaluation_timestep) # mean_reward = sum_timesteps_reward / num_episodes runner.tqdm.set_postfix(mean_reward='n/a') runner.tqdm.update(n=(runner.timesteps - runner.tqdm_last_update)) runner.tqdm_last_update = runner.timesteps return inner_callback(runner, parallel) self.callback = tqdm_callback # Evaluation if evaluation and (self.evaluation or len(self.environments) > 1): raise TensorforceError.unexpected() self.evaluation_run = self.evaluation or evaluation self.save_best_agent = save_best_agent if evaluation_callback is None: self.evaluation_callback = (lambda r: None) else: self.evaluation_callback = evaluation_callback if self.save_best_agent is not None: inner_evaluation_callback = self.evaluation_callback def mean_reward_callback(runner): result = inner_evaluation_callback(runner) if result is None: return float(np.mean(runner.evaluation_rewards[-mean_horizon:])) else: return result self.evaluation_callback = mean_reward_callback self.best_evaluation_score = None # Episode statistics self.episode_reward = [0.0 for _ in self.environments] self.episode_timestep = [0 for _ in self.environments] # if self.batch_agent_calls: # self.episode_agent_second = 0.0 # self.episode_start = time.time() if self.evaluation_run: self.episode_agent_second = [0.0 for _ in self.environments[:-1]] self.episode_start = [time.time() for _ in self.environments[:-1]] else: self.episode_agent_second = [0.0 for _ in self.environments] self.episode_start = [time.time() for _ in self.environments] self.evaluation_agent_second = 0.0 self.evaluation_start = time.time() # Values self.terminate = 0 self.prev_terminals = [-1 for _ in self.environments] self.states = [None for _ in self.environments] self.terminals = [None for _ in self.environments] self.rewards = [None for _ in self.environments] if self.evaluation_run: self.evaluation_internals = self.agent.initial_internals() # Required if agent was previously stopped mid-episode self.agent.reset() # Reset environments for environment in self.environments: environment.start_reset() # Runner loop while any(terminal <= 0 for terminal in self.prev_terminals): self.terminals = [None for _ in self.terminals] if self.batch_agent_calls: # Retrieve observations (only if not already terminated) while any(terminal is None for terminal in self.terminals): for n in range(len(self.environments)): if self.terminals[n] is not None: # Already received continue elif self.prev_terminals[n] <= 0: # Receive if not terminal observation = self.environments[n].receive_execute() if observation is None: continue self.states[n], self.terminals[n], self.rewards[n] = observation else: # Terminal self.states[n] = None self.terminals[n] = self.prev_terminals[n] self.rewards[n] = None self.handle_observe_joint() self.handle_act_joint() # Parallel environments loop no_environment_ready = True for n in range(len(self.environments)): if self.prev_terminals[n] > 0: # Continue if episode terminated (either sync_episodes or finished) self.terminals[n] = self.prev_terminals[n] continue elif self.batch_agent_calls: # Handled before parallel environments loop pass elif self.sync_timesteps: # Wait until environment is ready while True: observation = self.environments[n].receive_execute() if observation is not None: break else: # Check whether environment is ready, otherwise continue observation = self.environments[n].receive_execute() if observation is None: self.terminals[n] = self.prev_terminals[n] continue no_environment_ready = False if not self.batch_agent_calls: self.states[n], self.terminals[n], self.rewards[n] = observation # Check whether evaluation environment if self.evaluation_run and n == (len(self.environments) - 1): if self.terminals[n] == -1: # Initial act self.handle_act_evaluation() else: # Observe self.handle_observe_evaluation() if self.terminals[n] == 0: # Act self.handle_act_evaluation() else: # Terminal self.handle_terminal_evaluation() else: if self.terminals[n] == -1: # Initial act self.handle_act(parallel=n) else: # Observe self.handle_observe(parallel=n) if self.terminals[n] == 0: # Act self.handle_act(parallel=n) else: # Terminal self.handle_terminal(parallel=n) self.prev_terminals = list(self.terminals) # Sync_episodes: Reset if all episodes terminated if self.sync_episodes and all(terminal > 0 for terminal in self.terminals): num_episodes_left = self.num_episodes - self.episodes num_noneval_environments = len(self.environments) - int(self.evaluation_run) for n in range(min(num_noneval_environments, num_episodes_left)): self.prev_terminals[n] = -1 self.environments[n].start_reset() if self.evaluation_run and num_episodes_left > 0: self.prev_terminals[-1] = -1 self.environments[-1].start_reset() # Sleep if no environment was ready if no_environment_ready: time.sleep(self.num_sleep_secs)
def handle_act(self, parallel): if self.batch_agent_calls: self.environments[parallel].start_execute(actions=self.actions[parallel]) else: agent_start = time.time() actions = self.agent.act(states=self.states[parallel], parallel=parallel) self.episode_agent_second[parallel] += time.time() - agent_start self.environments[parallel].start_execute(actions=actions) # Update episode statistics self.episode_timestep[parallel] += 1 # Maximum number of timesteps or timestep callback (after counter increment!) self.timesteps += 1 if (( self.episode_timestep[parallel] % self.callback_timestep_frequency == 0 and not self.callback(self, parallel) ) or self.timesteps >= self.num_timesteps): self.terminate = 2 def handle_act_joint(self): parallel = [ n for n in range(len(self.environments) - int(self.evaluation_run)) if self.terminals[n] <= 0 ] if len(parallel) > 0: agent_start = time.time() self.actions = self.agent.act( states=[self.states[p] for p in parallel], parallel=parallel ) agent_second = (time.time() - agent_start) / len(parallel) for p in parallel: self.episode_agent_second[p] += agent_second self.actions = [ self.actions[parallel.index(n)] if n in parallel else None for n in range(len(self.environments)) ] if self.evaluation_run and self.terminals[-1] <= 0: agent_start = time.time() self.actions[-1], self.evaluation_internals = self.agent.act( states=self.states[-1], internals=self.evaluation_internals, evaluation=True ) self.episode_agent_second[-1] += time.time() - agent_start def handle_act_evaluation(self): if self.batch_agent_calls: actions = self.actions[-1] else: agent_start = time.time() actions, self.evaluation_internals = self.agent.act( states=self.states[-1], internals=self.evaluation_internals, evaluation=True ) self.evaluation_agent_second += time.time() - agent_start self.environments[-1].start_execute(actions=actions) # Update episode statistics self.episode_timestep[-1] += 1 # Maximum number of timesteps or timestep callback (after counter increment!) if self.evaluation_run and len(self.environments) == 1: self.timesteps += 1 if (( self.episode_timestep[-1] % self.callback_timestep_frequency == 0 and not self.callback(self, parallel) ) or self.timesteps >= self.num_timesteps): self.terminate = 2 def handle_observe(self, parallel): # Update episode statistics self.episode_reward[parallel] += self.rewards[parallel] # Not terminal but finished if self.terminals[parallel] == 0 and self.terminate == 2: self.terminals[parallel] = 2 # Observe unless batch_agent_calls if not self.batch_agent_calls: agent_start = time.time() updated = self.agent.observe( terminal=self.terminals[parallel], reward=self.rewards[parallel], parallel=parallel ) self.episode_agent_second[parallel] += time.time() - agent_start self.updates += int(updated) # Maximum number of updates (after counter increment!) if self.updates >= self.num_updates: self.terminate = 2 def handle_observe_joint(self): parallel = [ n for n in range(len(self.environments) - int(self.evaluation_run)) if self.prev_terminals[n] <= 0 and self.terminals[n] >= 0 ] if len(parallel) > 0: agent_start = time.time() updated = self.agent.observe( terminal=[self.terminals[p] for p in parallel], reward=[self.rewards[p] for p in parallel], parallel=parallel ) agent_second = (time.time() - agent_start) / len(parallel) for p in parallel: self.episode_agent_second[p] += agent_second self.updates += updated def handle_observe_evaluation(self): # Update episode statistics self.episode_reward[-1] += self.rewards[-1] # Reset agent if terminal if self.terminals[-1] > 0 or self.terminate == 2: agent_start = time.time() self.evaluation_agent_second += time.time() - agent_start def handle_terminal(self, parallel): # Update experiment statistics self.episode_rewards.append(self.episode_reward[parallel]) self.episode_timesteps.append(self.episode_timestep[parallel]) self.episode_seconds.append(time.time() - self.episode_start[parallel]) self.episode_agent_seconds.append(self.episode_agent_second[parallel]) if self.is_environment_remote: self.episode_env_seconds.append(self.environments[parallel].episode_seconds) # Maximum number of episodes or episode callback (after counter increment!) self.episodes += 1 if self.terminate == 0 and (( self.episodes % self.callback_episode_frequency == 0 and not self.callback(self, parallel) ) or self.episodes >= self.num_episodes): self.terminate = 1 # Reset episode statistics self.episode_reward[parallel] = 0.0 self.episode_timestep[parallel] = 0 self.episode_agent_second[parallel] = 0.0 self.episode_start[parallel] = time.time() # Reset environment if self.terminate == 0 and not self.sync_episodes: self.terminals[parallel] = -1 self.environments[parallel].start_reset() def handle_terminal_evaluation(self): # Update experiment statistics self.evaluation_rewards.append(self.episode_reward[-1]) self.evaluation_timesteps.append(self.episode_timestep[-1]) self.evaluation_seconds.append(time.time() - self.evaluation_start) self.evaluation_agent_seconds.append(self.evaluation_agent_second) if self.is_environment_remote: self.evaluation_env_seconds.append(self.environments[-1].episode_seconds) # Evaluation callback if self.save_best_agent is not None: evaluation_score = self.evaluation_callback(self) assert isinstance(evaluation_score, float) if self.best_evaluation_score is None: self.best_evaluation_score = evaluation_score elif evaluation_score > self.best_evaluation_score: self.best_evaluation_score = evaluation_score self.agent.save( directory=self.save_best_agent, filename='best-model', append_timestep=False ) else: self.evaluation_callback(self) # Maximum number of episodes or episode callback (after counter increment!) if self.evaluation_run and len(self.environments) == 1: self.episodes += 1 if self.terminate == 0 and (( self.episodes % self.callback_episode_frequency == 0 and not self.callback(self, 0) ) or self.episodes >= self.num_episodes): self.terminate = 1 # Reset episode statistics self.episode_reward[-1] = 0.0 self.episode_timestep[-1] = 0 self.evaluation_agent_second = 0.0 self.evaluation_start = time.time() # Reset environment if self.terminate == 0 and not self.sync_episodes: self.terminals[-1] = 0 self.environments[-1].start_reset() self.evaluation_internals = self.agent.initial_internals()