-
-
Save Ending2015a/35541009a21c1c83cf1d903b9690632e to your computer and use it in GitHub Desktop.
cartpole.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import random | |
import gym | |
import numpy as np | |
import pylab | |
from collections import deque | |
import pyvirtualdisplay | |
import cv2 | |
import datetime | |
import tensorflow as tf | |
from tensorflow.keras.utils import plot_model | |
from tensorflow.keras.models import Model, load_model | |
from tensorflow.keras.layers import Input, Dense, Lambda, Add, Conv2D, Flatten | |
from tensorflow.keras.optimizers import Adam, RMSprop | |
from tensorflow.keras import backend as K | |
from tensorflow.compat.v1 import ConfigProto | |
from tensorflow.compat.v1 import InteractiveSession | |
config = ConfigProto() | |
config.gpu_options.allow_growth = True | |
session = InteractiveSession(config=config) | |
def Model_(input_shape, action_space): | |
input_x = Input(input_shape) | |
input_ = input_x | |
input_ = tf.keras.layers.Sequential([ | |
tf.keras.layers.Conv2D(64, 5, 3, input_shape=input_shape, data_format="channels_first"), | |
tf.keras.layers.ReLU(), | |
tf.keras.layers.Conv2D(64, 4, 2, data_format="channels_first"), | |
tf.keras.layers.ReLU(), | |
tf.keras.layers.Conv2D(64, 3, 1, data_format="channels_first"), | |
tf.keras.layers.ReLU(), | |
tf.keras.layers.Flatten(), | |
tf.keras.layers.Dense(512, kernel_initializer='he_uniform'), | |
tf.keras.layers.ReLU(), | |
tf.keras.layers.Dense(256, kernel_initializer='he_uniform'), | |
tf.keras.layers.ReLU(), | |
tf.keras.layers.Dense(64, kernel_initializer='he_uniform'), | |
tf.keras.layers.ReLU(), | |
tf.keras.layers.Dense(action_space, kernel_initializer='he_uniform'), | |
])(input_) | |
model = Model(inputs=input_x, outputs=input_) | |
model.compile(loss="huber", optimizer=RMSprop(lr=0.00025, rho=0.95, epsilon=0.01), metrics=["accuracy"]) | |
model.summary() | |
return model | |
class DQNAgent: | |
def __init__(self, env_name): | |
self.env_name = env_name | |
self.env = gym.make(env_name) | |
#self.env.seed(0) | |
log_dir = "logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S") | |
self.tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1) | |
self.env._max_episode_steps = 200 | |
self.state_size = self.env.observation_space.shape[0] | |
self.action_size = self.env.action_space.n | |
#self.EPISODES = 6000 | |
self.EPISODES = 3000 | |
# Instantiate memory | |
memory_size = 3000 | |
self.memory = deque(maxlen=memory_size) | |
self.gamma = 0.95 # discount rate | |
# EXPLORATION HYPERPARAMETERS for epsilon and epsilon greedy strategy | |
self.epsilon = 1.0 | |
self.epsilon_min = 0.0001 | |
self.epsilon_decay = 0.0005 | |
self.batch_size = 64 | |
self.TAU = 0.05# target network soft update hyperparameter | |
self.Save_Path = 'Models' +datetime.datetime.now().strftime("%Y%m%d-%H%M%S") | |
if not os.path.exists(self.Save_Path): | |
os.makedirs(self.Save_Path) | |
self.scores, self.episodes, self.average = [], [], [] | |
self.Model_name = os.path.join(self.Save_Path, self.env_name+"_DQN_CNN.h5") | |
self.ROWS = 160 | |
self.COLS = 240 | |
self.FRAME_STEP = 4 | |
self.image_memory = np.zeros((self.FRAME_STEP, self.ROWS, self.COLS)) | |
self.state_size = (self.FRAME_STEP, self.ROWS, self.COLS) | |
# create main model and target model | |
self.model = Model_(input_shape=self.state_size, action_space = self.action_size) | |
self.target_model = Model_(input_shape=self.state_size, action_space = self.action_size) | |
# initialize target model | |
self.update_target_model(rate=1.0) | |
# setup optimizer | |
self.optimizer = tf.keras.optimizers.Adam(lr=1e-3) | |
# after some time interval update the target model to be same with model | |
def update_target_model(self, rate=1.0): | |
vars = self.model.trainable_variables | |
target_vars = self.target_model.trainable_variables | |
for tar, var in zip(target_vars, vars): | |
tar.assign(tar * (1.-rate) + var * rate) | |
def remember(self, state, action, reward, next_state, done): | |
experience = state, action, reward, next_state, done | |
self.memory.append((experience)) | |
def act(self, state, decay_step): | |
if self.epsilon > self.epsilon_min: | |
self.epsilon *= (1-self.epsilon_decay) | |
explore_probability = self.epsilon | |
if explore_probability > np.random.rand(): | |
# Make a random action (exploration) | |
return random.randrange(self.action_size) | |
else: | |
# Get action from Q-network (exploitation) | |
# Estimate the Qs values state | |
# Take the biggest Q value (= the best action) | |
return np.argmax(self.model.predict(state)) | |
@tf.function | |
def train(self, state, action, reward, next_state, done): | |
vars = self.model.trainable_variables | |
with tf.GradientTape() as tape: | |
tape.watch(vars) | |
action = tf.cast(action, dtype=tf.int64) | |
reward = tf.cast(reward, dtype=tf.float32) | |
done = tf.cast(done, dtype=tf.float32) | |
# calculate target q value | |
next_qs = self.target_model(next_state, training=True) | |
next_q = tf.math.reduce_max(next_qs, axis=-1) | |
y = reward + tf.stop_gradient((1.-done) * self.GAMMA * next_q) | |
# calculate current q | |
qs = self.model(state) | |
q = tf.gather(qs, indices=action, batch_dims=1) | |
# calculate td loss | |
loss = tf.keras.losses.huber(y, q) | |
# perform gradient updates | |
grads = tape.gradient(loss, vars) | |
self.optimizer.apply_gradients(zip(grads, vars)) | |
def replay(self): | |
# Randomly sample minibatch from the deque memory | |
minibatch = random.sample(self.memory, min(len(self.memory), self.batch_size)) | |
state = np.zeros((self.batch_size,) + self.state_size) | |
next_state = np.zeros((self.batch_size,) + self.state_size) | |
action, reward, done = [], [], [] | |
for i in range(len(minibatch)): | |
state[i] = minibatch[i][0] | |
action.append(minibatch[i][1]) | |
reward.append(minibatch[i][2]) | |
next_state[i] = minibatch[i][3] | |
done.append(minibatch[i][4]) | |
# convert to numpy arrays | |
state = np.asarray(state) | |
action = np.asarray(action) | |
reward = np.asarray(reward) | |
next_state = np.asarray(next_state) | |
done = np.asarray(done) | |
self.train(state, action, reward, next_state, done) | |
def load(self, name): | |
self.model = load_model(name) | |
def save(self, name): | |
self.model.save(name) | |
""" | |
def imshow(self, image, frame_step=0): | |
#cv2.imshow("cartpole"+str(frame_step), image[frame_step,...]) | |
if cv2.waitKey(25) & 0xFF == ord("q"): | |
cv2.destroyAllWindows() | |
return | |
""" | |
def GetImage(self): | |
img = self.env.render(mode='rgb_array') | |
img_rgb = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY) | |
img_rgb_resized = cv2.resize(img_rgb, (self.COLS, self.ROWS), interpolation=cv2.INTER_CUBIC) | |
img_rgb_resized[img_rgb_resized < 255] = 0 | |
img_rgb_resized = img_rgb_resized / 255 | |
self.image_memory = np.roll(self.image_memory, 1, axis = 0) | |
self.image_memory[0,:,:] = img_rgb_resized | |
return np.expand_dims(self.image_memory, axis=0) | |
def reset(self): | |
self.env.reset() | |
for i in range(self.FRAME_STEP): | |
state = self.GetImage() | |
return state | |
def step(self,action): | |
next_state, reward, done, info = self.env.step(action) | |
next_state = self.GetImage() | |
return next_state, reward, done, info | |
def run(self): | |
decay_step = 0 | |
state = self.reset() | |
for i in range(500): | |
action = random.randrange(self.action_size) | |
next_state, reward, done, _ = self.step(action) | |
self.remember(state, action, reward, next_state, done) | |
step_num = 0 | |
for e in range(self.EPISODES): | |
state = self.reset() | |
done = False | |
i = 0 | |
while not done: | |
decay_step += 1 | |
step_num += 1 | |
action = self.act(state, decay_step) | |
next_state, reward, done, _ = self.step(action) | |
self.remember(state, action, reward, next_state, done) | |
state = next_state | |
i += 1 | |
if done: | |
if e % self.FRAME_STEP == 0: | |
self.update_target_model(rate=self.TAU) | |
self.scores.append(i) | |
self.episodes.append(e) | |
self.average.append(sum(self.scores[-100:]) / len(self.scores[-100:])) | |
print("episode: {}/{}, score: {}".format(e, self.EPISODES, i)) | |
if i == self.env._max_episode_steps: | |
print("Saving trained model to", self.Model_name,e) | |
self.save(self.Model_name+str(e)) | |
break | |
if step_num % 200 == 0: | |
agent.test(10,"eval") | |
if step_num % 4 == 0: | |
self.replay() | |
self.env.close() | |
def test(self,num_eval_episodes,model_name): | |
total_return = 0 | |
if(model_name =="eval"): | |
m = self.target_model | |
else: | |
agent.load(model_name) | |
m = self.model | |
for e in range(num_eval_episodes): | |
state = self.reset() | |
done = False | |
i = 0 | |
while not done: | |
action = np.argmax(m.predict(state)) | |
next_state, reward, done, _ = self.step(action) | |
state = next_state | |
i += reward | |
if done: | |
print("episode: {}/{}, score: {}".format(e, num_eval_episodes, i)) | |
break | |
total_return += i | |
print("Model:",model_name) | |
print("Average:{}".format(total_return/num_eval_episodes)) | |
if total_return/num_eval_episodes > 180: | |
self.save(self.Model_name) | |
if __name__ == "__main__": | |
env_name = 'CartPole-v0' | |
display = pyvirtualdisplay.Display(visible=0, size=(1400, 900)).start() | |
agent = DQNAgent(env_name) | |
print("Training Start!") | |
agent.run() | |
print("Training Finish") | |
#agent.test(100,"Models_2W_0902_0655/CartPole-v0_DQN_CNN.h52026") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment