강화학습 - 정책 기반 에이전트(2)

BSH·2023년 5월 29일
0

강화학습_basic

목록 보기
12/12

Reinforce 알고리즘

이제 policy gradient를 이용해 어떻게 학습할 수 있는지 알아보겠습니다. reinforce알고리즘은 policy gradient에 속하는 간단한 알고리즘입니다.

이론적 배경

학습에 사용되는 수식입니다. policy gradient theorem에서 약간의 변경만 있습니다.

θJ(θ)=Eπθ[θlogπθ(s,a)Gt]\nabla_{\theta}J(\theta)=\mathbb{E_{\pi_{\theta}}}[\nabla_{\theta}log\pi_{\theta}(s, a)G_{t}]

Qπθ(s,a)Q_{\pi_{\theta}}(s, a)자리에 리턴인 GtG_{t}가 들어갔습니다. 리턴인 GtG_{t}Qπθ(s,a)Q_{\pi_{\theta}}(s, a)의 정의 때문에 편향되지 않은 샘플입니다. Qπθ(s,a)=E[Gtst=s,at=a]Q_{\pi_{\theta}}(s, a)=\mathbb{E}[G_{t}|s_{t}=s, a_{t}=a]입니다. GtG_{t} 샘플을 여러개 얻어 평균내면 그 값이 실제 state-action value인 Qπθ(s,a)Q_{\pi_{\theta}}(s, a)에 근사해지기 때문에 GtG_{t}를 사용해도 무방합니다.

pseudo code

  1. πθ(s,a)\pi_{\theta}(s, a)의 파라미터 θ\theta초기화
  2. 에이전트 상태 초기화
  3. πθ\pi_{\theta}이용하여 에피소드 끝까지 진행해서 데이터 얻음
  4. t=0~T까지 반복
    Gt:=Σi=tTriγitθ:=θ+αθlogπθ(st,at)GtG_{t}:=\Sigma^{T}_{i=t}r_{i}\gamma^{i-t}\\ \theta:=\theta+\alpha \nabla_{\theta}log\pi_{\theta}(s_{t}, a_{t})G_{t}
  5. 2,3,4반복

직관적으로 식을 보면 리턴이 증가하는 액션의 정책의 확률을 높이고 리턴이 감소하는 방향으로 정책을 업데이트 합니다. 보상이 모두 양수인 경우에는 확률을 곱하기 때문에 리턴이 더 큰 쪽으로 향하게 됩니다. 여기서 loglog확률을 증가시키는거나 그냥 확률을 증가시키는거나 같지 않을까?라고 생각할 수 있지만 policy gradient theorem이 성립하지 않아서 학습이 되지 않는다고 합니다.

Reinforce 코드 구현

데이터를 이용해 계산해야하는 gradient policy식을 살펴보겠습니다.

θJ(θ)Gtθlogπθ(st,at)\nabla_{\theta}J(\theta)\simeq G_{t}\nabla_{\theta}log\pi_{\theta}(s_{t}, a_{t})

이 식은 상수에 gradient가 곱해져 있습니다. 그러나 파이토치나 텐서플로를 사용할 때는 위와같이 미분된 형태의 수식을 사용하지 않습니다(체인룰을 통해 자동미분). 이번에도 마찬가지로 원형태인 Gtlogπθ(st,at)G_{t}log\pi_{\theta}(s_{t}, a_{t})를 사용합니다. 그리고 gradient ascent를 위해 식에 마이너스를 곱해 다음과 같이 변형해줍니다.

Gtlogπθ(st,at)-G_{t}log\pi_{\theta}(s_{t}, a_{t})
import gymnasium as gym
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical

hyper_params = {
    'lr': 0.0002,
    'gamma': 0.098
}

