Source code for keras_gym.core.policy_categorical

from tensorflow import keras
from tensorflow.keras import backend as K

from ..utils import check_tensor
from ..proba_dists import CategoricalDist
from .base import BaseUpdateablePolicy


[docs]class SoftmaxPolicy(BaseUpdateablePolicy): """ :term:`Updateable policy <updateable policy>` for discrete action spaces. Parameters ---------- function_approximator : FunctionApproximator object The main :term:`function approximator`. update_strategy : str, callable, optional The strategy for updating our policy. This determines the loss function that we use for our policy function approximator. If you wish to use a custom policy loss, you can override the :func:`policy_loss_with_metrics` method. Provided options are: 'vanilla' Plain vanilla policy gradient. The corresponding (surrogate) loss function that we use is: .. math:: J(\\theta)\\ =\\ -\\mathcal{A}(s,a)\\,\\ln\\pi(a|s,\\theta) 'ppo' `Proximal policy optimization <https://arxiv.org/abs/1707.06347>`_ uses a clipped proximal loss: .. math:: J(\\theta)\\ =\\ \\min\\Big( r(\\theta)\\,\\mathcal{A}(s,a)\\,,\\ \\text{clip}\\big( r(\\theta), 1-\\epsilon, 1+\\epsilon\\big) \\,\\mathcal{A}(s,a)\\Big) where :math:`r(\\theta)` is the probability ratio: .. math:: r(\\theta)\\ =\\ \\frac {\\pi(a|s,\\theta)} {\\pi(a|s,\\theta_\\text{old})} 'cross_entropy' Straightforward categorical cross-entropy (from logits). This loss function does *not* make use of the advantages :term:`Adv`. Instead, it minimizes the cross entropy between the behavior policy :math:`\\pi_b(a|s)` and the learned policy :math:`\\pi_\\theta(a|s)`: .. math:: J(\\theta)\\ =\\ \\hat{\\mathbb{E}}_t\\left\\{ -\\sum_a \\pi_b(a|S_t)\\, \\log \\pi_\\theta(a|S_t) \\right\\} ppo_clip_eps : float, optional The clipping parameter :math:`\\epsilon` in the PPO clipped surrogate loss. This option is only applicable if ``update_strategy='ppo'``. entropy_beta : float, optional The coefficient of the entropy bonus term in the policy objective. random_seed : int, optional Sets the random state to get reproducible results. """
[docs] def __call__(self, s, use_target_model=False): # Because we want our categorical samples to be differentiable, # self.predict_model cannot return fully deterministic samples. # We therefore perform the final sampling in the numpy layer. p = super().__call__(s, use_target_model) # p is almost deterministic a = self.random.choice(self.num_actions, p=p) return a
def _init_models(self): S = keras.Input( shape=self.env.observation_space.shape, dtype=self.env.observation_space.dtype, name='policy/S') A = keras.Input( shape=[self.num_actions], dtype='float', name='policy/A') Adv = keras.Input(shape=(), dtype='float', name='policy/Adv') # forward pass X = self.function_approximator.body(S) logits = self.function_approximator.head_pi(X) check_tensor(logits, ndim=2, axis_size=self.num_actions, axis=1) # apply available-action mask (optional) if hasattr(self, 'available_actions_mask'): check_tensor(self.available_actions_mask, ndim=2, dtype='bool') # set logits to large negative values for unavailable actions logits = keras.layers.Lambda( lambda x: K.switch( self._available_actions, x, -1e3 * K.ones_like(x)), name=('policy/logits/masked'))(logits) # special layers A_sample = keras.layers.Lambda( lambda x: CategoricalDist(logits=x).sample())(logits) A_greedy = keras.layers.Lambda(K.argmax)(logits) # output models self.predict_model = keras.Model(S, A_sample) self.target_model = self._create_target_model(self.predict_model) self.predict_greedy_model = keras.Model(S, A_greedy) self.target_greedy_model = self._create_target_model( self.predict_greedy_model) self.predict_param_model = keras.Model(S, logits) self.target_param_model = self._create_target_model( self.predict_param_model) # loss and target tensor self.dist = CategoricalDist(logits=logits) self.target_dist = CategoricalDist(logits=self.target_param_model(S)) loss, metrics = self.policy_loss_with_metrics(Adv, A) check_tensor(loss, ndim=0) # models self.train_model = keras.Model([S, A, Adv], loss) self.train_model.add_loss(loss) for name, metric in metrics.items(): self.train_model.add_metric(metric, name=name, aggregation='mean') self.train_model.compile( optimizer=self.function_approximator.optimizer)