Boost(10)

City_Duck·2022년 12월 2일
1

부스트캠프

목록 보기
11/15
post-thumbnail

KLUE & RE 대회를 진행하며 Baseline으로 받은 코드가 모듈화가 안되어있어 모듈화를 진행하는겸 Pytorch Lightning으로 이식하였습니다.

model.py

model.py

from importlib import import_module

import numpy as np
import pytorch_lightning as pl
import torch
import transformers

from utils import criterion_entrypoint, klue_re_auprc, klue_re_micro_f1, n_compute_metrics

class Model(pl.LightningModule):
    def __init__(self, config):
        super().__init__()
        self.save_hyperparameters()
		
        # 설정들을 OmegaConf를 통해 불러옵니다.
        self.model_name = config.model.model_name
        self.lr = config.train.learning_rate
        self.lr_sch_use = config.train.lr_sch_use
        self.lr_decay_step = config.train.lr_decay_step
        self.scheduler_name = config.train.scheduler_name
        self.lr_weight_decay = config.train.lr_weight_decay

        # 사용할 모델을 호출합니다.
        self.plm = transformers.AutoModelForSequenceClassification.from_pretrained(
            pretrained_model_name_or_path=self.model_name, num_labels=30
        )
        # Loss 계산을 위해 사용될 Loss를 호출합니다.
        self.loss_func = criterion_entrypoint(config.train.loss_name)
        # Optimizer를 config를 통해 불러옵니다.
        self.optimizer_name = config.train.optimizer_name

    def forward(self, x):
    	# Huggingface의 Github에서 모델의 input을 찾을 수 있습니다.
        x = self.plm(
            input_ids=x["input_ids"],
            attention_mask=x["attention_mask"],
            token_type_ids=x["token_type_ids"],
        )
        return x["logits"] # Huggingface의 Github에서 모델의 output을 찾을 수 있습니다.
	
    def training_step(self, batch, batch_idx):
        # trainer.train() 과정을 정의한다.
 	    # Dataset의 __getitem__의 return이 batch로 들어옵니다.
        x = batch
        y = batch["labels"]

        logits = self(x)	# foward
        loss = self.loss_func(logits, y.long())	# foward를 통해 나온 logits과 label

        f1, accuracy = n_compute_metrics(logits, y).values()
        self.log("train", {"loss": loss, "f1": f1, "accuracy": accuracy})

        return loss	# training_step이 끝날 때 return 
	
    def validation_step(self, batch, batch_idx):
        # trainer.train() 중 validation 과정을 정의한다.
        x = batch
        y = batch["labels"]

        logits = self(x)
        loss = self.loss_func(logits, y.long())

        f1, accuracy = n_compute_metrics(logits, y).values()
        self.log("val_loss", loss)
        self.log("val_accuracy", accuracy)
        self.log("val_f1", f1, on_step=True)

        return {"logits": logits, "y": y}	# logits값과 y를 validation_step 종료시 return

    def validation_epoch_end(self, outputs):
    	# outputs is an array with what you returned in validation_step for each batch
        # outputs = (batch_size,steps)
        # outputs = [{'loss': batch_0_loss, 'y': batch_0_y}, ... , {'loss': batch_n_loss, 'y': batch_0_y}]
        
        # outputs에서 logits, y을 각각 뽑아 torch.cat을 통해 붙여준다.
        logits = torch.cat([x["logits"] for x in outputs])	
        y = torch.cat([x["y"] for x in outputs])
		
        # 함수에 보내기 위해 CPU로 이동시켜준다.
        logits = logits.detach().cpu().numpy()
        y = y.detach().cpu()

        auprc = klue_re_auprc(logits, y)
        self.log("val_auprc", auprc)

    def test_step(self, batch, batch_idx):
    	# trainer.test() 과정을 정의한다.
        x = batch
        y = batch["labels"]

        logits = self(x)

        f1, accuracy = n_compute_metrics(logits, y).values()
        self.log("test_f1", f1)

        return {"logits": logits, "y": y}

    def test_epoch_end(self, outputs):
        logits = torch.cat([x["logits"] for x in outputs])
        y = torch.cat([x["y"] for x in outputs])

        logits = logits.detach().cpu().numpy()
        y = y.detach().cpu()

        auprc = klue_re_auprc(logits, y)
        self.log("test_auprc", auprc)

    def predict_step(self, batch, batch_idx):
        logits = self(batch)

        return logits	# (batch,steps) size tensor

    def configure_optimizers(self):
        opt_module = getattr(import_module("torch.optim"), self.optimizer_name)
        if self.lr_weight_decay:
            optimizer = opt_module(filter(lambda p: p.requires_grad, self.parameters()), lr=self.lr, weight_decay=0.01)
        else:
            optimizer = opt_module(
                filter(lambda p: p.requires_grad, self.parameters()),
                lr=self.lr,
                # weight_decay=5e-4
            )
        if self.lr_sch_use:
            t_total = # train_dataloader len * epochs
            warmup_step = int(t_total * 0.1)

            _scheduler_dic = {
                "StepLR": torch.optim.lr_scheduler.StepLR(optimizer, self.lr_decay_step, gamma=0.5),
                "ReduceLROnPlateau": torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, factor=0.1, patience=10),
                "CosineAnnealingLR": torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=2, eta_min=0.0),
                "constant_warmup": transformers.get_constant_schedule_with_warmup(optimizer, 100),
                "cosine_warmup": transformers.get_cosine_schedule_with_warmup(
                    optimizer, num_warmup_steps=warmup_step, num_training_steps=t_total
                ),
            }
            scheduler = _scheduler_dic[self.scheduler_name]

            return [optimizer], [scheduler]
        else:
            return optimizer
  • Huggingface trainer의 compute_metrics의 경우 하나의 epoch가 끝날경우 실행이 됩니다.
  • Baseline의 compute_metrics을 PL로 이식하기 위해 validation_epoch_end, test_epoch_end을 사용했습니다.

data.py

data.py

import os
import pickle as pickle

import pandas as pd
import pytorch_lightning as pl
import torch
import transformers

from tqdm.auto import tqdm
from utils import *

class Dataset(torch.utils.data.Dataset):
    """Dataset 구성을 위한 Class"""

    def __init__(self, pair_dataset, labels):
        self.pair_dataset = pair_dataset
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: val[idx].clone().detach() for key, val in self.pair_dataset.items()}
        item["labels"] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)


class Dataloader(pl.LightningDataModule):
    def __init__(self, model_name, batch_size, shuffle, train_path, test_path, split_seed=42):
        super().__init__()
        self.model_name = model_name
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.split_seed = split_seed

        self.train_path = train_path
        self.test_path = test_path

        self.train_dataset = None
        self.val_dataset = None
        self.test_dataset = None
        self.predict_dataset = None

        self.tokenizer = transformers.AutoTokenizer.from_pretrained(model_name, max_length=200)

    def setup(self, stage="fit"):
        if stage == "fit":
            # 학습 데이터을 호출
            total_data = load_data(self.train_path)
			
            # 9:1 비율로 train, validation을 분리합니다.
            train_data = total_data.sample(frac=0.9, random_state=self.split_seed)
            val_data = total_data.drop(train_data.index)

            train_label = label_to_num(train_data["label"].values)
            val_label = label_to_num(val_data["label"].values)

            tokenized_train = tokenized_dataset(train_data, self.tokenizer)
            tokenized_val = tokenized_dataset(val_data, self.tokenizer)

            self.train_dataset = Dataset(tokenized_train, train_label)
            self.val_dataset = Dataset(tokenized_val, val_label)

        if stage == "test":
            total_data = load_data(self.train_path)

            train_data = total_data.sample(frac=0.9, random_state=self.split_seed)
            val_data = total_data.drop(train_data.index)

            val_label = label_to_num(val_data["label"].values)
            tokenized_val = tokenized_dataset(val_data, self.tokenizer)

            self.test_dataset = Dataset(tokenized_val, val_label)

        if stage == "predict":
            p_data = load_data(self.test_path)
            p_label = list(map(int, p_data["label"].values))
            tokenized_p = tokenized_dataset(p_data, self.tokenizer)

            self.predict_dataset = Dataset(tokenized_p, p_label)

    def train_dataloader(self):
        return torch.utils.data.DataLoader(
            self.train_dataset,
            batch_size=self.batch_size,
            shuffle=self.shuffle,
            num_workers=4,
        )

    def val_dataloader(self):
        return torch.utils.data.DataLoader(self.val_dataset, batch_size=self.batch_size, num_workers=4)

    def test_dataloader(self):
        return torch.utils.data.DataLoader(self.test_dataset, batch_size=self.batch_size, num_workers=4)

    def predict_dataloader(self):
        return torch.utils.data.DataLoader(self.predict_dataset, batch_size=self.batch_size, num_workers=4)