def main():
    # env = gym.make('CartPole-v1', mode='human')
    env = gym.make('CartPole-v1')
    pi = Policy()
    score = 0.0
    print_interval = 100
    
    for n_epi in range(1, 10001):
        s = env.reset()[0]
        done = False
        
        while not done:
            prob = pi(torch.from_numpy(s).float())
            m = Categorical(prob)
            a = m.sample()
            s_prime, r, done, _, info = env.step(a.item())
            pi.put_data((r, prob[a]))
            s = s_prime
            score += r
        
        pi.train_net()
        if n_epi % print_interval == 0:
            print(f"Episode: {n_epi} | avg score: {(score/print_interval):3.f}")
            score = 0.0
    env.close()

정책 네트워크를 이용해 액션별 확률을 구하고 그 확률을 사용해 하나의 액션을 샘플링하는 방식을 사용합니다. reinforce알고리즘에는 πθ(st,at)\pi_{\theta}(s_{t}, a_{t})GtG_{t}만 있으면 loss를 계산할 수 있기 때문에 확률값 prob[a]와 보상r을 데이터에 저장해 둡니다.

class Policy(nn.Module):
    def __init__(self):
        super().__init__()
        self.data = []
        
        self.fc1 = nn.Linear(4, 128)
        self.fc2 = nn.Linear(128, 2)
        self.optimizer = optim.Adam(self.parameters(), lr=hyper_params['lr'])

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.softmax(self.fc2(x), dim=0)
        return x

    def put_data(self, item):
        self.data.append(item)
    
    def train_net(self):
        R = 0
        self.optimizer.zero_grad()
        for r, prob in self.data[::-1]:
            R = r + hyper_params['gamma'] * R
            loss = - R * torch.log(prob)
            loss.backward()
        self.optimizer.step() # 누적된 값을 평균내어 파라미터 업데이트
        self.data = []

Actor-Critic

정책 네트워크와 밸류 네트워크를 같이 학습하는 액터 크리틱 방법론에 대해서 알아보겠습니다. Q, Adventage, TD 방법이 있습니다.

Q actor-critic

Q 액터-크리틱은 간단합니다. reinforce알고리즘에서 에서 리턴 GtG_{t}가 아닌 Q를 그대로 이용하면 됩니다. 원래 policy gradient식이죠.

θJ(θ)=Eπθ[θlogπθ(s,a)Qπθ(s,a)]\nabla_{\theta}J(\theta)=\mathbb{E_{\pi_{\theta}}}[\nabla_{\theta}log\pi_{\theta}(s, a)Q_{\pi_{\theta}}(s, a)]

Qπθ(s,a)Q_{\pi_{\theta}}(s, a)는 미지의 함수이기 때문에 파라미터화 된 모델을 이용해야 합니다. Qw(s,a)Qπθ(s,a)Q_{w}(s, a)\simeq Q_{\pi_{\theta}}(s, a) 결국 θ\theta로 파라미터화된 정책 네트워크 πθ\pi_{\theta}ww로 파라미터화 된 밸류 네트워크 QwQ_{w}를 같이 학습해야합니다. πθ\pi_{\theta}는 실행할 액션 a를 선택하는 액터, QwQ_{w}는 선택된 액션 a의 밸류를 평가하는 크리틱 역할을 담당합니다. 이렇게 에이전트의 학습 과정에서 정책 π\pi와 밸류 QQ를 학습하는 방식을 액터-크리틱이라고 합니다.

pseudo code

  1. 정책, 액션-밸류 네트워크의 파라미터 θ\thetaww를 초기화
  2. 상태 s를 초기화
  3. 액션 aπθ(as)a\sim\pi_{\theta}(a|s) 샘플링
  4. a 실행하여 보상 r과 다음 상태 s'을 얻음
  5. θ\theta업데이트
  6. 다음 액션 a' 샘플링
  7. ww업데이트
  8. a:=a', s:=s'
  9. 4~9반복

Advantage actor-critic

다시 policy gradient식을 보겠습니다.

