# Copyright 2018 Tensorforce Team. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
import tensorflow as tf
from tensorforce import TensorforceError, util
from tensorforce.core import layer_modules, Module
from tensorforce.core.distributions import Distribution
[docs]class Categorical(Distribution):
"""
Categorical distribution, for discrete integer actions (specification key: `categorical`).
Args:
name (string): Distribution name
(<span style="color:#0000C0"><b>internal use</b></span>).
action_spec (specification): Action specification
(<span style="color:#0000C0"><b>internal use</b></span>).
embedding_shape (iter[int > 0]): Embedding shape
(<span style="color:#0000C0"><b>internal use</b></span>).
advantage_based (bool): Whether to compute action values as state value plus advantage
(<span style="color:#00C000"><b>default</b></span>: false).
summary_labels ('all' | iter[string]): Labels of summaries to record
(<span style="color:#00C000"><b>default</b></span>: inherit value of parent module).
"""
def __init__(
self, name, action_spec, embedding_shape, advantage_based=False, summary_labels=None
):
super().__init__(
name=name, action_spec=action_spec, embedding_shape=embedding_shape,
summary_labels=summary_labels
)
input_spec = dict(type='float', shape=self.embedding_shape)
num_values = self.action_spec['num_values']
self.state_value = None
if len(self.embedding_shape) == 1:
action_size = util.product(xs=self.action_spec['shape'])
self.action_values = self.add_module(
name='action_values', module='linear', modules=layer_modules,
size=(action_size * num_values), input_spec=input_spec
)
if advantage_based:
self.state_value = self.add_module(
name='states_value', module='linear', modules=layer_modules, size=action_size,
input_spec=input_spec
)
else:
if advantage_based:
raise TensorforceError.invalid(
name=name, argument='advantage_based', condition='embedding shape'
)
if len(self.embedding_shape) < 1 or len(self.embedding_shape) > 3:
raise TensorforceError.value(
name=name, argument='embedding_shape', value=self.embedding_shape,
hint='invalid rank'
)
if self.embedding_shape[:-1] == self.action_spec['shape'][:-1]:
size = self.action_spec['shape'][-1]
elif self.embedding_shape[:-1] == self.action_spec['shape']:
size = 1
else:
raise TensorforceError.value(
name=name, argument='embedding_shape', value=self.embedding_shape,
hint='not flattened and incompatible with action shape'
)
self.action_values = self.add_module(
name='action_values', module='linear', modules=layer_modules,
size=(size * num_values), input_spec=input_spec
)
Module.register_tensor(
name=(self.name + '-probabilities'),
spec=dict(type='float', shape=(self.action_spec['shape'] + (num_values,))),
batched=True
)
Module.register_tensor(
name=(self.name + '-values'),
spec=dict(type='float', shape=(self.action_spec['shape'] + (num_values,))),
batched=True
)
def tf_parametrize(self, x, mask):
epsilon = tf.constant(value=util.epsilon, dtype=util.tf_dtype(dtype='float'))
shape = (-1,) + self.action_spec['shape'] + (self.action_spec['num_values'],)
# Action values
action_values = self.action_values.apply(x=x)
action_values = tf.reshape(tensor=action_values, shape=shape)
if self.state_value is None:
# Implicit states value (TODO: experimental)
states_value = tf.reduce_logsumexp(input_tensor=action_values, axis=-1)
else:
# Explicit states value and advantage-based action values
states_value = self.state_value.apply(x=x)
states_value = tf.reshape(tensor=states_value, shape=shape[:-1])
action_values = tf.expand_dims(input=states_value, axis=-1) + action_values
action_values -= tf.math.reduce_mean(input_tensor=action_values, axis=-1, keepdims=True)
# TODO: before or after states_value?
min_float = tf.fill(
dims=tf.shape(input=action_values), value=util.tf_dtype(dtype='float').min
)
action_values = tf.where(condition=mask, x=action_values, y=min_float)
# Softmax for corresponding probabilities
probabilities = tf.nn.softmax(logits=action_values, axis=-1)
# "Normalized" logits
logits = tf.math.log(x=tf.maximum(x=probabilities, y=epsilon))
Module.update_tensor(name=(self.name + '-probabilities'), tensor=probabilities)
Module.update_tensor(name=(self.name + '-values'), tensor=action_values)
return logits, probabilities, states_value, action_values
def tf_sample(self, parameters, temperature):
logits, probabilities, _, action_values = parameters
summary_probs = probabilities
for _ in range(len(self.action_spec['shape'])):
summary_probs = tf.math.reduce_mean(input_tensor=summary_probs, axis=1)
logits, probabilities = self.add_summary(
label=('distributions', 'categorical'), name='probabilities', tensor=summary_probs,
pass_tensors=(logits, probabilities), enumerate_last_rank=True
)
one = tf.constant(value=1.0, dtype=util.tf_dtype(dtype='float'))
epsilon = tf.constant(value=util.epsilon, dtype=util.tf_dtype(dtype='float'))
# Deterministic: maximum likelihood action
definite = tf.argmax(input=action_values, axis=-1)
definite = tf.dtypes.cast(x=definite, dtype=util.tf_dtype('int'))
# Set logits to minimal value
min_float = tf.fill(dims=tf.shape(input=logits), value=util.tf_dtype(dtype='float').min)
logits = logits / temperature
logits = tf.where(condition=(probabilities < epsilon), x=min_float, y=logits)
# Non-deterministic: sample action using Gumbel distribution
uniform_distribution = tf.random.uniform(
shape=tf.shape(input=logits), minval=epsilon, maxval=(one - epsilon),
dtype=util.tf_dtype(dtype='float')
)
gumbel_distribution = -tf.math.log(x=-tf.math.log(x=uniform_distribution))
sampled = tf.argmax(input=(logits + gumbel_distribution), axis=-1)
sampled = tf.dtypes.cast(x=sampled, dtype=util.tf_dtype('int'))
return tf.where(condition=(temperature < epsilon), x=definite, y=sampled)
def tf_log_probability(self, parameters, action):
logits, _, _, _ = parameters
if util.tf_dtype(dtype='int') not in (tf.int32, tf.int64):
action = tf.dtypes.cast(x=action, dtype=tf.int32)
logits = tf.gather(
params=logits, indices=tf.expand_dims(input=action, axis=-1), batch_dims=-1
)
return tf.squeeze(input=logits, axis=-1)
def tf_entropy(self, parameters):
logits, probabilities, _, _ = parameters
return -tf.reduce_sum(input_tensor=(probabilities * logits), axis=-1)
def tf_kl_divergence(self, parameters1, parameters2):
logits1, probabilities1, _, _ = parameters1
logits2, _, _, _ = parameters2
log_prob_ratio = logits1 - logits2
return tf.reduce_sum(input_tensor=(probabilities1 * log_prob_ratio), axis=-1)
def tf_action_value(self, parameters, action=None):
_, _, _, action_values = parameters
if action is not None:
if util.tf_dtype(dtype='int') not in (tf.int32, tf.int64):
action = tf.dtypes.cast(x=action, dtype=tf.int32)
action = tf.expand_dims(input=action, axis=-1)
action_values = tf.gather(params=action_values, indices=action, batch_dims=-1)
action_values = tf.squeeze(input=action_values, axis=-1)
return action_values # states_value + tf.squeeze(input=logits, axis=-1)
def tf_states_value(self, parameters):
_, _, states_value, _ = parameters
return states_value