import os
import torch
import numpy as np
from typing import List
from torch import Tensor
from multiprocessing import Pipe, Process

class Config:
    def __init__(self, agent_class=None, env_class=None, env_args=None):
        self.num_envs = None
        self.agent_class = agent_class  # agent = agent_class(...)
        self.if_off_policy = self.get_if_off_policy()  # whether off-policy or on-policy of DRL algorithm

        '''Argument of environment'''
        self.env_class = env_class  # env = env_class(**env_args)
        self.env_args = env_args  # env = env_class(**env_args)
        if env_args is None:  # dummy env_args
            env_args = {'env_name': None,
                        'num_envs': 1,
                        'max_step': 12345,
                        'state_dim': None,
                        'action_dim': None,
                        'if_discrete': None, }
        env_args.setdefault('num_envs', 1)  # `num_envs=1` in default in single env.
        env_args.setdefault('max_step', 12345)  # `max_step=12345` in default, which is a large enough value.
        self.env_name = env_args['env_name']  # the name of environment. Be used to set 'cwd'.
        self.num_envs = env_args['num_envs']  # the number of sub envs in vectorized env. `num_envs=1` in single env.
        self.max_step = env_args['max_step']  # the max step number of an episode. 'set as 12345 in default.
        self.state_dim = env_args['state_dim']  # vector dimension (feature number) of state
        self.action_dim = env_args['action_dim']  # vector dimension (feature number) of action
        self.if_discrete = env_args['if_discrete']  # discrete or continuous action space

        '''Arguments for reward shaping'''
        self.gamma = 0.99  # discount factor of future rewards
        self.reward_scale = 2 ** 0  # an approximate target reward usually be closed to 256

        '''Arguments for training'''
        self.net_dims = (64, 32)  # the middle layer dimension of MLP (MultiLayer Perceptron)
        self.learning_rate = 6e-5  # the learning rate for network updating
        self.clip_grad_norm = 3.0  # 0.1 ~ 4.0, clip the gradient after normalization
        self.state_value_tau = 0  # the tau of normalize for value and state `std = (1-std)*std + tau*std`
        self.soft_update_tau = 5e-3  # 2 ** -8 ~= 5e-3. the tau of soft target update `net = (1-tau)*net + tau*net1`
        if self.if_off_policy:  # off-policy
            self.batch_size = int(64)  # num of transitions sampled from replay buffer.
            self.horizon_len = int(512)  # collect horizon_len step while exploring, then update networks
            self.buffer_size = int(1e6)  # ReplayBuffer size. First in first out for off-policy.
            self.repeat_times = 1.0  # repeatedly update network using ReplayBuffer to keep critic's loss small
            self.if_use_per = False  # use PER (Prioritized Experience Replay) for sparse reward
        else:  # on-policy
            self.batch_size = int(128)  # num of transitions sampled from replay buffer.
            self.horizon_len = int(2048)  # collect horizon_len step while exploring, then update network
            self.buffer_size = None  # ReplayBuffer size. Empty the ReplayBuffer for on-policy.
            self.repeat_times = 8.0  # repeatedly update network using ReplayBuffer to keep critic's loss small
            self.if_use_vtrace = False  # use V-trace + GAE (Generalized Advantage Estimation) for sparse reward

        '''Arguments for device'''
        self.gpu_id = int(0)  # `int` means the ID of single GPU, -1 means CPU
        self.num_workers = 2  # rollout workers number pre GPU (adjust it to get high GPU usage)
        self.num_threads = 8  # cpu_num for pytorch, `torch.set_num_threads(self.num_threads)`
        self.random_seed = 0  # initialize random seed in self.init_before_training()
        self.learner_gpus = 0  # `int` means the ID of single GPU, -1 means CPU

        '''Arguments for evaluate'''
        self.cwd = None  # current working directory to save model. None means set automatically
        self.if_remove = True  # remove the cwd folder? (True, False, None:ask me)
        self.break_step = np.inf  # break training if 'total_step > break_step'
        self.break_score = np.inf  # break training if `cumulative_rewards > break_score`
        self.if_keep_save = True  # keeping save the checkpoint. False means save until stop training.
        self.if_over_write = False  # overwrite the best policy network. `self.cwd/actor.pth`
        self.if_save_buffer = False  # if save the replay buffer for continuous training after stop training

        self.save_gap = int(8)  # save actor f"{cwd}/actor_*.pth" for learning curve.
        self.eval_times = int(3)  # number of times that get the average episodic cumulative return
        self.eval_per_step = int(2e4)  # evaluate the agent per training steps
        self.eval_env_class = None  # eval_env = eval_env_class(*eval_env_args)
        self.eval_env_args = None  # eval_env = eval_env_class(*eval_env_args)

    def init_before_training(self):

        '''set cwd (current working directory) for saving model'''
        if self.cwd is None:  # set cwd (current working directory) for saving model
            self.cwd = f'./{self.env_name}_{self.agent_class.__name__[5:]}_{self.random_seed}'

        '''remove history'''
        if self.if_remove is None:
            self.if_remove = bool(input(f"| Arguments PRESS 'y' to REMOVE: {self.cwd}? ") == 'y')
        if self.if_remove:
            import shutil
            shutil.rmtree(self.cwd, ignore_errors=True)
            print(f"| Arguments Remove cwd: {self.cwd}")
            print(f"| Arguments Keep cwd: {self.cwd}")
        os.makedirs(self.cwd, exist_ok=True)

    def get_if_off_policy(self) -> bool:
        agent_name = self.agent_class.__name__ if self.agent_class else ''
        on_policy_names = ('SARSA', 'VPG', 'A2C', 'A3C', 'TRPO', 'PPO', 'MPO')
        return all([agent_name.find(s) == -1 for s in on_policy_names])

    def print(self):
        from pprint import pprint
        pprint(vars(self))  # prints out args in a neat, readable format

