dacon 상추의 생육 환경 생성 AI 경진대회 baseline 클론 코딩 및 해석

woojjn·2022년 12월 3일
0

dacon에 현재 열려 있는 "상추의 생육 환경 생성 AI 경진대회"를 준비해보려 한다.
먼저 baseline 코드를 그대로 써보고 이를 해석한 것을 쓴다.

출처: https://dacon.io/competitions/official/236033/codeshare/7081?page=1&dtype=recent

import

먼저 필요한 라이브러리를 import한다.

import random
import pandas as pd
import numpy as np
import os
import glob

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

from tqdm.auto import tqdm

import warnings
warnings.filterwarnings(action='ignore')
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

Hyperparameter Setting

하이퍼파라미터 또한 CFG에 dictionary 형태로 저장한다.
혹시 나중에 hyperparameter를 추가하거나 변경해줄 때, 이 값을 이용하여 변경해주면 될듯하다.

CFG = {
    'EPOCHS':30,
    'LEARNING_RATE':1e-3,
    'BATCH_SIZE':16,
    'SEED':41
}

Fixed RandomSeed

seed를 고정시킨다.
별로 중요해보이지 않지만 나중에 다시 결과를 재현할 때 꼭 필요한 작업이라 일단 해주는 것이 좋다. 물론 학습이 진행되는 데에는 영향을 끼치지 않는다.

def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(CFG['SEED']) # Seed 고정

Data Pre-processing

glob 라이브러리를 통해 train_input(target)내에 있는 모든 csv 파일들을 list에 넣는다. sorted를 통해 정렬된 상태로 list를 얻는다.
나 같은 경우는 구글 colab을 통해 실행시킬 것이기 때문에 DATA_PATH를 별도 설정해주었다.

DATA_PATH = "./drive/MyDrive/open"

all_input_list = sorted(glob.glob(f'{DATA_PATH}/train_input/*.csv'))
all_target_list = sorted(glob.glob(f'{DATA_PATH}/train_target/*.csv'))

그 다음은 data를 train set과 eval set으로 split해주는 작업인데, 여러 방법이 있겠지만 baseline에서는 shuffle 작업을 생략하고, 그냥 순서대로 split을 해주었다.
나는 그렇게 하긴 아쉬워서 shuffle을 추가하여 random하게 split을 해주었다.

random.shuffle(all_input_list)		# baseline엔 없음
random.shuffle(all_target_list)		# baseline엔 없음

train_input_list = all_input_list[:25]
train_target_list = all_target_list[:25]

val_input_list = all_input_list[25:]
val_target_list = all_target_list[25:]

다시 생각해보니 이렇게 input과 target을 따로 섞으면 dataset의 data, label 짝이 안 맞는 문제가 발생한다. 그래서 다음과 같은 방식으로 변경했다.

samples = set(random.sample(range(len(all_input_list)), k = 25))

train_input_list = []
train_target_list = []

val_input_list = []
val_target_list = []

for i in range(len(all_input_list)):
    if i in samples:
        train_input_list.append(all_input_list[i])
        train_target_list.append(all_target_list[i])
    else:
        val_input_list.append(all_input_list[i])
        val_target_list.append(all_target_list[i])

CustomDataset

이제 model에 input을 넣기 전 더 좋은 형태로 넣기 위해 data들을 가공할 CustomDataset 클래스이다.
train data의 경우 시간 당 한 번, target data의 경우 하루에 한 번 측정한다.
그래서 train data 24개 묶음이 target data 1개와 하나의 data, label로 리턴되어야 한다.

class CustomDataset(Dataset):
    def __init__(self, input_paths, target_paths, infer_mode):
        self.input_paths = input_paths		# input 데이터 경로 리스트 = train_input_list
        self.target_paths = target_paths	# target 데이터 경로 리스트 = train_target_list
        self.infer_mode = infer_mode		# True면 data만 리턴, False면 data 와 label 같이 리턴
        
        self.data_list = []
        self.label_list = []
        print('Data Pre-processing..')
        for input_path, target_path in tqdm(zip(self.input_paths, self.target_paths)):
            input_df = pd.read_csv(input_path)
            target_df = pd.read_csv(target_path)
            
            input_df = input_df.drop(columns=['obs_time'])	# "obs_time" column 삭제
            input_df = input_df.fillna(0)	# NaN 데이터를 0으로 채움
            
            input_length = int(len(input_df)/24)	
            target_length = int(len(target_df))
            
            for idx in range(target_length):
                time_series = input_df[24*idx:24*(idx+1)].values	
                self.data_list.append(torch.Tensor(time_series))
            for label in target_df["predicted_weight_g"]:
                self.label_list.append(label)
        print('Done.')
              
    def __getitem__(self, index):
        data = self.data_list[index]
        label = self.label_list[index]
        if self.infer_mode == False:
            return data, label
        else:
            return data
        
    def __len__(self):
        return len(self.data_list)
