Source code for tensorforce.core.layers.misc

# Copyright 2018 Tensorforce Team. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

from collections import Counter

import tensorflow as tf

from tensorforce import TensorforceError, util
import tensorforce.core
from tensorforce.core import Module, parameter_modules
from tensorforce.core.layers import Layer


[docs]class Activation(Layer): """ Activation layer (specification key: `activation`). Args: name (string): Layer name (<span style="color:#00C000"><b>default</b></span>: internally chosen). nonlinearity ('crelu' | 'elu' | 'leaky-relu' | 'none' | 'relu' | 'selu' | 'sigmoid' | 'softmax' | 'softplus' | 'softsign' | 'swish' | 'tanh'): Nonlinearity (<span style="color:#C00000"><b>required</b></span>). input_spec (specification): Input tensor specification (<span style="color:#00C000"><b>internal use</b></span>). summary_labels ('all' | iter[string]): Labels of summaries to record (<span style="color:#00C000"><b>default</b></span>: inherit value of parent module). """ def __init__( self, name, nonlinearity, input_spec=None, summary_labels=None ): super().__init__(name=name, input_spec=input_spec, summary_labels=summary_labels) # Nonlinearity if nonlinearity not in ( 'crelu', 'elu', 'leaky-relu', 'none', 'relu', 'selu', 'sigmoid', 'softmax', 'softplus', 'softsign', 'swish', 'tanh' ): raise TensorforceError.value( name='activation', argument='nonlinearity', value=nonlinearity ) self.nonlinearity = nonlinearity def default_input_spec(self): return dict(type='float', shape=None) def tf_apply(self, x): if self.nonlinearity == 'crelu': x = tf.nn.crelu(features=x) elif self.nonlinearity == 'elu': x = tf.nn.elu(features=x) elif self.nonlinearity == 'leaky-relu': x = tf.nn.leaky_relu(features=x, alpha=0.2) # alpha argument??? elif self.nonlinearity == 'none': pass elif self.nonlinearity == 'relu': x = tf.nn.relu(features=x) x = self.add_summary( label='relu', name='relu', tensor=tf.math.zero_fraction(value=x), pass_tensors=x ) elif self.nonlinearity == 'selu': x = tf.nn.selu(features=x) elif self.nonlinearity == 'sigmoid': x = tf.sigmoid(x=x) elif self.nonlinearity == 'softmax': x = tf.nn.softmax(logits=x) elif self.nonlinearity == 'softplus': x = tf.nn.softplus(features=x) elif self.nonlinearity == 'softsign': x = tf.nn.softsign(features=x) elif self.nonlinearity == 'swish': # https://arxiv.org/abs/1710.05941 x = tf.sigmoid(x=x) * x elif self.nonlinearity == 'tanh': x = tf.nn.tanh(x=x) return x
[docs]class Block(Layer): """ Block of layers (specification key: `block`). Args: name (string): Layer name (<span style="color:#00C000"><b>default</b></span>: internally chosen). layers (iter[specification]): Layers configuration, see [layers](../modules/layers.html) (<span style="color:#C00000"><b>required</b></span>). input_spec (specification): Input tensor specification (<span style="color:#00C000"><b>internal use</b></span>). """ def __init__(self, name, layers, input_spec=None): # TODO: handle internal states and combine with layered network if len(layers) == 0: raise TensorforceError.value( name='block', argument='layers', value=layers, hint='zero length' ) self._input_spec = input_spec self.layers = layers super().__init__(name=name, input_spec=input_spec, summary_labels=None) def default_input_spec(self): layer_counter = Counter() for n, layer_spec in enumerate(self.layers): if 'name' in layer_spec: layer_spec = dict(layer_spec) layer_name = layer_spec.pop('name') else: if isinstance(layer_spec.get('type'), str): layer_type = layer_spec['type'] else: layer_type = 'layer' layer_name = layer_type + str(layer_counter[layer_type]) layer_counter[layer_type] += 1 # layer_name = self.name + '-' + layer_name self.layers[n] = self.add_module( name=layer_name, module=layer_spec, modules=tensorforce.core.layer_modules, input_spec=self._input_spec ) self._input_spec = self.layers[n].output_spec return self.layers[0].default_input_spec() def get_output_spec(self, input_spec): for layer in self.layers: input_spec = layer.get_output_spec(input_spec=input_spec) return input_spec def tf_apply(self, x): for layer in self.layers: x = layer.apply(x=x) return x
[docs]class Dropout(Layer): """ Dropout layer (specification key: `dropout`). Args: name (string): Layer name (<span style="color:#00C000"><b>default</b></span>: internally chosen). rate (parameter, 0.0 <= float < 1.0): Dropout rate (<span style="color:#C00000"><b>required</b></span>). input_spec (specification): Input tensor specification (<span style="color:#00C000"><b>internal use</b></span>). summary_labels ('all' | iter[string]): Labels of summaries to record (<span style="color:#00C000"><b>default</b></span>: inherit value of parent module). """ def __init__(self, name, rate, input_spec=None, summary_labels=None): super().__init__(name=name, input_spec=input_spec, summary_labels=summary_labels) # Rate self.rate = self.add_module( name='rate', module=rate, modules=parameter_modules, dtype='float', min_value=0.0, max_value=1.0 ) def default_input_spec(self): return dict(type='float', shape=None) def set_input_spec(self, spec): super().set_input_spec(spec=spec) if spec['type'] != 'float': raise TensorforceError.value( name='dropout', argument='input_spec', value=spec['type'], hint='!= float' ) def tf_apply(self, x): rate = self.rate.value() def no_dropout(): return x def apply_dropout(): dropout = tf.nn.dropout(x=x, rate=rate) return self.add_summary( label='dropout', name='dropout', tensor=tf.math.zero_fraction(value=dropout), pass_tensors=dropout ) skip_dropout = tf.math.logical_not(x=Module.retrieve_tensor(name='deterministic')) zero = tf.constant(value=0.0, dtype=util.tf_dtype(dtype='float')) skip_dropout = tf.math.logical_or(x=skip_dropout, y=tf.math.equal(x=rate, y=zero)) return self.cond(pred=skip_dropout, true_fn=no_dropout, false_fn=apply_dropout)
[docs]class Function(Layer): """ Custom TensorFlow function layer (specification key: `function`). Args: name (string): Layer name (<span style="color:#00C000"><b>default</b></span>: internally chosen). function (lambda[x -> x]): TensorFlow function (<span style="color:#C00000"><b>required</b></span>). output_spec (specification): Output tensor specification containing type and/or shape information (<span style="color:#00C000"><b>default</b></span>: same as input). input_spec (specification): Input tensor specification (<span style="color:#00C000"><b>internal use</b></span>). summary_labels ('all' | iter[string]): Labels of summaries to record (<span style="color:#00C000"><b>default</b></span>: inherit value of parent module). l2_regularization (float >= 0.0): Scalar controlling L2 regularization (<span style="color:#00C000"><b>default</b></span>: inherit value of parent module). """ # (requires function as first argument) def __init__( self, name, function, output_spec=None, input_spec=None, summary_labels=None, l2_regularization=None ): self.output_spec = output_spec super().__init__( name=name, input_spec=input_spec, summary_labels=summary_labels, l2_regularization=l2_regularization ) self.function = function def default_input_spec(self): return dict(type=None, shape=None) def get_output_spec(self, input_spec): if self.output_spec is not None: input_spec.update(self.output_spec) return input_spec def tf_apply(self, x): return self.function(x)
[docs]class Register(Layer): """ Tensor retrieval layer, which is useful when defining more complex network architectures which do not follow the sequential layer-stack pattern, for instance, when handling multiple inputs (specification key: `register`). Args: name (string): Layer name (<span style="color:#00C000"><b>default</b></span>: internally chosen). tensor (string): Name under which tensor will be registered (<span style="color:#C00000"><b>required</b></span>). input_spec (specification): Input tensor specification (<span style="color:#00C000"><b>internal use</b></span>). summary_labels ('all' | iter[string]): Labels of summaries to record (<span style="color:#00C000"><b>default</b></span>: inherit value of parent module). """ def __init__(self, name, tensor, input_spec=None, summary_labels=None): """ Register layer constructor. Args: """ if not isinstance(tensor, str): raise TensorforceError.type(name='register', argument='tensor', dtype=type(tensor)) self.tensor = tensor super().__init__(name=name, input_spec=input_spec, summary_labels=summary_labels) Module.register_tensor(name=self.tensor, spec=self.input_spec, batched=True) self.output_spec = None def default_input_spec(self): return dict(type=None, shape=None) def tf_apply(self, x): last_scope = Module.global_scope.pop() Module.update_tensor(name=self.tensor, tensor=x) Module.global_scope.append(last_scope) return x
[docs]class Reshape(Layer): """ Reshape layer (specification key: `reshape`). Args: name (string): Layer name (<span style="color:#00C000"><b>default</b></span>: internally chosen). shape (<i>int | iter[int]</i>): New shape (<span style="color:#C00000"><b>required</b></span>). input_spec (specification): Input tensor specification (<span style="color:#00C000"><b>internal use</b></span>). summary_labels ('all' | iter[string]): Labels of summaries to record (<span style="color:#00C000"><b>default</b></span>: inherit value of parent module). """ def __init__(self, name, shape, input_spec=None, summary_labels=None): if isinstance(shape, int): self.shape = (shape,) else: self.shape = tuple(shape) super().__init__(name=name, input_spec=input_spec, summary_labels=summary_labels) def default_input_spec(self): return dict(type=None, shape=None) def get_output_spec(self, input_spec): if util.product(xs=input_spec['shape']) != util.product(xs=self.shape): raise TensorforceError.value(name='Reshape', argument='shape', value=self.shape) input_spec['shape'] = self.shape return input_spec def tf_apply(self, x): x = tf.reshape(tensor=x, shape=((-1,) + self.shape)) return x
[docs]class Retrieve(Layer): """ Tensor retrieval layer, which is useful when defining more complex network architectures which do not follow the sequential layer-stack pattern, for instance, when handling multiple inputs (specification key: `retrieve`). Args: name (string): Layer name (<span style="color:#00C000"><b>default</b></span>: internally chosen). tensors (iter[string]): Names of global tensors to retrieve, for instance, state names or previously registered global tensor names (<span style="color:#C00000"><b>required</b></span>). aggregation ('concat' | 'product' | 'stack' | 'sum'): Aggregation type in case of multiple tensors (<span style="color:#00C000"><b>default</b></span>: 'concat'). axis (int >= 0): Aggregation axis, excluding batch axis (<span style="color:#00C000"><b>default</b></span>: 0). input_spec (specification): Input tensor specification (<span style="color:#00C000"><b>internal use</b></span>). summary_labels ('all' | iter[string]): Labels of summaries to record (<span style="color:#00C000"><b>default</b></span>: inherit value of parent module). """ def __init__( self, name, tensors, aggregation='concat', axis=0, input_spec=None, summary_labels=None ): if not isinstance(tensors, str) and not util.is_iterable(x=tensors): raise TensorforceError.type(name='retrieve', argument='tensors', dtype=type(tensors)) elif util.is_iterable(x=tensors) and len(tensors) == 0: raise TensorforceError.value( name='retrieve', argument='tensors', value=tensors, hint='zero length' ) if aggregation not in ('concat', 'product', 'stack', 'sum'): raise TensorforceError.value( name='retrieve', argument='aggregation', value=aggregation, hint='not in {concat,product,stack,sum}' ) self.tensors = (tensors,) if isinstance(tensors, str) else tuple(tensors) self.aggregation = aggregation self.axis = axis super().__init__(name=name, input_spec=input_spec, summary_labels=summary_labels) self.input_spec = None def default_input_spec(self): return dict(type=None, shape=None) def get_output_spec(self, input_spec): if len(self.tensors) == 1: return Module.get_tensor_spec(name=self.tensors[0]) # Get tensor types and shapes dtypes = list() shapes = list() for tensor in self.tensors: # Tensor specification if tensor == '*': spec = input_spec else: spec = Module.get_tensor_spec(name=tensor) dtypes.append(spec['type']) shapes.append(spec['shape']) # Check tensor types if all(dtype == dtypes[0] for dtype in dtypes): dtype = dtypes[0] else: raise TensorforceError.value(name='retrieve', argument='tensor types', value=dtypes) if self.aggregation == 'concat': if any(len(shape) != len(shapes[0]) for shape in shapes): raise TensorforceError.value( name='retrieve', argument='tensor shapes', value=shapes ) elif any( shape[n] != shapes[0][n] for shape in shapes for n in range(len(shape)) if n != self.axis ): raise TensorforceError.value( name='retrieve', argument='tensor shapes', value=shapes ) shape = tuple( sum(shape[n] for shape in shapes) if n == self.axis else shapes[0][n] for n in range(len(shapes[0])) ) elif self.aggregation == 'stack': if any(len(shape) != len(shapes[0]) for shape in shapes): raise TensorforceError.value( name='retrieve', argument='tensor shapes', value=shapes ) elif any(shape[n] != shapes[0][n] for shape in shapes for n in range(len(shape))): raise TensorforceError.value( name='retrieve', argument='tensor shapes', value=shapes ) shape = tuple( len(shapes) if n == self.axis else shapes[0][n - int(n > self.axis)] for n in range(len(shapes[0]) + 1) ) else: # Check and unify tensor shapes for shape in shapes: if len(shape) != len(shapes[0]): raise TensorforceError.value( name='retrieve', argument='tensor shapes', value=shapes ) if any(x != y and x != 1 and y != 1 for x, y in zip(shape, shapes[0])): raise TensorforceError.value( name='retrieve', argument='tensor shapes', value=shapes ) shape = tuple(max(shape[n] for shape in shapes) for n in range(len(shapes[0]))) # Missing num_values, min/max_value!!! return dict(type=dtype, shape=shape) def tf_apply(self, x): if len(self.tensors) == 1: if self.tensors == '*': return x else: last_scope = Module.global_scope.pop() x = Module.retrieve_tensor(name=self.tensors[0]) Module.global_scope.append(last_scope) return x tensors = list() for tensor in self.tensors: if tensor == '*': tensors.append(x) else: last_scope = Module.global_scope.pop() tensors.append(Module.retrieve_tensor(name=tensor)) Module.global_scope.append(last_scope) shape = self.output_spec['shape'] for n, tensor in enumerate(tensors): for axis in range(util.rank(x=tensor), len(shape)): tensor = tf.expand_dims(input=tensor, axis=axis) tensors[n] = tensor if self.aggregation == 'concat': x = tf.concat(values=tensors, axis=(self.axis + 1)) elif self.aggregation == 'product': x = tf.stack(values=tensors, axis=(self.axis + 1)) x = tf.reduce_prod(input_tensor=x, axis=(self.axis + 1)) elif self.aggregation == 'stack': x = tf.stack(values=tensors, axis=(self.axis + 1)) elif self.aggregation == 'sum': x = tf.stack(values=tensors, axis=(self.axis + 1)) x = tf.reduce_sum(input_tensor=x, axis=(self.axis + 1)) return x
[docs]class Reuse(Layer): """ Reuse layer (specification key: `reuse`). Args: name (string): Layer name (<span style="color:#00C000"><b>default</b></span>: internally chosen). layer (string): Name of a previously defined layer (<span style="color:#C00000"><b>required</b></span>). is_trainable (bool): Whether reused layer variables are kept trainable (<span style="color:#00C000"><b>default</b></span>: true). input_spec (specification): Input tensor specification (<span style="color:#00C000"><b>internal use</b></span>). """ def __init__(self, name, layer, is_trainable=True, input_spec=None): self.layer = layer self.is_trainable = is_trainable super().__init__( name=name, input_spec=input_spec, summary_labels=None, l2_regularization=0.0 ) def default_input_spec(self): # from tensorforce.core.networks import Network # if not isinstance(self.parent, Network): # raise TensorforceError.unexpected() # if self.layer not in self.parent.modules: # raise TensorforceError.unexpected() # self.layer = self.parent.modules[self.layer] if self.layer not in Layer.layers: raise TensorforceError.value(name='reuse', argument='layer', value=self.layer) self.layer = Layer.layers[self.layer] return dict(self.layer.input_spec) def get_output_spec(self, input_spec): return self.layer.get_output_spec(input_spec=input_spec) def tf_apply(self, x): return self.layer.apply(x=x) def get_variables(self, only_trainable=False, only_saved=False): variables = super().get_variables(only_trainable=only_trainable, only_saved=only_saved) if only_trainable and self.is_trainable: variables.extend(self.layer.get_variables( only_trainable=only_trainable, only_saved=only_saved )) elif only_saved: pass else: variables.extend(self.layer.get_variables( only_trainable=only_trainable, only_saved=only_saved )) return variables def get_available_summaries(self): summaries = super().get_available_summaries() summaries.update(self.layer.get_available_summaries()) return sorted(summaries)