main.py

main.py

import argparse
import os
import re

from datetime import datetime, timedelta

import torch
import wandb

from data_n import *
from model import *
from omegaconf import OmegaConf
from pytorch_lightning.callbacks import ModelCheckpoint, RichProgressBar
from pytorch_lightning.callbacks.early_stopping import EarlyStopping
from pytorch_lightning.loggers import WandbLogger

time_ = datetime.now() + timedelta(hours=9)
time_now = time_.strftime("%m%d%H%M")

wandb_dict = {
    "users": "key"
}

if __name__ == "__main__":
    # 하이퍼 파라미터 등 각종 설정값을 입력받습니다
    # 터미널 실행 예시 : python3 run.py --batch_size=64 ...
    # 실행 시 '--batch_size=64' 같은 인자를 입력하지 않으면 default 값이 기본으로 실행됩니다
    parser = argparse.ArgumentParser()
    parser.add_argument("--config", type=str, default="base_config")
    args, _ = parser.parse_known_args()

    cfg = OmegaConf.load(f"/opt/ml/code/pl/config/{args.config}.yaml")

    wandb.login(key=wandb_dict[cfg.wandb.wandb_username])
    model_name_ch = re.sub("/", "_", cfg.model.model_name)
    wandb_logger = WandbLogger(
        log_model="all",
        name=f"{cfg.model.saved_name}_{cfg.train.batch_size}_{cfg.train.learning_rate}_{time_now}",
        project=cfg.wandb.wandb_project,
        entity=cfg.wandb.wandb_entity,
    )
	
    """
    By setting workers=True in seed_everything(), 
    Lightning derives unique seeds across all dataloader workers and processes 
    for torch, numpy and stdlib random number generators. 
    When turned on, it ensures that e.g. data augmentations are not repeated across workers.
    """
    
    pl.seed_everything(cfg.train.seed, workers=True)     # seed를 고정합니다.

    ck_dir_path = f"/opt/ml/code/pl/checkpoint/{model_name_ch}"
    if not os.path.exists(ck_dir_path):
        os.makedirs(ck_dir_path)

    # Checkpoint
    checkpoint_callback = ModelCheckpoint(
        dirpath=ck_dir_path, filename="{epoch}_{val_loss:.4f}", monitor="val_f1", save_top_k=1, mode="max"
    )

    # Earlystopping
    earlystopping = EarlyStopping(monitor="val_f1", patience=3, mode="max")

    # dataloader와 model을 생성합니다.
    dataloader = Dataloader(
        cfg.model.model_name,
        cfg.train.batch_size,
        cfg.data.shuffle,
        cfg.path.train_path,
        cfg.path.test_path,
        cfg.train.seed,
    )
    model = Model(cfg)
	
    """
    To ensure full reproducibility from run to run you need to set seeds for
    pseudo-random generators, and set deterministic flag in Trainer.
    """
    # gpu가 없으면 'gpus=0'을, gpu가 여러개면 'gpus=4'처럼 사용하실 gpu의 개수를 입력해주세요
    trainer = pl.Trainer(
        precision=16,	    # Mixed precision(FP16)
        accelerator="gpu",
        devices=1,
        max_epochs=cfg.train.max_epoch,
        log_every_n_steps=cfg.train.logging_step,
        logger=wandb_logger,  # W&B integration
        # RichProgressBar()를 통해 예쁜 아웃풋을 보여줍니다.
        callbacks=[earlystopping, checkpoint_callback, RichProgressBar()],
        deterministic=True,
        # limit_train_batches=0.15,  # use only 15% of training data
        # limit_val_batches = 0.01, # use only 1% of val data
        # limit_train_batches=10    # use only 10 batches of training data
    )

    trainer.fit(model=model, datamodule=dataloader)
    # test시 ckpt 중 가장 성능이 좋은 ckpt를 가지고 옵니다.
    trainer.test(model=model, datamodule=dataloader, ckpt_path="best")	

    # 학습이 완료된 모델을 저장합니다.
    output_dir_path = "output"
    if not os.path.exists(output_dir_path):
        os.makedirs(output_dir_path)

    output_path = os.path.join(output_dir_path, f"{model_name_ch}_{time_now}_model.pt")
    torch.save(model.state_dict(), output_path)
  • RichProgressBar를 사용하여 Command-Line을 꾸몄습니다.
  • OmegaConf를 사용하여 config 파일을 사용했습니다.
  • EarlyStopping을 사용하여 과적합을 방지하였습니다.
  • ModelCheckpoint를 사용하여 Checkpoint를 관리하였습니다.
  • pl.seed_everythingdeterministic을 사용하여 재현을 보장하였습니다.
  • precision을 사용하여 Mixed precision 기능을 추가하였습니다.
  • limit_train_batches을 사용하여 빠른 테스트를 지원하였습니다.