θJ(θ)=Eπθ[θlogπθ(s,a)Qπθ(s,a)]\nabla_{\theta}J(\theta)=\mathbb{E_{\pi_{\theta}}}[\nabla_{\theta}log\pi_{\theta}(s, a)Q_{\pi_{\theta}}(s, a)]

여기서 한가지 생각해볼 것이 있습니다. 상태 s에서 액션 a를 하고 운이 좋게 밸류가 아주 높은 상태 s'에 도달했다고 가정합시다. 애초에 s'의 밸류가 좋아서 어떤 액션을 택하든 이후에 얻는 리턴이 높은 상황입니다. 예를 들어 a1a_{1}에는 1000, a2a_{2}에는 1050의 리턴을 얻는다고 하면 둘다 비슷하게 강화되는데 a1a_{1}이 확률적으로 더 강화되겠지만 그러기 위해서는 수많은 샘플들이 필요합니다. 그러면 이 방법론이 효율적인가에 대해 다시 생각해볼 필요가 있습니다.

두 액션이 모두 강화되는 이유는 s'의 밸류가 애초에 너무 높았기 때문입니다. s'에 도착한 것은 이미 과거의 일이므로 아래와 같이 제거하는 방법을 생각해볼 수 있습니다.

θJ(θ)=Eπθ[θlogπθ(s,a)(Qπθ(s,a)Vπθ(s))]\nabla_{\theta}J(\theta)=\mathbb{E_{\pi_{\theta}}}[\nabla_{\theta}log\pi_{\theta}(s, a)(Q_{\pi_{\theta}}(s, a)-V_{\pi_{\theta}}(s))]

모든 상태에서 업데이트 할 때 각 상태의 밸류값인 Vπθ(s)V_{\pi_{\theta}}(s)를 빼줍니다.그러면 이제 상태보다 액션을 통해 얼마의 가치를 더 얻는지를 보게 됩니다. 그리고 이 값을 advantage라고 부릅니다.

Aπθ(s,a)Qπθ(s,a)Vπθ(s)A_{\pi_{\theta}}(s, a)\equiv Q_{\pi_{\theta}}(s, a)-V_{\pi_{\theta}}(s)

여기서 Vπθ(s)V_{\pi_{\theta}}(s)를 기저라고 부릅니다. 상태 s는 이미 벌어진 일이고 거기서 어떤 액션을 하느냐에 주목하는 것이 직관적으로도 좋아보입니다. 수학적으로 보면 어드밴티지 Aπθ(s)A_{\pi_{\theta}}(s)를 곱함으로 그라디언트 추정치의 변동성이 작아집니다. 상태의 가치가 야기하는 변동성만큼을 빼주고 고려하기 때문입니다. 즉 액션으로 인해 생기는 추가 이득만 고려할 수 있습니다. 그래서 학습 성능 향상에도 도움이 됩니다.

증명

그럼 왜 VπθV_{\pi_{\theta}}를 빼도 되는지 보겠습니다.

기존 수식에서 VπθV_{\pi_{\theta}}를 빼줘도 원 수식의 기댓값이 변하지 않아야합니다.

Eπθ[θlogπθ(s,a)Qπθ(s,a)]=Eπθ[θlogπθ(s,a)Qπθ(s,a)Vπθ(s)]=Eπθ[θlogπθ(s,a)Qπθ(s,a)]Eπθ[θlogπθ(s,a)Qπθ(s,a)Vπθ]Eπθ[θlogπθ(s,a)Vπθ(s)]=0\begin{matrix} \mathbb{E_{\pi_{\theta}}}[\nabla_{\theta}log\pi_{\theta}(s, a)Q_{\pi_{\theta}}(s, a)]&=&\mathbb{E_{\pi_{\theta}}}[\nabla_{\theta}log\pi_{\theta}(s, a)Q_{\pi_{\theta}}(s, a)-V_{\pi_{\theta}}(s)]\\ &=&\mathbb{E_{\pi_{\theta}}}[\nabla_{\theta}log\pi_{\theta}(s, a)Q_{\pi_{\theta}}(s, a)]-\mathbb{E_{\pi_{\theta}}}[\nabla_{\theta}log\pi_{\theta}(s, a)Q_{\pi_{\theta}}(s, a)V_{\pi_{\theta}}] \end{matrix}\\ \therefore \mathbb{E_{\pi_{\theta}}}[\nabla_{\theta}log\pi_{\theta}(s, a)V_{\pi_{\theta}}(s)]=0

