Source code for elegantrl.agents.AgentDDPG

import numpy as np
import numpy.random as rd
import torch
from copy import deepcopy
from typing import Tuple
from torch import Tensor

from elegantrl.train.config import Config
from elegantrl.train.replay_buffer import ReplayBuffer
from elegantrl.agents.AgentBase import AgentBase
from import Actor, Critic

[docs]class AgentDDPG(AgentBase): """DDPG(Deep Deterministic Policy Gradient) “Continuous control with deep reinforcement learning”. T. Lillicrap et al.. 2015.” net_dims: the middle layer dimension of MLP (MultiLayer Perceptron) state_dim: the dimension of state (the number of state vector) action_dim: the dimension of action (or the number of discrete action) gpu_id: the gpu_id of the training device. Use CPU when cuda is not available. args: the arguments for agent training. `args = Config()` """ def __init__(self, net_dims: [int], state_dim: int, action_dim: int, gpu_id: int = 0, args: Config = Config()): self.act_class = getattr(self, 'act_class', Actor) self.cri_class = getattr(self, 'cri_class', Critic) super().__init__(net_dims=net_dims, state_dim=state_dim, action_dim=action_dim, gpu_id=gpu_id, args=args) self.act_target = deepcopy(self.act) self.cri_target = deepcopy(self.cri) self.explore_noise_std = getattr(args, 'explore_noise_std', 0.05) # standard deviation of exploration noise self.act.explore_noise_std = self.explore_noise_std # assign explore_noise_std for agent.act.get_action(state) def update_net(self, buffer: ReplayBuffer) -> tuple: with torch.no_grad(): states, actions, rewards, undones = buffer.add_item self.update_avg_std_for_normalization( states=states.reshape((-1, self.state_dim)), returns=self.get_cumulative_rewards(rewards=rewards, undones=undones).reshape((-1,)) ) '''update network''' obj_critics = 0.0 obj_actors = 0.0 update_times = int(buffer.add_size * self.repeat_times) assert update_times >= 1 for update_c in range(update_times): obj_critic, state = self.get_obj_critic(buffer, self.batch_size) obj_critics += obj_critic.item() self.optimizer_update(self.cri_optimizer, obj_critic) self.soft_update(self.cri_target, self.cri, self.soft_update_tau) action_pg = self.act(state) # policy gradient obj_actor = self.cri_target(state, action_pg).mean() # use cri_target is more stable than cri obj_actors += obj_actor.item() self.optimizer_update(self.act_optimizer, -obj_actor) self.soft_update(self.act_target, self.act, self.soft_update_tau) return obj_critics / update_times, obj_actors / update_times def get_obj_critic_raw(self, buffer: ReplayBuffer, batch_size: int) -> Tuple[Tensor, Tensor]: with torch.no_grad(): states, actions, rewards, undones, next_ss = buffer.sample(batch_size) # next_ss: next states next_as = self.act_target(next_ss) # next actions next_qs = self.cri_target(next_ss, next_as) # next q_values q_labels = rewards + undones * self.gamma * next_qs q_values = self.cri(states, actions) obj_critic = self.criterion(q_values, q_labels) return obj_critic, states def get_obj_critic_per(self, buffer: ReplayBuffer, batch_size: int) -> Tuple[Tensor, Tensor]: with torch.no_grad(): states, actions, rewards, undones, next_ss, is_weights, is_indices = buffer.sample_for_per(batch_size) # is_weights, is_indices: important sampling `weights, indices` by Prioritized Experience Replay (PER) next_as = self.act_target(next_ss) next_qs = self.cri_target(next_ss, next_as) q_labels = rewards + undones * self.gamma * next_qs q_values = self.cri(states, actions) td_errors = self.criterion(q_values, q_labels) obj_critic = (td_errors * is_weights).mean() buffer.td_error_update_for_per(is_indices.detach(), td_errors.detach()) return obj_critic, states
class OrnsteinUhlenbeckNoise: def __init__(self, size: int, theta=0.15, sigma=0.3, ou_noise=0.0, dt=1e-2): """ The noise of Ornstein-Uhlenbeck Process Source: It makes Zero-mean Gaussian Noise more stable. It helps agent explore better in a inertial system. Don't abuse OU Process. OU process has too much hyper-parameters and over fine-tuning make no sense. int size: the size of noise, noise.shape==(-1, action_dim) float theta: related to the not independent of OU-noise float sigma: related to action noise std float ou_noise: initialize OU-noise float dt: derivative """ self.theta = theta self.sigma = sigma self.ou_noise = ou_noise self.dt = dt self.size = size def __call__(self) -> float: """ output a OU-noise return array ou_noise: a noise generated by Ornstein-Uhlenbeck Process """ noise = self.sigma * np.sqrt(self.dt) * rd.normal(size=self.size) self.ou_noise -= self.theta * self.ou_noise * self.dt + noise return self.ou_noise