ml-agent 활용 예제 - 02. python DQN

Seulgi Kim·2023년 5월 16일
0

reinforce learning

목록 보기
10/14
import numpy as np
import random
import copy
import datetime
import platform
import torch
import torch.nn.functional as F
from torch.utils.tensorboard import SummaryWriter
from collections import deque
from mlagents_envs.environment import UnityEnvironment, ActionTuple
from mlagents_envs.side_channel.engine_configuration_channel import EngineConfigurationChannel
# UnityEnvironment : 유니티로 만든 환경을 불러올 때 사용
# ActionTuple : 액션을 환경에 전달하기 위한 행동 객체
# EngineConfigurationChannel : 유니티 환경 타임 스케일 조절

state_size = [7, 1, 1] # DQN 에이전트의 입력으로 사용할 상태의 크기
# goal_plus와 goal_ex 두 가지가 있어서 크기는 [RGB * 2, 세로 64 grid, 가로 84 grid]
# visual 관측 정보 (agent의 위치)는 goal 관측 정보와 합쳐 전처리할 것이다.
action_size = 2 # DQN 에이전트의 출력으로 사용할 행동의 크기

load_model = True 
train_mode = False

batch_size = 32 # 한번 모델을 학습할 때 리플레이 메모리에서 꺼내는 경험 데이터 수
mem_maxlen = 10000 # 만약 10000개 이상 쌓이면 가장 오래된 데이터를 제거
discount_factor = 0.9 # gamma
learning_rate = 0.00025 # 네트워크의 learning rate

run_step = 50000 if train_mode else 0 # 학습 모드에서 진행할 스텝 수 
test_step = 5000 # 평가모드에서 진행할 스텝 수
train_start_step = 5000 # 학습 시작 전에 리플레이 메모리에 충분한 데이터를 모으기 위해 몇 스텝 동안 임의의 행동으로 게임 진행할 것인지 
target_update_step = 500 # 타겟 네트워크를 몇 스텝 주기로 갱신할 것인지 

print_interval = 10 # 텐서보드에 기록할 주기 설정
save_interval = 100 # 학습 모델을 저장할 에피소드 주기 설정

epsilon_eval = 0.05 # 평가모드의 epsilon 값. 평가모드에서는 5%의 확률로 랜덤하게 이동한다. (탐색한다)
epsilon_init = 1.0 if train_mode else epsilon_eval # 초기 epsioon 값. 학습모드 일때 처음에 탐색하는 비율.
epsilon_min = 0.1
explore_step = run_step * 0.8 # epsilon이 감소되는 구간
epsilon_delta = (epsilon_init - epsilon_min) / explore_step if train_mode else 0 # 한 스텝당 감소하는 epsilon 변화량


VISUAL_OBS = 0 # 시각적 관측 인덱스
GOAL_OBS = 1 # 목적지 관측 인덱스
VECTOR_OBS = 2 # 수치적 관측 인덱스
OBS = VISUAL_OBS # DQN에서는 시각적 관측 인덱스를 사용함으로써 VISUAL_OBS로 설정

# 유니티 환경 경로
game = 'FlappyJump'
os_name = platform.system()
if os_name == 'Windows':
    env_name = f'../envs/{game}_{os_name}/{game}'
elif os_name == 'Darwin': # MacOS
#   env_name = f'../envs/{game}_{os_name}'
    env_name = f'../../Unity/{game}/{game}_RL' # 불러올 유니티 환경 경로

# 모델 저장 및 불러오기 경로
date_time = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
save_path = f'./saved_models/{game}/DQN/{date_time}' # 모델 파일이 저장될 경로
load_path = f'./saved_models/{game}/DQN/20230509001048' # 모델 파일을 불러올 경로

# 연산 장치
device = torch.device('mps' if torch.backends.mps.is_available() else 'cpu')
print (device)



class DQN(torch.nn.Module):
    def __init__(self, **kwargs):
        super(DQN, self).__init__(**kwargs)
#       self.conv1 = torch.nn.Conv2d(in_channels=state_size[0], out_channels=32, kernel_size=8, stride=4) # convolution layer 를 만들어 주기 위함. input = state_size[0], output node = 32, kernel_size = 8x8
#       # stride = 필터를 이동시기는 간격, stride만큼 이미지 크기가 줄어들게 됨.
#       dim1 = ((state_size[1] - 8)//4 + 1, (state_size[2] - 8)//4 + 1) # (20, 15)
#       self.conv2 = torch.nn.Conv2d(in_channels=32, out_channels=64, kernel_size=4, stride=2)
#       dim2 = ((dim1[0] - 4)//2 + 1, (dim1[1] - 4)//2 + 1) # (9, 6)
#       self.conv3 = torch.nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1)
#       dim3 = ((dim2[0] - 3)//1 + 1, (dim2[1] - 3)//1 + 1) # (7, 4)

