Source code for keras_gym.core.policy_normal

from tensorflow import keras

from ..utils import check_tensor
from ..proba_dists import NormalDist
from .base import BaseUpdateablePolicy

[docs]class GaussianPolicy(BaseUpdateablePolicy): """ An :term:`updateable policy` for environments with a continuous action space, i.e. a :class:`Box <gym.spaces.Box>`. It models the policy :math:`\\pi_\\theta(a|s)` as a normal distribution with conditional parameters :math:`(\\mu_\\theta(s), \\sigma_\\theta(s))`. .. important:: This environment requires that the ``env`` is with: .. code:: env = km.wrappers.BoxToReals(env) This wrapper decompactifies the Box action space. Parameters ---------- function_approximator : FunctionApproximator object The main :term:`function approximator`. update_strategy : str, optional The strategy for updating our policy. This typically determines the loss function that we use for our policy function approximator. Options are: 'vanilla' Plain vanilla policy gradient. The corresponding (surrogate) loss function that we use is: .. math:: J(\\theta)\\ =\\ \\hat{\\mathbb{E}}_t \\left\\{ -\\mathcal{A}_t\\,\\log\\pi_\\theta(A_t|S_t) \\right\\} where :math:`\\mathcal{A}_t=\\mathcal{A}(S_t,A_t)` is the advantage at time step :math:`t`. 'ppo' `Proximal policy optimization <>`_ uses a clipped proximal loss: .. math:: J(\\theta)\\ =\\ \\hat{\\mathbb{E}}_t \\left\\{ \\min\\Big( \\rho_t(\\theta)\\,\\mathcal{A}_t\\,,\\ \\tilde{\\rho}_t(\\theta)\\,\\mathcal{A}_t \\Big) \\right\\} where :math:`\\rho_t(\\theta)` is the probability ratio: .. math:: \\rho_t(\\theta)\\ =\\ \\frac {\\pi_\\theta(A_t|S_t)} {\\pi_{\\theta_\\text{old}}(A_t|S_t)} and :math:`\\tilde{\\rho}_t(\\theta)` is its clipped version: .. math:: \\tilde{\\rho}_t(\\theta)\\ =\\ \\text{clip}\\big( \\rho_t(\\theta), 1-\\epsilon, 1+\\epsilon\\big) 'cross_entropy' Straightforward categorical cross-entropy (from logits). This loss function does *not* make use of the advantages :term:`Adv`. Instead, it minimizes the cross entropy between the behavior policy :math:`\\pi_b(a|s)` and the learned policy :math:`\\pi_\\theta(a|s)`: .. math:: J(\\theta)\\ =\\ \\hat{\\mathbb{E}}_t\\left\\{ -\\sum_a \\pi_b(a|S_t)\\, \\log \\pi_\\theta(a|S_t) \\right\\} ppo_clip_eps : float, optional The clipping parameter :math:`\\epsilon` in the PPO clipped surrogate loss. This option is only applicable if ``update_strategy='ppo'``. entropy_beta : float, optional The coefficient of the entropy bonus term in the policy objective. """ def _init_models(self): S = keras.Input( shape=self.env.observation_space.shape, dtype=self.env.observation_space.dtype, name='policy/S') A = keras.Input( shape=self.env.action_space.shape, dtype=self.env.action_space.dtype, name='policy/A') Adv = keras.Input(shape=(), dtype='float', name='policy/Adv') # forward pass X = self.function_approximator.body(S) mu, logvar = self.function_approximator.head_pi(X) check_tensor(mu, ndim=2, axis_size=self.actions_ndim, axis=1) check_tensor(logvar, same_as=mu) # special layers A_sample = keras.layers.Lambda( lambda args: NormalDist(*args).sample())([mu, logvar]) A_greedy = mu # output models self.predict_model = keras.Model(S, A_sample) self.target_model = self._create_target_model(self.predict_model) self.predict_greedy_model = keras.Model(S, A_greedy) self.target_greedy_model = self._create_target_model( self.predict_greedy_model) self.predict_param_model = keras.Model(S, [mu, logvar]) self.target_param_model = self._create_target_model( self.predict_param_model) # loss and target tensor (depends on self.update_strategy) self.dist = NormalDist(mu=mu, logvar=logvar) self.target_dist = NormalDist(*self.target_param_model(S)) loss, metrics = self.policy_loss_with_metrics(Adv, A) check_tensor(loss, ndim=0) # models self.train_model = keras.Model([S, A, Adv], loss) self.train_model.add_loss(loss) for name, metric in metrics.items(): self.train_model.add_metric(metric, name=name, aggregation='mean') self.train_model.compile( optimizer=self.function_approximator.optimizer)