Stratified KFold

Stratified KFold (main)

    results = []

    for k in range(cfg.train.nums_folds):
		model = Model(cfg)
        
        datamodule = Dataloader(
            cfg.model.model_name,
            cfg.train.batch_size,
            cfg.data.shuffle,
            cfg.path.train_path,
            cfg.path.test_path,
            k=k,
            split_seed=cfg.train.seed,
            num_splits=cfg.train.nums_folds,
        )

        trainer = pl.Trainer(
            precision=16,
            accelerator="gpu",
            devices=1,
            max_epochs=cfg.train.max_epoch,
            log_every_n_steps=cfg.train.logging_step,
            logger=wandb_logger,
            callbacks=[checkpoint_callback, earlystopping, RichProgressBar()],
            deterministic=True,
            # limit_train_batches=0.05,
        )
        trainer.fit(model=model, datamodule=datamodule)
        score = trainer.test(model=model, datamodule=datamodule, ckpt_path="best")
        results.extend(score)

    # Fold 적용 결과 확인
    show_result(results)

Stratified KFold (data)

    def setup(self, stage="fit"):
        if stage == "fit":
            # StratifiedKFold
            kf = StratifiedKFold(
                n_splits=self.num_splits,
                shuffle=True,
                random_state=self.split_seed,
            )
            # 학습 데이터을 호출
            total_data = load_data(self.train_path)
            total_label = label_to_num(total_data["label"].values)
            tokenized_total = tokenized_dataset(total_data, self.tokenizer)
            total_dataset = Dataset(tokenized_total, total_label)

            all_splits = [k for k in kf.split(total_dataset, total_label)]
            # k번째 Fold Dataset 선택
            train_indexes, val_indexes = all_splits[self.k]
            train_indexes, val_indexes = train_indexes.tolist(), val_indexes.tolist()
            # fold한 index에 따라 데이터셋 분할
            self.train_dataset = [total_dataset[x] for x in train_indexes]
            self.val_dataset = [total_dataset[x] for x in val_indexes]

        if stage == "test":
            total_data = load_data(self.train_path)

            train_data = total_data.sample(frac=0.9, random_state=self.split_seed)
            val_data = total_data.drop(train_data.index)

            val_label = label_to_num(val_data["label"].values)
            tokenized_val = tokenized_dataset(val_data, self.tokenizer)

            self.test_dataset = Dataset(tokenized_val, val_label)

        if stage == "predict":
            p_data = load_data(self.test_path)
            p_label = list(map(int, p_data["label"].values))
            tokenized_p = tokenized_dataset(p_data, self.tokenizer)

            self.predict_dataset = Dataset(tokenized_p, p_label)  

R-ROBERTa

R-ROBERTa

class FCLayer(pl.LightningModule):
    def __init__(self, input_dim, output_dim, dropout_rate=0.0, use_activation=True):
        super().__init__()
        self.save_hyperparameters()

        self.use_activation = use_activation
        self.dropout = torch.nn.Dropout(dropout_rate)
        self.linear = torch.nn.Linear(input_dim, output_dim)
        self.tanh = torch.nn.Tanh()

        torch.nn.init.xavier_uniform_(self.linear.weight)

    def forward(self, x):
        x = self.dropout(x)
        if self.use_activation:
            x = self.tanh(x)
        return self.linear(x)


