Sentiment analysis with huggingface BERT(code)

Dongbin Lee·2021년 2월 12일
0

AI study

목록 보기
1/6

작년 2020년 내가 AI를 공부하면서 많은 기술들을 보았지만, transformer, bert, gpt에 대한 내용을 많이 볼 수 있었다.
이전에 인턴기간 다녔었던 판교 회사에서도 bert를 이용한 챗봇을 강조하면서 홍보를 했었다.

transformer에 대한 내용을 boostcamp에서 이해하였고, 이에 대한 코드를 간단하게 찾아 구현해 보았다.

아래의 사이트에서 코드를 참고하였다.

https://curiousily.com/posts/sentiment-analysis-with-bert-and-hugging-face-using-pytorch-and-python/

코드를 참고하여 작성하던 중, freeze_support에러가 발생하여 run함수를 따로 구현하여 main에서 동작하게 하였으며 workers도 0으로 주었다.

임베딩 과정 중에 위 코드가 max_len에 맞지 않게 동작하여 truncation=True 옵션도 추가하여 작성했다.

from multiprocessing import freeze_support

import transformers
from transformers import BertModel, BertTokenizer, AdamW, get_linear_schedule_with_warmup
import torch
import gdown

import numpy as np
import pandas as pd
import seaborn as sns
from pylab import rcParams
import matplotlib.pyplot as plt
from matplotlib import rc
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report
from collections import defaultdict
from textwrap import wrap
from torch.nn import functional as F
from torchsummary import summary as summary_


from torch import nn, optim
from torch.utils.data import Dataset, DataLoader