#       self.flat = torch.nn.Flatten() # Fully-Connected layer로 만들어주기 위함
#       self.fc1 = torch.nn.Linear(64*dim3[0]*dim3[1], 512) # FC, input node = out_channel * 7 * 4
#       self.q = torch.nn.Linear(512, action_size)

        self.flat = torch.nn.Flatten() # Fully-Connected layer로 만들어주기 위함
        self.fc1 = torch.nn.Linear(state_size[0], 5) # FC, input node = out_channel * 7 * 4
        self.q = torch.nn.Linear(5, action_size)

    def forward(self, x):
        x = x.permute(0, 3, 1, 2) # 데이터 차원 변환. Unity는 (height, width, channel) 순인 반면, pytorch는 (channel, height, width) 순이기 때문.
#       x = F.relu(self.conv1(x))
#       x = F.relu(self.conv2(x))
#       x = F.relu(self.conv3(x))
        x = self.flat(x)
        x = F.relu(self.fc1(x))
        return self.q(x)

class DQNAgent:
    def __init__(self):
        self.network = DQN().to(device)
        self.target_network = copy.deepcopy(self.network)
        self.optimizer = torch.optim.Adam(self.network.parameters(), lr=learning_rate)
        self.memory = deque(maxlen=mem_maxlen) # 리플레이 메모리
        self.epsilon = epsilon_init
        self.writer = SummaryWriter(save_path)

        if load_model == True:
            print (f'... Load Model from {load_path}/ckpt')
            checkpoint = torch.load(load_path+'/ckpt', map_location=device) # map_location으로 해당 device로 모델을 불러온다.
            self.network.load_state_dict(checkpoint['network']) # state_dict는 각 계층을 매개변수 텐서로 매핑되는 Python dictionary 객체이다.
            self.target_network.load_state_dict(checkpoint['network'])
            self.optimizer.load_state_dict(checkpoint['optimizer'])

    # epsilon-greedy 기법에 따라 행동 결정
    def get_action(self, state, training=True):
        # 네트워크 모드 설정
        self.network.train(training) # batch normalization layer나 dropout layer가 있을 경우, train-mode와 eval-mode일때 다르게 작용함.
        epsilon = self.epsilon if training else epsilon_eval

        # 랜덤하게 행동 결정
        if epsilon > random.random():
            action = np.random.randint(0, action_size, size=(1, 1))
        # 네트워크 연산에 따라 행동 결정
        else:
            q = self.network(torch.FloatTensor(state).to(device)) # float tensor 형태로 넣어줌
            action = torch.argmax(q, axis=-1, keepdim=True).data.cpu().numpy() # keepdim = output tensor가 input tensor의 형태를 가질 것인지
        return action

    # 리플레이 메모리에 데이터 추가 (상태, 행동, 보상, 다음 상태, 게임 종료 여부)
    def append_sample(self, state, action, rewared, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    # 학습 수행
    def train_model(self):
        batch = random.sample(self.memory, batch_size)
        state = np.stack([b[0] for b in batch], axis=0)
        action = np.stack([b[1] for b in batch], axis=0)
        reward = np.stack([b[2] for b in batch], axis=0)
        next_state = np.stack([b[3] for b in batch], axis=0)
        done = np.stack([b[4] for b in batch], axis=0)

        state, action, reward, next_state, done = map(lambda x: torch.FloatTensor(x).to(device), [state, action, reward, next_state, done])

        eye = torch.eye(action_size).to(device) # I(action_size) 단위행렬 생성
        one_hot_action = eye[action.view(-1).long()]
        q = (self.network(state) * one_hot_action).sum(1, keepdims=True) # 확인 필요

        with torch.no_grad(): # 네트워크 업데이트를 하지 않는다.
            next_q = self.target_network(next_state)
            target_q = reward + next_q.max(1, keepdims=True).values * ((1 - done) * discount_factor)

        loss = F.smooth_l1_loss(q, target_q) # Huber loss 사용

        self.optimizer.zero_grad() # optimizer의 gradiant 초기화
        loss.backward() # 역전파를 통해 gradient 계산
        self.optimizer.step() # model 의 파라미터 값들을 업데이트

        # epsilon 감소
        self.epsilon = max(epsilon_min, self.epsilon - epsilon_delta)

        return loss.item()

    # 타겟 네트워크 업데이트
    def update_target(self):
        self.target_network.load_state_dict(self.network.state_dict())

    # 네트워크 모델 저장
    def save_model(self):
        print(f'... Save Model to {save_path}/ckpt...')
        torch.save({'network':self.network.state_dict(), 'optimizer':self.optimizer.state_dict()}, save_path+'/ckpt')

    # 학습 기록
    def write_summary(self, score, loss, epsilon, step):
        self.writer.add_scalar('run/score', score, step)
        self.writer.add_scalar('model/loss', loss, step)
        self.writer.add_scalar('model/epsilon', epsilon, step)

# main 함수
if __name__ == '__main__':
    # 유니티 환경 경로 설정 (file_name)
    engine_configuration_channel = EngineConfigurationChannel() # 유니티 엔진 설정 채널
    env = UnityEnvironment(file_name = env_name, side_channels=[engine_configuration_channel]) # 유니티 환경, side_channel은 해상도, timesclae, graphic quality 등을 설정할 때 사용
    env.reset()

    # 유니티 브레인 설정
    behavior_name = list(env.behavior_specs.keys())[0] # env.behavior_specs.keys()는 모든 behavior의 정보를 가지고있음
    spec = env.behavior_specs[behavior_name] # behavior의 관찰, 행동 정보 저장
    engine_configuration_channel.set_configuration_parameters(time_scale=12.0) # time_scale로 유니티 시간 배율 설정.
    dec, term = env.get_steps(behavior_name) # decision step = decision request (요청한 스텝 정보), terminal step = 에피소드가 종료된 스텝 정보

    # DQNAgent 클래스를 agent로 정의
    agent = DQNAgent()

    preprocess = lambda obs: obs.reshape([1,7,1,1])
    losses, scores, episode, score = [], [], 0, 0
    for step in range(run_step + test_step): # 학습모드 스텝 + 테스트 모드 스텝
        if step == run_step:
            if train_mode:
                agent.save_model()
            print('TEST START')
            train_mode = False
            engine_configuration_channel.set_configuration_parameters(time_scale=0.001)

        behavior_name = list(env.behavior_specs.keys())[0] # env.behavior_specs.keys()는 모든 behavior의 정보를 가지고있음
        spec = env.behavior_specs[behavior_name] # behavior의 관찰, 행동 정보 저장
        dec, term = env.get_steps(behavior_name) # decision step = decision request (요청한 스텝 정보), terminal step = 에피소드가 종료된 스텝 정보

        state = preprocess(dec.obs[OBS])
        action = agent.get_action(state, train_mode) # 행동 결정
        real_action = action
        action_tuple = ActionTuple()
        action_tuple.add_discrete(real_action)
        env.set_actions(behavior_name, action_tuple) # 환경에 behavior_name이 action_tuple만큼의 행동을 했음을 전달
        env.step() # 시뮬레이션 한스텝 진행 

        dec, term = env.get_steps(behavior_name)
        done = len(term.agent_id) > 0 # 에이전트가 한개만 존재하기 때문에 agent_id로 종료 여부 판단 가능
        reward = term.reward if done else dec.reward
        next_state = preprocess(term.obs[OBS]) if done else preprocess(dec.obs[OBS])
        score += reward[0]

        if train_mode:
            agent.append_sample(state[0], action[0], reward, next_state[0], [done]) # replay memory 에 저장

        if train_mode and step > max(batch_size, train_start_step):
            # 학습 수행
            loss = agent.train_model()
            losses.append(loss)

            # 타겟 네트워크 업데이트
            if step % target_update_step == 0:
                agent.update_target()

        if done:
            episode += 1
            scores.append(score)
            score = 0

            # 게임 진행 상황 출력 및 텐서 보드에 보상과 손실함수 값 기록
            if episode % print_interval == 0:
                mean_score = np.mean(scores)
                mean_loss = np.mean(losses)
                agent.write_summary(mean_score, mean_loss, agent.epsilon, step)
                losses, scores = [], []

                print (f'{episode} Episode / Step: {step} / Score: {mean_score:.2f} / Loss: {mean_loss:.4f} / Epsilon: {agent.epsilon:.4f}')

            # 네트워크 모델 저장
            if train_mode and episode % save_interval == 0:
                agent.save_model()

            env.reset()

    env.close()

완성 히히히

0개의 댓글