# -*- coding: utf-8 -*-
"""
Created on 2020/8/17
@project: SPAIC
@filename: IO
@author: Hong Chaofei
@contact: hongchf@gmail.com
@description:
"""
from abc import abstractmethod
from ..Neuron.Node import Action, Encoder, Reward
import numpy as np
from tqdm import tqdm
import torch
from collections import namedtuple, deque
import random
[docs]class Pipline:
def __init__(self, **kwargs):
pass
[docs] @abstractmethod
def update_step(self, batch, **kwargs):
"""
Perform a pass of the network given the input batch.
:param batch: The current batch. This could be anything as long as the subclass
agrees upon the format in some way.
:return: Any output that is need for recording purposes.
"""
raise NotImplementedError("You need to provide a step_ method.")
[docs]class RLPipeline(Pipline):
def __init__(self, network, environment, time=None, **kwargs):
self.network = network
self._backend = self.network._backend
self.sim_name = self._backend.backend_name
self.device = self._backend.device
self.time = time
self.time_step = int(self.time / self._backend.dt)
self.actuator = None
self.encoder = None
self.rewarder = None
# Get actuator and encoder
for group in self.network.get_groups():
if isinstance(group, Action):
self.actuator = group
if isinstance(group, Encoder):
self.encoder = group
if isinstance(group, Reward):
self.rewarder = group
if self.actuator is None:
raise ValueError('Lack of Action object')
if self.encoder is None:
raise ValueError('Lack of Encoder object')
self.network.build(self._backend)
self.state = np.zeros(self.encoder.num)
self.environment = environment
if self.environment.shape is not None and len(self.environment.shape) >= 2:
self.conv_state = True
else:
self.conv_state = False
self.step_count = 0
self.episode = 0
self.num_episodes = kwargs.get('num_episodes', 100)
self.accumulated_reward = 0.0
self.reward_list = []
self.action = -1
self.last_action = -1
self.action_repeat_count = 0
self.action_repeat = kwargs.get('action_repeat', 2)
self.probability_random_action = kwargs.get('probability_random_action', 0.0)
self.render_interval = kwargs.get('render_interval', None)
self.reward_delay = kwargs.get('reward_delay', None)
self.replay_memory = kwargs.get('replay_memory', False)
if self.replay_memory:
self.memory_capacity = kwargs.get('memory_capacity', 10000)
self.memory_pool = ReplayMemory(self.memory_capacity)
if self.reward_delay is not None:
assert self.reward_delay > 0
self.rewards = np.zeros(self.reward_delay)
[docs] def env_step(self):
"""
Single step of the environment which includes rendering, getting and performing
the action, and accumulating/delaying rewards.
Returns:
An OpenAI ``gym`` compatible tuple (next_state, reward, done).
"""
# Render the environment.
if (self.render_interval is not None and self.step_count % self.render_interval == 0):
self.environment.render()
# Get action
self.last_action = self.action
# action sampled from the action space with a certain probability
if np.random.rand(1) < self.probability_random_action:
self.action = np.random.randint(
low=0, high=self.environment.action_num, size=(1,)
)[0]
elif self.action_repeat_count > self.action_repeat:
if self.last_action == 0:
self.action = 1
tqdm.write(f"Act -> too many times {self.last_action} ")
else:
self.action = np.random.randint(
low=0, high=self.environment.action_num, size=(1,)
)[0]
tqdm.write(f"Act -> too many times {self.last_action} ")
else:
if self.sim_name == 'pytorch':
self.action = int(self.actuator.action) # Get action from the predict result of action
if self.last_action == self.action:
self.action_repeat_count += 1
else:
self.action_repeat_count = 0
# Run a step of the environment.
next_state, reward, done, _ = self.environment.step(self.action)
if self.conv_state:
next_state = next_state[np.newaxis, :]
if done:
next_state = None
# Set reward in case of delay.
if self.reward_delay is not None:
if self.sim_name == 'pytorch':
self.rewards = torch.tensor([reward, *self.rewards[1:]], device=self.device)
reward = self.rewards[-1]
# Accumulate reward.
self.accumulated_reward += reward
if self.replay_memory:
self.memory_pool.push(self.state, self.action, next_state, reward)
self.state = next_state
return next_state, reward, done
[docs] def update_step(self, gym_batch, **kwargs):
"""
Run a single iteration of the network and update it and the reward list when
done.
Args:
gym_batch (tuple): An OpenAI ``gym`` compatible tuple (next_state, reward, done).
"""
self.step_count += 1
next_state, reward, done = gym_batch
if done:
self.reward_list.append(self.accumulated_reward)
print("Episode finished after {} steps".format(self.step_count))
else:
# Add a placeholder for batch_size
next_state = next_state[np.newaxis, :]
# Place the observations into the network.
# self.network.input(next_state)
self.encoder(next_state)
if self.rewarder is not None:
self.rewarder(reward)
self.network.run(self.time)
[docs] def reset_pipeline(self):
"""
Reset the pipeline.
"""
self.environment.reset()
self.accumulated_reward = 0.0
self.step_count = 0
self.action = -1
self.last_action = -1
self.action_repeat_count = 0
if self.conv_state:
self.state = np.zeros(self.environment.shape)
self.state = self.state[np.newaxis, :]
else:
self.state = np.zeros(self.encoder.num)
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))
[docs]class ReplayMemory(object):
def __init__(self, capacity):
self.memory = deque([], maxlen=capacity)
[docs] def push(self, *args):
"""Save a transition"""
self.memory.append(Transition(*args))
[docs] def sample(self, batch_size):
return random.sample(self.memory, batch_size)
def __len__(self):
return len(self.memory)