NLP 연구 분야의 큰 흐름 중 하나는 Attention Mechanism의 활용
!pip install transformers
def PreProcessingText(input_sentence):
input_sentence = input_sentence.lower() # 소문자화
input_sentence = re.sub('<[^>]*>', repl=' ', string = input_sentence) # <br /> 처리
input_sentence = re.sub('[!"$%&\()*+,-./:;<=>?@[\\]^_`{|}~]', repl=' ', string = input_sentence) # ' 제외한 특수문자 처리
input_sentence = re.sub('\s+', repl=' ', string = input_sentence) # 연속된 띄어쓰기 처리
if input_sentence:
return input_sentence
for example in train_data.examples:
vars(example)['text'] = PreProcessingText(' '.join(vars(example)['text'])).split()
for example in test_data.examples:
vars(example)['text'] = PreProcessingText(' '.join(vars(example)['text'])).split()
import torch
import torch.nn as nn
from transformers import BertModel
bert = BertModel.from_pretrained('bert-base-uncased') # 원하는 bert 모델 이름 입력해 손쉽게 사용
model_config['emb_dim'] = bert.config.to_dict()['hidden_size'] # bert의 output은 token별로 지정된 hidden layer size의 vector로 나옴
class SentenceClassification(nn.Module):
def __init__(self, **model_config):
super(SentenceClassification, self).__init__()
self.bert = bert
self.fc = nn.Linear(model_config['emb_dim'], model_config['output_dim'])
def forward(self, x):
pooled_cls_output = self.bert(x)[1]
# ([CLS] output, [CLS] Pooled output, token1_output, token2_output, ...)
return self.fc(pooled_cls_output)
sequence data의 길이가 다른 경우
pack_sequence() # packing
pad_sequence() # padding
pad_packed_sequence() # packing된 것을 padding으로 변환
pack_padded_sequence() # padding된 것을 packing으로 변환
import random
import torch
import torch.nn as nn
import torch.optim as optim
torch.manual_seed(0)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
raw = ['I feel hungry. 나는 배가 고프다.',
'Pytorch is very easy. 파이토치는 매우 쉽다.',
'Pytorch is a framework for deep learning. 파이토치는 딥러닝을 위한 프레임워크이다.',
'Pytorch is very clear to use. 파이토치는 사용하기 매우 직관적이다.']
SOS_token = 0
EOS_token = 1
def preprocess(corpus, source_max_length, target_max_length):
pairs = []
for line in corpus:
pairs.append([s for s in line.strip().lower().split('\t')])
pairs = [pair for pair in pairs if filter_pair(pair, source_max_length, target_max_length)]
source_vocab = Vocab()
target_vocab = Vocab()
for pair in pairs:
source_vocab.add_vocab(pair[0])
target_vocab.add_vocab(pair[1])
return pairs, source_vocab, target_vocab
class Encoder(nn.Module):
def __init__(self, input_size, hidden_size):
super(Encoder, self).__init__()
self.hidden_size = hidden_size
self.embedding = nn.Embedding(input_size, hidden_size) # input_size: corpus의 단어 개수, hidden_size: 몇 차원 벡터로 나타낼 것인가
self.gru = nn.GRU(hidden_size, hidden_size)
def forward(self, x, hidden):
x = self.embedding(x).view(1, 1, -1)
x, hidden = self.gru(x, hidden)
return x, hidden
class Decoder(nn.Module):
def __init__(self, hidden_size, output_size):
super(Decoder, self).__init__()
self.hidden_size = hidden_size
self.embedding = nn.Embedding(output_size, hidden_size)
self.gru = nn.GRU(hidden_size, hidden_size)
self.out = nn.Linear(hidden_size, output_size)
slef.softmax = nn.LogSoftmax(dim=1)
def forward(self, x, hidden):
x = self.embedding(x).view(1,1,-1)
x, hidden = self.gru(x, hidden)
x = self.softmax(self.out(x[0]))
return x, hidden
def tensorize(vocab, sentence): # 문장을 원핫벡터로 바꾼 후 파이토치의 텐서로 변환
indexes = [vocab.vocab2index[word] for word in sentence.split(' ')]
indexes.append(vocab.vocab2index['<EOS>'])
return torch.Tensor(indexes).long().to(device).view(-1,1)
def train(pairs, source_vocab, target_vocab, encoder, decoder, n_iter, print_every=1000, learning_rate=0.01):
loss_total = 0
encoder_optimizer = torch.optim.SGD(encoder.parameters(), lr=learning_rate)
decoder_optimizer = torch.optim.SGD(decoder.parameters(), lr=learning_rate)
training_batch = [random.choice(pairs) for _ in range(n_iter)]
training_source = [tensorize(source_vocab, pair[0]) for pair in training_batch]
training_target = [tensorize(target_vocab, pair[1]) for pair in training_batch]
criterion = nn.NLLLoss()
for i in range(1, n_iter+1):
source_tensor = training_source[i-1]
target_tensor = training_target[i-1]
encoder_hidden = torch.zeros([1,1,encoder.hidden_size]).to(device)
encoder_optimizer.zero_grad()
decoder_optimizer.zero_grad()
source_length = source_tensor.size(0)
target_length = target_tensor.size(0)
loss = 0
for enc_input = in range(source_length):
_, encoder_hidden = encoder(source_tensor[enc_input], encoder_hidden)
decoder_input = torch.Tensor([[SOS_token]]).long().to(device)
decoder_hidden = encoder_hidden
for di in range(target_length):
decoder_output, decoder_hidden = decoder(decoder_input, decoder,hidden)
loss += criterion(decoder_output, target_tensor[di])
decoder_input = target_tensor[di]
loss.backward()
encoder_optimizer.step()
decoder_optimizer.step()
loss_iter = loss.item()/target_length
loss_total += loss_iter
if i % print_every == 0:
loss_avg = loss_total/print_every
loss_total = 0
print('[{} - {}%] loss = {:05.4f}'.format(i, i/n_iter*100, loss_avg))
source_max_length = 10
target_max_length = 12
load_pairs, load_source_vocab, load_target_vocab = preprocess(raw, source_max_length, target_max_length)
print(random.choice(load_pairs))
enc_hidden_size = 16
dec_hidden_size = enc_hidden_size
enc = Encoder(load_source_vocab.n_vocab, enc_hidden_size).to(device)
dec = Decoder(dec_hidden_size, load_target_vocab.n_vocab).to(device)
train(load_pairs, load_source_vocab, load_target_vocab, enc, dec, 5000, print_every=1000)
참고
파이썬 딥러닝 파이토치 (이경택, 방성수, 안상준)
모두를 위한 딥러닝 시즌 2 Lab 11-5, 11-6