Source code for keras_gym.wrappers.monitors

import os
import datetime
import time

import gym
import numpy as np
import tensorflow as tf

from ..base.mixins import ActionSpaceMixin, LoggerMixin

# use tensorflow v1 for writing Tensorboard summaries
tf1 = tf if tf.__version__ < '2.0' else tf.compat.v1


__all__ = (
    'TrainMonitor',
)


[docs]class TrainMonitor(gym.Wrapper, ActionSpaceMixin, LoggerMixin): """ Environment wrapper for monitoring the training process. This wrapper logs some diagnostics at the end of each episode and it also gives us some handy attributes (listed below). Parameters ---------- env : gym environment A gym environment. tensorboard_dir : str, optional If provided, TrainMonitor will log all diagnostics to be viewed in tensorboard. To view these, point tensorboard to the same dir: .. code:: $ tensorboard --logdir {tensorboard_dir} Attributes ---------- T : positive int Global step counter. This is not reset by ``env.reset()``, use ``env.reset_global()`` instead. ep : positive int Global episode counter. This is not reset by ``env.reset()``, use ``env.reset_global()`` instead. t : positive int Step counter within an episode. G : float The return, i.e. amount of reward accumulated from the start of the current episode. avg_G : float The average return G, averaged over the past 100 episodes. dt_ms : float The average wall time of a single step, in milliseconds. """ def __init__(self, env, tensorboard_dir=None): super().__init__(env) self.quiet = False self.reset_global() self.tensorboard = None if tensorboard_dir is not None: tensorboard_dir = os.path.join( tensorboard_dir, datetime.datetime.now().strftime('%Y%m%d_%H%M%S')) self.tensorboard = tf1.summary.FileWriter( tensorboard_dir, flush_secs=10)
[docs] def reset_global(self): """ Reset the global counters, not just the episodic ones. """ self.T = 0 self.ep = 0 self.t = 0 self.G = 0.0 self.avg_G = 0.0 self._n_avg_G = 0.0 self._ep_starttime = time.time() self._ep_losses = None self._ep_actions = [] self._losses = None
[docs] def reset(self): # increment global counters: self.T += 1 self.ep += 1 # reset episodic counters: self.t = 0 self.G = 0.0 self._ep_starttime = time.time() self._ep_losses = None self._ep_actions = [] return self.env.reset()
@property def dt_ms(self): if self.t <= 0: return np.nan return 1000 * (time.time() - self._ep_starttime) / self.t @property def avg_r(self): if self.t <= 0: return np.nan return self.G / self.t
[docs] def step(self, a): self._ep_actions.append(a) s_next, r, done, info = self.env.step(a) if info is None: info = {} info['monitor'] = {'T': self.T, 'ep': self.ep} self.t += 1 self.T += 1 self.G += r if done: if self._n_avg_G < 100: self._n_avg_G += 1. self.avg_G += (self.G - self.avg_G) / self._n_avg_G if not self.quiet: self.logger.info( "ep: {:d}, T: {:,d}, G: {:.3g}, avg_G: {:.3g}, t: {:d}, " "dt: {:.3f}ms{:s}" .format( self.ep, self.T, self.G, self.avg_G, self.t, self.dt_ms, self._losses_str())) if self.tensorboard is not None: diagnostics = { 'ep_return': self.G, 'ep_avg_reward': self.avg_r, 'ep_steps': self.t, 'avg_step_duration_ms': self.dt_ms} if self._ep_losses is not None: diagnostics.update(self._ep_losses) self._write_scalars_to_tensorboard(diagnostics) self._write_histogram_to_tensorboard( values=self._ep_actions, name='actions', is_discrete=self.action_space_is_discrete) self.tensorboard.flush() return s_next, r, done, info
def _write_scalars_to_tensorboard(self, diagnostics): for k, v in diagnostics.items(): summary = tf1.summary.Summary( value=[tf1.summary.Summary.Value( tag=f'TrainMonitor/{k}', simple_value=v)]) self.tensorboard.add_summary(summary, global_step=self.T) def _write_histogram_to_tensorboard(self, values, name, is_discrete, bins=50): # noqa: E501 """ This custom histogram logger was taken from: https://stackoverflow.com/a/48876774/2123555 """ if is_discrete: values = np.array(values, dtype='int') distinct_values, counts = np.unique(values, return_counts=True) bin_edges = distinct_values + 1 else: values = np.array(values, dtype='float') counts, bin_edges = np.histogram(values, bins=bins) bin_edges = bin_edges[1:] # Fill fields of histogram proto hist = tf1.HistogramProto() hist.min = float(np.min(values)) hist.max = float(np.max(values)) hist.num = int(np.prod(values.shape)) hist.sum = float(np.sum(values)) hist.sum_squares = float(np.sum(values**2)) # Add bin edges and counts for edge in bin_edges: hist.bucket_limit.append(edge) for c in counts: hist.bucket.append(c) # Create and write Summary summary = tf1.summary.Summary(value=[ tf1.summary.Summary.Value(tag=f'TrainMonitor/{name}', histo=hist)]) self.tensorboard.add_summary(summary, global_step=self.T)
[docs] def record_losses(self, losses): """ Record losses during the training process. These are used to print more diagnostics. Parameters ---------- losses : dict A dict of losses/metrics, of type ``{name <str>: value <float>}``. """ if self._losses is None or set(self._losses) != set(losses): self._losses = dict(losses) self._n_losses = 1.0 else: if self._n_losses < 100: self._n_losses += 1.0 self._losses = { k: v + (losses[k] - v) / self._n_losses for k, v in self._losses.items()} if self._ep_losses is None or set(self._ep_losses) != set(losses): self._ep_losses = dict(losses) self._n_ep_losses = 1.0 else: self._n_ep_losses += 1.0 self._ep_losses = { k: v + (losses[k] - v) / self._n_ep_losses for k, v in self._ep_losses.items()}
def _losses_str(self): if self._losses is not None: return ", " + ", ".join( '{:s}: {:.3g}'.format(k, v) for k, v in self._losses.items()) return ""