train_dataset = CustomDataset(train_input_list, train_target_list, False)
train_loader = DataLoader(train_dataset, batch_size = CFG['BATCH_SIZE'], shuffle=True, num_workers=6)

val_dataset = CustomDataset(val_input_list, val_target_list, False)
val_loader = DataLoader(val_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=6)

Model Define

Baseline에서의 Basemodel은 lstm에 FC layer를 붙인 방식을 채택했다.
대략적으로 (DAT, 내부온도관측치, · · ·, 일간누적총광량)와 같은 15개의 feature를 입력으로 주면, hidden_size가 256인 단방향 lstm을 통과하고 (batch_size, sequence_length, hidden_size)의 output이 나온다.
이 output 중 sequence_length의 마지막 output만을 가지고 classifier에 입력으로 주어 상추의 무게를 예측한다.

class BaseModel(nn.Module):
    def __init__(self):
        super(BaseModel, self).__init__()
        self.lstm = nn.LSTM(input_size=15, hidden_size=256, batch_first=True, bidirectional=False)
        # input_size는 feature의 개수와 동일(DAT, 내부온도관측치, · · ·, 일간누적총광량)
        # output의 차원 = (batch_size, sequence_length, hidden_size), (h_n, c_n)
        # h_n: n번 째 hidden_state
        # c_n: n번 째 cell_state
        self.classifier = nn.Sequential(
            nn.Linear(256, 1),
        )
        
    def forward(self, x):
        hidden, _ = self.lstm(x)
        output = self.classifier(hidden[:,-1,:])
        # lstm에 FC layer를 붙여 하나의 값이 return 되게 한다.
        return output
        

Train

train 함수
loss 함수로 L1 loss를 사용한다.
한 epoch이 끝날 때마다 validation loss를 측정하고, validation loss가 가장 낮은 모델을 return 한다.

def train(model, optimizer, train_loader, val_loader, scheduler, device):
    model.to(device)
    criterion = nn.L1Loss().to(device)
    
    best_loss = 9999
    best_model = None
    for epoch in range(1, CFG['EPOCHS']+1):
        model.train()
        train_loss = []
        for X, Y in iter(train_loader):
            X = X.to(device)
            Y = Y.to(device)
            
            optimizer.zero_grad()
            
            output = model(X)
            loss = criterion(output, Y)
            
            loss.backward()
            optimizer.step()
            
            train_loss.append(loss.item())
                    
        val_loss = validation(model, val_loader, criterion, device)
        
        print(f'Train Loss : [{np.mean(train_loss):.5f}] Valid Loss : [{val_loss:.5f}]')
        
        if scheduler is not None:
            scheduler.step(val_loss)
            
        if best_loss > val_loss:
            best_loss = val_loss
            best_model = model
    return best_model

validation 함수

def validation(model, val_loader, criterion, device):
    model.eval()
    val_loss = []
    with torch.no_grad():
        for X, Y in iter(val_loader):
            X = X.float().to(device)
            Y = Y.float().to(device)
            
            model_pred = model(X)
            loss = criterion(model_pred, Y)
            
            val_loss.append(loss.item())
            
    return np.mean(val_loss)

Run!!

학습 시작!!!

model = BaseModel()		# model 인스턴스 생성
model.eval()		
# Adam optimizer 사용
optimizer = torch.optim.Adam(params = model.parameters(), lr = CFG["LEARNING_RATE"])
# ReduceLROnPlateau LR scheduler 사용
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=2, threshold_mode='abs',min_lr=1e-8, verbose=True)

best_model = train(model, optimizer, train_loader, val_loader, scheduler, device)

Inference

다시 glob를 통해 test 데이터 가져오기

test_input_list = sorted(glob.glob('./test_input/*.csv'))
test_target_list = sorted(glob.glob('./test_target/*.csv'))

inference 진행 후 제출할 csv 파일을 만드는 함수

def inference_per_case(model, test_loader, test_path, device):
    model.to(device)
    model.eval()
    pred_list = []
    with torch.no_grad():
        for X in iter(test_loader):
            X = X.float().to(device)
            
            model_pred = model(X)
            
            model_pred = model_pred.cpu().numpy().reshape(-1).tolist()
            
            pred_list += model_pred
    
    submit_df = pd.read_csv(test_path)
    submit_df['predicted_weight_g'] = pred_list
    submit_df.to_csv(test_path, index=False) 

inference 실행

for test_input_path, test_target_path in zip(test_input_list, test_target_list):
    test_dataset = CustomDataset([test_input_path], [test_target_path], True)
    test_loader = DataLoader(test_dataset, batch_size = CFG['BATCH_SIZE'], shuffle=False, num_workers=0)
    inference_per_case(best_model, test_loader, test_target_path, device)

Submission

생성된 파일 압축하기

import zipfile
os.chdir(f"{DATA_PATH}/test_target/")
submission = zipfile.ZipFile("../submission.zip", 'w')
for path in test_target_list:
    path = path.split('/')[-1]
    submission.write(path)
submission.close()

0개의 댓글