Source code for keras_gym.utils.array

import gym
import numpy as np

from scipy.special import expit as sigmoid

from ..base.errors import NumpyArrayCheckError, SpaceError


__all__ = (
    'argmax',
    'argmin',
    'batch_to_single_instance',
    'box_to_reals_np',
    'box_to_unit_interval_np',
    'check_numpy_array',
    'clipped_logit_np',
    'feature_vector',
    'idx',
    'log_softmax',
    'one_hot',
    'project_onto_actions_np',
    'reals_to_box_np',
    'softmax',
    'unit_interval_to_box_np',
)


[docs]def argmax(arr, axis=-1, random_state=None): """ This is a little hack to ensure that argmax breaks ties randomly, which is something that :func:`numpy.argmax` doesn't do. *Note: random tie breaking is only done for 1d arrays; for multidimensional inputs, we fall back to the numpy version.* Parameters ---------- a : array_like Input array. axis : int, optional By default, the index is into the flattened array, otherwise along the specified axis. random_state : int or RandomState This can either be a random seed (`int`) or an instance of :class:`numpy.random.RandomState`. Returns ------- index_array : ndarray of ints Array of indices into the array. It has the same shape as `a.shape` with the dimension along `axis` removed. """ if not isinstance(arr, np.ndarray): arr = np.array(arr) if arr.ndim == 1: candidates = np.arange(arr.size) # all candidates = candidates[arr == np.max(arr)] # max if not isinstance(random_state, np.random.RandomState): # treat input random_state as seed random_state = np.random.RandomState(random_state) return random_state.choice(candidates) else: return np.argmax(arr, axis=axis)
[docs]def argmin(arr, axis=None, random_state=None): """ This is a little hack to ensure that argmin breaks ties randomly, which is something that :func:`numpy.argmin` doesn't do. *Note: random tie breaking is only done for 1d arrays; for multidimensional inputs, we fall back to the numpy version.* Parameters ---------- a : array_like Input array. axis : int, optional By default, the index is into the flattened array, otherwise along the specified axis. random_state : int or RandomState This can either be a random seed (`int`) or an instance of :class:`numpy.random.RandomState`. Returns ------- index_array : ndarray of ints Array of indices into the array. It has the same shape as `a.shape` with the dimension along `axis` removed. """ return argmax(-arr, axis=axis, random_state=random_state)
def batch_to_single_instance(X): """ Take the first instance of an array that contains a batch of items. Parameters ---------- X : nd array, shape: [batch_size, \\*instance_shape] A numpy array whose first axis is the batch axis. Returns ------- x : (n-1)d array, shape: [\\*instance_shape] This is essentially just ``X[0]`` with some post processing. """ x = X[0] assert False, x.ndim if x.ndim == 0 and x.dtype == 'int': x = int(x.item()) if x.ndim == 0 and x.dtype == 'float': x = float(x.item()) return x
[docs]def box_to_unit_interval_np(arr, space): """ Rescale array values from Box space to the unit interval. This is essentially just min-max scaling: .. math:: x\\ \\mapsto\\ \\frac{x-x_\\text{low}}{x_\\text{high}-x_\\text{low}} Parameters ---------- arr : nd array A numpy array containing a single instance or a batch of elements of a Box space. space : gym.spaces.Box The Box space. This is needed to determine the shape and size of the space. Returns ------- out : nd array, same shape as input A numpy array with the transformed values. The output values lie on the unit interval :math:`[0, 1]`. """ arr, lo, hi = _get_box_bounds(arr, space) # box to unit interval p = (arr - lo) / (hi - lo) if np.any(p > 1) or np.any(p < 0): raise ValueError("array values are not contained in the provided Box") return p
[docs]def box_to_reals_np(arr, space, epsilon=1e-15): """ Transform array values from a Box space to the reals. This is done by first mapping the Box values to the unit interval :math:`x\\in[0, 1]` and then feeding it to the :func:`clipped_logit_np` function. Parameters ---------- arr : nd array A numpy array containing a single instance or a batch of elements of a Box space. space : gym.spaces.Box The Box space. This is needed to determine the shape and size of the space. epsilon : float, optional The cut-off value used by :func:`clipped_logit_np`. Returns ------- out : nd array, same shape as input A numpy array with the transformed values. The output values are real-valued. """ return clipped_logit_np(box_to_unit_interval_np(arr, space), epsilon)
[docs]def check_numpy_array(arr, ndim=None, ndim_min=None, dtype=None, shape=None, axis_size=None, axis=None): # noqa: E501 """ This helper function is mostly for internal use. It is used to check a few common properties of a numpy array. Raises ------ NumpyArrayCheckError If one of the checks fails, it raises a :class:`NumpyArrayCheckError`. """ if not isinstance(arr, np.ndarray): raise NumpyArrayCheckError( "expected input to be a numpy array, got type: {}" .format(type(arr))) check = ndim is not None ndims = [ndim] if not isinstance(ndim, (list, tuple, set)) else ndim if check and arr.ndim not in ndims: raise NumpyArrayCheckError( "expected input with ndim(s) {}, got ndim: {}" .format(ndim, arr.ndim)) check = ndim_min is not None if check and arr.ndim < ndim_min: raise NumpyArrayCheckError( "expected input with ndim at least {}, got ndim: {}" .format(ndim_min, arr.ndim)) check = dtype is not None dtypes = [dtype] if not isinstance(dtype, (list, tuple, set)) else dtype if check and arr.dtype not in dtypes: raise NumpyArrayCheckError( "expected input with dtype(s) {}, got dtype: {}" .format(dtype, arr.dtype)) check = shape is not None if check and arr.shape != shape: raise NumpyArrayCheckError( "expected input with shape {}, got shape: {}" .format(shape, arr.shape)) check = axis_size is not None and axis is not None sizes = [axis_size] if not isinstance(axis_size, (list, tuple, set)) else axis_size # noqa: E501 if check and arr.shape[axis] not in sizes: raise NumpyArrayCheckError( "expected input with size(s) {} along axis {}, got shape: {}" .format(axis_size, axis, arr.shape))
[docs]def clipped_logit_np(x, epsilon=1e-15): """ A safe implementation of the logit function :math:`x\\mapsto\\log(x/(1-x))`. It clips the arguments of the log function from below so as to avoid evaluating it at 0: .. math:: \\text{logit}_\\epsilon(x)\\ =\\ \\log(\\max(\\epsilon, x)) - \\log(\\max(\\epsilon, 1 - x)) Parameters ---------- x : nd array Input numpy array whose entries lie on the unit interval, :math:`x_i\\in [0, 1]`. epsilon : float, optional The small number with which to clip the arguments of the logarithm from below. Returns ------- z : nd array, dtype: float, shape: same as input The output logits whose entries lie on the real line, :math:`z_i\\in\\mathbb{R}`. """ if np.any(x < 0) or np.any(x > 1): raise ValueError("values do not lie on the unit interval") return np.log(np.maximum(epsilon, x)) - np.log(np.maximum(epsilon, 1 - x))
[docs]def feature_vector(x, space): """ Create a feature vector out of a state observation :math:`s` or an action :math:`a`. This is used in the :class:`DefaultPreprocessor`. Parameters ---------- x : state or action A state observation :math:`s` or an action :math:`a`. space : gym space A gym space, e.g. :class:`gym.spaces.Box`, :class:`gym.spaces.Discrete`, etc. """ if space is None: if not (isinstance(x, np.ndarray) and x.ndim == 1): raise TypeError( "if space is None, x must be a 1d numpy array already") elif isinstance(space, gym.spaces.Tuple): x = np.concatenate([ feature_vector(x_, space_) # recursive for x_, space_ in zip(x, space.spaces)], axis=0) elif isinstance(space, gym.spaces.MultiDiscrete): x = np.concatenate([ feature_vector(x_, gym.spaces.Discrete(n)) # recursive for x_, n in zip(x.ravel(), space.nvec.ravel()) if n], axis=0) elif isinstance(space, gym.spaces.Discrete): x = one_hot(x, space.n) elif isinstance(space, (gym.spaces.MultiBinary, gym.spaces.Box)): x = x.ravel() else: raise NotImplementedError( "haven't implemented a preprocessor for space type: {}" .format(type(space))) assert x.ndim == 1, "x must be 1d array, got shape: {}".format(x.shape) return x
[docs]def idx(arr, axis=0): """ Given a numpy array, return its corresponding integer index array. Parameters ---------- arr : array Input array. axis : int, optional The axis along which we'd like to get an index. Returns ------- index : 1d array, shape: arr.shape[axis] An index array `[0, 1, 2, ...]`. """ check_numpy_array(arr, ndim_min=1) return np.arange(arr.shape[axis])
[docs]def log_softmax(arr, axis=-1): """ Compute the log-softmax. **Note:** This is the *numpy* implementation. Parameters ---------- arr : numpy array The input array. axis : int, optional The axis along which to normalize, default is 0. Returns ------- out : array of same shape The entries may be interpreted as log-probabilities. """ if not isinstance(arr, np.ndarray): arr = np.array(arr) Z = arr - arr.mean(axis=axis, keepdims=True) # center before clipping Z = np.clip(Z, -30, 30) # avoid over/underflow log_P = Z - np.log(np.sum(np.exp(Z), axis=axis, keepdims=True)) return log_P
[docs]def one_hot(i, n, dtype='float'): """ Create a dense one-hot encoded vector. Parameters ---------- i : int or 1d array of ints The index of the non-zero entry. n : int The dimensionality of the dense vector. Note that `n` must be greater than `i`. dtype : str or datatype The output data type, default is `'float'`. Returns ------- x : 1d array of length n The dense one-hot encoded vector. """ if isinstance(i, (int, np.integer)): if not 0 <= i < n: raise ValueError("i must be non-negative and smaller than n") x = np.zeros(int(n), dtype=dtype) x[int(i)] = 1.0 return x if isinstance(i, np.ndarray) and i.ndim == 1 and i.dtype == np.integer: if np.any(i >= n) or np.any(i < 0): raise ValueError("i must be non-negative and smaller than n") x = np.zeros((len(i), int(n)), dtype=dtype) x[idx(i), i] = 1.0 return x raise ValueError("i must be an int or 1d array of ints")
[docs]def project_onto_actions_np(Y, A): """ Project tensor onto specific actions taken: **numpy** implementation. **Note**: This only applies to discrete action spaces. Parameters ---------- Y : 2d array, shape: [batch_size, num_actions] The tensor to project down. A : 1d array, shape: [batch_size] The batch of actions used to project. Returns ------- Y_projected : 1d array, shape: [batch_size] The tensor projected onto the actions taken. """ check_numpy_array(Y, ndim=2) check_numpy_array(A, ndim=1, dtype='int') check_numpy_array(Y, axis_size=A.shape[0], axis=0) # same batch size return Y[idx(Y), A]
[docs]def reals_to_box_np(arr, space): """ Transform array values from the reals to a Box space. This is done by first applying the logistic sigmoid to map the reals onto the unit interval and then applying :func:`unit_interval_to_box_np` to rescale to the Box space. Parameters ---------- arr : nd array A numpy array containing a single instance or a batch of elements of a Box space, encoded as logits. space : gym.spaces.Box The Box space. This is needed to determine the shape and size of the space. Returns ------- out : nd array, same shape as input A numpy array with the transformed values. The output values are contained in the provided Box space. """ return unit_interval_to_box_np(sigmoid(arr), space)
[docs]def softmax(arr, axis=-1): """ Compute the softmax (normalized point-wise exponential). **Note:** This is the *numpy* implementation. Parameters ---------- arr : numpy array The input array. axis : int, optional The axis along which to normalize, default is 0. Returns ------- out : array of same shape The entries of the output array are non-negative and normalized, which make them good candidates for modeling probabilities. """ if not isinstance(arr, np.ndarray): arr = np.array(arr) Z = arr - np.mean(arr, axis=axis, keepdims=True) # center before clipping Z = np.clip(Z, -30, 30) # avoid over/underflow exp_Z = np.exp(Z) p = exp_Z / np.sum(exp_Z, axis=axis, keepdims=True) return p
[docs]def unit_interval_to_box_np(arr, space): """ Rescale array values from the unit interval to a Box space. This is essentially `inverted` min-max scaling: .. math:: x\\ \\mapsto\\ x_\\text{low} + (x_\\text{high} - x_\\text{low})\\,x Parameters ---------- arr : nd array A numpy array containing a single instance or a batch of elements of a Box space, scaled to the unit interval. space : gym.spaces.Box The Box space. This is needed to determine the shape and size of the space. Returns ------- out : nd array, same shape as input A numpy array with the transformed values. The output values are contained in the provided Box space. """ arr, lo, hi = _get_box_bounds(arr, space) return lo + (hi - lo) * arr
def _get_box_bounds(arr, space): check_numpy_array(arr, dtype=('float', np.float32, np.float64)) if not isinstance(space, gym.spaces.Box): raise SpaceError("space must be a Box") # prepare box bounds lo, hi = space.low, space.high check_numpy_array(lo, dtype=('float', np.float32, np.float64)) check_numpy_array(hi, dtype=('float', np.float32, np.float64)) if np.ndim(arr) == np.ndim(lo) + 1: shape = arr.shape[1:] lo = np.expand_dims(lo, axis=0) hi = np.expand_dims(hi, axis=0) else: shape = arr.shape if shape != lo.shape or shape != hi.shape: SpaceError("array shape is incompatible with the Box space") return arr, lo, hi