다시말하면 위 식이 성립해야합니다.

Vπθ(s)V_{\pi_{\theta}}(s)뿐 아니라 상태 s에 대한 임의의 함수를 빼도 된다고 합니다. 그 함수가 액션 a에 대한 함수가 아니기만 하면 됩니다.
상태 ss에 대한 임의의 함수를 B(s)B(s)라고 하겠습니다. B(s)B(s)는 간단하게 상태 s를 넣어주면 숫자 값 하나를 리턴해주는 아무 함수를 가져다가 사용해도 됩니다. VπθV_{\pi_{\theta}}B(s)B(s)의 특별한 경우입니다. 이제 앞으로 아래의 식을 증명하고자 합니다.

Eπθ[θlogπθ(s,a)B(s)]=0\mathbb{E_{\pi_{\theta}}}[\nabla_{\theta}log\pi_{\theta}(s, a)B(s)]=0

이 증명 이전에 상태 분포(state distribution)dπ(s)d_{\pi}(s)를 정의하겠습니다. 상태 분포는 정책 π\pi를 따라서 움직이는 에이전트가 각 상태에 평균적으로 머무는 비율을 나타내는 분포입니다.
기댓값을 풀어서 식을 써보겠습니다.

Eπθ[θlogπθ(s,a)B(s)]=ΣsSdπθ(s)ΣaAπθ(s,a)θlogπθ(s,a)B(s)\mathbb{E_{\pi_{\theta}}}[\nabla_{\theta}log\pi_{\theta}(s, a)B(s)]=\Sigma_{s\in S}d_{\pi_{\theta}}(s)\Sigma_{a\in A}\pi_{\theta}(s, a)\nabla_{\theta}log\pi_{\theta}(s, a)B(s)

실제 우변은 계산할 수 없지만 단순 증명을 위해 풀어썼습니다.

ΣsSdπθ(s)ΣaAπθ(s,a)θlogπθ(s,a)B(s)=ΣsSdπθ(s)ΣaAπθ(s,a)θπθ(s,a)πθ(s,a)B(s)=ΣsSdπθ(s)ΣaAθπθ(s,a)B(s)=ΣsSdπθ(s)B(s)ΣaAθπθ(s,a)=ΣsSdπθ(s)B(s)θΣaAπθ(s,a)=ΣsSdπθ(s)B(s)θ1=0\begin{matrix} \Sigma_{s\in S}d_{\pi_{\theta}}(s)\Sigma_{a\in A}\pi_{\theta}(s, a)\nabla_{\theta}log\pi_{\theta}(s, a)B(s)&=&\Sigma_{s\in S}d_{\pi_{\theta}}(s)\Sigma_{a\in A}\pi_{\theta}(s, a){\nabla_{\theta}\pi_{\theta}(s, a)\over \pi_{\theta}(s, a)}B(s)\\ &=& \Sigma_{s\in S}d_{\pi_{\theta}}(s)\Sigma_{a\in A}\nabla_{\theta}\pi_{\theta}(s, a)B(s)\\ &=& \Sigma_{s\in S}d_{\pi_{\theta}}(s)B(s)\Sigma_{a\in A}\nabla_{\theta}\pi_{\theta}(s, a)\\ &=& \Sigma_{s\in S}d_{\pi_{\theta}}(s)B(s)\nabla_{\theta}\Sigma_{a\in A}\pi_{\theta}(s, a)\\ &=& \Sigma_{s\in S}d_{\pi_{\theta}}(s)B(s)\nabla_{\theta}1\\ &=& 0\\ \end{matrix}
Eπθ[θlogπθ(s,a)B(s)]=0\therefore \mathbb{E_{\pi_{\theta}}}[\nabla_{\theta}log\pi_{\theta}(s, a)B(s)]=0

