Source code for elegantrl.agents.AgentPPO

import torch
from typing import Tuple
from torch import Tensor

from elegantrl.train.config import Config
from elegantrl.agents.AgentBase import AgentBase
from elegantrl.agents.net import ActorPPO, CriticPPO
from elegantrl.agents.net import ActorDiscretePPO


[docs]class AgentPPO(AgentBase): """ PPO algorithm. “Proximal Policy Optimization Algorithms”. John Schulman. et al.. 2017. net_dims: the middle layer dimension of MLP (MultiLayer Perceptron) state_dim: the dimension of state (the number of state vector) action_dim: the dimension of action (or the number of discrete action) gpu_id: the gpu_id of the training device. Use CPU when cuda is not available. args: the arguments for agent training. `args = Config()` """ def __init__(self, net_dims: [int], state_dim: int, action_dim: int, gpu_id: int = 0, args: Config = Config()): self.act_class = getattr(self, "act_class", ActorPPO) self.cri_class = getattr(self, "cri_class", CriticPPO) super().__init__(net_dims=net_dims, state_dim=state_dim, action_dim=action_dim, gpu_id=gpu_id, args=args) self.if_off_policy = False self.ratio_clip = getattr(args, "ratio_clip", 0.25) # `ratio.clamp(1 - clip, 1 + clip)` self.lambda_gae_adv = getattr(args, "lambda_gae_adv", 0.95) # could be 0.50~0.99 # GAE for sparse reward self.lambda_entropy = getattr(args, "lambda_entropy", 0.01) # could be 0.00~0.20 self.lambda_entropy = torch.tensor(self.lambda_entropy, dtype=torch.float32, device=self.device) if getattr(args, 'if_use_v_trace', False): self.get_advantages = self.get_advantages_vtrace # get advantage value in reverse time series (V-trace) else: self.get_advantages = self.get_advantages_origin # get advantage value using critic network self.value_avg = torch.zeros(1, dtype=torch.float32, device=self.device) self.value_std = torch.ones(1, dtype=torch.float32, device=self.device)
[docs] def explore_one_env(self, env, horizon_len: int, if_random: bool = False) -> Tuple[Tensor, ...]: """ Collect trajectories through the actor-environment interaction for a **single** environment instance. env: RL training environment. env.reset() env.step(). It should be a vector env. horizon_len: collect horizon_len step while exploring to update networks return: `(states, actions, rewards, undones)` for off-policy env_num == 1 states.shape == (horizon_len, env_num, state_dim) actions.shape == (horizon_len, env_num, action_dim) logprobs.shape == (horizon_len, env_num, action_dim) rewards.shape == (horizon_len, env_num) undones.shape == (horizon_len, env_num) """ states = torch.zeros((horizon_len, self.num_envs, self.state_dim), dtype=torch.float32).to(self.device) actions = torch.zeros((horizon_len, self.num_envs, self.action_dim), dtype=torch.float32).to(self.device) logprobs = torch.zeros((horizon_len, self.num_envs), dtype=torch.float32).to(self.device) rewards = torch.zeros((horizon_len, self.num_envs), dtype=torch.float32).to(self.device) dones = torch.zeros((horizon_len, self.num_envs), dtype=torch.bool).to(self.device) state = self.last_state # shape == (1, state_dim) for a single env. get_action = self.act.get_action convert = self.act.convert_action_for_env for t in range(horizon_len): action, logprob = get_action(state) states[t] = state ary_action = convert(action[0]).detach().cpu().numpy() ary_state, reward, done, _ = env.step(ary_action) # next_state ary_state = env.reset() if done else ary_state # ary_state.shape == (state_dim, ) state = torch.as_tensor(ary_state, dtype=torch.float32, device=self.device).unsqueeze(0) actions[t] = action logprobs[t] = logprob rewards[t] = reward dones[t] = done self.last_state = state # state.shape == (1, state_dim) for a single env. rewards *= self.reward_scale undones = 1.0 - dones.type(torch.float32) return states, actions, logprobs, rewards, undones
[docs] def explore_vec_env(self, env, horizon_len: int, if_random: bool = False) -> Tuple[Tensor, ...]: """ Collect trajectories through the actor-environment interaction for a **vectorized** environment instance. env: RL training environment. env.reset() env.step(). It should be a vector env. horizon_len: collect horizon_len step while exploring to update networks return: `(states, actions, rewards, undones)` for off-policy states.shape == (horizon_len, env_num, state_dim) actions.shape == (horizon_len, env_num, action_dim) logprobs.shape == (horizon_len, env_num, action_dim) rewards.shape == (horizon_len, env_num) undones.shape == (horizon_len, env_num) """ states = torch.zeros((horizon_len, self.num_envs, self.state_dim), dtype=torch.float32).to(self.device) actions = torch.zeros((horizon_len, self.num_envs, self.action_dim), dtype=torch.float32).to(self.device) logprobs = torch.zeros((horizon_len, self.num_envs), dtype=torch.float32).to(self.device) rewards = torch.zeros((horizon_len, self.num_envs), dtype=torch.float32).to(self.device) dones = torch.zeros((horizon_len, self.num_envs), dtype=torch.bool).to(self.device) state = self.last_state # shape == (env_num, state_dim) for a vectorized env. get_action = self.act.get_action convert = self.act.convert_action_for_env for t in range(horizon_len): action, logprob = get_action(state) states[t] = state state, reward, done, _ = env.step(convert(action)) # next_state actions[t] = action logprobs[t] = logprob rewards[t] = reward dones[t] = done self.last_state = state rewards *= self.reward_scale undones = 1.0 - dones.type(torch.float32) return states, actions, logprobs, rewards, undones
def update_net(self, buffer) -> Tuple[float, ...]: with torch.no_grad(): states, actions, logprobs, rewards, undones = buffer buffer_size = states.shape[0] buffer_num = states.shape[1] '''get advantages and reward_sums''' bs = 2 ** 10 # set a smaller 'batch_size' to avoiding out of GPU memory. values = torch.empty_like(rewards) # values.shape == (buffer_size, buffer_num) for i in range(0, buffer_size, bs): for j in range(buffer_num): values[i:i + bs, j] = self.cri(states[i:i + bs, j]) advantages = self.get_advantages(rewards, undones, values) # shape == (buffer_size, buffer_num) reward_sums = advantages + values # shape == (buffer_size, buffer_num) del rewards, undones, values advantages = (advantages - advantages.mean()) / (advantages.std(dim=0) + 1e-4) self.update_avg_std_for_normalization( states=states.reshape((-1, self.state_dim)), returns=reward_sums.reshape((-1,)) ) # assert logprobs.shape == advantages.shape == reward_sums.shape == (buffer_size, buffer_num) '''update network''' obj_critics = 0.0 obj_actors = 0.0 sample_len = buffer_size - 1 update_times = int(buffer_size * self.repeat_times / self.batch_size) assert update_times >= 1 for _ in range(update_times): ids = torch.randint(sample_len * buffer_num, size=(self.batch_size,), requires_grad=False) ids0 = torch.fmod(ids, sample_len) # ids % sample_len ids1 = torch.div(ids, sample_len, rounding_mode='floor') # ids // sample_len state = states[ids0, ids1] action = actions[ids0, ids1] logprob = logprobs[ids0, ids1] advantage = advantages[ids0, ids1] reward_sum = reward_sums[ids0, ids1] value = self.cri(state) # critic network predicts the reward_sum (Q value) of state obj_critic = self.criterion(value, reward_sum) self.optimizer_update(self.cri_optimizer, obj_critic) new_logprob, obj_entropy = self.act.get_logprob_entropy(state, action) ratio = (new_logprob - logprob.detach()).exp() surrogate1 = advantage * ratio surrogate2 = advantage * ratio.clamp(1 - self.ratio_clip, 1 + self.ratio_clip) obj_surrogate = torch.min(surrogate1, surrogate2).mean() obj_actor = obj_surrogate + obj_entropy.mean() * self.lambda_entropy self.optimizer_update(self.act_optimizer, -obj_actor) obj_critics += obj_critic.item() obj_actors += obj_actor.item() a_std_log = self.act.action_std_log.mean() if hasattr(self.act, 'action_std_log') else torch.zeros(1) return obj_critics / update_times, obj_actors / update_times, a_std_log.item() def get_advantages_origin(self, rewards: Tensor, undones: Tensor, values: Tensor) -> Tensor: advantages = torch.empty_like(values) # advantage value masks = undones * self.gamma horizon_len = rewards.shape[0] next_value = self.cri(self.last_state).detach() advantage = torch.zeros_like(next_value) # last advantage value by GAE (Generalized Advantage Estimate) for t in range(horizon_len - 1, -1, -1): next_value = rewards[t] + masks[t] * next_value advantages[t] = advantage = next_value - values[t] + masks[t] * self.lambda_gae_adv * advantage next_value = values[t] return advantages def get_advantages_vtrace(self, rewards: Tensor, undones: Tensor, values: Tensor) -> Tensor: advantages = torch.empty_like(values) # advantage value masks = undones * self.gamma horizon_len = rewards.shape[0] advantage = torch.zeros_like(values[0]) # last advantage value by GAE (Generalized Advantage Estimate) for t in range(horizon_len - 1, -1, -1): advantages[t] = rewards[t] - values[t] + masks[t] * advantage advantage = values[t] + self.lambda_gae_adv * advantages[t] return advantages
[docs]class AgentDiscretePPO(AgentPPO): def __init__(self, net_dims: [int], state_dim: int, action_dim: int, gpu_id: int = 0, args: Config = Config()): self.act_class = getattr(self, "act_class", ActorDiscretePPO) super().__init__(net_dims=net_dims, state_dim=state_dim, action_dim=action_dim, gpu_id=gpu_id, args=args)
[docs] def explore_one_env(self, env, horizon_len: int, if_random: bool = False) -> Tuple[Tensor, ...]: """ Collect trajectories through the actor-environment interaction for a **single** environment instance. env: RL training environment. env.reset() env.step(). It should be a vector env. horizon_len: collect horizon_len step while exploring to update networks return: `(states, actions, rewards, undones)` for off-policy env_num == 1 states.shape == (horizon_len, env_num, state_dim) actions.shape == (horizon_len, env_num, action_dim) logprobs.shape == (horizon_len, env_num, action_dim) rewards.shape == (horizon_len, env_num) undones.shape == (horizon_len, env_num) """ states = torch.zeros((horizon_len, self.num_envs, self.state_dim), dtype=torch.float32).to(self.device) actions = torch.zeros((horizon_len, self.num_envs, 1), dtype=torch.int32).to(self.device) # only different logprobs = torch.zeros((horizon_len, self.num_envs), dtype=torch.float32).to(self.device) rewards = torch.zeros((horizon_len, self.num_envs), dtype=torch.float32).to(self.device) dones = torch.zeros((horizon_len, self.num_envs), dtype=torch.bool).to(self.device) state = self.last_state # shape == (1, state_dim) for a single env. get_action = self.act.get_action convert = self.act.convert_action_for_env for t in range(horizon_len): action, logprob = get_action(state) states[t] = state int_action = convert(action).item() ary_state, reward, done, _ = env.step(int_action) # next_state state = torch.as_tensor(env.reset() if done else ary_state, dtype=torch.float32, device=self.device).unsqueeze(0) actions[t] = action logprobs[t] = logprob rewards[t] = reward dones[t] = done self.last_state = state rewards *= self.reward_scale undones = 1.0 - dones.type(torch.float32) return states, actions, logprobs, rewards, undones
[docs] def explore_vec_env(self, env, horizon_len: int, if_random: bool = False) -> Tuple[Tensor, ...]: """ Collect trajectories through the actor-environment interaction for a **vectorized** environment instance. env: RL training environment. env.reset() env.step(). It should be a vector env. horizon_len: collect horizon_len step while exploring to update networks return: `(states, actions, rewards, undones)` for off-policy states.shape == (horizon_len, env_num, state_dim) actions.shape == (horizon_len, env_num, action_dim) logprobs.shape == (horizon_len, env_num, action_dim) rewards.shape == (horizon_len, env_num) undones.shape == (horizon_len, env_num) """ states = torch.zeros((horizon_len, self.num_envs, self.state_dim), dtype=torch.float32).to(self.device) actions = torch.zeros((horizon_len, self.num_envs, 1), dtype=torch.float32).to(self.device) logprobs = torch.zeros((horizon_len, self.num_envs), dtype=torch.float32).to(self.device) rewards = torch.zeros((horizon_len, self.num_envs), dtype=torch.float32).to(self.device) dones = torch.zeros((horizon_len, self.num_envs), dtype=torch.bool).to(self.device) state = self.last_state # shape == (env_num, state_dim) for a vectorized env. get_action = self.act.get_action convert = self.act.convert_action_for_env for t in range(horizon_len): action, logprob = get_action(state) states[t] = state state, reward, done, _ = env.step(convert(action)) # next_state actions[t] = action logprobs[t] = logprob rewards[t] = reward dones[t] = done self.last_state = state actions = actions.unsqueeze(2) rewards *= self.reward_scale undones = 1.0 - dones.type(torch.float32) return states, actions, logprobs, rewards, undones