식물의 사진으로 질병 유무 판단하기

yeoni·2023년 6월 21일
0

딥러닝-Pytorch

목록 보기
3/3

텐서플로 허브와 전이학습

Base 모델 만들기

1. 데이터 풀기

  • 폴더명이 식물이름_healty, 식물이름_질병이름으로 저장되어 있음
  • 사진들의 개수가 밸런스가 맞지는 않는다.
from google.colab import drive
drive.mount('/content/drive')

!unzip -qq '/content/drive/MyDrive/데이터스쿨/실습파일/DL/data/dataset.zip' -d './dataset'

2. 폴더 정리

import os
import shutil

original_dataset_dir = './dataset'
classes_list = os.listdir(original_dataset_dir) #폴더이름에서 클래스 이름 가져오기

base_dir = './splitted'
os.mkdir(base_dir)

# 데이터 정리를 위한 목록 및 폴더 생성
train_dir = os.path.join(base_dir, 'train')
os.mkdir(train_dir)
validation_dir = os.path.join(base_dir, 'val')
os.mkdir(validation_dir)
test_dir = os.path.join(base_dir, 'test')
os.mkdir(test_dir)

# 폴더 목록 만들기
for cls in classes_list:
  os.mkdir(os.path.join(train_dir, cls))
  os.mkdir(os.path.join(validation_dir, cls))
  os.mkdir(os.path.join(test_dir, cls))

3. 데이터 현황 확인

import math

for cls in classes_list:
  # './dataset' + 클래스
  path = os.path.join(original_dataset_dir, cls)
  # path에 있는 파일 이름 저장
  fnames = os.listdir(path)

  # 6:2:2 = train, val, test floor -> 정수형으로
  train_size = math.floor(len(fnames) * 0.6)
  validation_size = math.floor(len(fnames) * 0.2)
  test_size = math.floor(len(fnames) * 0.2)

  # 처음부터 60%까지
  train_fnames = fnames[:train_size]
  print('Train size(',cls,')', len(train_fnames))
  for fname in train_fnames:
    src = os.path.join(path, fname)
    dst = os.path.join(os.path.join(train_dir, cls), fname)
    # 파일 copy
    shutil.copyfile(src, dst)

  validation_fnames = fnames[train_size:(validation_size + train_size)]
  print('Validation size(',cls,'): ', len(validation_fnames))
  for fname in validation_fnames:
    src = os.path.join(path, fname)
    dst = os.path.join(os.path.join(validation_dir, cls), fname)
    shutil.copyfile(src, dst)

  test_fnames = fnames[(train_size + validation_size):(validation_size + train_size + test_size)]
  print('Test size(',cls,'): ', len(test_fnames))
  for fname in test_fnames:
    src = os.path.join(path, fname)
    dst = os.path.join(os.path.join(test_dir, cls), fname)
    shutil.copyfile(src, dst)

4. 학습 준비

import torch
import os

USE_CUDA = torch.cuda.is_available()
DEVICE = torch.device('cuda' if USE_CUDA else 'cpu')
BATCH_SIZE = 256 
EPOCH = 30

5. 데이터 loader

  • num_workers: dataset의 데이터를 gpu로 전송할 때 필요한 전처리를 수행할 때 사용하는 subprocess의 수
  • num_workers의 수를 늘리면 병렬처리를 통해 더 빠르게 gpu에 정보를 전달할 수 있어 성능이 좋아진다.
  • 하지만 num_workers의 수가 너무 크면 다른 일을 수행하는 데 사용할 자원이 적어져 성능이 안좋아질 수 있다. 따라서 적절한 값을 찾기 위해 하이퍼파라미터 튜닝을 하듯 접근한다.
import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder

# 사진크기가 제가각이라서 64*64
transform_base = transforms.Compose([transforms.Resize((64, 64)), transforms.ToTensor()])
# 폴더를 전체를 읽기 -> 폴더 하나를 클래스로 매치 -> 라벨=폴더이름
train_dataset = ImageFolder(root='./splitted/train', transform=transform_base)
val_dataset = ImageFolder(root='./splitted/val', transform=transform_base)

from torch.utils.data import DataLoader

train_loader = torch.utils.data.DataLoader(train_dataset,
                                           batch_size=BATCH_SIZE,
                                           shuffle=True,
                                           num_workers=4
                                           )
val_loader = torch.utils.data.DataLoader(val_dataset,
                                           batch_size=BATCH_SIZE,
                                           shuffle=True,
                                           num_workers=4
                                           )

6. 모델 수립

import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