class Model(pl.LightningModule):
    def __init__(self, config):
        super().__init__()
        self.save_hyperparameters()

        self.model_name = config.model.model_name
        self.lr = config.train.learning_rate
        self.lr_sch_use = config.train.lr_sch_use
        self.lr_decay_step = config.train.lr_decay_step
        self.scheduler_name = config.train.scheduler_name
        self.lr_weight_decay = config.train.lr_weight_decay
        self.dr_rate = 0
        self.hidden_size = 1024
        self.num_classes = 30

        # 사용할 모델을 호출합니다.
        self.plm = transformers.RobertaModel.from_pretrained(self.model_name, add_pooling_layer=False)
        self.cls_fc = FCLayer(self.hidden_size, self.hidden_size // 2, self.dr_rate)
        self.sentence_fc = FCLayer(self.hidden_size, self.hidden_size // 2, self.dr_rate)
        self.label_classifier = FCLayer(self.hidden_size // 2 * 3, self.num_classes, self.dr_rate, False)

        # Loss 계산을 위해 사용될 CE Loss를 호출합니다.
        self.loss_func = criterion_entrypoint(config.train.loss_name)
        self.optimizer_name = config.train.optimizer_name

    def forward(self, x):
        out = self.plm(
            input_ids=x["input_ids"],
            attention_mask=x["attention_mask"],
            token_type_ids=x["token_type_ids"],
        )[0]

        sentence_end_position = torch.where(x["input_ids"] == 2)[1]
        sent1_end, sent2_end = sentence_end_position[0], sentence_end_position[1]

        cls_vector = out[:, 0, :]  # take <s> token (equiv. to [CLS])
        prem_vector = out[:, 1:sent1_end]  # Get Premise vector
        hypo_vector = out[:, sent1_end + 1 : sent2_end]  # Get Hypothesis vector

        prem_vector = torch.mean(prem_vector, dim=1)  # Average
        hypo_vector = torch.mean(hypo_vector, dim=1)

        # Dropout -> tanh -> fc_layer (Share FC layer for premise and hypothesis)
        cls_embedding = self.cls_fc(cls_vector)
        prem_embedding = self.sentence_fc(prem_vector)
        hypo_embedding = self.sentence_fc(hypo_vector)

        # Concat -> fc_layer
        concat_embedding = torch.cat([cls_embedding, prem_embedding, hypo_embedding], dim=-1)

        return self.label_classifier(concat_embedding)
  • 분류를 위해 CLS 토큰 + Entity 2개의 벡터를 같이 사용하는 R-BERT의 아이디어를 ROBERTa에 이식

utils

utils.py

import pickle

import numpy as np
import pandas as pd
import sklearn
import torch
import torch.nn as nn
import torch.nn.functional as F

from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
from tqdm.auto import tqdm

def preprocessing_dataset(dataset):
    """처음 불러온 csv 파일을 원하는 형태의 DataFrame으로 변경 시켜줍니다."""
    subject_entity = []
    object_entity = []

    for i, j in tqdm(zip(dataset["subject_entity"], dataset["object_entity"]), desc="preprocessing"):
        i = i[1:-1].split(",")[0].split(":")[1]
        j = j[1:-1].split(",")[0].split(":")[1]

        subject_entity.append(i)
        object_entity.append(j)

    out_dataset = pd.DataFrame(
        {
            "id": dataset["id"],
            "sentence": dataset["sentence"],
            "subject_entity": subject_entity,
            "object_entity": object_entity,
            "label": dataset["label"],
        }
    )
    return out_dataset


def tokenized_dataset(dataset, tokenizer):
    """tokenizer에 따라 sentence를 tokenizing 합니다."""
    concat_entity = []
    for e01, e02 in tqdm(zip(dataset["subject_entity"], dataset["object_entity"]), desc="tokenizing"):
        temp = ""
        temp = e01 + "[SEP]" + e02
        concat_entity.append(temp)

    tokenized_sentences = tokenizer(
        concat_entity,
        list(dataset["sentence"]),
        return_tensors="pt",
        padding=True,
        truncation=True,
        max_length=256,
        add_special_tokens=True,
    )

    return tokenized_sentences


def label_to_num(label):
    num_label = []
    with open("/opt/ml/code/dict_label_to_num.pkl", "rb") as f:
        dict_label_to_num = pickle.load(f)
    for v in label:
        num_label.append(dict_label_to_num[v])

    return num_label


def klue_re_micro_f1(preds, labels):
    """KLUE-RE micro f1 (except no_relation)"""
    label_list = [
        "no_relation",
        "org:top_members/employees",
        "org:members",
        "org:product",
        "per:title",
        "org:alternate_names",
        "per:employee_of",
        "org:place_of_headquarters",
        "per:product",
        "org:number_of_employees/members",
        "per:children",
        "per:place_of_residence",
        "per:alternate_names",
        "per:other_family",
        "per:colleagues",
        "per:origin",
        "per:siblings",
        "per:spouse",
        "org:founded",
        "org:political/religious_affiliation",
        "org:member_of",
        "per:parents",
        "org:dissolved",
        "per:schools_attended",
        "per:date_of_death",
        "per:date_of_birth",
        "per:place_of_birth",
        "per:place_of_death",
        "org:founded_by",
        "per:religion",
    ]
    no_relation_label_idx = label_list.index("no_relation")
    label_indices = list(range(len(label_list)))
    label_indices.remove(no_relation_label_idx)
    return sklearn.metrics.f1_score(labels, preds, average="micro", labels=label_indices) * 100.0


def klue_re_auprc(probs, labels):
    """KLUE-RE AUPRC (with no_relation)"""
    labels = np.eye(30)[labels]
    score = np.zeros((30,))
    for c in range(30):
        targets_c = labels.take([c], axis=1).ravel()
        preds_c = probs.take([c], axis=1).ravel()
        precision, recall, _ = sklearn.metrics.precision_recall_curve(targets_c, preds_c)
        score[c] = sklearn.metrics.auc(recall, precision)
    return np.average(score) * 100.0


def compute_metrics(pred):
    """validation을 위한 metrics function"""
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)
    probs = pred.predictions

    # calculate accuracy using sklearn's function
    f1 = klue_re_micro_f1(preds, labels)
    auprc = klue_re_auprc(probs, labels)
    acc = accuracy_score(labels, preds)  # 리더보드 평가에는 포함되지 않습니다.

    return {
        "micro f1 score": f1,
        "auprc": auprc,
        "accuracy": acc,
    }


def n_compute_metrics(logits, y):
    """refactoring된 코드를 위한 compute_metrics"""
    logits = logits.detach().cpu()
    y = y.detach().cpu()

    pred = np.argmax(logits.numpy(), axis=-1)

    # calculate accuracy using sklearn's function
    f1 = klue_re_micro_f1(pred, y)
    acc = accuracy_score(y, pred)  # 리더보드 평가에는 포함되지 않습니다.

    return {
        "micro f1 score": f1,
        "accuracy": acc,
    }


def load_data(dataset_dir):
    """csv 파일을 경로에 맡게 불러 옵니다."""
    pd_dataset = pd.read_csv(dataset_dir)
    dataset = preprocessing_dataset(pd_dataset)

    return dataset


def num_to_label(label):
    """
    숫자로 되어 있던 class를 원본 문자열 라벨로 변환 합니다.
    """
    origin_label = []
    with open("/opt/ml/code/dict_num_to_label.pkl", "rb") as f:
        dict_num_to_label = pickle.load(f)
    for v in label:
        origin_label.append(dict_num_to_label[v])

    return origin_label


def make_output(logits):
    """
    batch 단위의 logits을 풀고 prob와 pred를 통해 csv파일을 만듭니다.
    """
    logits = torch.cat([x for x in logits])

    prob = F.softmax(logits, dim=-1).tolist()
    pred = np.argmax(logits, axis=-1).tolist()

    pred_a = num_to_label(pred)

    output = pd.DataFrame({"id": 0, "pred_label": pred_a, "probs": prob})
    output["id"] = range(0, len(output))
    output.to_csv("./submission.csv", index=False)


def show_result(result):
    f1 = 0
    au = 0

    for i, x in enumerate(result):
        f1 += x["test_f1"]
        au += x["test_auprc"]
        print("----------------------")
        print(f"{i+1}번 Fold")
        print(f"F1 score : {x['test_f1']:.2f}")
        print(f"AUPRC score : {x['test_auprc']:.2f}")

    print("----------------------")
    print(f"Average F1 score : {f1/5:.2f}")
    print(f"Average AUPRC score : {au/5:.2f}")
    print("----------------------")


# loss funcion
# https://discuss.pytorch.org/t/is-this-a-correct-implementation-for-focal-loss-in-pytorch/43327/8
class FocalLoss(nn.Module):
    def __init__(self, weight=None, gamma=0.5, reduction="mean"):
        nn.Module.__init__(self)
        self.weight = weight
        self.gamma = gamma
        self.reduction = reduction

    def forward(self, input_tensor, target_tensor):
        log_prob = F.log_softmax(input_tensor, dim=-1)
        prob = torch.exp(log_prob)
        return F.nll_loss(
            ((1 - prob) ** self.gamma) * log_prob, target_tensor, weight=self.weight, reduction=self.reduction
        )


# class FocalLoss(nn.Module):
#     def __init__(self, alpha=1, gamma=2):
#         super(FocalLoss, self).__init__()
#         self.alpha = alpha
#         self.gamma = gamma

#     def forward(self, outputs, targets):
#         ce_loss = torch.nn.functional.cross_entropy(outputs, targets, reduction="none")
#         pt = torch.exp(-ce_loss)
#         focal_loss = (self.alpha * (1 - pt) ** self.gamma * ce_loss).mean()
#         return focal_loss


class LabelSmoothingLoss(nn.Module):
    def __init__(self, classes=3, smoothing=0.0, dim=-1):
        super(LabelSmoothingLoss, self).__init__()
        self.confidence = 1.0 - smoothing
        self.smoothing = smoothing
        self.cls = classes
        self.dim = dim

    def forward(self, pred, target):
        pred = pred.log_softmax(dim=self.dim)
        with torch.no_grad():
            true_dist = torch.zeros_like(pred)
            true_dist.fill_(self.smoothing / (self.cls - 1))
            true_dist.scatter_(1, target.data.unsqueeze(1), self.confidence)
        return torch.mean(torch.sum(-true_dist * pred, dim=self.dim))


# https://gist.github.com/SuperShinyEyes/dcc68a08ff8b615442e3bc6a9b55a354
class F1Loss(nn.Module):
    def __init__(self, classes=30, epsilon=1e-7):
        super().__init__()
        self.classes = classes
        self.epsilon = epsilon

    def forward(self, y_pred, y_true):
        assert y_pred.ndim == 2
        assert y_true.ndim == 1
        y_true = F.one_hot(y_true, self.classes).to(torch.float32)
        y_pred = F.softmax(y_pred, dim=1)

        tp = (y_true * y_pred).sum(dim=0).to(torch.float32)
        tn = ((1 - y_true) * (1 - y_pred)).sum(dim=0).to(torch.float32)
        fp = ((1 - y_true) * y_pred).sum(dim=0).to(torch.float32)
        fn = (y_true * (1 - y_pred)).sum(dim=0).to(torch.float32)

        precision = tp / (tp + fp + self.epsilon)
        recall = tp / (tp + fn + self.epsilon)

        f1 = 2 * (precision * recall) / (precision + recall + self.epsilon)
        f1 = f1.clamp(min=self.epsilon, max=1 - self.epsilon)
        return 1 - f1.mean()


_criterion_entrypoints = {
    "CrossEntropy": nn.CrossEntropyLoss(),
    "focal": FocalLoss(),
    "label_smoothing": LabelSmoothingLoss(),
    "f1": F1Loss(),
}


def criterion_entrypoint(criterion_name):
    return _criterion_entrypoints[criterion_name]

Tuning

Tune.py

"""
batch_size
"""

trainer = pl.Trainer(auto_scale_batch_size="binsearch")
trainer.tuner.scale_batch_size(model=model, datamodule=dataloader)

"""
learning_rate
"""

trainer = pl.Trainer(auto_lr_find=True)
trainer.tuner.lr_find(model=model,datamodule=dataloader, num_training=300)
  • PL의 기능을 활용해 batch size 및 초기 LR을 찾았습니다.
profile
AI 새싹

0개의 댓글