Source code for elegantrl.train.evaluator

import os
import time
import torch.nn
import numpy as np
from torch import Tensor
from typing import Tuple, List

from elegantrl.train.config import Config


[docs]class Evaluator: def __init__(self, cwd: str, env, args: Config, if_tensorboard: bool = False): self.cwd = cwd # current working directory to save model self.env = env # the env for Evaluator, `eval_env = env` in default self.agent_id = args.gpu_id self.total_step = 0 # the total training step self.start_time = time.time() # `used_time = time.time() - self.start_time` self.eval_times = args.eval_times # number of times that get episodic cumulative return self.eval_per_step = args.eval_per_step # evaluate the agent per training steps self.eval_step_counter = -self.eval_per_step # `self.total_step > self.eval_step_counter + self.eval_per_step` self.save_gap = args.save_gap self.save_counter = 0 self.if_keep_save = args.if_keep_save self.if_over_write = args.if_over_write self.recorder_path = f'{cwd}/recorder.npy' self.recorder = [] # total_step, r_avg, r_std, obj_c, ... self.max_r = -np.inf print("| Evaluator:" "\n| `step`: Number of samples, or total training steps, or running times of `env.step()`." "\n| `time`: Time spent from the start of training to this moment." "\n| `avgR`: Average value of cumulative rewards, which is the sum of rewards in an episode." "\n| `stdR`: Standard dev of cumulative rewards, which is the sum of rewards in an episode." "\n| `avgS`: Average of steps in an episode." "\n| `objC`: Objective of Critic network. Or call it loss function of critic network." "\n| `objA`: Objective of Actor network. It is the average Q value of the critic network." f"\n{'#' * 80}\n" f"{'ID':<3}{'Step':>8}{'Time':>8} |" f"{'avgR':>8}{'stdR':>7}{'avgS':>7}{'stdS':>6} |" f"{'expR':>8}{'objC':>7}{'objA':>7}{'etc.':>7}") if getattr(env, 'num_envs', 1) == 1: # get attribute self.get_cumulative_rewards_and_step = self.get_cumulative_rewards_and_step_single_env else: # vectorized environment self.get_cumulative_rewards_and_step = self.get_cumulative_rewards_and_step_vectorized_env if if_tensorboard: from torch.utils.tensorboard import SummaryWriter self.tensorboard = SummaryWriter(f"{cwd}/tensorboard") else: self.tensorboard = None def evaluate_and_save(self, actor: torch.nn, steps: int, exp_r: float, logging_tuple: tuple): self.total_step += steps # update total training steps if self.total_step < self.eval_step_counter + self.eval_per_step: return self.eval_step_counter = self.total_step rewards_step_ten = self.get_cumulative_rewards_and_step(actor) returns = rewards_step_ten[:, 0] # episodic cumulative returns of an steps = rewards_step_ten[:, 1] # episodic step number avg_r = returns.mean().item() std_r = returns.std().item() avg_s = steps.mean().item() std_s = steps.std().item() train_time = int(time.time() - self.start_time) '''record the training information''' self.recorder.append((self.total_step, avg_r, std_r, exp_r, *logging_tuple)) # update recorder if self.tensorboard: self.tensorboard.add_scalar("info/critic_loss_sample", logging_tuple[0], self.total_step) self.tensorboard.add_scalar("info/actor_obj_sample", -1 * logging_tuple[1], self.total_step) self.tensorboard.add_scalar("reward/avg_reward_sample", avg_r, self.total_step) self.tensorboard.add_scalar("reward/std_reward_sample", std_r, self.total_step) self.tensorboard.add_scalar("reward/exp_reward_sample", exp_r, self.total_step) self.tensorboard.add_scalar("info/critic_loss_time", logging_tuple[0], train_time) self.tensorboard.add_scalar("info/actor_obj_time", -1 * logging_tuple[1], train_time) self.tensorboard.add_scalar("reward/avg_reward_time", avg_r, train_time) self.tensorboard.add_scalar("reward/std_reward_time", std_r, train_time) self.tensorboard.add_scalar("reward/exp_reward_time", exp_r, train_time) '''print some information to Terminal''' prev_max_r = self.max_r self.max_r = max(self.max_r, avg_r) # update max average cumulative rewards print(f"{self.agent_id:<3}{self.total_step:8.2e}{train_time:8.0f} |" f"{avg_r:8.2f}{std_r:7.1f}{avg_s:7.0f}{std_s:6.0f} |" f"{exp_r:8.2f}{''.join(f'{n:7.2f}' for n in logging_tuple)}") if_save = avg_r > prev_max_r if if_save: self.save_training_curve_jpg() if not self.if_keep_save: return self.save_counter += 1 actor_path = None if if_save: # save checkpoint with the highest episode return if self.if_over_write: actor_path = f"{self.cwd}/actor.pt" else: actor_path = f"{self.cwd}/actor__{self.total_step:012}_{self.max_r:09.3f}.pt" elif self.save_counter == self.save_gap: self.save_counter = 0 if self.if_over_write: actor_path = f"{self.cwd}/actor.pt" else: actor_path = f"{self.cwd}/actor__{self.total_step:012}.pt" if actor_path: torch.save(actor, actor_path) # save policy network in *.pt def save_or_load_recoder(self, if_save: bool): if if_save: np.save(self.recorder_path, self.recorder) elif os.path.exists(self.recorder_path): recorder = np.load(self.recorder_path) self.recorder = [tuple(i) for i in recorder] # convert numpy to list self.total_step = self.recorder[-1][0] def get_cumulative_rewards_and_step_single_env(self, actor) -> Tensor: rewards_steps_list = [get_cumulative_rewards_and_steps(self.env, actor) for _ in range(self.eval_times)] rewards_steps_ten = torch.tensor(rewards_steps_list, dtype=torch.float32) return rewards_steps_ten # rewards_steps_ten.shape[1] == 2 def get_cumulative_rewards_and_step_vectorized_env(self, actor) -> Tensor: rewards_step_list = [get_cumulative_rewards_and_step_from_vec_env(self.env, actor) for _ in range(max(1, self.eval_times // self.env.num_envs))] rewards_step_list = sum(rewards_step_list, []) rewards_step_ten = torch.tensor(rewards_step_list) return rewards_step_ten # rewards_steps_ten.shape[1] == 2 def save_training_curve_jpg(self): recorder = np.array(self.recorder) train_time = int(time.time() - self.start_time) total_step = int(self.recorder[-1][0]) fig_title = f"step_time_maxR_{int(total_step)}_{int(train_time)}_{self.max_r:.3f}" draw_learning_curve(recorder=recorder, fig_title=fig_title, save_path=f"{self.cwd}/LearningCurve.jpg") np.save(self.recorder_path, recorder) # save self.recorder for `draw_learning_curve()`
"""util""" def get_cumulative_rewards_and_steps(env, actor, if_render: bool = False) -> Tuple[float, int]: """Usage eval_times = 4 net_dim = 2 ** 7 actor_path = './LunarLanderContinuous-v2_PPO_1/actor.pt' env = build_env(env_class=env_class, env_args=env_args) act = agent(net_dim, env.state_dim, env.action_dim, gpu_id=gpu_id).act act.load_state_dict(torch.load(actor_path, map_location=lambda storage, loc: storage)) r_s_ary = [get_episode_return_and_step(env, act) for _ in range(eval_times)] r_s_ary = np.array(r_s_ary, dtype=np.float32) r_avg, s_avg = r_s_ary.mean(axis=0) # average of episode return and episode step """ max_step = env.max_step if_discrete = env.if_discrete device = next(actor.parameters()).device # net.parameters() is a Python generator. state = env.reset() steps = None returns = 0.0 # sum of rewards in an episode for steps in range(max_step): tensor_state = torch.as_tensor(state, dtype=torch.float32, device=device).unsqueeze(0) tensor_action = actor(tensor_state) if if_discrete: tensor_action = tensor_action.argmax(dim=1) action = tensor_action.detach().cpu().numpy()[0] # not need detach(), because using torch.no_grad() outside state, reward, done, _ = env.step(action) returns += reward if if_render: env.render() time.sleep(0.02) if done: break else: print("| get_rewards_and_step: WARNING. max_step > 12345") returns = getattr(env, 'cumulative_returns', returns) steps += 1 return returns, steps def get_cumulative_rewards_and_step_from_vec_env(env, actor) -> List[Tuple[float, int]]: device = env.device env_num = env.num_envs max_step = env.max_step if_discrete = env.if_discrete '''get returns and dones (GPU)''' returns = torch.empty((max_step, env_num), dtype=torch.float32, device=device) dones = torch.empty((max_step, env_num), dtype=torch.bool, device=device) state = env.reset() # must reset in vectorized env for t in range(max_step): action = actor(state.to(device)) # assert action.shape == (env.env_num, env.action_dim) if if_discrete: action = action.argmax(dim=1, keepdim=True) state, reward, done, info_dict = env.step(action) returns[t] = reward dones[t] = done '''get cumulative returns and step''' if hasattr(env, 'cumulative_returns'): # GPU returns_step_list = [(ret, env.max_step) for ret in env.cumulative_returns] else: # CPU returns = returns.cpu() dones = dones.cpu() returns_step_list = [] for i in range(env_num): dones_where = torch.where(dones[:, i] == 1)[0] + 1 episode_num = len(dones_where) if episode_num == 0: continue j0 = 0 for j1 in dones_where.tolist(): reward_sum = returns[j0:j1, i].sum().item() # cumulative returns of an episode steps_num = j1 - j0 # step number of an episode returns_step_list.append((reward_sum, steps_num)) j0 = j1 return returns_step_list def draw_learning_curve(recorder: np.ndarray = None, fig_title: str = 'learning_curve', save_path: str = 'learning_curve.jpg'): steps = recorder[:, 0] # x-axis is training steps r_avg = recorder[:, 1] r_std = recorder[:, 2] r_exp = recorder[:, 3] obj_c = recorder[:, 4] obj_a = recorder[:, 5] '''plot subplots''' import matplotlib as mpl mpl.use('Agg') """Generating matplotlib graphs without a running X server [duplicate] write `mpl.use('Agg')` before `import matplotlib.pyplot as plt` https://stackoverflow.com/a/4935945/9293137 """ import matplotlib.pyplot as plt fig, axs = plt.subplots(2) '''axs[0]''' ax00 = axs[0] ax00.cla() ax01 = axs[0].twinx() color01 = 'darkcyan' ax01.set_ylabel('Explore AvgReward', color=color01) ax01.plot(steps, r_exp, color=color01, alpha=0.5, ) ax01.tick_params(axis='y', labelcolor=color01) color0 = 'lightcoral' ax00.set_ylabel('Episode Return', color=color0) ax00.plot(steps, r_avg, label='Episode Return', color=color0) ax00.fill_between(steps, r_avg - r_std, r_avg + r_std, facecolor=color0, alpha=0.3) ax00.grid() '''axs[1]''' ax10 = axs[1] ax10.cla() ax11 = axs[1].twinx() color11 = 'darkcyan' ax11.set_ylabel('objC', color=color11) ax11.fill_between(steps, obj_c, facecolor=color11, alpha=0.2, ) ax11.tick_params(axis='y', labelcolor=color11) color10 = 'royalblue' ax10.set_xlabel('Total Steps') ax10.set_ylabel('objA', color=color10) ax10.plot(steps, obj_a, label='objA', color=color10) ax10.tick_params(axis='y', labelcolor=color10) for plot_i in range(6, recorder.shape[1]): other = recorder[:, plot_i] ax10.plot(steps, other, label=f'{plot_i}', color='grey', alpha=0.5) ax10.legend() ax10.grid() '''plot save''' plt.title(fig_title, y=2.3) plt.savefig(save_path) plt.close('all') # avoiding warning about too many open figures, rcParam `figure.max_open_warning` # plt.show() # if use `mpl.use('Agg')` to draw figures without GUI, then plt can't plt.show() """learning curve""" def demo_evaluator_actor_pth(): import gym from elegantrl.agents.AgentPPO import AgentPPO from elegantrl.train.config import Config, build_env gpu_id = 0 # >=0 means GPU ID, -1 means CPU agent_class = AgentPPO env_class = gym.make env_args = {'env_num': 1, 'env_name': 'LunarLanderContinuous-v2', 'max_step': 1000, 'state_dim': 8, 'action_dim': 2, 'if_discrete': False, 'target_return': 200, 'id': 'LunarLanderContinuous-v2'} # actor_path = './LunarLanderContinuous-v2_PPO_1/actor.pt' eval_times = 4 net_dim = 2 ** 7 '''init''' args = Config(agent_class=agent_class, env_class=env_class, env_args=env_args) env = build_env(env_class=args.env_class, env_args=args.env_args) act = agent_class(net_dim, env.state_dim, env.action_dim, gpu_id=gpu_id, args=args).act # act.load_state_dict(torch.load(actor_path, map_location=lambda storage, loc: storage)) '''evaluate''' r_s_ary = [get_cumulative_rewards_and_steps(env, act) for _ in range(eval_times)] r_s_ary = np.array(r_s_ary, dtype=np.float32) r_avg, s_avg = r_s_ary.mean(axis=0) # average of episode return and episode step print('r_avg, s_avg', r_avg, s_avg) return r_avg, s_avg def demo_evaluate_actors(dir_path: str, gpu_id: int, agent, env_args: dict, eval_times=2, net_dim=128): import gym from elegantrl.train.config import build_env # dir_path = './LunarLanderContinuous-v2_PPO_1' # gpu_id = 0 # agent_class = AgentPPO # net_dim = 2 ** 7 env_class = gym.make # env_args = {'env_num': 1, # 'env_name': 'LunarLanderContinuous-v2', # 'max_step': 1000, # 'state_dim': 8, # 'action_dim': 2, # 'if_discrete': False, # 'target_return': 200, # 'eval_times': 2 ** 4, # # 'id': 'LunarLanderContinuous-v2'} # eval_times = 2 ** 1 '''init''' env = build_env(env_class=env_class, env_args=env_args) act = agent(net_dim, env.state_dim, env.action_dim, gpu_id=gpu_id).act '''evaluate''' step_epi_r_s_ary = [] act_names = [name for name in os.listdir(dir_path) if len(name) == 19] for act_name in act_names: act_path = f"{dir_path}/{act_name}" act.load_state_dict(torch.load(act_path, map_location=lambda storage, loc: storage)) r_s_ary = [get_cumulative_rewards_and_steps(env, act) for _ in range(eval_times)] r_s_ary = np.array(r_s_ary, dtype=np.float32) r_avg, s_avg = r_s_ary.mean(axis=0) # average of episode return and episode step step = int(act_name[6:15]) step_epi_r_s_ary.append((step, r_avg, s_avg)) step_epi_r_s_ary = np.array(step_epi_r_s_ary, dtype=np.float32) '''sort by step''' step_epi_r_s_ary = step_epi_r_s_ary[step_epi_r_s_ary[:, 0].argsort()] return step_epi_r_s_ary def demo_load_pendulum_and_render(): import torch from elegantrl.agents.AgentPPO import AgentPPO from elegantrl.train.config import Config, build_env gpu_id = 0 # >=0 means GPU ID, -1 means CPU agent_class = AgentPPO from elegantrl.envs.CustomGymEnv import PendulumEnv env_class = PendulumEnv env_args = {'env_num': 1, 'env_name': 'Pendulum-v1', 'state_dim': 3, 'action_dim': 1, 'if_discrete': False, } actor_path = './Pendulum-v1_PPO_0/actor.pt' net_dim = 2 ** 7 '''init''' env = build_env(env_class=env_class, env_args=env_args) args = Config(agent_class=agent_class, env_class=env_class, env_args=env_args) act = agent_class(net_dim, env.state_dim, env.action_dim, gpu_id=gpu_id, args=args).act act.load_state_dict(torch.load(actor_path, map_location=lambda storage, loc: storage)) '''evaluate''' # eval_times = 2 ** 7 # from elegantrl.envs.CustomGymEnv import PendulumEnv # eval_env = PendulumEnv() # from elegantrl.train.evaluator import get_cumulative_returns_and_step # r_s_ary = [get_cumulative_returns_and_step(eval_env, act) for _ in range(eval_times)] # r_s_ary = np.array(r_s_ary, dtype=np.float32) # r_avg, s_avg = r_s_ary.mean(axis=0) # average of episode return and episode step # # print('r_avg, s_avg', r_avg, s_avg) '''render''' max_step = env.max_step if_discrete = env.if_discrete device = next(act.parameters()).device # net.parameters() is a Python generator. state = env.reset() steps = None returns = 0.0 # sum of rewards in an episode for steps in range(max_step): s_tensor = torch.as_tensor(state, dtype=torch.float32, device=device).unsqueeze(0) a_tensor = act(s_tensor).argmax(dim=1) if if_discrete else act(s_tensor) action = a_tensor.detach().cpu().numpy()[0] # not need detach(), because using torch.no_grad() outside state, reward, done, _ = env.step(action * 2) # for Pendulum specially returns += reward env.render() if done: break returns = getattr(env, 'cumulative_returns', returns) steps += 1 print(f"\n| cumulative_returns {returns}" f"\n| episode steps {steps}") def run(): from elegantrl.agents.AgentPPO import AgentPPO flag_id = 1 # int(sys.argv[1]) gpu_id = [2, 3][flag_id] agent = AgentPPO env_args = [ {'env_num': 1, 'env_name': 'LunarLanderContinuous-v2', 'max_step': 1000, 'state_dim': 8, 'action_dim': 2, 'if_discrete': False, 'target_return': 200, 'eval_times': 2 ** 4, 'id': 'LunarLanderContinuous-v2'}, {'env_num': 1, 'env_name': 'BipedalWalker-v3', 'max_step': 1600, 'state_dim': 24, 'action_dim': 4, 'if_discrete': False, 'target_return': 300, 'eval_times': 2 ** 3, 'id': 'BipedalWalker-v3', }, ][flag_id] env_name = env_args['env_name'] print('gpu_id', gpu_id) print('env_name', env_name) '''save step_epi_r_s_ary''' # cwd_path = '.' # dir_names = [name for name in os.listdir(cwd_path) # if name.find(env_name) >= 0 and os.path.isdir(name)] # for dir_name in dir_names: # dir_path = f"{cwd_path}/{dir_name}" # step_epi_r_s_ary = demo_evaluate_actors(dir_path, gpu_id, agent, env_args) # np.savetxt(f"{dir_path}-step_epi_r_s_ary.txt", step_epi_r_s_ary) '''load step_epi_r_s_ary''' step_epi_r_s_ary = [] cwd_path = '.' ary_names = [name for name in os.listdir('.') if name.find(env_name) >= 0 and name[-4:] == '.txt'] for ary_name in ary_names: ary_path = f"{cwd_path}/{ary_name}" ary = np.loadtxt(ary_path) step_epi_r_s_ary.append(ary) step_epi_r_s_ary = np.vstack(step_epi_r_s_ary) step_epi_r_s_ary = step_epi_r_s_ary[step_epi_r_s_ary[:, 0].argsort()] print('step_epi_r_s_ary.shape', step_epi_r_s_ary.shape) '''plot''' import matplotlib.pyplot as plt # plt.plot(step_epi_r_s_ary[:, 0], step_epi_r_s_ary[:, 1]) plot_x_y_up_dw_step = [] n = 8 for i in range(0, len(step_epi_r_s_ary), n): y_ary = step_epi_r_s_ary[i:i + n, 1] if y_ary.shape[0] <= 1: continue y_avg = y_ary.mean() y_up = y_ary[y_ary > y_avg].mean() y_dw = y_ary[y_ary <= y_avg].mean() y_step = step_epi_r_s_ary[i:i + n, 2].mean() x_avg = step_epi_r_s_ary[i:i + n, 0].mean() plot_x_y_up_dw_step.append((x_avg, y_avg, y_up, y_dw, y_step)) if_show_episode_step = True color0 = 'royalblue' color1 = 'lightcoral' # color2 = 'darkcyan' # colors = ['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd', # '#8c564b', '#e377c2', '#7f7f7f', '#bcbd22', '#17becf'] title = f"{env_name}_{agent.__name__}_ElegantRL" fig, ax = plt.subplots(1) plot_x = [item[0] for item in plot_x_y_up_dw_step] plot_y = [item[1] for item in plot_x_y_up_dw_step] plot_y_up = [item[2] for item in plot_x_y_up_dw_step] plot_y_dw = [item[3] for item in plot_x_y_up_dw_step] ax.plot(plot_x, plot_y, label='Episode Return', color=color0) ax.fill_between(plot_x, plot_y_up, plot_y_dw, facecolor=color0, alpha=0.3) ax.set_ylabel('Episode Return', color=color0) ax.tick_params(axis='y', labelcolor=color0) ax.grid(True) if if_show_episode_step: ax_twin = ax.twinx() plot_y_step = [item[4] for item in plot_x_y_up_dw_step] ax_twin.fill_between(plot_x, 0, plot_y_step, facecolor=color1, alpha=0.3) ax_twin.set_ylabel('Episode Step', color=color1) ax_twin.tick_params(axis='y', labelcolor=color1) ax_twin.set_ylim(0, np.max(plot_y_step) * 2) print('title', title) plt.title(title) plt.show() if __name__ == '__main__': # demo_evaluate_actors() run()