Source code for keras_gym.core.value_q

from abc import abstractmethod

import numpy as np
import tensorflow as tf
from tensorflow import keras

from ..base.errors import ActionSpaceError
from ..utils import (
    one_hot, check_numpy_array, check_tensor, project_onto_actions_np)
from ..caching import NStepCache

from .base import BaseFunctionApproximator


__all__ = (
    'QTypeI',
    'QTypeII',
)


class BaseQ(BaseFunctionApproximator):
    UPDATE_STRATEGIES = ('sarsa', 'q_learning', 'double_q_learning')

    def __init__(
            self, function_approximator,
            gamma=0.9,
            bootstrap_n=1,
            bootstrap_with_target_model=False,
            update_strategy='sarsa'):

        self.function_approximator = function_approximator

        self.env = self.function_approximator.env
        self.gamma = float(gamma)
        self.bootstrap_n = int(bootstrap_n)
        self.bootstrap_with_target_model = bool(bootstrap_with_target_model)
        self.update_strategy = update_strategy

        self._cache = NStepCache(self.env, self.bootstrap_n, self.gamma)
        self._init_models()
        self._check_attrs()

    def __call__(self, s, a=None, use_target_model=False):
        """
        Evaluate the Q-function.

        Parameters
        ----------
        s : state observation

            A single state observation.

        a : action, optional

            A single action.

        use_target_model : bool, optional

            Whether to use the :term:`target_model` internally. If False
            (default), the :term:`predict_model` is used.

        Returns
        -------
        Q : float or array of floats

            If action ``a`` is provided, a single float representing
            :math:`q(s,a)` is returned. If, on the other hand, ``a`` is left
            unspecified, a vector representing :math:`q(s,.)` is returned
            instead. The shape of the latter return value is ``[num_actions]``,
            which is only well-defined for discrete action spaces.

        """
        assert self.env.observation_space.contains(s)
        S = np.expand_dims(s, axis=0)
        if a is not None:
            assert self.env.action_space.contains(a)
            if self.action_space_is_discrete:
                a = self._one_hot_encode_discrete(a)
            A = np.expand_dims(a, axis=0)
            Q = self.batch_eval(S, A, use_target_model=use_target_model)
            check_numpy_array(Q, shape=(1,))
            Q = np.asscalar(Q)
        else:
            Q = self.batch_eval(S, use_target_model=use_target_model)
            check_numpy_array(Q, shape=(1, self.num_actions))
            Q = np.squeeze(Q, axis=0)
        return Q

    def update(self, s, a, r, done):
        """
        Update the Q-function.

        Parameters
        ----------
        s : state observation

            A single state observation.

        a : action

            A single action.

        r : float

            A single observed reward.

        done : bool

            Whether the episode has finished.

        """
        assert self.env.observation_space.contains(s)
        self._cache.add(s, a, r, done)

        # eager updates
        while self._cache:
            self.batch_update(*self._cache.pop())  # pop with batch_size=1

    def batch_update(self, S, A, Rn, In, S_next, A_next=None):
        """
        Update the value function on a batch of transitions.

        Parameters
        ----------
        S : nd array, shape: [batch_size, ...]

            A batch of state observations.

        A : nd Tensor, shape: [batch_size, ...]

            A batch of actions taken.

        Rn : 1d array, dtype: float, shape: [batch_size]

            A batch of partial returns. For example, in n-step bootstrapping
            this is given by:

            .. math::

                R^{(n)}_t\\ =\\ R_t + \\gamma\\,R_{t+1} + \\dots
                    \\gamma^{n-1}\\,R_{t+n-1}

            In other words, it's the non-bootstrapped part of the n-step
            return.

        In : 1d array, dtype: float, shape: [batch_size]

            A batch bootstrapping factor. For instance, in n-step bootstrapping
            this is given by :math:`I^{(n)}_t=\\gamma^n` if the episode is
            ongoing and :math:`I^{(n)}_t=0` otherwise. This allows us to write
            the bootstrapped target as:

            .. math::

                G^{(n)}_t=R^{(n)}_t+I^{(n)}_tQ(S_{t+n}, A_{t+n})


        S_next : nd array, shape: [batch_size, ...]

            A batch of next-state observations.

        A_next : 2d Tensor, shape: [batch_size, ...]

            A batch of (potential) next actions :term:`A_next`. This argument
            is only used if ``update_strategy='sarsa'``.

        Returns
        -------
        losses : dict

            A dict of losses/metrics, of type ``{name <str>: value <float>}``.

        """
        G = self.bootstrap_target(Rn, In, S_next, A_next)
        losses = self._train_on_batch([S, A, G])
        return losses

    def bootstrap_target(self, Rn, In, S_next, A_next=None):
        """
        Get the bootstrapped target
        :math:`G^{(n)}_t=R^{(n)}_t+\\gamma^nQ(S_{t+n}, A_{t+n})`.

        Parameters
        ----------
        Rn : 1d array, dtype: float, shape: [batch_size]

            A batch of partial returns. For example, in n-step bootstrapping
            this is given by:

            .. math::

                R^{(n)}_t\\ =\\ R_t + \\gamma\\,R_{t+1} + \\dots
                    \\gamma^{n-1}\\,R_{t+n-1}

            In other words, it's the non-bootstrapped part of the n-step
            return.

        In : 1d array, dtype: float, shape: [batch_size]

            A batch bootstrapping factor. For instance, in n-step bootstrapping
            this is given by :math:`I^{(n)}_t=\\gamma^n` if the episode is
            ongoing and :math:`I^{(n)}_t=0` otherwise. This allows us to write
            the bootstrapped target as:

            .. math::

                G^{(n)}_t=R^{(n)}_t+I^{(n)}_tQ(S_{t+n},A_{t+n})


        S_next : nd array, shape: [batch_size, ...]

            A batch of next-state observations.

        A_next : 2d Tensor, dtype: int, shape: [batch_size, num_actions]

            A batch of (potential) next actions :term:`A_next`. This argument
            is only used if ``update_strategy='sarsa'``.

        Returns
        -------
        Gn : 1d array, dtype: int, shape: [batch_size]

            A batch of bootstrap-estimated returns
            :math:`G^{(n)}_t=R^{(n)}_t+I^{(n)}_tQ(S_{t+n},A_{t+n})` computed
            according to given ``update_strategy``.

        """
        if self.update_strategy == 'sarsa':
            assert A_next is not None
            Q_next = self.batch_eval(
                S_next, use_target_model=self.bootstrap_with_target_model)
            Q_next = np.einsum('ij,ij->i', Q_next, A_next)

        elif self.update_strategy == 'q_learning':
            Q_next = self.batch_eval(
                S_next, use_target_model=self.bootstrap_with_target_model)
            Q_next = np.max(Q_next, axis=1)  # greedy

        elif self.update_strategy == 'double_q_learning':
            if not self.bootstrap_with_target_model:
                raise ValueError(
                    "incompatible settings: "
                    "update_strategy='double_q_learning' requires that "
                    "bootstrap_with_target_model=True")
            A_next = np.argmax(
                self.batch_eval(S_next, use_target_model=False), axis=1)
            Q_next = self.batch_eval(S_next, use_target_model=True)
            Q_next = project_onto_actions_np(Q_next, A_next)

        else:
            raise ValueError("unknown update_strategy")

        Gn = Rn + In * Q_next
        return Gn

    @abstractmethod
    def batch_eval(self, S, A=None, use_target_model=False):
        """
        Evaluate the Q-function on a batch of state (or state-action)
        observations.

        Parameters
        ----------
        S : nd array, shape: [batch_size, ...]

            A batch of state observations.

        A : 1d array, dtype: int, shape: [batch_size], optional

            A batch of actions that were taken.

        use_target_model : bool, optional

            Whether to use the :term:`target_model` internally. If False
            (default), the :term:`predict_model` is used.

        Returns
        -------
        Q : 1d or 2d array of floats

            If action ``A`` is provided, a 1d array representing a batch of
            :math:`q(s,a)` is returned. If, on the other hand, ``A`` is left
            unspecified, a vector representing a batch of :math:`q(s,.)` is
            returned instead. The shape of the latter return value is
            ``[batch_size, num_actions]``, which is only well-defined for
            discrete action
            spaces.

        """
        pass


