-
-
Save wayne1029jihad/abd4b0692d64a2c9ae5086499895f4ff to your computer and use it in GitHub Desktop.
cartpole.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import random | |
import gym | |
import numpy as np | |
import pylab | |
from collections import deque | |
import pyvirtualdisplay | |
import cv2 | |
import datetime | |
import tensorflow as tf | |
from tensorflow.keras.utils import plot_model | |
from tensorflow.keras.models import Model, load_model | |
from tensorflow.keras.layers import Input, Dense, Lambda, Add, Conv2D, Flatten | |
from tensorflow.keras.optimizers import Adam, RMSprop | |
from tensorflow.keras import backend as K | |
from tensorflow.compat.v1 import ConfigProto | |
from tensorflow.compat.v1 import InteractiveSession | |
config = ConfigProto() | |
config.gpu_options.allow_growth = True | |
session = InteractiveSession(config=config) | |
def Model_(input_shape, action_space): | |
input_x = Input(input_shape) | |
input_ = input_x | |
input_ = Conv2D(64, 5, strides=(3, 3),padding="valid", input_shape=input_shape, activation="relu", data_format="channels_first")(input_) | |
input_ = Conv2D(64, 4, strides=(2, 2),padding="valid", activation="relu", data_format="channels_first")(input_) | |
input_ = Conv2D(64, 3, strides=(1, 1),padding="valid", activation="relu", data_format="channels_first")(input_) | |
input_ = Flatten()(input_) | |
# 'Dense' is the basic form of a neural network layer | |
# Input Layer of state size(4) and Hidden Layer with 512 nodes | |
input_ = Dense(512, activation="relu", kernel_initializer='he_uniform')(input_) | |
# Hidden layer with 256 nodes | |
input_ = Dense(256, activation="relu", kernel_initializer='he_uniform')(input_) | |
# Hidden layer with 64 nodes | |
input_ = Dense(64, activation="relu", kernel_initializer='he_uniform')(input_) | |
# Output Layer with # of actions: 2 nodes (left, right) | |
input_ = Dense(action_space, activation="linear", kernel_initializer='he_uniform')(input_) | |
model = Model(inputs = input_x, outputs = input_) | |
model.compile(loss="mean_squared_error", optimizer=RMSprop(lr=0.00025, rho=0.95, epsilon=0.01), metrics=["accuracy"]) | |
# export an image of model | |
#dot_img_file = '/home/luca/Scrivania/model_1.png' | |
#plot_model(model, to_file=dot_img_file, show_shapes=True) | |
model.summary() | |
return model | |
class DQNAgent: | |
def __init__(self, env_name): | |
self.env_name = env_name | |
self.env = gym.make(env_name) | |
#self.env.seed(0) | |
log_dir = "logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S") | |
self.tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1) | |
self.env._max_episode_steps = 200 | |
self.state_size = self.env.observation_space.shape[0] | |
self.action_size = self.env.action_space.n | |
#self.EPISODES = 6000 | |
self.EPISODES = 3000 | |
# Instantiate memory | |
memory_size = 3000 | |
self.memory = deque(maxlen=memory_size) | |
self.gamma = 0.95 # discount rate | |
# EXPLORATION HYPERPARAMETERS for epsilon and epsilon greedy strategy | |
self.epsilon = 1.0 | |
self.epsilon_min = 0.0001 | |
self.epsilon_decay = 0.0005 | |
self.batch_size = 64 | |
self.TAU = 0.05# target network soft update hyperparameter | |
self.Save_Path = 'Models' +datetime.datetime.now().strftime("%Y%m%d-%H%M%S") | |
if not os.path.exists(self.Save_Path): | |
os.makedirs(self.Save_Path) | |
self.scores, self.episodes, self.average = [], [], [] | |
self.Model_name = os.path.join(self.Save_Path, self.env_name+"_DQN_CNN.h5") | |
self.ROWS = 160 | |
self.COLS = 240 | |
self.FRAME_STEP = 4 | |
self.image_memory = np.zeros((self.FRAME_STEP, self.ROWS, self.COLS)) | |
self.state_size = (self.FRAME_STEP, self.ROWS, self.COLS) | |
# create main model and target model | |
self.model = Model_(input_shape=self.state_size, action_space = self.action_size) | |
self.target_model = Model_(input_shape=self.state_size, action_space = self.action_size) | |
# after some time interval update the target model to be same with model | |
def update_target_model(self): | |
q_model_theta = self.model.get_weights() | |
target_model_theta = self.target_model.get_weights() | |
counter = 0 | |
for q_weight, target_weight in zip(q_model_theta, target_model_theta): | |
target_weight = target_weight * (1-self.TAU) + q_weight * self.TAU | |
target_model_theta[counter] = target_weight | |
counter += 1 | |
self.target_model.set_weights(target_model_theta) | |
def remember(self, state, action, reward, next_state, done): | |
experience = state, action, reward, next_state, done | |
self.memory.append((experience)) | |
def act(self, state, decay_step): | |
if self.epsilon > self.epsilon_min: | |
self.epsilon *= (1-self.epsilon_decay) | |
explore_probability = self.epsilon | |
if explore_probability > np.random.rand(): | |
# Make a random action (exploration) | |
return random.randrange(self.action_size) | |
else: | |
# Get action from Q-network (exploitation) | |
# Estimate the Qs values state | |
# Take the biggest Q value (= the best action) | |
return np.argmax(self.model.predict(state)) | |
def replay(self): | |
# Randomly sample minibatch from the deque memory | |
minibatch = random.sample(self.memory, min(len(self.memory), self.batch_size)) | |
state = np.zeros((self.batch_size,) + self.state_size) | |
next_state = np.zeros((self.batch_size,) + self.state_size) | |
action, reward, done = [], [], [] | |
for i in range(len(minibatch)): | |
state[i] = minibatch[i][0] | |
action.append(minibatch[i][1]) | |
reward.append(minibatch[i][2]) | |
next_state[i] = minibatch[i][3] | |
done.append(minibatch[i][4]) | |
# predict Q-values for starting state using the main network | |
target = self.model.predict(state) | |
target_old = np.array(target) | |
# predict best action in ending state using the main network | |
target_next = self.model.predict(next_state) | |
# predict Q-values for ending state using the target network | |
target_val = self.target_model.predict(next_state) | |
for i in range(len(minibatch)): | |
# correction on the Q value for the action used | |
if done[i]: | |
target[i][action[i]] = reward[i] | |
else: | |
# DQN chooses the max Q value among next actions | |
# selection and evaluation of action is on the target Q Network | |
# Q_max = max_a' Q_target(s', a') | |
target[i][action[i]] = reward[i] + self.gamma * (np.amax(target_next[i])) | |
# Train the Neural Network with batches | |
self.model.fit(state, target, batch_size=self.batch_size,verbose=0,callbacks=[self.tensorboard_callback]) | |
def load(self, name): | |
self.model = load_model(name) | |
def save(self, name): | |
self.model.save(name) | |
""" | |
def imshow(self, image, frame_step=0): | |
#cv2.imshow("cartpole"+str(frame_step), image[frame_step,...]) | |
if cv2.waitKey(25) & 0xFF == ord("q"): | |
cv2.destroyAllWindows() | |
return | |
""" | |
def GetImage(self): | |
img = self.env.render(mode='rgb_array') | |
img_rgb = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY) | |
img_rgb_resized = cv2.resize(img_rgb, (self.COLS, self.ROWS), interpolation=cv2.INTER_CUBIC) | |
img_rgb_resized[img_rgb_resized < 255] = 0 | |
img_rgb_resized = img_rgb_resized / 255 | |
self.image_memory = np.roll(self.image_memory, 1, axis = 0) | |
self.image_memory[0,:,:] = img_rgb_resized | |
return np.expand_dims(self.image_memory, axis=0) | |
def reset(self): | |
self.env.reset() | |
for i in range(self.FRAME_STEP): | |
state = self.GetImage() | |
return state | |
def step(self,action): | |
next_state, reward, done, info = self.env.step(action) | |
next_state = self.GetImage() | |
return next_state, reward, done, info | |
def run(self): | |
decay_step = 0 | |
state = self.reset() | |
for i in range(500): | |
action = random.randrange(self.action_size) | |
next_state, reward, done, _ = self.step(action) | |
self.remember(state, action, reward, next_state, done) | |
step_num = 0 | |
for e in range(self.EPISODES): | |
state = self.reset() | |
done = False | |
i = 0 | |
while not done: | |
decay_step += 1 | |
step_num += 1 | |
action = self.act(state, decay_step) | |
next_state, reward, done, _ = self.step(action) | |
self.remember(state, action, reward, next_state, done) | |
state = next_state | |
i += 1 | |
if done: | |
if e % self.FRAME_STEP == 0: | |
self.update_target_model() | |
self.scores.append(i) | |
self.episodes.append(e) | |
self.average.append(sum(self.scores[-100:]) / len(self.scores[-100:])) | |
print("episode: {}/{}, score: {}".format(e, self.EPISODES, i)) | |
if i == self.env._max_episode_steps: | |
print("Saving trained model to", self.Model_name,e) | |
self.save(self.Model_name+str(e)) | |
break | |
if step_num % 200 == 0: | |
agent.test(10,"eval") | |
if step_num % 4 == 0: | |
self.replay() | |
self.env.close() | |
def test(self,num_eval_episodes,model_name): | |
total_return = 0 | |
if(model_name =="eval"): | |
m = self.target_model | |
else: | |
agent.load(model_name) | |
m = self.model | |
for e in range(num_eval_episodes): | |
state = self.reset() | |
done = False | |
i = 0 | |
while not done: | |
action = np.argmax(m.predict(state)) | |
next_state, reward, done, _ = self.step(action) | |
state = next_state | |
i += reward | |
if done: | |
print("episode: {}/{}, score: {}".format(e, num_eval_episodes, i)) | |
break | |
total_return += i | |
print("Model:",model_name) | |
print("Average:{}".format(total_return/num_eval_episodes)) | |
if total_return/num_eval_episodes > 180: | |
self.save(self.Model_name) | |
if __name__ == "__main__": | |
env_name = 'CartPole-v0' | |
display = pyvirtualdisplay.Display(visible=0, size=(1400, 900)).start() | |
agent = DQNAgent(env_name) | |
print("Training Start!") | |
agent.run() | |
print("Training Finish") | |
#agent.test(100,"Models_2W_0902_0655/CartPole-v0_DQN_CNN.h52026") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment