트랜스포머의 구조는 N개의 인코더와 디코더가 쌓여있고 입력 문장(소스 문장)을 입력하면 인코더에서 해당 문장에 대한 표현을 학습시키고, 그 결과값을 디코더에 보내면 디코더에서 타깃 문장을 생성한다.
디코더는 vocab에 대한 확률 분포를 예측하고 확률이 가장 큰 단어를 선택한다. 트랜스포머에서는 올바른 문장을 생성하려면 예측 확률 분포와 실제 확률 분포 사이의 차이를 최소화해야 한다. 이때 교차 엔트로피(cross-entropy)를 사용하여 분포의 차이를 알 수 있다. 이때 옵티마이저는 Adam 을 사용한다.
# transformers.py
import torch.nn as nn
from module.encoder import Encoder
from module.decoder import Decoder
from module.utils import make_mask
class Transformer(nn.Module):
def __init__(self, vocab_size, num_layers, d_model, n_heads, hidden, max_len, dropout) -> None:
super().__init__()
self.enc_outputs = Encoder(vocab_size, num_layers, d_model, n_heads, hidden, max_len, dropout)
self.dec_outputs = Decoder(vocab_size, num_layers, d_model, n_heads, hidden, max_len, dropout)
self.output = nn.Linear(d_model, vocab_size)
self.softmax = nn.LogSoftmax(dim=-1)
def forward(self, input, dec_input):
enc_input = input
dec_input = dec_input
enc_padding_mask = make_mask(enc_input, "padding")
dec_padding_mask = make_mask(enc_input, "padding")
look_ahead_mask = make_mask(dec_input, "lookahead")
enc_output = self.enc_outputs(enc_input, enc_padding_mask)
dec_output = self.dec_outputs(enc_output, dec_input, dec_padding_mask, look_ahead_mask)
outputs = self.output(dec_output)
outputs = self.softmax(outputs)
return outputs
# train.py
import torch
import torch.nn as nn
import pandas as pd
import re
import os
import matplotlib.pyplot as plt
os.environ['CUDA_LAUNCH_BLOCKING'] = "1"
from module.transformer import Transformer
from torchtext.legacy import data
from soynlp.tokenizer import LTokenizer
tokenizer = LTokenizer()
MAX_LENGTH = 60
D_MODEL = 128
NUM_LAYERS = 4
NUM_HEADS = 4
HIDDEN = 512
BATCH_SIZE = 64
NUM_EPOCH = 100
DROPOUT = 0.3
train_data = pd.read_csv("chatbot.csv")
questions = []
answers = []
for sentence in train_data['Q']:
sentence = re.sub(r"([?.!,])", r" \1 ", sentence)
sentence = sentence.strip()
questions.append(sentence)
for sentence in train_data['A']:
sentence = re.sub(r"([?.!,])", r" \1 ", sentence)
sentence = sentence.strip()
answers.append(sentence)
Q = data.Field(
sequential=True,
use_vocab=True,
lower=True,
tokenize=tokenizer,
batch_first=True,
init_token="<SOS>",
eos_token="<EOS>",
fix_length=MAX_LENGTH
)
A = data.Field(
sequential=True,
use_vocab=True,
lower=True,
tokenize=tokenizer,
batch_first=True,
init_token="<SOS>",
eos_token="<EOS>",
fix_length=MAX_LENGTH
)
train_data_set = data.TabularDataset(
path="chatbot.csv",
format="csv",
skip_header=False,
fields=[("Q",Q), ('A',A)]
)
print('훈련 샘플의 개수 : {}'.format(len(train_data_set)))
Q.build_vocab(train_data_set.Q, train_data_set.A, min_freq = 2)
A.vocab = Q.vocab
VOCAB_SIZE = len(Q.vocab)
PAD_TOKEN, START_TOKEN, END_TOKEN, UNK_TOKEN = Q.vocab.stoi['<pad>'], Q.vocab.stoi['<SOS>'], Q.vocab.stoi['<EOS>'], Q.vocab.stoi['<unk>']
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
train_iter = data.BucketIterator(
train_data_set, batch_size=BATCH_SIZE,
shuffle=True, repeat=False, sort=False, device=device
)
print(VOCAB_SIZE)
transformer = Transformer(VOCAB_SIZE, NUM_LAYERS, D_MODEL, NUM_HEADS, HIDDEN, MAX_LENGTH, DROPOUT)
# 네트워크 초기화
def weights_init(m):
classname = m.__class__.__name__
if classname.find('Linear') != -1:
# Liner층의 초기화
nn.init.kaiming_normal_(m.weight)
if m.bias is not None:
nn.init.constant_(m.bias, 0.0)
# 훈련 모드 설정
transformer.train()
# TransformerBlock모듈의 초기화 설정
transformer.apply(weights_init)
print('네트워크 초기화 완료')
# 손실 함수의 정의
criterion = nn.CrossEntropyLoss()
# 최적화 설정
learning_rate = 2e-4
optimizer = torch.optim.Adam(transformer.parameters(), lr=learning_rate)
best_epoch_loss = float("inf")
epoch_ = []
epoch_train_loss = []
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("사용 디바이스:", device)
print('-----start-------')
transformer.to(device)
# 네트워크가 어느정도 고정되면 고속화
torch.backends.cudnn.benchmark = True
for epoch in range(NUM_EPOCH):
epoch_loss = 0.0
count = 0
for batch in train_iter:
questions = batch.Q.to(device)
answers = batch.A.to(device)
with torch.set_grad_enabled(True):
preds = transformer(questions, answers)
pad = torch.LongTensor(answers.size(0), 1).fill_(PAD_TOKEN).to(device)
preds_id = torch.transpose(preds,1,2)
outputs = torch.cat((answers[:, 1:], pad), -1)
optimizer.zero_grad()
loss = criterion(preds_id, outputs) # loss 계산
loss.backward()
torch.nn.utils.clip_grad_norm_(transformer.parameters(), 0.5)
optimizer.step()
epoch_loss +=loss.item()
count += 1
epoch_loss = epoch_loss / count
if not best_epoch_loss or epoch_loss < best_epoch_loss:
if not os.path.isdir("snapshot"):
os.makedirs("snapshot")
torch.save(transformer.state_dict(), "./snapshot/transformer.pt")
epoch_.append(epoch)
epoch_train_loss.append(epoch_loss)
print(f"Epoch {epoch+1}/{NUM_EPOCH} Average Loss: {epoch_loss}")
fig = plt.figure(figsize=(8,8))
fig.set_facecolor('white')
ax = fig.add_subplot()
ax.plot(epoch_,epoch_train_loss, label='Average loss')
ax.legend()
ax.set_xlabel('epoch')
ax.set_ylabel('loss')
plt.savefig('train.png')