PPO 코드분석

이두현·2024년 3월 17일
0

환경에 대해서는 일단 좀 미뤄두고 전체적인 흐름부터 보는걸로…

main.py

def main():
	actor_critic = Policy(
		envs.obs_space.shape,
		envs.action_space,
		)
	actor_critic.to(device)

model.py

class Policy

def init():
		if action_space.__class__.__name__ == "Discrete":
            num_outputs = action_space.n
            self.dist = Categorical(self.base.output_size, num_outputs)
  • Discrete 과 Box 의 차이

→ Discrete은 Discrete(n) 과 같이 주어졌을 경우 action이 [0, n-1] 중 하나임을 의미

→ Box는 low 와 high 사이 shape 만큼의 dimension을 갖는 실수 vector 형태를 의미

  • image classification 문제에서 action은 [0, # label-1] 의 Discrete 형태를 갖고 observation은 image shape의 Box 형태를 가질 것이다.
  • 이 때 대응하는 distribution 은 Categorical 이다

distributions.py

class FixedCategorical(torch.distributions.Categorical):
    def sample(self):
        return super().sample().unsqueeze(-1)

    def log_probs(self, actions):
        return (
            super()
            .log_prob(actions.squeeze(-1))
            .view(actions.size(0), -1)
            .sum(-1)
            .unsqueeze(-1)
        )

    def mode(self):
        return self.probs.argmax(dim=-1, keepdim=True)

class Categorical(nn.Module):
    def __init__(self, num_inputs, num_outputs):
        super(Categorical, self).__init__()

        init_ = lambda m: init(
            m,
            nn.init.orthogonal_,
            lambda x: nn.init.constant_(x, 0),
            gain=0.01)

        self.linear = init_(nn.Linear(num_inputs, num_outputs))

    def forward(self, x):
        x = self.linear(x)
        return FixedCategorical(logits=x)
  • outer lambda 가 호출하는 init 함수는 module, weight init 방식, bias init 방식, bias를 받는 함수 (utils.py에 위치) , 이후 init된 모듈을 return 함
  • self.linear는 위의 방식으로 weight, bias가 초기화된 모듈이며 self.base.output_size를 받고 가능한 action space 만큼을 내뱉는다(classification에서는 label 개수)
  • 이는 logit으로 인식된 후 self.dist 변수에 FixedCategorical 함수를 전달한다

main.py

def main():
	agent = algo.PPO(
            actor_critic,
            args.clip_param,
            args.ppo_epoch,
            args.num_mini_batch,
            args.value_loss_coef,
            args.entropy_coef,
            lr=args.lr,
            eps=args.eps,
            max_grad_norm=args.max_grad_norm)
  • 이후 agent에는 algo.PPO가 할당된다
  • agent.update 부분에서 자세히…

main.py

def main():
	rollouts = RolloutStorage(args.num_steps, args.num_processes,
                              envs.observation_space.shape, envs.action_space,
                              actor_critic.recurrent_hidden_state_size)

	obs = envs.reset()
  rollouts.obs[0].copy_(obs)
  rollouts.to(device)
  • RolloutStorage init 함수 에서는 torch.zeros 를 (num_steps +1, num_process, 자기 자신 크기) 만큼 할당한다
  • 향후 자세한 함수 호출 시 다루도록
  • env 초기화 후 zero state를 obs의 num_step = 0 배열에 담음

main.py

def main():
	num_updates = int(
        args.num_env_steps) // args.num_steps // args.num_processes
  • num_env_steps : 총 training 진행할 env step 수
  • num_steps : 일단 number of forward step 으로 이해하면 될 듯
  • 그러므로 process는 한번이라 가정했을 떄 총 update는 num_env_steps // num_steps 만큼 이뤄진다고 할 수 있음

main.py

for j in range(num_updates):
        if args.use_linear_lr_decay:
            # decrease learning rate linearly
            utils.update_linear_schedule(
                agent.optimizer, j, num_updates,
                agent.optimizer.lr if args.algo == "acktr" else args.lr)
  • outer loop 는 num_updates 만큼 돌아간다
  • use_linear_lr_decay는 default = False로 세팅되어있다

main.py

for step in range(args.num_steps):
            # Sample actions
            with torch.no_grad():
                value, action, action_log_prob, recurrent_hidden_states = actor_critic.act(
                    rollouts.obs[step], rollouts.recurrent_hidden_states[step],
                    rollouts.masks[step])
  • inner loop 시작 (num_steps== number of forward steps)
  • actor_critic은 action , action_log_prob 와 함께 critic의 예측 value 값을 반환하는 객체
  • 처음 넘겨지는 rollouts.obs[0]에는 env.reset()의 결과가 들어가 있음
  • 아래 rollouts.insert() 를 통해 이번 step 의 값들이 들어가므로 전 step 을 참고할 수 있음

model.py

def act(self, inputs, rnn_hxs, masks, deterministic=False):
        value, actor_features, rnn_hxs = self.base(inputs, rnn_hxs, masks)
  • Policy class 초기화 과정에서 CNNBase나 MLPBase로 초기화된다

model.py

MLPBase 의 구조

2-layer로 된 actor와 critic이 있고 1 layer의 critic_linear 가 존재한다

critic_linear는 critic에서 나온 feature를 다시한번 통과시킨다

def forward(self, inputs, rnn_hxs, masks):
        x = inputs

        if self.is_recurrent:
            x, rnn_hxs = self._forward_gru(x, rnn_hxs, masks)

        hidden_critic = self.critic(x)
        hidden_actor = self.actor(x)

        return self.critic_linear(hidden_critic), hidden_actor, rnn_hxs

forward 함수의 return은 위와 같다

model.py

				dist = self.dist(actor_features)

        if deterministic:
            action = dist.mode()
        else:
            action = dist.sample()

        action_log_probs = dist.log_probs(action)
        dist_entropy = dist.entropy().mean()

        return value, action, action_log_probs, rnn_hxs
  • def act 이어서…
  • 이미지 env 에서 action은 Discrete 형태이므로 self.dist 는 Categorical class를 할당받을 것이고 policy default 값은 stochastic policy 이므로 action은 dist.sample()을 통해 고를 것이다

algo/distributions.py

class FixedCategorical(torch.distributions.Categorical):
    def sample(self):
        return super().sample().unsqueeze(-1)

    def log_probs(self, actions):
        return (
            super()
            .log_prob(actions.squeeze(-1))
            .view(actions.size(0), -1)
            .sum(-1)
            .unsqueeze(-1)
        )

    def mode(self):
        return self.probs.argmax(dim=-1, keepdim=True)
  • sample 함수의 경우 상속받은 super class 에 원래 implement 되어있는 형태이고 .unsqueeze(-1) 로 마지막 dimension을 하나 늘려준다
  • deterministic policy의 경우 최대 probability를 갖는 action을 고정적으로 고를 것이다
  • 자세한 계산 디테일은 나중에… def log_probs 함수가 잘 이해안됨

main.py

						episode_rewards = deque(maxlen=10)
						obs, reward, done, infos = envs.step(action)

            for info in infos:
                if 'episode' in info.keys():
                    episode_rewards.append(info['episode']['r'])
  • 아직 forward step을 돌고 있는 loop 안
  • episode_reward와 reward의 차이 ?

→ 일단 reward는 rollout에 기록되어 훈련에 사용되는 것 같지만 episode_reward는 log 용 데이터로 보임

main.py

						masks = torch.FloatTensor(
                [[0.0] if done_ else [1.0] for done_ in done])
            bad_masks = torch.FloatTensor(
                [[0.0] if 'bad_transition' in info.keys() else [1.0]
                 for info in infos])
            rollouts.insert(obs, recurrent_hidden_states, action,
                            action_log_prob, value, reward, masks, bad_masks)
  • 이 line 보고 multiple env parallel 하게 돌리고 있는 상황 이해
  • for done_ in done 은 (multiple process == multiple env) 의 종료 여부로 이해

main.py

with torch.no_grad():
            next_value = actor_critic.get_value(
                rollouts.obs[-1], rollouts.recurrent_hidden_states[-1],
                rollouts.masks[-1]).detach()
  • value function 값만 앞선 number of forward step 만큼 수행한 이후 critic에 대해 뽑아내는 것
  • 참고할 점은 actor와 critic 이 하나의 NN으로 구성되어 갈래가 나눠져서 값을 준다는 점이다

algo/storage.py

def compute_returns(self,
                        next_value,
                        use_gae,
                        gamma,
                        gae_lambda,
                        use_proper_time_limits=True):
        if use_proper_time_limits:
            if use_gae:
                self.value_preds[-1] = next_value
                gae = 0
                for step in reversed(range(self.rewards.size(0))):
                    delta = self.rewards[step] + gamma * self.value_preds[
                        step + 1] * self.masks[step +
                                               1] - self.value_preds[step]
                    gae = delta + gamma * gae_lambda * self.masks[step +
                                                                  1] * gae
                    gae = gae * self.bad_masks[step + 1]
                    self.returns[step] = gae + self.value_preds[step]
  • 논문에서는 generalized advantage estimation 을 사용했으므로 여러 가지 option들 중 이를 사용하는 경우에 대해 다룬다
  • delta 값은 꾸준히 r+V_{t+1} - V_t 로 계산된다
  • gae 값은 현재 delta + (delta gamma) 현재 gae 값으로 업데이트 되면서
  • 원래 value prediction 버퍼는 actor_critic network 에서 나온 value 값으로 채워져 있었고 self.returns 버퍼를 value prediction + gae 값으로 업데이트 시킴
  • 현재 이해한 방향은 value prediction은 s 만 받아서 채워넣은 V(s) 값이므로 여기에 advantage를 더해 self.returns 에 Q 값을 채워넣는 것으로 생각했음

main.py

rollouts.compute_returns(next_value, args.use_gae, args.gamma,
                                 args.gae_lambda, args.use_proper_time_limits)
  • 위에 설명한 compute_returns 함수는 rollout 객체가 가진 self.returns 버퍼를 estimated return으로 self.returns 버퍼를 업데이트하는 함수
  • self.rewards와 착각하지 않도록 주의하자!

algo/ppo.py

def update(self, rollouts):
		# advantage normalize 한 후 data generator에 넘겨줌
		advantages = rollouts.returns[:-1] - rollouts.value_preds[:-1]
    advantages = (advantages - advantages.mean()) / (
            advantages.std() + 1e-5)
		for e in range(self.ppo_epoch):
            if self.actor_critic.is_recurrent:
                data_generator = rollouts.recurrent_generator(
                    advantages, self.num_mini_batch)
            else:
                data_generator = rollouts.feed_forward_generator(
                    advantages, self.num_mini_batch)
  • 이 함수는 actor_critic 객체가 update 되는 부분을 의미한다
  • value_preds는 actor_critic 에서 예측한 값들로 채워진 것이고 returns는 이 값에 gae가 더해져 업데이트 된 배열이다
  • actor_critic recurrent 여부에 따라 다른 방식으로 agent 훈련을 위한 data가 생성된다

storage.py

def feed_forward_generator():
		sampler = BatchSampler(
            SubsetRandomSampler(range(batch_size)),
            mini_batch_size,
            drop_last=True)

sample.py

class SubsetRandomSampler # samples elements randomly from a given list iof indices
class BatchSampler(Sampler[List[int]]): # wraps another sampler to yield a mini-batch of indices
  • SubsetRandomSampler가 만든 하나당 batch_size 만큼의 random indicies 들을
  • BatchSampler가 mini_batch_size 개수만큼 들고있는다
  • drop_last = True 옵션은 batch가 딱 떨어지지 않는 경우 버리는 지에 대한 여부를 결정한다

storage.py

for indices in sampler:
            obs_batch = self.obs[:-1].view(-1, *self.obs.size()[2:])[indices]
            recurrent_hidden_states_batch = self.recurrent_hidden_states[:-1].view(
                -1, self.recurrent_hidden_states.size(-1))[indices]
            actions_batch = self.actions.view(-1,
                                              self.actions.size(-1))[indices]
            value_preds_batch = self.value_preds[:-1].view(-1, 1)[indices]
            return_batch = self.returns[:-1].view(-1, 1)[indices]
            masks_batch = self.masks[:-1].view(-1, 1)[indices]
            old_action_log_probs_batch = self.action_log_probs.view(-1,
                                                                    1)[indices]
            if advantages is None:
                adv_targ = None
            else:
                adv_targ = advantages.view(-1, 1)[indices]

            yield obs_batch, recurrent_hidden_states_batch, actions_batch, \
                value_preds_batch, return_batch, masks_batch, old_action_log_probs_batch, adv_targ
  • self.obs 모양은 (num_steps+1, num_processes, *obs_shape)
  • 마지막 원소 하나를 [:-1]로 날림
  • view 로 (num_steps num_processes, obs_shape) 으로 뭉침
  • indices에 대응하는 결과값의 [*obs_shape] 의 집합을 반환
  • advantage를 포함한 위의 정보들을 계속해서 yield

ppo.py

for sample in data_generator:
		obs_batch, recurrent_hidden_states_batch, actions_batch, \
		value_preds_batch, return_batch, masks_batch, old_action_log_probs_batch, \
		adv_targ = sample

		# Reshape to do in a single forward pass for all steps
		values, action_log_probs, dist_entropy, _ = self.actor_critic.evaluate_actions(
		obs_batch, recurrent_hidden_states_batch, masks_batch,
		actions_batch)
  • evalutate_actions 함수는 act 함수와 다르게 distribution에서 action을 sample 하지 않고 parameter로 action도 받아 action_log_probability를 계산한다

ppo.py

ratio = torch.exp(action_log_probs -
                                  old_action_log_probs_batch)
                surr1 = ratio * adv_targ
                surr2 = torch.clamp(ratio, 1.0 - self.clip_param,
                                    1.0 + self.clip_param) * adv_targ
                action_loss = -torch.min(surr1, surr2).mean()

Untitled

  • 이부분은 논문의 L_clip 을 계산한고 있으며 ratio 는 log probability에 exponential의 차를 계산했으므로 논문에서

Untitled

위와 같이 명시한 ratio를 정확히 계산하고 있다

  • 논문에 나온 objective L_clip은 maximize 가 목표이므로 gradient descent를 사용하는 torch에서는 - 를 붙여 loss를 설정해 주고 있다

ppo.py

if self.use_clipped_value_loss:
                    value_pred_clipped = value_preds_batch + \
                        (values - value_preds_batch).clamp(-self.clip_param, self.clip_param)
                    value_losses = (values - return_batch).pow(2)
                    value_losses_clipped = (
                        value_pred_clipped - return_batch).pow(2)
                    value_loss = 0.5 * torch.max(value_losses,
                                                 value_losses_clipped).mean()
                else:
                    value_loss = 0.5 * (return_batch - values).pow(2).mean()

Untitled

  • clipped value loss를 사용하는지 모르겠는데 위의 항에 1/2 계수를 붙인 loss를 계산하고 있다

Untitled

  • Overall loss가 위와 같이 주어져 있는데 L_vf는 maximization 문제 앞에 음수가 있으므로 gradient descent를 쓰는 torch에서는 그대로 가도 된다

ppo.py

								self.optimizer.zero_grad()
                (value_loss * self.value_loss_coef + action_loss -
                 dist_entropy * self.entropy_coef).backward()
                nn.utils.clip_grad_norm_(self.actor_critic.parameters(),
                                         self.max_grad_norm)
                self.optimizer.step()
  • dist_entropy는 actor_critic의 evaluate_actions에서 나온 값을 사용한다
  • value_loss에는 이미 - 부호를 붙였고 마지막 항도 maximization 문제에서 + 부호를 갖기 때문에 torch에서는 - 를 붙여주도록 한다

ppo.py

								value_loss_epoch += value_loss.item()
                action_loss_epoch += action_loss.item()
                dist_entropy_epoch += dist_entropy.item()
				
				# 여기서부터는 가장 바깥의 ppo_epoch loop 탈출 
        num_updates = self.ppo_epoch * self.num_mini_batch

        value_loss_epoch /= num_updates
        action_loss_epoch /= num_updates
        dist_entropy_epoch /= num_updates

이후 각 epoch 마다 평균 loss 가 어떻게 되는지 각 세부 loss에 대해 구한다

main.py

rollouts.after_update()

storage.py

def after_update(self):
        self.obs[0].copy_(self.obs[-1])
        self.recurrent_hidden_states[0].copy_(self.recurrent_hidden_states[-1])
        self.masks[0].copy_(self.masks[-1])
        self.bad_masks[0].copy_(self.bad_masks[-1])
  • agent update과정에서 tensor 의 [-1]을 원소들을 drop 했었는데 첫 step 의 원소들을 마지막 값들로 채워 training이 이어질 수 있도록 한다
profile
0100101

0개의 댓글