[docs]def build_env(env_class=None, env_args: dict = None, gpu_id: int = -1): env_args['gpu_id'] = gpu_id # set gpu_id for vectorized env before build it if env_args.get('if_build_vec_env'): num_envs = env_args['num_envs'] env = VecEnv(env_class=env_class, env_args=env_args, num_envs=num_envs, gpu_id=gpu_id) elif env_class.__module__ == 'gym.envs.registration': import gym assert '0.18.0' <= gym.__version__ <= '0.25.2' # pip3 install gym==0.24.0 gym.logger.set_level(40) # Block warning env = env_class(id=env_args['env_name']) else: env = env_class(**kwargs_filter(env_class.__init__, env_args.copy())) env_args.setdefault('num_envs', 1) env_args.setdefault('max_step', 12345) for attr_str in ('env_name', 'num_envs', 'max_step', 'state_dim', 'action_dim', 'if_discrete'): setattr(env, attr_str, env_args[attr_str]) return env
[docs]def kwargs_filter(function, kwargs: dict) -> dict: import inspect sign = inspect.signature(function).parameters.values() sign = { for val in sign} common_args = sign.intersection(kwargs.keys()) return {key: kwargs[key] for key in common_args} # filtered kwargs
def get_gym_env_args(env, if_print: bool) -> dict: """get a dict about a standard OpenAI gym env information. assert 0.18.0 <= gym.__version__ <= 0.25.3 env: a standard OpenAI gym env if_print: [bool] print the dict about env information. return: env_args [dict] env_args = { 'env_name': env_name, # [str] the environment name, such as XxxXxx-v0 'num_envs': num_envs. # [int] the number of sub envs in vectorized env. `num_envs=1` in single env. 'max_step': max_step, # [int] the max step number of an episode. 'state_dim': state_dim, # [int] the dimension of state 'action_dim': action_dim, # [int] the dimension of action or the number of discrete action 'if_discrete': if_discrete, # [bool] action space is discrete or continuous } """ import gym if_gym_standard_env = {'unwrapped', 'observation_space', 'action_space', 'spec'}.issubset(dir(env)) if if_gym_standard_env and (not hasattr(env, 'num_envs')): # isinstance(env, gym.Env): assert '0.18.0' <= gym.__version__ <= '0.25.2' # pip3 install gym==0.24.0 env_name = num_envs = getattr(env, 'num_envs', 1) max_step = getattr(env, '_max_episode_steps', 12345) state_shape = env.observation_space.shape state_dim = state_shape[0] if len(state_shape) == 1 else state_shape # sometimes state_dim is a list if_discrete = isinstance(env.action_space, gym.spaces.Discrete) if if_discrete: # make sure it is discrete action space action_dim = getattr(env.action_space, 'n') elif isinstance(env.action_space, gym.spaces.Box): # make sure it is continuous action space action_dim = env.action_space.shape[0] if any(env.action_space.high - 1): print('WARNING: env.action_space.high', env.action_space.high) if any(env.action_space.low + 1): print('WARNING: env.action_space.low', env.action_space.low) else: raise RuntimeError('\n| Error in get_gym_env_info(). Please set these value manually:' '\n `state_dim=int; action_dim=int; if_discrete=bool;`' '\n And keep action_space in range (-1, 1).') else: env_name = getattr(env, 'env_name', 'env') num_envs = getattr(env, 'num_envs', 1) max_step = getattr(env, 'max_step', 12345) state_dim = env.state_dim action_dim = env.action_dim if_discrete = env.if_discrete env_args = {'env_name': env_name, 'num_envs': num_envs, 'max_step': max_step, 'state_dim': state_dim, 'action_dim': action_dim, 'if_discrete': if_discrete, } if if_print: env_args_str = repr(env_args).replace(',', f",\n{'':11}") print(f"env_args = {env_args_str}") return env_args """vectorized env""" class SubEnv(Process): def __init__(self, sub_pipe0: Pipe, vec_pipe1: Pipe, env_class, env_args: dict, env_id: int = 0): super().__init__() self.sub_pipe0 = sub_pipe0 self.vec_pipe1 = vec_pipe1 self.env_class = env_class self.env_args = env_args self.env_id = env_id def run(self): torch.set_grad_enabled(False) '''build env''' if self.env_class.__module__ == 'gym.envs.registration': # is standard OpenAI Gym env env = self.env_class(id=self.env_args['env_name']) else: env = self.env_class(**kwargs_filter(self.env_class.__init__, self.env_args.copy())) '''set env random seed''' random_seed = self.env_id np.random.seed(random_seed) torch.manual_seed(random_seed) while True: action = self.sub_pipe0.recv() if action is None: state = env.reset() self.vec_pipe1.send((self.env_id, state)) else: state, reward, done, info_dict = env.step(action) state = env.reset() if done else state self.vec_pipe1.send((self.env_id, state, reward, done, info_dict)) class VecEnv: def __init__(self, env_class: object, env_args: dict, num_envs: int, gpu_id: int = -1): self.device = torch.device(f"cuda:{gpu_id}" if (torch.cuda.is_available() and (gpu_id >= 0)) else "cpu") self.num_envs = num_envs # the number of sub env in vectorized env. '''the necessary env information when you design a custom env''' self.env_name = env_args['env_name'] # the name of this env. self.max_step = env_args['max_step'] # the max step number in an episode for evaluation self.state_dim = env_args['state_dim'] # feature number of state self.action_dim = env_args['action_dim'] # feature number of action self.if_discrete = env_args['if_discrete'] # discrete action or continuous action '''speed up with multiprocessing: Process, Pipe''' assert self.num_envs <= 64 self.res_list = [[] for _ in range(self.num_envs)] sub_pipe0s, sub_pipe1s = list(zip(*[Pipe(duplex=False) for _ in range(self.num_envs)])) self.sub_pipe1s = sub_pipe1s vec_pipe0, vec_pipe1 = Pipe(duplex=False) # recv, send self.vec_pipe0 = vec_pipe0 self.sub_envs = [ SubEnv(sub_pipe0=sub_pipe0, vec_pipe1=vec_pipe1, env_class=env_class, env_args=env_args, env_id=env_id) for env_id, sub_pipe0 in enumerate(sub_pipe0s) ] [setattr(p, 'daemon', True) for p in self.sub_envs] # set before process start to exit safely [p.start() for p in self.sub_envs] def reset(self) -> Tensor: # reset the agent in env torch.set_grad_enabled(False) for pipe in self.sub_pipe1s: pipe.send(None) states, = self.get_orderly_zip_list_return() states = torch.tensor(np.stack(states), dtype=torch.float32, device=self.device) return states def step(self, action: Tensor) -> (Tensor, Tensor, Tensor, List[dict]): # agent interacts in env action = action.detach().cpu().numpy() if self.if_discrete: action = action.squeeze(1) for pipe, a in zip(self.sub_pipe1s, action): pipe.send(a) states, rewards, dones, info_dicts = self.get_orderly_zip_list_return() states = torch.tensor(np.stack(states), dtype=torch.float32, device=self.device) rewards = torch.tensor(rewards, dtype=torch.float32, device=self.device) dones = torch.tensor(dones, dtype=torch.bool, device=self.device) return states, rewards, dones, info_dicts def close(self): [process.terminate() for process in self.sub_envs] def get_orderly_zip_list_return(self): for _ in range(self.num_envs): res = self.vec_pipe0.recv() self.res_list[res[0]] = res[1:] return list(zip(*self.res_list))