증명이 끝났습니다. 실제 기댓값 연산자 안에서 Vπθ(s)V_{\pi_{\theta}}(s)를 빼도 기댓값이 변하지 않는 것을 확인하였습니다. 따라서 policy gradient 식에서 Vπθ(s)V_{\pi_{\theta}}(s)를 빼줄 수 있습니다.

θJ(θ)=Eπθ[θlogπθ(s,a)Aπθ(s,a)]\nabla_{\theta}J(\theta)=\mathbb{E_{\pi_{\theta}}}[\nabla_{\theta}log\pi_{\theta}(s, a)A_{\pi_{\theta}}(s, a)]

실제 알고리즘으로 사용하려면 아래와 같이 근사가 필요합니다.

QπθQw,VπθVϕQ_{\pi_{\theta}}\simeq Q_{w}, V_{\pi_{\theta}}\simeq V_{\phi}

모두 뉴럴네트워크를 이용해 근사를 합니다. 정리하면 아래와 같은 3개의 모델이 필요합니다.

  • 정책 함수 θ\theta
  • 액션-가치 함수 ww
  • 가치 함수 ϕ\phi

pseudo code

  1. 3쌍의 네트워크 파라미터 초기화
  2. 상태 s 초기화
  3. 액션 aπθ(as)a\sim \pi_{\theta}(a|s)샘플링
  4. a를 실행해 보상 r과 다음상태 s'을 얻음
  5. θ\theta업데이트
  6. 액션 aπθ(as)a'\sim \pi_{\theta}(a'|s')샘플링
  7. ww업데이트
  8. ϕ\phi업데이트
  9. a:=a,s:=sa:=a', s:=s'
  10. 4~9반복

정책네트워크, 밸류 네트워크, 액션-밸류 네트워크가 같이 학습을 하며 밸류네트워크는 모두 TD방식으로 학습을 합니다.

TD actor-critic

어드밴티지 액터 크리틱을 통해 추정치의 변동성을 줄여 학습 효율성을 높였지만 치명적인 단점이 하나 있습니다. 바로 3개의 뉴럴 네트워크 모델을 필요로 한다는 점입니다. 앞으로 나올 아이디어를 통해 QwQ_{w}를 사용할 필요가 없게 됩니다. 먼저 가치함수 V(s)V(s)의 TD에러 δ\delta를 생각해봅시다.