class Net(nn.Module):
  def __init__(self):
    super(Net, self).__init__()

    self.conv1 = nn.Conv2d(3, 32, 3, padding=1)
    self.pool = nn.MaxPool2d(2,2)
    self.conv2 = nn.Conv2d(32, 64, 3, padding=1)
    self.conv3 = nn.Conv2d(64, 64, 3, padding=1)

    self.fc1 = nn.Linear(4096, 512)
    self.fc2 = nn.Linear(512, 33)

  def forward(self, x):
    x = self.conv1(x)
    x = F.relu(x)
    x = self.pool(x)
    # 훈련할 때만 사용하려고 training
    x = F.dropout(x, p=0.25, training=self.training)

    x = self.conv2(x)
    x = F.relu(x)
    x = self.pool(x)
    x = F.dropout(x, p=0.25, training=self.training)

    x = self.conv3(x)
    x = F.relu(x)
    x = self.pool(x)
    x = F.dropout(x, p=0.25, training=self.training)

    x = x.view(-1, 4096) #flatten과 비슷한 효과
    x = self.fc1(x)
    x = F.relu(x)
    x = F.dropout(x, p=0.5, training=self.training)
    x = self.fc2(x)

    return F.log_softmax(x, dim=1)
    
model_base = Net().to(DEVICE)
optimizer = optim.Adam(model_base.parameters(), lr=0.001)

7. train, evaluate 함수

  • width resource → 자원을 받지 않아도 with 구문 끝나면 끝난다. 에러를 방지해주는 역할을 한다.
def train(model, tranin_loader, optimizer):
  model.train()
  for bath_idx, (data, target) in enumerate(train_loader):
    data, target = data.to(DEVICE), target.to(DEVICE)
    optimizer.zero_grad()
    output = model(data)
    loss = F.cross_entropy(output, target)
    loss.backward()
    optimizer.step() #weight 업데이트

def evaluate(model, test_loader):
  model.eval()
  test_loss = 0
  correct = 0

  # 가중치 업데이트 x , 평가 모드
  with torch.no_grad(): 
    for data, target in test_loader:
      data, target = data.to(DEVICE), target.to(DEVICE)
      output = model(data)

      test_loss += F.cross_entropy(output, target, reduction='sum').item()

      pred = output.max(1, keepdim=True)[1]
      correct += pred.eq(target.view_as(pred)).sum().item()

  test_loss /= len(test_loader.dataset)
  test_accuracy = 100. * correct / len(test_loader.dataset)
  return test_loss, test_accuracy

8. best 모델을 뽑아 base 모델로 저장

from torch.serialization import validate_cuda_device
import time
import copy

def train_baseline(model, train_loader, val_loader, optimizer, num_epochs=30):
  best_acc=0.0
  # 정확도가 높은 모델의 weight 저장
  best_model_wts = copy.deepcopy(model.state_dict())

  for epoch in range(1, num_epochs + 1):
    since = time.time()
    train(model, train_loader, optimizer)
    train_loss, train_acc = evaluate(model, train_loader)
    val_loss, val_acc = evaluate(model, val_loader)

    if val_acc > best_acc:
      best_acc = val_acc
      best_model_wts = copy.deepcopy(model.state_dict())

    time_elapsed = time.time() - since

    print('----------------------------epoch {}-----------------------------'.format(epoch))
    print('train Loss: {:.4f}, Accuracy: {:.2f}%'.format(train_loss, train_acc))
    print('val Loss: {:.4f}, Accuracy: {:.2f}%'.format(val_loss, val_acc))
    print('Completed in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))

  model.load_state_dict(best_model_wts)
  return model

base = train_baseline(model_base, train_loader, val_loader, optimizer, EPOCH)
torch.save(base, 'baseline.pt')

전이학습, 미세조정

전이학습 예시1

전이학습 예시2

  • Imagenet → class를 주어 사물 분류
  • Imagenet(class를 주어 사물 분류)에서 이미 학습을 완료한 구조화된 weight → 적용하려고 하는 모델로 가져와서 전이학습
  • 1사분면: 출력단 훈련 / 2사분면: 구조만 가져와서 훈련 / 3,4사분면: 입력층에 가까운 데이터 훈련

1. 데이터 전처리

data_transforms = {
	# Compose 전처리
    'train': transforms.Compose([transforms.Resize([64, 64]),
                                 # 랜덤 상하좌우로 반전 -> 이미지 증강 -> 과적합 방지 & 부족한 데이터 보충 / 좌우 순서가 중요한 경우는 사용x(ex.6, 9)
                                 transforms.RandomHorizontalFlip(),
                                 transforms.RandomVerticalFlip(),
                                 # 이미지 자르기 52*52 정사이즈 여러 종류의 crop 존재 -> 일부를 보여주는 효과
                                 transforms.RandomCrop(52), transforms.ToTensor(),
                                 # rgb 색상 값이고 평균 값, 표준편차 값 -> normalize 색상 분포를 찾아 놓아야 함
                                 transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])]),
    'val': transforms.Compose([transforms.Resize([64, 64]),
                                 transforms.RandomCrop(52), transforms.ToTensor(),
                                 transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])])
}

