# Copyright 2018 Tensorforce Team. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
import tensorflow as tf
from tensorforce import util
from tensorforce.core import parameter_modules
from tensorforce.core.layers import Layer
class PreprocessingLayer(Layer):
def tf_reset(self):
raise NotImplementedError
[docs]class Clipping(Layer):
"""
Clipping layer (specification key: `clipping`).
Args:
name (string): Layer name
(<span style="color:#00C000"><b>default</b></span>: internally chosen).
upper (parameter, float): Upper clipping value
(<span style="color:#C00000"><b>required</b></span>).
lower (parameter, float): Lower clipping value
(<span style="color:#00C000"><b>default</b></span>: negative upper value).
input_spec (specification): Input tensor specification
(<span style="color:#00C000"><b>internal use</b></span>).
summary_labels ('all' | iter[string]): Labels of summaries to record
(<span style="color:#00C000"><b>default</b></span>: inherit value of parent module).
"""
def __init__(self, name, upper, lower=None, input_spec=None, summary_labels=None):
super().__init__(name=name, input_spec=input_spec, summary_labels=summary_labels)
if lower is None:
self.lower = None
self.upper = self.add_module(
name='upper', module=lower, modules=parameter_modules, dtype='float', min_value=0.0
)
else:
self.lower = self.add_module(
name='lower', module=lower, modules=parameter_modules, dtype='float'
)
self.upper = self.add_module(
name='upper', module=upper, modules=parameter_modules, dtype='float'
)
def default_input_spec(self):
return dict(type='float', shape=None)
def tf_apply(self, x):
upper = self.upper.value()
if self.lower is None:
lower = -upper
else:
lower = self.lower.value()
assertion = tf.debugging.assert_greater_equal(
x=upper, y=lower, message="Incompatible lower and upper clipping bound."
)
with tf.control_dependencies(control_inputs=(assertion,)):
return tf.clip_by_value(t=x, clip_value_min=lower, clip_value_max=upper)
[docs]class Deltafier(PreprocessingLayer):
"""
Deltafier layer computing the difference between the current and the previous input; can only
be used as preprocessing layer (specification key: `deltafier`).
Args:
name (string): Layer name
(<span style="color:#00C000"><b>default</b></span>: internally chosen).
concatenate (False | int >= 0): Whether to concatenate instead of replace deltas with
input, and if so, concatenation axis
(<span style="color:#00C000"><b>default</b></span>: false).
input_spec (specification): Input tensor specification
(<span style="color:#00C000"><b>internal use</b></span>).
summary_labels ('all' | iter[string]): Labels of summaries to record
(<span style="color:#00C000"><b>default</b></span>: inherit value of parent module).
"""
def __init__(self, name, concatenate=False, input_spec=None, summary_labels=None):
self.concatenate = concatenate
super().__init__(name=name, input_spec=input_spec, summary_labels=summary_labels)
@classmethod
def output_spec(cls, concatenate=False, input_spec=None, **kwargs):
input_spec = super().output_spec(input_spec=input_spec)
if concatenate is not False:
input_spec['shape'] = tuple(
2 * dims if axis == concatenate else dims
for axis, dims in enumerate(input_spec['shape'])
)
return input_spec
def default_input_spec(self):
return dict(type='float', shape=None)
def get_output_spec(self, input_spec):
if self.concatenate is not False:
input_spec['shape'] = tuple(
2 * dims if axis == self.concatenate else dims
for axis, dims in enumerate(input_spec['shape'])
)
return input_spec
def tf_initialize(self):
super().tf_initialize()
self.has_previous = self.add_variable(
name='has-previous', dtype='bool', shape=(), is_trainable=False, initializer='zeros'
)
self.previous = self.add_variable(
name='previous', dtype='float', shape=((1,) + self.input_spec['shape']),
is_trainable=False, initializer='zeros'
)
def tf_reset(self):
assignment = self.has_previous.assign(
value=tf.constant(value=False, dtype=util.tf_dtype(dtype='bool')), read_value=False
)
return assignment
def tf_apply(self, x):
assertion = tf.debugging.assert_equal(
x=tf.shape(input=x)[0], y=1,
message="Deltafier preprocessor currently not compatible with batched Agent.act."
)
def first_delta():
assignment = self.has_previous.assign(
value=tf.constant(value=True, dtype=util.tf_dtype(dtype='bool')), read_value=False
)
with tf.control_dependencies(control_inputs=(assignment,)):
return tf.concat(values=(tf.zeros_like(input=x[:1]), x[1:] - x[:-1]), axis=0)
def later_delta():
return x - tf.concat(values=(self.previous, x[:-1]), axis=0)
with tf.control_dependencies(control_inputs=(assertion,)):
delta = self.cond(pred=self.has_previous, true_fn=later_delta, false_fn=first_delta)
assignment = self.previous.assign(value=x[-1:], read_value=False)
with tf.control_dependencies(control_inputs=(assignment,)):
if self.concatenate is False:
return util.identity_operation(x=delta)
else:
return tf.concat(values=(x, delta), axis=(self.concatenate + 1))
[docs]class Image(Layer):
"""
Image preprocessing layer (specification key: `image`).
Args:
name (string): Layer name
(<span style="color:#00C000"><b>default</b></span>: internally chosen).
height (int): Height of resized image
(<span style="color:#00C000"><b>default</b></span>: no resizing or relative to width).
width (int): Width of resized image
(<span style="color:#00C000"><b>default</b></span>: no resizing or relative to height).
grayscale (bool | iter[float]): Turn into grayscale image, optionally using given weights
(<span style="color:#00C000"><b>default</b></span>: false).
input_spec (specification): Input tensor specification
(<span style="color:#00C000"><b>internal use</b></span>).
summary_labels ('all' | iter[string]): Labels of summaries to record
(<span style="color:#00C000"><b>default</b></span>: inherit value of parent module).
"""
def __init__(
self, name, height=None, width=None, grayscale=False, input_spec=None, summary_labels=None
):
self.height = height
self.width = width
self.grayscale = grayscale
super().__init__(name=name, input_spec=input_spec, summary_labels=summary_labels)
@classmethod
def output_spec(cls, height=None, width=None, grayscale=False, input_spec=None, **kwargs):
input_spec = super().output_spec(input_spec=input_spec)
if height is not None:
if width is None:
width = round(height * input_spec['shape'][1] / input_spec['shape'][0])
input_spec['shape'] = (height, width, input_spec['shape'][2])
elif width is not None:
height = round(width * input_spec['shape'][0] / input_spec['shape'][1])
input_spec['shape'] = (height, width, input_spec['shape'][2])
if not isinstance(grayscale, bool) or grayscale:
input_spec['shape'] = input_spec['shape'][:2] + (1,)
return input_spec
def default_input_spec(self):
return dict(type='float', shape=(0, 0, 0))
def get_output_spec(self, input_spec):
if self.height is not None:
if self.width is None:
self.width = round(self.height * input_spec['shape'][1] / input_spec['shape'][0])
input_spec['shape'] = (self.height, self.width, input_spec['shape'][2])
elif self.width is not None:
self.height = round(self.width * input_spec['shape'][0] / input_spec['shape'][1])
input_spec['shape'] = (self.height, self.width, input_spec['shape'][2])
if not isinstance(self.grayscale, bool) or self.grayscale:
input_spec['shape'] = input_spec['shape'][:2] + (1,)
return input_spec
def tf_apply(self, x):
if self.height is not None:
x = tf.image.resize(images=x, size=(self.height, self.width))
if not isinstance(self.grayscale, bool):
weights = tf.constant(
value=self.grayscale, dtype=util.tf_dtype(dtype='float'),
shape=(1, 1, 1, len(self.grayscale))
)
x = tf.reduce_sum(input_tensor=(x * weights), axis=3, keepdims=True)
elif self.grayscale:
x = tf.image.rgb_to_grayscale(images=x)
return x
[docs]class Sequence(PreprocessingLayer):
"""
Sequence layer stacking the current and previous inputs; can only be used as preprocessing
layer (specification key: `sequence`).
Args:
name (string): Layer name
(<span style="color:#00C000"><b>default</b></span>: internally chosen).
length (int > 0): Number of inputs to concatenate
(<span style="color:#C00000"><b>required</b></span>).
axis (int >= 0): Concatenation axis, excluding batch axis
(<span style="color:#00C000"><b>default</b></span>: last axis).
concatenate (bool): Whether to concatenate inputs at given axis, otherwise introduce new
sequence axis
(<span style="color:#00C000"><b>default</b></span>: true).
input_spec (specification): Input tensor specification
(<span style="color:#00C000"><b>internal use</b></span>).
summary_labels ('all' | iter[string]): Labels of summaries to record
(<span style="color:#00C000"><b>default</b></span>: inherit value of parent module).
"""
def __init__(
self, name, length, axis=-1, concatenate=True, input_spec=None, summary_labels=None
):
assert length > 1
self.length = length
self.axis = axis
self.concatenate = concatenate
super().__init__(name=name, input_spec=input_spec, summary_labels=summary_labels)
@classmethod
def output_spec(cls, length, axis=-1, concatenate=True, input_spec=None, **kwargs):
input_spec = super().output_spec(input_spec=input_spec)
if concatenate:
if axis == -1:
axis = len(input_spec['shape']) - 1
input_spec['shape'] = tuple(
length * dims if axis == axis else dims
for axis, dims in enumerate(input_spec['shape'])
)
else:
if axis == -1:
axis = len(input_spec['shape'])
shape = input_spec['shape']
input_spec['shape'] = shape[:axis] + (length,) + shape[axis:]
return input_spec
def default_input_spec(self):
return dict(type=None, shape=None)
def get_output_spec(self, input_spec):
if self.concatenate:
if self.axis == -1:
self.axis = len(input_spec['shape']) - 1
input_spec['shape'] = tuple(
self.length * dims if axis == self.axis else dims
for axis, dims in enumerate(input_spec['shape'])
)
else:
if self.axis == -1:
self.axis = len(input_spec['shape'])
shape = input_spec['shape']
input_spec['shape'] = shape[:self.axis] + (self.length,) + shape[self.axis:]
return input_spec
def tf_initialize(self):
super().tf_initialize()
self.has_previous = self.add_variable(
name='has-previous', dtype='bool', shape=(), is_trainable=False, initializer='zeros'
)
shape = self.input_spec['shape']
if self.concatenate:
shape = (1,) + shape[:self.axis] + (shape[self.axis] * (self.length - 1),) + \
shape[self.axis + 1:]
else:
shape = (1,) + shape[:self.axis] + (self.length - 1,) + shape[self.axis:]
self.previous = self.add_variable(
name='previous', dtype='float', shape=shape, is_trainable=False, initializer='zeros'
)
def tf_reset(self):
assignment = self.has_previous.assign(
value=tf.constant(value=False, dtype=util.tf_dtype(dtype='bool')), read_value=False
)
return assignment
def tf_apply(self, x):
assertion = tf.debugging.assert_equal(
x=tf.shape(input=x)[0], y=1,
message="Sequence preprocessor currently not compatible with batched Agent.act."
)
def first_timestep():
assignment = self.has_previous.assign(
value=tf.constant(value=True, dtype=util.tf_dtype(dtype='bool')), read_value=False
)
with tf.control_dependencies(control_inputs=(assignment,)):
if self.concatenate:
current = x
else:
current = tf.expand_dims(input=x, axis=(self.axis + 1))
multiples = tuple(
self.length if dims == self.axis + 1 else 1
for dims in range(util.rank(x=current))
)
return tf.tile(input=current, multiples=multiples)
def other_timesteps():
if self.concatenate:
current = x
else:
current = tf.expand_dims(input=x, axis=(self.axis + 1))
return tf.concat(values=(self.previous, current), axis=(self.axis + 1))
with tf.control_dependencies(control_inputs=(assertion,)):
xs = self.cond(
pred=self.has_previous, true_fn=other_timesteps, false_fn=first_timestep
)
if self.concatenate:
begin = tuple(
self.input_spec['shape'][dims - 1] if dims == self.axis + 1 else 0
for dims in range(util.rank(x=xs))
)
else:
begin = tuple(1 if dims == self.axis + 1 else 0 for dims in range(util.rank(x=xs)))
assignment = self.previous.assign(
value=tf.slice(input_=xs, begin=begin, size=self.previous.shape), read_value=False
)
with tf.control_dependencies(control_inputs=(assignment,)):
return util.identity_operation(x=xs)