δ=r+γV(s)V(s)\delta=r+\gamma V(s')-V(s)

상태 s에서 어떤 액션 a를 했을 때의 δ\delta의 기댓값을 구해보겠습니다.

Eπ[δs,a]=Eπ[r+γV(s)V(s)s,a]=Eπ[r+γV(s)s,a]V(s)=Q(s,a)V(s)=A(s,a)\begin{matrix} \mathbb{E_{\pi}}[\delta|s, a]&=&\mathbb{E_{\pi}}[r+\gamma V(s')-V(s)|s, a]\\ &=&\mathbb{E_{\pi}}[r+\gamma V(s')|s, a]-V(s)\\ &=&Q(s,a)-V(s)=A(s,a) \end{matrix}

TD에러인 δ\delta의 기댓값이 어드밴티지와 동일합니다. 그럼 A(s,a)A(s,a)대신 δ\delta를 업데이트해도 괜찮습니다. 기존 어드밴티지 액터-크리틱에서 policy gradient 수식을 아래와 같이 변형할 수 있습니다.

θJ(θ)=Eπθ[θlogπθ(s,a)δ]\nabla_{\theta}J(\theta)=\mathbb{E_{\pi_{\theta}}}[\nabla_{\theta}log\pi_{\theta}(s, a)\delta]

pseudo code

  1. 정책, 밸류 네트워크 파라미터 초기화
  2. 액션 aπθ(as)a\sim \pi_{\theta}(a|s)샘플링
  3. a실행하여 보상 r과 다음상태 s'을 얻음
  4. δ:=r+γVϕ(s)Vϕ(s)\delta:=r+\gamma V_{\phi}(s')-V_{\phi}(s)
  5. θ:θ+α1θlogπθ(s,a)δ\theta:\theta+\alpha_{1}\nabla_{\theta}log\pi_{\theta}(s, a)\delta
  6. ϕ:=ϕ+α2δϕVϕ(s)\phi:=\phi+\alpha_{2}\delta\nabla_{\phi}V_{\phi}(s)
  7. a:=a,s:=sa:=a', s:=s'
  8. 3~7반복

구현 코드

import gymnasium as gym
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical

hyper_params = {
    'lr': 0.0002,
    'gamma': 0.98,
    'n_rollout': 10
}

def main():
    env = gym.make("CartPole-v1")
    model = ActorCritic()
    print_interval = 100
    score = 0.0
    
    for n_epi in range(1, 10001):
        done = False
        s = env.reset()[0]
        
        while not done:
            for t in range(hyper_params['n_rollout']):
                prob = model.pi(torch.from_numpy(s).float())
                m = Categorical(prob)
                a = m.sample().item()
                s_prime, r, done, _, info = env.step(a)
                model.put_data((s, a, r, s_prime, done))
            
                s = s_prime
                score += r
                
                if done: 
                    break
            
            model.train_net()
    
        if n_epi % print_interval == 0:
            print(f"episode: {n_epi}, avg score: {score/print_interval:.3f}")
            score = 0.0
    
    env.close()
                
                
class ActorCritic(nn.Module):
    def __init__(self):
        super().__init__()
        self.data = []
    
        self.fc1 = nn.Linear(4, 256)
        self.fc_pi = nn.Linear(256, 2)
        self.fc_v = nn.Linear(256, 1)
        self.optimizer = optim.Adam(self.parameters(), lr=hyper_params['lr'])
    
    def pi(self, x, softmax_dim=0):
        x = F.relu(self.fc1(x))
        x = self.fc_pi(x)
        prob = F.softmax(x, dim=softmax_dim)
        return prob
    
    def v(self, x):
        x = F.relu(self.fc1(x))
        v = self.fc_v(x)
        return v
    
    def put_data(self, transition):
        self.data.append(transition)
    
    def make_batch(self):
        s_lst, a_lst, r_lst, s_prime_lst, done_lst = [], [], [], [], []
        for transition in self.data:
            s, a, r, s_prime, done = transition
            s_lst.append(s)
            a_lst.append([a])
            r_lst.append([r/100.0])
            s_prime_lst.append(s_prime)
            done_mask = 0.0 if done else 1.0
            done_lst.append([done_mask])
        
        s_batch = torch.tensor(np.array(s_lst), dtype=torch.float32)
        a_batch = torch.tensor(np.array(a_lst))
        r_batch = torch.tensor(np.array(r_lst), dtype=torch.float32)
        s_prime_batch = torch.tensor(np.array(s_prime_lst), dtype=torch.float32)
        done_batch = torch.tensor(np.array(done_lst), dtype=torch.float32)
        self.data = []
        return s_batch, a_batch, r_batch, s_prime_batch, done_batch

    def train_net(self):
        s, a, r, s_prime, done = self.make_batch()
        td_target = r + hyper_params['gamma'] * self.v(s_prime) * done
        delta = td_target - self.v(s)

        pi = self.pi(s, softmax_dim=1)
        pi_a = pi.gather(1, a)
        loss = -torch.log(pi_a) * delta.detach() + F.smooth_l1_loss(self.v(s), td_target.detach())

        self.optimizer.zero_grad()
        loss.mean().backward()
        self.optimizer.step()
        
        
main()
profile
컴공생

0개의 댓글