data_dir = './splitted'
image_datasets = {x: ImageFolder(root=os.path.join(data_dir, x),
                                  transform=data_transforms[x]) for x in ['train', 'val']}

dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x],
                                              batch_size=BATCH_SIZE,
                                              shuffle=True,
                                              num_workers=4) for x in ['train', 'val']}

dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val']}

class_names = image_datasets['train'].classes

2. weight 가져오기

from torchvision import models
from torch.optim import lr_scheduler

# 학습이 완료된 weight를 가져옴, 주의! 클래스는 맞지 않는다 출력단을 맞추기(원래 모델 33)
resnet = models.resnet50(pretrained=True)
num_fits = resnet.fc.in_features # in_features 마지막 layer 채널수
resnet.fc = nn.Linear(num_fits, 33)
resnet = resnet.to(DEVICE)

criterion = nn.CrossEntropyLoss()
# 마지막 교체 layer weight는 학습이 안됐으니 아래 옵션 필요
optimizer_ft = optim.Adam(filter(lambda p: p.requires_grad, resnet.parameters()), lr=0.001)

#epoch에 따라서 lr 변화 7epoch마다 0.1 하락
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)

ct = 0
# 모델의 하위 layer가져오기
for child in resnet.children():
  ct += 1
  # 0 ~ 5 freeze 학습 x -> 입력에 가까운 layer 고정 / 6~9 학습하기
  if ct < 6:
    for param in child.parameters():
      param.requires_grad = False

3. 전이학습 best 모델 저장 함수

def train_resnet(model, criterion, optimizer, scheduler, num_epochs=25):
  best_model_wts = copy.deepcopy(model.state_dict())
  best_acc = 0.0

  for epoch in range(num_epochs):
    print('----------------------------epoch {}------------------------------'.format(epoch+1))
    since = time.time()

    for phase in ['train', 'val']:
      if phase == 'train':
        model.train()
      else:
        model.eval()

      # 초기화
      running_loss = 0.0
      running_corrects = 0

      for inputs, labels in dataloaders[phase]:
        inputs = inputs.to(DEVICE)
        labels = labels.to(DEVICE)

        optimizer.zero_grad()

        with torch.set_grad_enabled(phase=='train'):
          outputs = model(inputs)
          _, preds = torch.max(outputs, 1)
          loss = criterion(outputs, labels)

          if phase == 'train':
            loss.backward()
            optimizer.step()

        running_loss += loss.item() * inputs.size(0) # inputs.size(0) 배치사이즈
        running_corrects += torch.sum(preds==labels.data)

      if phase == 'train':
        scheduler.step()

      epoch_loss = running_loss/dataset_sizes[phase]
      epoch_acc = running_corrects.double()/dataset_sizes[phase]

      print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))

      if phase == 'val' and epoch_acc > best_acc:
        best_acc = epoch_acc
        best_model_wts = copy.deepcopy(model.state_dict())

    time_elapsed = time.time() - since
    print('Completed in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
  print('Best val Acc: {:.4f}'.format(best_acc))

  model.load_state_dict(best_model_wts)

  return model

4. 전이학습

model_resnet50 = train_resnet(resnet, criterion, optimizer_ft, exp_lr_scheduler, num_epochs=EPOCH)
torch.save(model_resnet50, 'resnet50.pt')

# 전처리
transform_resNet = transforms.Compose([
    transforms.Resize([64, 64]),
    transforms.RandomCrop(52),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# test loader
test_resNet = ImageFolder(root='./splitted/test', transform=transform_resNet)
test_loader_resNet = torch.utils.data.DataLoader(
    test_resNet,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=4
)

# evaluate
resnet50 = torch.load('resnet50.pt')
resnet50.eval()
test_loss, test_accuracy = evaluate(resnet50, test_loader_resNet)
print('ResNet test acc: ', test_accuracy) #ResNet test acc:  98.9485542621104

Reference
1) 제로베이스 데이터스쿨 강의자료
2) https://velog.io/@seokjin1013/PyTorch-numworkers%EC%97%90-%EA%B4%80%ED%95%98%EC%97%AC

profile
데이터 사이언스 / just do it

0개의 댓글