Frame Skipping and Stacking
기존의 프레임 한개로는 속도를 측정하기 힘들기 때문에 여러개 스택킹 해서 분석하게한다. 굳이 RGB로 볼필요가 없는 경우는 grayscale로 바꾸어서 본다.
하지만 프레임을 연속적으로 붙이면 유의미한 차이를 확인하기힘들다. 그래서 몇 프레임을 건너 뛰면서 frame stacking을 수행하면 유의미한 차이를 볼 수 있는데 이것을 = Frame Skipping
이렇게 하지 않으면 매번 4번 프레임동한 같은 action을 취하게 되는데 이런 방식으로 진행하면 안되는 게임들도 존재하기에 위와 같은 방법을 써서 매 step 마다 다른 action 을 선택할 수 있게 해준다.
Experience Replay
Target Network
#!/usr/bin/env python
import numpy as np
import random
import datetime
import os
from collections import deque
import gym
import torch
import torch.optim as optim
import torch.nn.functional as F
import torch.nn as nn
env = gym.make("CartPole-v0")
algorithm = 'DQN'
state_size = 4
action_size = env.action_space.n
load_model = False
train_mode = True
batch_size = 32
mem_maxlen = 10000
discount_factor = 0.99
learning_rate = 0.00025
skip_frame =1
stack_frame = 1
start_train_step = 10000
run_step = 50000
test_step = 10000
target_update_step = 1000
print_episode = 10
save_step =100000
epsilon_init = 1.0
epsilon_min = 0.1
date_time = datetime.datetime.now().strftime("%Y%m%d-%H-%M-%S")
save_path = "./saved_models/" + date_time
load_path = "./saved_models/20210205-18-52-50_DQN"
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
class DQN(nn.Module):
def __init__(self,network_name):
super(DQN,self).__init__()
input_size = state_size * stack_frame
self.fc1 = nn.Linear(state_size,512)
self.fc2 = nn.Linear(512,512)
self.fc3 = nn.Linear(512, action_size)
def forward(self,x):
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
class DQNAgent():
def __init__(self,model,target_model,optimizer):
self.model = model
self.target_model = target_model
self.optimizer = optimizer
self.memory = deque(maxlen=mem_maxlen)
self.obs_set = deque(maxlen=skip_frame * stack_frame)
self.epsilon = epsilon_init
self.update_target()
if load_model == True:
self.model.load_state_dict(torch.load(load_path+'/model.pth'),map_location=device)
print("Model is loaded from {}".format(load_path +'/model.pth'))
def skip_stack_frame(self,obs):
self.obs_set.append(obs)
state = np.zeros([state_size * stack_frame])
for i in range(stack_frame):
state[state_size * i : state_size *(i+1)] = self.obs_set[-1 -(skip_frame * i)]
return state
def append_sample(self,state,action,reward, next_state, done):
self.memory.append((state,action,reward, next_state, done))
def get_action(self,state):
if train_mode:
if self.epsilon > np.random.rand():
return np.random.randint(0,action_size)
else:
with torch.no_grad():
Q = self.model(torch.FloatTensor(state).unsqueeze(0).to(device))
return np.argmax(Q.cpu().detach().numpy())
else:
with torch.no_grad():
Q = self.model(torch.FloatTensor(state).unsqueeze(0).to(device))
return np.argmax(Q.cpu.detach().numpy())
def save_model(self,load_model, train_mode):
if not load_model and train_mode:
os.makedirs(save_path + algorithm, exist_ok=True)
torch.save(self.model.state_dict(), save_path + algorithm +'/model.pth')
print("Save Model: {}".format(save_path + algorithm))
elif load_model and train_mode:
torch.save(self.model.state_dict(), load_path + '/model.pth')
print("Save Model: {}".format(load_path))
def train_model(self, state, action, reward, next_state, done):
batch = random.sample(self.memory, batch_size)
state_batch = torch.FloatTensor(np.stack([b[0] for b in batch], axis = 0)).to(device)
action_batch = torch.FloatTensor(np.stack([b[1] for b in batch], axis = 0)).to(device)
reward_batch = torch.FloatTensor(np.stack([b[2] for b in batch], axis = 0)).to(device)
next_state_batch = torch.FloatTensor(np.stack([b[3] for b in batch], axis = 0)).to(device)
done_batch = torch.FloatTensor(np.stack([b[4] for b in batch], axis = 0)).to(device)
eye = torch.eye(action_size).to(device)
one_hot_action = eye[action_batch.view(-1).long()]
q = (self.model(state_batch) * one_hot_action).sum(1)
with torch.no_grad():
max_Q = torch.max(q).item()
next_q = self.target_model(next_state_batch)
target_q = reward_batch + next_q.max(1).values * (discount_factor * (1-done_batch))
loss = F.smooth_l1_loss(q,target_q)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
return loss.item(), max_Q
def update_target(self):
self.target_model.load_state_dict(self.model.state_dict())
if __name__ == "__main__":
model = DQN("main").to(device)
target_model = DQN("target").to(device)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
agent = DQNAgent(model, target_model, optimizer)
model.train()
step = 0
episode = 0
reward_list = []
loss_list = []
max_Q_list = []
while step < run_step + test_step:
obs = env.reset()
episode_rewards = 0
done = False
for i in range(skip_frame*stack_frame):
agent.obs_set.append(obs)
state = agent.skip_stack_frame(obs)
while not done:
if step == run_step:
train_mode = False
model.eval()
action = agent.get_action(state)
next_obs, reward, done, _ = env.step(action)
episode_rewards += reward
next_state = agent.skip_stack_frame(next_obs)
reward -= abs(next_obs[0])
if train_mode:
agent.append_sample(state, action, reward, next_state, done)
else:
agent.epsilon = 0.0
env.render()
step += 1
if step > start_train_step and train_mode:
if agent.epsilon > epsilon_min:
agent.epsilon -= 1.0 / (run_step - start_train_step)
loss, maxQ = agent.train_model(state, action, reward, next_state, done)
loss_list.append(loss)
max_Q_list.append(maxQ)
if step % target_update_step == 0:
agent.update_target()
state = next_state
if step % save_step == 0 and step != 0 and train_mode:
agent.save_model(load_model, train_mode)
reward_list.append(episode_rewards)
episode += 1
if episode % print_episode == 0 and episode != 0:
print("step: {} / episode: {} / reward: {:.2f} / loss: {:.4f} / maxQ: {:.2f} / epsilon: {:.4f}".format
(step, episode, np.mean(reward_list), np.mean(loss_list), np.mean(max_Q_list), agent.epsilon))
reward_list = []
loss_list = []
max_Q_list = []
agent.save_model(load_model, train_mode)
env.close()
'자율주행 > Deep Q Network' 카테고리의 다른 글
DQN(Deep Q Network) (0) | 2021.02.24 |
---|---|
Reinforcement Learning (0) | 2021.02.23 |
ANN ,CNN -> ㅡMNIST 분석 (0) | 2021.02.22 |