Source code for keras_gym.core.value_v

import numpy as np
from tensorflow import keras

from ..utils import check_numpy_array, check_tensor
from ..caching import NStepCache

from .base import BaseFunctionApproximator


__all__ = (
    'V',
)


[docs]class V(BaseFunctionApproximator): """ A :term:`state value function` :math:`s\\mapsto v(s)`. Parameters ---------- function_approximator : FunctionApproximator object The main :term:`function approximator`. gamma : float, optional The discount factor for discounting future rewards. bootstrap_n : positive int, optional The number of steps in n-step bootstrapping. It specifies the number of steps over which we're willing to delay bootstrapping. Large :math:`n` corresponds to Monte Carlo updates and :math:`n=1` corresponds to TD(0). bootstrap_with_target_model : bool, optional Whether to use the :term:`target_model` when constructing a bootstrapped target. If False (default), the primary :term:`predict_model` is used. """ def __init__( self, function_approximator, gamma=0.9, bootstrap_n=1, bootstrap_with_target_model=False): self.function_approximator = function_approximator self.env = self.function_approximator.env self.gamma = float(gamma) self.bootstrap_n = int(bootstrap_n) self.bootstrap_with_target_model = bool(bootstrap_with_target_model) self._cache = NStepCache(self.env, self.bootstrap_n, self.gamma) self._init_models() self._check_attrs()
[docs] def __call__(self, s, use_target_model=False): """ Evaluate the Q-function. Parameters ---------- s : state observation A single state observation. use_target_model : bool, optional Whether to use the :term:`target_model` internally. If False (default), the :term:`predict_model` is used. Returns ------- V : float or array of floats The estimated value of the state :math:`v(s)`. """ assert self.env.observation_space.contains(s) S = np.expand_dims(s, axis=0) V = self.batch_eval(S, use_target_model=use_target_model) check_numpy_array(V, shape=(1,)) V = np.asscalar(V) return V
[docs] def update(self, s, r, done): """ Update the Q-function. Parameters ---------- s : state observation A single state observation.. r : float A single observed reward. done : bool Whether the episode has finished. """ assert self.env.observation_space.contains(s) self._cache.add(s, 0, r, done) # eager updates while self._cache: S, _, Rn, In, S_next, _ = self._cache.pop() self.batch_update(S, Rn, In, S_next)
[docs] def batch_update(self, S, Rn, In, S_next): """ Update the value function on a batch of transitions. Parameters ---------- S : nd array, shape: [batch_size, ...] A batch of state observations. Rn : 1d array, dtype: float, shape: [batch_size] A batch of partial returns. For example, in n-step bootstrapping this is given by: .. math:: R^{(n)}_t\\ =\\ R_t + \\gamma\\,R_{t+1} + \\dots \\gamma^{n-1}\\,R_{t+n-1} In other words, it's the non-bootstrapped part of the n-step return. In : 1d array, dtype: float, shape: [batch_size] A batch bootstrapping factor. For instance, in n-step bootstrapping this is given by :math:`I^{(n)}_t=\\gamma^n` if the episode is ongoing and :math:`I^{(n)}_t=0` otherwise. This allows us to write the bootstrapped target as: .. math:: G^{(n)}_t=R^{(n)}_t+I^{(n)}_tQ(S_{t+n}, A_{t+n}) S_next : nd array, shape: [batch_size, ...] A batch of next-state observations. Returns ------- losses : dict A dict of losses/metrics, of type ``{name <str>: value <float>}``. """ V_next = self.batch_eval( S_next, use_target_model=self.bootstrap_with_target_model) Gn = Rn + In * V_next losses = self._train_on_batch([S, Gn]) return losses
[docs] def batch_eval(self, S, use_target_model=False): """ Evaluate the state value function on a batch of state observations. Parameters ---------- S : nd array, shape: [batch_size, ...] A batch of state observations. use_target_model : bool, optional Whether to use the :term:`target_model` internally. If False (default), the :term:`predict_model` is used. Returns ------- V : 1d array, dtype: float, shape: [batch_size] The predicted state values. """ model = self.target_model if use_target_model else self.predict_model V = model.predict_on_batch(S) check_numpy_array(V, ndim=2, axis_size=1, axis=1) V = np.squeeze(V, axis=1) # shape: [batch_size] return V
def _init_models(self): shape = self.env.observation_space.shape dtype = self.env.observation_space.dtype S = keras.Input(name='value_v/S', shape=shape, dtype=dtype) G = keras.Input(name='value_v/G', shape=(1,), dtype='float') # forward pass X = self.function_approximator.body(S) V = self.function_approximator.head_v(X) # loss function loss = self.function_approximator.VALUE_LOSS_FUNCTION(G, V) check_tensor(loss, ndim=0) # train model self.train_model = keras.Model([S, G], loss) self.train_model.add_loss(loss) self.train_model.compile( optimizer=self.function_approximator.optimizer) # predict/target models self.predict_model = keras.Model(S, V) self.target_model = self._create_target_model(self.predict_model)