def run():
    freeze_support()
    sns.set(style='whitegrid', palette='muted', font_scale=1.2)
    HAPPY_COLORS_PALETTE = ["#01BEFE", "#FFDD00", "#FF7D00", "#FF006D", "#ADFF02", "#8F00FF"]
    sns.set_palette(sns.color_palette(HAPPY_COLORS_PALETTE))
    rcParams['figure.figsize'] = 12, 8
    RANDOM_SEED = 42
    np.random.seed(RANDOM_SEED)
    torch.manual_seed(RANDOM_SEED)
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

    df = pd.read_csv("reviews.csv")
    print(df.head())
    print(df.shape)
    print(df.info())

    sns.countplot(df.score)
    plt.xlabel('review scroe')
    plt.show()

    def to_sentiment(rating):
        rating = int(rating)
        if rating <= 2 : return 0
        elif rating == 3 : return 1
        else : return 2

    df['sentiment'] = df.score.apply(to_sentiment)
    class_names = ['negative', 'neutral', 'positive']

    ax = sns.countplot(df.sentiment)
    plt.xlabel('review sentiment')
    ax.set_xticklabels(class_names)
    plt.show()

    PRE_TRAINED_MODEL_NAME = 'bert-base-cased'
    tokenizer = BertTokenizer.from_pretrained(PRE_TRAINED_MODEL_NAME)
    # sample_txt = 'When was I last outside? I am stuck at home for 2 weeks.'
    #
    # tokens = tokenizer.tokenize(sample_txt)
    # tokens_ids = tokenizer.convert_tokens_to_ids(tokens)
    #
    # print(f' Sentence: {sample_txt}')
    # print(f'   Tokens: {tokens}')
    # print(f'Token IDs: {tokens_ids}')

    # print(tokenizer.sep_token, tokenizer.sep_token_id)
    # print(tokenizer.cls_token, tokenizer.cls_token_id)
    # print(tokenizer.pad_token, tokenizer.pad_token_id)
    # print(tokenizer.unk_token, tokenizer.unk_token_id)

    # encoding = tokenizer.encode_plus(
    #     sample_txt,
    #     max_length=32,
    #     add_special_tokens=True,
    #     return_token_type_ids=False,
    #     padding='max_length',
    #     return_attention_mask=True,
    #     return_tensors='pt'
    # )
    #
    # print(encoding.keys())
    #
    # print(len(encoding['input_ids'][0]))
    # print(encoding['input_ids'][0])
    #
    # print(len(encoding['attention_mask'][0]))
    # print(encoding['attention_mask'])
    #
    # print(tokenizer.convert_ids_to_tokens(encoding['input_ids'][0]))

    token_lens = []
    for txt in df.content:
        tokens = tokenizer.encode(txt, max_length=512)
        token_lens.append(len(tokens))

    sns.displot(token_lens, kde=True)
    plt.xlim([0,256])
    plt.xlabel('Token count')
    plt.show()

    # Most of reviesws seem to contain less than 128 tokens, but we'll be on the safe side and choose a maximum length of 160.
    MAX_LEN = 160

    class GPReviewDataset(Dataset):
      def __init__(self, reviews, targets, tokenizer, max_len):
        self.reviews = reviews
        self.targets = targets
        self.tokenizer = tokenizer
        self.max_len = max_len

      def __len__(self):
        return len(self.reviews)

      def __getitem__(self, item):
        review = str(self.reviews[item])
        target = self.targets[item]

        encoding = self.tokenizer.encode_plus(
          review,
          add_special_tokens=True,
          max_length=self.max_len,
          return_token_type_ids=False,
          padding='max_length',
          return_attention_mask=True,
          return_tensors='pt',
          truncation=True
        )

        return {
          'review_text': review,
          'input_ids': encoding['input_ids'].flatten(),
          'attention_mask': encoding['attention_mask'].flatten(),
          'targets': torch.tensor(target, dtype=torch.long)
        }

    df_train, df_test = train_test_split(df, test_size=0.1, random_state=RANDOM_SEED)
    df_val, df_test = train_test_split(df_test, test_size=0.5, random_state=RANDOM_SEED)

    print(df_train.shape, df_val.shape, df_test.shape)


    def create_data_loader(df, tokenizer, max_len, batch_size):
      ds = GPReviewDataset(
        reviews=df.content.to_numpy(),
        targets=df.sentiment.to_numpy(),
        tokenizer=tokenizer,
        max_len=max_len
      )

      return DataLoader(
        ds,
        batch_size=batch_size,
        num_workers=0
      )

    BATCH_SIZE = 32

    train_data_loader = create_data_loader(df_train, tokenizer, MAX_LEN, BATCH_SIZE)
    val_data_loader = create_data_loader(df_val, tokenizer, MAX_LEN, BATCH_SIZE)
    test_data_loader = create_data_loader(df_test, tokenizer, MAX_LEN, BATCH_SIZE)



    data = next(iter(train_data_loader))
    print(data.keys())

    print(data['input_ids'].shape)
    print(data['attention_mask'].shape)
    print(data['targets'].shape)

    # bert_model = BertModel.from_pretrained(PRE_TRAINED_MODEL_NAME)
    #
    # ouput = bert_model(
    #     input_ids=encoding['input_ids'],
    #     attention_mask=encoding['attention_mask']
    # )
    # last_hidden_state = ouput.last_hidden_state
    # pooled_output = ouput.pooler_output
    # print(last_hidden_state.shape)
    # print(bert_model.config.hidden_size)
    # print(pooled_output.shape)

    class SentimentClassifier(nn.Module):
      def __init__(self, n_classes):
        super(SentimentClassifier, self).__init__()
        self.bert = BertModel.from_pretrained(PRE_TRAINED_MODEL_NAME)
        self.drop = nn.Dropout(p=0.3)
        #self.out = nn.Linear(self.bert.config.hidden_size, n_classes)
        self.resnet = torch.hub.load('pytorch/vision:v0.6.0', 'resnet18', pretrained=True)

      def forward(self, input_ids, attention_mask):
        output = self.bert(
          input_ids=input_ids,
          attention_mask=attention_mask
        )
        output = self.drop(output.pooler_output)
        return self.out(output)

    model = SentimentClassifier(len(class_names))
    model = model.to(device)

    input_ids = data['input_ids'].to(device)
    attention_mask = data['attention_mask'].to(device)

    print(input_ids.shape)  # batch size x seq length
    print(attention_mask.shape)  # batch size x seq length

    EPOCHS = 10
    optimizer = AdamW(model.parameters(), lr=2e-5, correct_bias=False)
    total_steps = len(train_data_loader) * EPOCHS
    scheduler = get_linear_schedule_with_warmup(
        optimizer,
        num_warmup_steps=0,
        num_training_steps=total_steps
    )
    loss_fn = nn.CrossEntropyLoss().to(device)

    def train_epoch(
            model,
            data_loader,
            loss_fn,
            optimizer,
            device,
            scheduler,
            n_examples
    ):
        model = model.train()
        losses = []
        correct_predictions = 0
        for d in data_loader:

            input_ids = d["input_ids"].to(device)
            attention_mask = d["attention_mask"].to(device)
            targets = d["targets"].to(device)
            outputs = model(
                input_ids=input_ids,
                attention_mask=attention_mask
            )
            _, preds = torch.max(outputs, dim=1)
            loss = loss_fn(outputs, targets)
            correct_predictions += torch.sum(preds == targets)
            losses.append(loss.item())
            loss.backward()
            nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()
            scheduler.step()
            optimizer.zero_grad()
        return correct_predictions.double() / n_examples, np.mean(losses)

    def eval_model(model, data_loader, loss_fn, device, n_examples):
        model = model.eval()
        losses = []
        correct_predictions = 0
        with torch.no_grad():
            for d in data_loader:
                input_ids = d["input_ids"].to(device)
                attention_mask = d["attention_mask"].to(device)
                targets = d["targets"].to(device)
                outputs = model(
                    input_ids=input_ids,
                    attention_mask=attention_mask
                )
                _, preds = torch.max(outputs, dim=1)
                loss = loss_fn(outputs, targets)
                correct_predictions += torch.sum(preds == targets)
                losses.append(loss.item())
        return correct_predictions.double() / n_examples, np.mean(losses)

    history = defaultdict(list)
    best_accuracy = 0
    for epoch in range(EPOCHS):
        print(f'Epoch {epoch + 1}/{EPOCHS}')
        print('-' * 10)
        train_acc, train_loss = train_epoch(
            model,
            train_data_loader,
            loss_fn,
            optimizer,
            device,
            scheduler,
            len(df_train)
        )
        print(f'Train loss {train_loss} accuracy {train_acc}')
        val_acc, val_loss = eval_model(
            model,
            val_data_loader,
            loss_fn,
            device,
            len(df_val)
        )
        print(f'Val   loss {val_loss} accuracy {val_acc}')
        print()
        history['train_acc'].append(train_acc)
        history['train_loss'].append(train_loss)
        history['val_acc'].append(val_acc)
        history['val_loss'].append(val_loss)
        if val_acc > best_accuracy:
            torch.save(model.state_dict(), 'best_model_state.bin')
            best_accuracy = val_acc

    plt.plot(history['train_acc'], label='train accuracy')
    plt.plot(history['val_acc'], label='validation accuracy')
    plt.title('Training history')
    plt.ylabel('Accuracy')
    plt.xlabel('Epoch')
    plt.legend()
    plt.ylim([0, 1])

    test_acc, _ = eval_model(
        model,
        test_data_loader,
        loss_fn,
        device,
        len(df_test)
    )
    test_acc.item()


if __name__ == '__main__':

    run()



profile
Preparation student who dreams of becoming an AI engineer.

0개의 댓글