일주일만 지나면 공부한 내용의 70%는 까먹는 나약한 내가 허깅코스 3장을 과제로 정리하기 이전에 우선 1, 2장에서 뭘 공부했는지 복기해봤다.
transformer는 평문을 자기지도학습한 모델이다.
1장에서는 pipeline 함수의 종류를 배웠다. pipeline 함수는 transformer 라이브러리가 가지고 있는 함수 중 하나로, 번역, 요약, 빈칸 채우기 등 똑똑한 함수들이 많았다. 또한, transformer 모델의 architecture (인코더, 디코더, attention layer)에 대해 알아봤다.
2장에서는 pipeline 함수의 작동 과정(preprocessing, 모델 통과, postprocessing)에 대해 배웠다. 또, preprocessing에서 tokenizer의 역할에 대해 알아봤다. tokenizer는 평문 input을 token(단어, 서브워드, 심볼)으로 구분하고 각 token을 정수로 반환해주는 아주 중요한 기능을 한다.
1, 2장에서는 pretrained된 transformer 모델의 기본 구조와, 이 모델의 pipeline 함수를 이용해 원하는 결과를 얻는 몇 줄짜리 파이썬 코드를 알아봤다고 할 수 있다.
하지만 pretrained된 transformer 모델을 자신만의 데이터셋으로 fine-tuning 하여 목적에 맞게 사용하고자 하는 사람도 있을 것이다. 3장에서는 원하는 데이터셋으로 transformer 모델을 전이학습시키는 방법에 대해서 다룬다.
허깅코스에서 예제로 사용한 데이터셋은 5801쌍의 문장에 대하여 한 쌍의 두 개 문장이 서로 같은 의미인지 아닌지 나타내는 label(equivalent, unequivalent)을 가진 MRPC 데이터셋이다. MRPC 데이터셋은 머신러닝의 성능을 측정하는 대표적인 GLUE benchmark 데이터셋 중 하나라고 한다.
from datasets import load_dataset #허브에서 데이터셋을 다운로드하는 명령어를 제공하는 라이브러리
raw_datasets = load_dataset("glue", "mrpc") #mrpc 평문 데이터셋을 가져옴
불러온 상태에서 raw_datasets
를 출력해보면
DatasetDict({
train: Dataset({
features: ['sentence1', 'sentence2', 'label', 'idx'],
num_rows: 3668
})
validation: Dataset({
features: ['sentence1', 'sentence2', 'label', 'idx'],
num_rows: 408
})
test: Dataset({
features: ['sentence1', 'sentence2', 'label', 'idx'],
num_rows: 1725
})
})
mrpc의 각 train, test, validation 데이터셋은 두 문장과 label, 인덱스가 포함된 딕셔너리임을 알 수 있다.
우선 tokenizer를 가져온다.
from transformers import AutoTokenizer #tokenizer 라이브러리
checkpoint = "bert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(checkpoint)
tokenizer가 한 쌍의 문장을 동시에 입력했을 때 작동하는지 임의로 살펴보면
inputs = tokenizer("This is the first sentence.", "This is the second one.")
inputs
출력 :
{'input_ids': [101, 2023, 2003, 1996, 2034, 6251, 1012, 102, 2023, 2003, 1996, 2117, 2028, 1012, 102], 'token_type_ids': [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1], 'attention_mask': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]}
첫 번째 문장은 token_type_ids를 0으로, 두 번째 문장은 1로 처리하여 input_ids의 두 문장을 구분한다는 것을 알 수 있다.
tokenizer가 데이터셋 딕셔너리를 인자로 받아 input_ids, attention_mask, token_type_ids 키가 결정된 딕셔너리를 반환하도록 함수를 만들어보자.
def tokenize_function(example): #데이터셋 딕셔너리를 인자로 받음
return tokenizer(example["sentence1"], example["sentence2"], truncation=True)
tokenized_datasets = raw_datasets.map(tokenize_function, batched=True) #데이터셋의 각 딕셔너리(한 쌍의 문장, label, idx)에 대하여 tokenize_function을 적용
우선 raw_dataset.map(tokenize_function, batched=True)
에서 map함수는 raw_datasets의 각 요소에 대하여 tokenize_function을 적용한다. 따라서 tokenize_function
에는 raw_datasets의 각 딕셔너리(sentence1, sentence2, label, idx)가 인자로 전달된다. 그러면 tokenize_function는 tokenizer가 지정한 새로운 키들이 포함된 새로운 딕셔너리(평문에서 정수id로 반환된 각 문장의 input_ids, token_type_ids, attention_mask, sentence1, sentence2, labal, idx)를 반환한다.
raw_dataset.map(tokenize_function, batched=True)
에서 batched=True를 사용하는 이유는 배치 내에 존재하는 모든 요소들에 함수가 한꺼번에 적용되도록 하기 위함이다.
전처리된 tokenized_datasets
를 출력해보면
DatasetDict({
train: Dataset({
features: ['attention_mask', 'idx', 'input_ids', 'label', 'sentence1', 'sentence2', 'token_type_ids'],
num_rows: 3668
})
validation: Dataset({
features: ['attention_mask', 'idx', 'input_ids', 'label', 'sentence1', 'sentence2', 'token_type_ids'],
num_rows: 408
})
test: Dataset({
features: ['attention_mask', 'idx', 'input_ids', 'label', 'sentence1', 'sentence2', 'token_type_ids'],
num_rows: 1725
})
})
from transformers import DataCollatorWithPadding
data_collator = DataCollatorWithPadding(tokenizer=tokenizer) #동적 패딩
collate 함수는 데이터셋의 요소들을 지정된 크기의 batch로 구성해주는 함수이다. DataCollatorWithPadding은 각 batch로 분리할 데이터셋의 각 요소에 대하여 정확한 수의 padding을 적용할 수 있는 콜레이트 함수를 제공하는 transformer의 라이브러리이다.
dynamic padding은 전체 데이터셋의 최대 길이로 padding하지 않고, 각 배치 내부의 최대 길이로 padding하는 방법이다. tokenizer를 입력받은 DataCollatorWithPadding 라이브러리의 collate 함수는 동적 패딩이 작동한다.
transformer는 데이터셋으로 fine-tuning할 수 있는 trainer 클래스를 가지고 있다. 이 trainer api를 활용하면 trainer를 제대로 정의해주기만 한다면 쉽게 fine-tuning을 해낼 수 있다.
from transformers import TrainingArguments
#training과 evaluation에 사용될 hyperparameter가 포함되는 클래스
training_args = TrainingArguments("test-trainer")
from transformers import AutoModelForSequenceClassification
#모델 가져오기
model = AutoModelForSequenceClassification.from_pretrained(checkpoint, num_labels=2)
from transformers import Trainer
trainer = Trainer(
model, #앞서 정의한 모델
training_args, #앞서 정의한 training arguments 클래스
train_dataset=tokenized_datasets["train"],
eval_dataset=tokenized_datasets["validation"],
data_collator=data_collator,
tokenizer=tokenizer,
)
1)의 training의 결과가 어떠한지 알기 위해서는 평가도 정의해주어야 한다.
predictions = trainer.predict(tokenized_datasets["validation"]) #검증
print(predictions.predictions.shape, predictions.label_ids.shape)
결과 :
(408, 2) (408,)
predict()함수는 predictions, labels, metrics를 가지고 있다. metrics에는 loss와 예측에 걸린 전체/평균 시간을 나타내는 time metrics가 포함된다. predictions는 모델이 예측한 결과 logit들이고, labels는 실제 정답들이다.
출력된 결과에서 (408,2)는 predictions의 shape이고, (408,)은 label_ids의 shape이다. 우선 predictions는 408 x 2인 2차원 배열로, 예측에 사용된 데이터셋에 408개의 요소가 있었다는 의미이다. 각 408개의 요소들은 각 예측의 결과물인 logit들을 가지고 있다. 정확도는 가장 높은 것만이 필요하므로 가장 큰 logit을 추출한다.
import numpy as np
preds = np.argmax(predictions.predictions, axis=-1)
이제부터 preds는 각 408개의 요소들에 대하여 최대 예측값(최대 정확도)을 가진 인덱스를 의미한다.
from datasets import load_metric #데이터셋의 metrics를 쉽게 가져올 수 있도록 해주는 transformer 라이브러리
metric = load_metric("glue", "mrpc") #mrpc 데이터셋과 관련된 metrics를 가져오는 함수
metric.compute(predictions=preds, references=predictions.label_ids)
지금까지 살펴본 것을 바탕으로 전체 코드를 정리해보자.
def compute_metrics(eval_preds): #evaluation을 위한 comput_metrics 함수 정의
metric = load_metric("glue", "mrpc")
logits, labels = eval_preds
predictions = np.argmax(logits, axis=-1)
return metric.compute(predictions=predictions, references=labels)
training_args = TrainingArguments("test-trainer", evaluation_strategy="epoch") #epoch가 끝날 때마다 metrics를 출력하기 위해 epoch도 인자로 전달함
model = AutoModelForSequenceClassification.from_pretrained(checkpoint, num_labels=2) #모델 정의
trainer = Trainer( #trainer 클래스 정의
model,
training_args,
train_dataset=tokenized_datasets["train"],
eval_dataset=tokenized_datasets["validation"],
data_collator=data_collator,
tokenizer=tokenizer,
compute_metrics=compute_metrics,
)
trainer.train()
이번에는 2.에서 사용한 trainer 클래스 없이 직접 training loop를 구성해보자.
1.에서 얻은 전처리된 tokenized_datasets에 대하여 다음과 같은 작업들이 필요하다.
tokenized_datasets = tokenized_datasets.remove_columns(["sentence1", "sentence2", "idx"])
tokenized_datasets = tokenized_datasets.rename_column("label", "labels")
tokenized_datasets.set_format("torch")
tokenized_datasets["train"].column_names
sentence1, setence2처럼 모델이 인식할 수 없는 문자열과 필요없는 column은 딕셔너리에서 제거(tokenized_datasets.remove_columns(["sentence1", "sentence2", "idx"])
)
label을 labels로 이름 수정(모델이 labels라는 이름으로 변수를 받기 때문) (tokenized_datasets.rename_column("label", "labels")
)
pytorch 텐서를 반환하도록 형식 설정 (tokenized_datasets.set_format("torch")
)
from torch.utils.data import DataLoader
train_dataloader = DataLoader(
tokenized_datasets["train"],
shuffle=True,
batch_size=8,
collate_fn=data_collator, #콜레이트 함수로 padding
)
eval_dataloader = DataLoader(
tokenized_datasets["validation"],
batch_size=8,
collate_fn=data_collator, #콜레이트 함수로 padding
)
from transformers import AutoModelForSequenceClassification
model = AutoModelForSequenceClassification.from_pretrained(checkpoint, num_labels=2)
from transformers import AdamW #optimizer 설정
optimizer = AdamW(model.parameters(), lr=5e-5)
from transformers import get_scheduler #learning rate scheduler 설정
num_epochs = 3
num_training_steps = num_epochs * len(train_dataloader)
lr_scheduler = get_scheduler(
"linear",
optimizer=optimizer,
num_warmup_steps=0,
num_training_steps=num_training_steps,
)
print(num_training_steps)
여기에서 num_training_steps = num_epochs * len(train_dataloader)
인 이유는 training steps의 개수가 epoch 수와 학습 배치 개수를 곱한 것이기 때문이다. 학습 배치 개수는 다시 데이터로더의 길이와 같다.
우선 device를 정의한다.
import torch
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
#GPU가 있으면 사용, 없으면 CPU
model.to(device)
device
GPU가 있으면 사용하고 없으면 CPU를 사용한다.
from tqdm.auto import tqdm
progress_bar = tqdm(range(num_training_steps)) #단계별 progress bar가 보이도록
model.train()
for epoch in range(num_epochs): #각 epoch에 대하여
for batch in train_dataloader:
batch = {k: v.to(device) for k, v in batch.items()}
outputs = model(**batch)
loss = outputs.loss
loss.backward()
optimizer.step()
lr_scheduler.step()
optimizer.zero_grad()
progress_bar.update(1)
from datasets import load_metric
metric = load_metric("glue", "mrpc")
model.eval()
for batch in eval_dataloader:
batch = {k: v.to(device) for k, v in batch.items()}
with torch.no_grad():
outputs = model(**batch)
logits = outputs.logits
predictions = torch.argmax(logits, dim=-1)
metric.add_batch(predictions=predictions, references=batch["labels"]) #배치별 metric 결과를 누적
metric.compute() #누적된 결과를 한 번에 출력