[docs]class QTypeI(BaseQ): """ A :term:`type-I state-action value function` :math:`(s,a)\\mapsto q(s,a)`. Parameters ---------- function_approximator : FunctionApproximator object The main :term:`function approximator`. gamma : float, optional The discount factor for discounting future rewards. bootstrap_n : positive int, optional The number of steps in n-step bootstrapping. It specifies the number of steps over which we're willing to delay bootstrapping. Large :math:`n` corresponds to Monte Carlo updates and :math:`n=1` corresponds to TD(0). bootstrap_with_target_model : bool, optional Whether to use the :term:`target_model` when constructing a bootstrapped target. If False (default), the primary :term:`predict_model` is used. update_strategy : str, optional The update strategy that we use to select the (would-be) next-action :math:`A_{t+n}` in the bootstrapped target: .. math:: G^{(n)}_t\\ =\\ R^{(n)}_t + \\gamma^n Q(S_{t+n}, A_{t+n}) Options are: 'sarsa' Sample the next action, i.e. use the action that was actually taken. 'q_learning' Take the action with highest Q-value under the current estimate, i.e. :math:`A_{t+n} = \\arg\\max_aQ(S_{t+n}, a)`. This is an off-policy method. 'double_q_learning' Same as 'q_learning', :math:`A_{t+n} = \\arg\\max_aQ(S_{t+n}, a)`, except that the value itself is computed using the :term:`target_model` rather than the primary model, i.e. .. math:: A_{t+n}\\ &=\\ \\arg\\max_aQ_\\text{primary}(S_{t+n}, a)\\\\ G^{(n)}_t\\ &=\\ R^{(n)}_t + \\gamma^n Q_\\text{target}(S_{t+n}, A_{t+n}) 'expected_sarsa' Similar to SARSA in that it's on-policy, except that we take the expectated Q-value rather than a sample of it, i.e. .. math:: G^{(n)}_t\\ =\\ R^{(n)}_t + \\gamma^n\\sum_a\\pi(a|s)\\,Q(S_{t+n}, a) """
[docs] def batch_eval(self, S, A=None, use_target_model=False): model = self.target_model if use_target_model else self.predict_model if A is not None: Q = model.predict_on_batch([S, A]) check_numpy_array(Q, ndim=2, axis_size=1, axis=1) Q = np.squeeze(Q, axis=1) return Q # shape: [batch_size] else: Q = [] for a in range(self.num_actions): A = one_hot(a * np.ones(len(S), dtype='int'), self.num_actions) Q.append(self.batch_eval(S, A)) Q = np.stack(Q, axis=1) check_numpy_array(Q, ndim=2, axis_size=self.num_actions, axis=1) return Q # shape: [batch_size, num_actions]
def _init_models(self): # extract input shapes s_shape = self.env.observation_space.shape s_dtype = self.env.observation_space.dtype if self.action_space_is_discrete: a_shape = [self.num_actions] a_dtype = 'float' else: a_shape = self.env.action_space.shape a_dtype = self.env.action_space.dtype # input S = keras.Input(name='value_q1/S', shape=s_shape, dtype=s_dtype) A = keras.Input(name='value_q1/A', shape=a_shape, dtype=a_dtype) G = keras.Input(name='value_q1/G', shape=(1,), dtype='float') # forward pass X = self.function_approximator.body_q1(S, A) Q = self.function_approximator.head_q1(X) # loss loss = self.function_approximator.VALUE_LOSS_FUNCTION(G, Q) check_tensor(loss, ndim=0) # regular models self.train_model = keras.Model([S, A, G], loss) self.train_model.add_loss(loss) self.train_model.compile( optimizer=self.function_approximator.optimizer) # predict and target model self.predict_model = keras.Model([S, A], Q) self.target_model = self._create_target_model(self.predict_model)
[docs]class QTypeII(BaseQ): """ A :term:`type-II state-action value function` :math:`s\\mapsto q(s,.)`. Parameters ---------- function_approximator : FunctionApproximator object The main :term:`function approximator`. gamma : float, optional The discount factor for discounting future rewards. bootstrap_n : positive int, optional The number of steps in n-step bootstrapping. It specifies the number of steps over which we're willing to delay bootstrapping. Large :math:`n` corresponds to Monte Carlo updates and :math:`n=1` corresponds to TD(0). bootstrap_with_target_model : bool, optional Whether to use the :term:`target_model` when constructing a bootstrapped target. If False (default), the primary :term:`predict_model` is used. update_strategy : str, optional The update strategy that we use to select the (would-be) next-action :math:`A_{t+n}` in the bootstrapped target: .. math:: G^{(n)}_t\\ =\\ R^{(n)}_t + \\gamma^n Q(S_{t+n}, A_{t+n}) Options are: 'sarsa' Sample the next action, i.e. use the action that was actually taken. 'q_learning' Take the action with highest Q-value under the current estimate, i.e. :math:`A_{t+n} = \\arg\\max_aQ(S_{t+n}, a)`. This is an off-policy method. 'double_q_learning' Same as 'q_learning', :math:`A_{t+n} = \\arg\\max_aQ(S_{t+n}, a)`, except that the value itself is computed using the :term:`target_model` rather than the primary model, i.e. .. math:: A_{t+n}\\ &=\\ \\arg\\max_aQ_\\text{primary}(S_{t+n}, a)\\\\ G^{(n)}_t\\ &=\\ R^{(n)}_t + \\gamma^n Q_\\text{target}(S_{t+n}, A_{t+n}) 'expected_sarsa' Similar to SARSA in that it's on-policy, except that we take the expectated Q-value rather than a sample of it, i.e. .. math:: G^{(n)}_t\\ =\\ R^{(n)}_t + \\gamma^n\\sum_a\\pi(a|s)\\,Q(S_{t+n}, a) """
[docs] def batch_eval(self, S, A=None, use_target_model=False): model = self.target_model if use_target_model else self.predict_model if A is not None: Q = model.predict_on_batch(S) # shape: [batch_size, num_actions] check_numpy_array(Q, ndim=2, axis_size=self.num_actions, axis=1) check_numpy_array( A, ndim=1, dtype='int', axis_size=Q.shape[0], axis=0) Q = project_onto_actions_np(Q, A) return Q # shape: [batch_size] else: Q = model.predict_on_batch(S) check_numpy_array(Q, ndim=2, axis_size=self.num_actions, axis=1) return Q # shape: [batch_size, num_actions]
def _init_models(self): if not self.action_space_is_discrete: raise ActionSpaceError( "QTypeII is incompatible with non-discrete action spaces; " "please use QTypeI instead") s_shape = self.env.observation_space.shape s_dtype = self.env.observation_space.dtype a_shape = [self.num_actions] a_dtype = 'float' S = keras.Input(name='value_q2/S', shape=s_shape, dtype=s_dtype) A = keras.Input(name='value_q2/A', shape=a_shape, dtype=a_dtype) G = keras.Input(name='value_q2/G', shape=(1,), dtype='float') # forward pass X = self.function_approximator.body(S) Q = self.function_approximator.head_q2(X) # loss check_tensor(Q, ndim=2, axis_size=self.num_actions, axis=1) Q_proj = tf.einsum('ij,ij->i', A, Q) loss = self.function_approximator.VALUE_LOSS_FUNCTION(G, Q_proj) check_tensor(loss, ndim=0) # regular models self.train_model = keras.Model([S, A, G], loss) self.train_model.add_loss(loss) self.train_model.compile( optimizer=self.function_approximator.optimizer) # predict and target model self.predict_model = keras.Model(S, Q) self.target_model = self._create_target_model(self.predict_model)