import numpy as np
import random
import copy
import datetime
import platform
import torch
import torch.nn.functional as F
from torch.utils.tensorboard import SummaryWriter
from collections import deque
from mlagents_envs.environment import UnityEnvironment, ActionTuple
from mlagents_envs.side_channel.engine_configuration_channel import EngineConfigurationChannel
# UnityEnvironment : 유니티로 만든 환경을 불러올 때 사용
# ActionTuple : 액션을 환경에 전달하기 위한 행동 객체
# EngineConfigurationChannel : 유니티 환경 타임 스케일 조절
state_size = [7, 1, 1] # DQN 에이전트의 입력으로 사용할 상태의 크기
# goal_plus와 goal_ex 두 가지가 있어서 크기는 [RGB * 2, 세로 64 grid, 가로 84 grid]
# visual 관측 정보 (agent의 위치)는 goal 관측 정보와 합쳐 전처리할 것이다.
action_size = 2 # DQN 에이전트의 출력으로 사용할 행동의 크기
load_model = True
train_mode = False
batch_size = 32 # 한번 모델을 학습할 때 리플레이 메모리에서 꺼내는 경험 데이터 수
mem_maxlen = 10000 # 만약 10000개 이상 쌓이면 가장 오래된 데이터를 제거
discount_factor = 0.9 # gamma
learning_rate = 0.00025 # 네트워크의 learning rate
run_step = 50000 if train_mode else 0 # 학습 모드에서 진행할 스텝 수
test_step = 5000 # 평가모드에서 진행할 스텝 수
train_start_step = 5000 # 학습 시작 전에 리플레이 메모리에 충분한 데이터를 모으기 위해 몇 스텝 동안 임의의 행동으로 게임 진행할 것인지
target_update_step = 500 # 타겟 네트워크를 몇 스텝 주기로 갱신할 것인지
print_interval = 10 # 텐서보드에 기록할 주기 설정
save_interval = 100 # 학습 모델을 저장할 에피소드 주기 설정
epsilon_eval = 0.05 # 평가모드의 epsilon 값. 평가모드에서는 5%의 확률로 랜덤하게 이동한다. (탐색한다)
epsilon_init = 1.0 if train_mode else epsilon_eval # 초기 epsioon 값. 학습모드 일때 처음에 탐색하는 비율.
epsilon_min = 0.1
explore_step = run_step * 0.8 # epsilon이 감소되는 구간
epsilon_delta = (epsilon_init - epsilon_min) / explore_step if train_mode else 0 # 한 스텝당 감소하는 epsilon 변화량
VISUAL_OBS = 0 # 시각적 관측 인덱스
GOAL_OBS = 1 # 목적지 관측 인덱스
VECTOR_OBS = 2 # 수치적 관측 인덱스
OBS = VISUAL_OBS # DQN에서는 시각적 관측 인덱스를 사용함으로써 VISUAL_OBS로 설정
# 유니티 환경 경로
game = 'FlappyJump'
os_name = platform.system()
if os_name == 'Windows':
env_name = f'../envs/{game}_{os_name}/{game}'
elif os_name == 'Darwin': # MacOS
# env_name = f'../envs/{game}_{os_name}'
env_name = f'../../Unity/{game}/{game}_RL' # 불러올 유니티 환경 경로
# 모델 저장 및 불러오기 경로
date_time = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
save_path = f'./saved_models/{game}/DQN/{date_time}' # 모델 파일이 저장될 경로
load_path = f'./saved_models/{game}/DQN/20230509001048' # 모델 파일을 불러올 경로
# 연산 장치
device = torch.device('mps' if torch.backends.mps.is_available() else 'cpu')
print (device)
class DQN(torch.nn.Module):
def __init__(self, **kwargs):
super(DQN, self).__init__(**kwargs)
# self.conv1 = torch.nn.Conv2d(in_channels=state_size[0], out_channels=32, kernel_size=8, stride=4) # convolution layer 를 만들어 주기 위함. input = state_size[0], output node = 32, kernel_size = 8x8
# # stride = 필터를 이동시기는 간격, stride만큼 이미지 크기가 줄어들게 됨.
# dim1 = ((state_size[1] - 8)//4 + 1, (state_size[2] - 8)//4 + 1) # (20, 15)
# self.conv2 = torch.nn.Conv2d(in_channels=32, out_channels=64, kernel_size=4, stride=2)
# dim2 = ((dim1[0] - 4)//2 + 1, (dim1[1] - 4)//2 + 1) # (9, 6)
# self.conv3 = torch.nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1)
# dim3 = ((dim2[0] - 3)//1 + 1, (dim2[1] - 3)//1 + 1) # (7, 4)
# self.flat = torch.nn.Flatten() # Fully-Connected layer로 만들어주기 위함
# self.fc1 = torch.nn.Linear(64*dim3[0]*dim3[1], 512) # FC, input node = out_channel * 7 * 4
# self.q = torch.nn.Linear(512, action_size)
self.flat = torch.nn.Flatten() # Fully-Connected layer로 만들어주기 위함
self.fc1 = torch.nn.Linear(state_size[0], 5) # FC, input node = out_channel * 7 * 4
self.q = torch.nn.Linear(5, action_size)
def forward(self, x):
x = x.permute(0, 3, 1, 2) # 데이터 차원 변환. Unity는 (height, width, channel) 순인 반면, pytorch는 (channel, height, width) 순이기 때문.
# x = F.relu(self.conv1(x))
# x = F.relu(self.conv2(x))
# x = F.relu(self.conv3(x))
x = self.flat(x)
x = F.relu(self.fc1(x))
return self.q(x)
class DQNAgent:
def __init__(self):
self.network = DQN().to(device)
self.target_network = copy.deepcopy(self.network)
self.optimizer = torch.optim.Adam(self.network.parameters(), lr=learning_rate)
self.memory = deque(maxlen=mem_maxlen) # 리플레이 메모리
self.epsilon = epsilon_init
self.writer = SummaryWriter(save_path)
if load_model == True:
print (f'... Load Model from {load_path}/ckpt')
checkpoint = torch.load(load_path+'/ckpt', map_location=device) # map_location으로 해당 device로 모델을 불러온다.
self.network.load_state_dict(checkpoint['network']) # state_dict는 각 계층을 매개변수 텐서로 매핑되는 Python dictionary 객체이다.
self.target_network.load_state_dict(checkpoint['network'])
self.optimizer.load_state_dict(checkpoint['optimizer'])
# epsilon-greedy 기법에 따라 행동 결정
def get_action(self, state, training=True):
# 네트워크 모드 설정
self.network.train(training) # batch normalization layer나 dropout layer가 있을 경우, train-mode와 eval-mode일때 다르게 작용함.
epsilon = self.epsilon if training else epsilon_eval
# 랜덤하게 행동 결정
if epsilon > random.random():
action = np.random.randint(0, action_size, size=(1, 1))
# 네트워크 연산에 따라 행동 결정
else:
q = self.network(torch.FloatTensor(state).to(device)) # float tensor 형태로 넣어줌
action = torch.argmax(q, axis=-1, keepdim=True).data.cpu().numpy() # keepdim = output tensor가 input tensor의 형태를 가질 것인지
return action
# 리플레이 메모리에 데이터 추가 (상태, 행동, 보상, 다음 상태, 게임 종료 여부)
def append_sample(self, state, action, rewared, next_state, done):
self.memory.append((state, action, reward, next_state, done))
# 학습 수행
def train_model(self):
batch = random.sample(self.memory, batch_size)
state = np.stack([b[0] for b in batch], axis=0)
action = np.stack([b[1] for b in batch], axis=0)
reward = np.stack([b[2] for b in batch], axis=0)
next_state = np.stack([b[3] for b in batch], axis=0)
done = np.stack([b[4] for b in batch], axis=0)
state, action, reward, next_state, done = map(lambda x: torch.FloatTensor(x).to(device), [state, action, reward, next_state, done])
eye = torch.eye(action_size).to(device) # I(action_size) 단위행렬 생성
one_hot_action = eye[action.view(-1).long()]
q = (self.network(state) * one_hot_action).sum(1, keepdims=True) # 확인 필요
with torch.no_grad(): # 네트워크 업데이트를 하지 않는다.
next_q = self.target_network(next_state)
target_q = reward + next_q.max(1, keepdims=True).values * ((1 - done) * discount_factor)
loss = F.smooth_l1_loss(q, target_q) # Huber loss 사용
self.optimizer.zero_grad() # optimizer의 gradiant 초기화
loss.backward() # 역전파를 통해 gradient 계산
self.optimizer.step() # model 의 파라미터 값들을 업데이트
# epsilon 감소
self.epsilon = max(epsilon_min, self.epsilon - epsilon_delta)
return loss.item()
# 타겟 네트워크 업데이트
def update_target(self):
self.target_network.load_state_dict(self.network.state_dict())
# 네트워크 모델 저장
def save_model(self):
print(f'... Save Model to {save_path}/ckpt...')
torch.save({'network':self.network.state_dict(), 'optimizer':self.optimizer.state_dict()}, save_path+'/ckpt')
# 학습 기록
def write_summary(self, score, loss, epsilon, step):
self.writer.add_scalar('run/score', score, step)
self.writer.add_scalar('model/loss', loss, step)
self.writer.add_scalar('model/epsilon', epsilon, step)
# main 함수
if __name__ == '__main__':
# 유니티 환경 경로 설정 (file_name)
engine_configuration_channel = EngineConfigurationChannel() # 유니티 엔진 설정 채널
env = UnityEnvironment(file_name = env_name, side_channels=[engine_configuration_channel]) # 유니티 환경, side_channel은 해상도, timesclae, graphic quality 등을 설정할 때 사용
env.reset()
# 유니티 브레인 설정
behavior_name = list(env.behavior_specs.keys())[0] # env.behavior_specs.keys()는 모든 behavior의 정보를 가지고있음
spec = env.behavior_specs[behavior_name] # behavior의 관찰, 행동 정보 저장
engine_configuration_channel.set_configuration_parameters(time_scale=12.0) # time_scale로 유니티 시간 배율 설정.
dec, term = env.get_steps(behavior_name) # decision step = decision request (요청한 스텝 정보), terminal step = 에피소드가 종료된 스텝 정보
# DQNAgent 클래스를 agent로 정의
agent = DQNAgent()
preprocess = lambda obs: obs.reshape([1,7,1,1])
losses, scores, episode, score = [], [], 0, 0
for step in range(run_step + test_step): # 학습모드 스텝 + 테스트 모드 스텝
if step == run_step:
if train_mode:
agent.save_model()
print('TEST START')
train_mode = False
engine_configuration_channel.set_configuration_parameters(time_scale=0.001)
behavior_name = list(env.behavior_specs.keys())[0] # env.behavior_specs.keys()는 모든 behavior의 정보를 가지고있음
spec = env.behavior_specs[behavior_name] # behavior의 관찰, 행동 정보 저장
dec, term = env.get_steps(behavior_name) # decision step = decision request (요청한 스텝 정보), terminal step = 에피소드가 종료된 스텝 정보
state = preprocess(dec.obs[OBS])
action = agent.get_action(state, train_mode) # 행동 결정
real_action = action
action_tuple = ActionTuple()
action_tuple.add_discrete(real_action)
env.set_actions(behavior_name, action_tuple) # 환경에 behavior_name이 action_tuple만큼의 행동을 했음을 전달
env.step() # 시뮬레이션 한스텝 진행
dec, term = env.get_steps(behavior_name)
done = len(term.agent_id) > 0 # 에이전트가 한개만 존재하기 때문에 agent_id로 종료 여부 판단 가능
reward = term.reward if done else dec.reward
next_state = preprocess(term.obs[OBS]) if done else preprocess(dec.obs[OBS])
score += reward[0]
if train_mode:
agent.append_sample(state[0], action[0], reward, next_state[0], [done]) # replay memory 에 저장
if train_mode and step > max(batch_size, train_start_step):
# 학습 수행
loss = agent.train_model()
losses.append(loss)
# 타겟 네트워크 업데이트
if step % target_update_step == 0:
agent.update_target()
if done:
episode += 1
scores.append(score)
score = 0
# 게임 진행 상황 출력 및 텐서 보드에 보상과 손실함수 값 기록
if episode % print_interval == 0:
mean_score = np.mean(scores)
mean_loss = np.mean(losses)
agent.write_summary(mean_score, mean_loss, agent.epsilon, step)
losses, scores = [], []
print (f'{episode} Episode / Step: {step} / Score: {mean_score:.2f} / Loss: {mean_loss:.4f} / Epsilon: {agent.epsilon:.4f}')
# 네트워크 모델 저장
if train_mode and episode % save_interval == 0:
agent.save_model()
env.reset()
env.close()
완성 히히히