- 데이터 준비
- Dataset 준비
input 데이터와 output 데이터로 구성
- Dataloader 생성
- 입력과 출력을 연결하는 Layer(층)으로 이뤄진 네트워크(모델)을 정의
- Sequential 방식: 순서대로 쌓아올린 네트워크로 이뤄진 모델을 생성하는 방식
- layer를 순서대로 쌓은 모델을 구현할때 간단히 모델을 정의할 수 있다.
- layer block을 정의하는데 사용할 수 있다.
- Subclass 방식: 네트워크를 정의하는 클래스를 구현.
- 다양한 구조의 모델을 정의할 수 있다.
- inializer에서 필요한 layer들을 생성한다.
- forward(self, X) 메소드에 forward propagation 계산을 구현한다.
- train
- train 함수, test 함수 정의
- test set 최종평가
분야와 상관 없이 공통적으로 가비고 있어야 하는 것 import
pip install torch torchvision
# torch: 공통 구현체
# torchvision: 영상처리위한 구현체
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import os
import matplotlib.pyplot as plt
import numpy as np
print("파이토치 버젼: ", torch.__version__)
device 설정
# 처리를 cpu를 이용할지 gpu를 이용할지 설정
device = "cuda" if torch.cuda.is_available() else "cpu"
print("decive:", device)
변수 설정
# 뒤에서 여러번 사용 할 값들을 저장할 변수
# 보델에 설정할 값으(모델 자체, 학습할 때 필요한 값 등등)으로 성능에 영향을 주는 값들을 저장할 변수
# === 하이퍼 파라미터(Hyper Parameter)
## 대문자 표기 -> 뒤에서 바꾸지 말라는 뜻
BATCH_SIZE = 256 # 모델의 파라미터를 업데이트 할 떄 사용 할 데이터의 개수, 한번에 몇개의 데이터를 입력할 지
N_EPOCH = 20 # 전체 TRAIN DATASET을 한번 학습한 것을 1 epoch
LR = 0.001 # 학습률. 파라미터 update 할 떄 gradient 값에 곱해줄 값
# (gradient 를 새로운 파라미터에 계산 할 때 얼마나 반영할 지 비율)
#
# step : 파라미터를 1먼 upeate하는 단위: 1step당 학습 데이터 수: batch_size
# epoch: 전체 학습 데이터셋을 한번 학습한 단위 ,
# 1 epoch당 step 횟수: ceil(총 데이터 수 /batch_size)
DATASET_SAVE_PATH = "datasets" # 데이터셋을 저장 할 디렉토리 경로
MODEL_SAVE_PATH = "models" # 학습-평가가 끝난 모델을 저장할 디렉토리 경로
os.makedirs(DATASET_SAVE_PATH, exist_ok=True)
os.makedirs(MODEL_SAVE_PATH, exist_ok=True)
np.ceil(60000/256)
dataset
### MNIST Dataset을 다운로드 + Dataset 객체를 생성
# train dataset
## torchvision.datasets
train_set = datasets.MNIST(root = DATA_SAVE_PATH, # 데이터셋을 저장할 디렉토리 경로
train = True, # trainset(훈련용): True, testset(검증용): False
downlad = True, # root에 저장된 데이터 파일들이 없을 떄 다운로드 방을 지 여부
transform = transforms.ToTensor() #데이터 전처리
)
# ToTensor(): ndarray, PIL .Image객체 -> torch.Tensor로 변환. (변환기라고 보면 됨)
# Pixcel값 정규화(normalize): 0 ~ 1 실수로 변환
test_set = datasets.MNIST(root=DATASET_SAVE_PATH,
train=False,
download = True,
transform = transforms.ToTensor())
train dataset과 test dataset 확인
type(train_set) # Dataset 타입
print(train_set)
print(test_set)
# 데이터 포인트 개수
print(len(train_set), len(test_set))
# Dataset을 모델에 어떻게 제공할지를 설정 =>학습/평가시 설정된대로 데이터를 loading
## 훈련용 DataLoader
train_loader = DataLoader(train_set, # Dataset
batch_size=BATCH_SIZE, # batch_size를 설정
shuffle=True, # 한 epoch이 끝나면 epoch이 끝나면 다음 EPOCH전에 데이터를 섞을 지 여부
drop_last = True, # 마지막 batch의 데이터수가 batch_size보다 적을 경우 버릴지(학습에 사용 안함) 여부
)
## 평가용 DataLoader
test_loader = DataLoader(test_set, batch_size=BATCH_SIZE)
> 동일한 데이터를 한번 끝날때 마다 섞어서 사용한다(default 값은 false)
>drop_last의 default 값은 false
> ex)
>```
>60000% 256 #마지막 남을 것은 수가 batch 사이즈보다 모자라면 버린다
>```
각 dataset 별 step 수 확인
print("1 에폭 당 step 수")
print("trainset: ", len(train_loader))
print("testset: ", len(test_loader))
np의 값 도출
np.floor(60000/256), 10000/256
여기까지의 과정이 이제 데이터 정리가 마무리 된 것이라 할 수 있다.
이제 모델을 정리 할 차레
두가지 모델을 만들어 준다
# class로 정의: nn.Module을 상속해서 정의
class MnistModel(nn.Module):
def __init__(self):
"""
모델 객체 생성 시 모델을 구현(정의)할 떄 필요한 것들을 초기화
필요한 거시 Layer들. 등등
"""
super().__init__()
#### 784(pixcel 수) -> 128개로 축소
self.lr1 = nn.Linear(28 * 28, 128) # input feature 크기, output size - 작성자가 지정(감에 따라..?)
#### 128(pixcel 수) -> 64개로 축소
self.lr2 = nn.Linear(128, 64)
#### 10(pixcel 수) -> 10개로 축소 (각 범주의 확룰)
self.lr3 = nn.Linear(64, 10)
#### Activation(활성)함수 -> 비선형 함수: ReLU
self.relu = nn.ReLU() # f(x) = max(x, 0)
def forward(self, x):
"""
input data를 입력 받아서 output 데이터를 만들떄 까지의 계산 흐름을 정의
===> forward propagation
parameter
x: 입력데이터
return
torch.Tensor: 출력데이터(모델 예측결과)
"""
# init에서 생성한 함수들을 이용해서 계산
### x-> 1차원으로 볌환 -> lr1 -> relu -> lr2 -> relu -> lr3 => ouput
# input (batch_size, channel, height, width) => (batch_size, 전체pixcel) - flatten 시키기
x = torch.flatten(x, start_dim=1) # (b, c, h, w) -> (b, c*h*w)
x = self.lr1(x)
x = self.relu(x)
x = self.lr2(x)
x = self.relu(x)
output = self.lr(x)
return output
무조건 필요한 것-> layer들(parameter들을 가지고 있는 함수들)
전체 pixcel중에서 진짜 최종적으로 필요한 만큼의 pixcel만 남긴다.(이때 설정하는 output은 감으로 설정- 많이 하다보면 생긴다)
model = MnistModel()
print(model)
여기서 변수명은 지정하지 않았지만 in features 크기와 out features 크기가 나와있고 relu 까지 출력이 된다. 그러나 이러한 연산의 흐름은 알 수 없다. 따라서 그 과정을 알아보기 위해서 torchinfo 패키지를 사용하여 모델의 연산 흐름 밑 정보를 확인하였다
from torchinfo import sumary
summary(model, (256, 1, 28, 28)) # summary(모델객체, input shape: tuple)
#### train dataset의 첫번쨰 배치를 이용해서 모델에 추론
x_batch, y_batch = next(iter(train_loader)) # (input, outout)
print(x_batch.shape, y_batch.shape)
# 추론
pred_batch = model(x_batch) #model의 forward() 메소드가 실행
pred_batch.shape
# 첫번째 이미지에 대한 추론 결과 -> class별 확률 => 가장 큰 확률의 index를 조회
pred_batch[0].argmax()
output: tensor(9)
y_batch[0]
output: tensor(6)
torch.sum(pred_batch.argmax(dim = 1) == y_batch)
output: tensor(26)
이제 만든 model을 train시켜보자
모델,loss function, optimizer 생성
#### 모델을 device로 옯긴다 (모델을 이용한 계싼을 CPU 에서 할 지 GPU 에서 할지 )
# 참고: device로 옮기는 것 => model, 모델에 주입 할 imput data, output data -> 이 세가지의 위치가 동일해야 함
# model = MnistModel()
model = model.to(device)
##### Loss function
# 다중분류문제: cross entropy, 이중분류문제: binary crossentropy ==> log loss
# 다중 분류: laㅠ디dl dufjro, 이중분류: yes/no 분류
loss_fn = nn.CrossEntropyLoss() # 객체, nn.funtional.cross_entry() 함수 -> 이는 예전에 많이 사용 - 지금은 전자가 더 잘 쓰임
#### Optimizer ===> 모델 파라미터들 최적화 ===> 경사하강법을 구현
optimizer = torch.optim.Adam(model.parameters(), # 최적화 대상 파라미터들
lr = LR # 학습률
)
학습 및 검증
import time # 학습시간 체크
## gkrtmq => train(훈련) + validation(1epoch 학습한 모델성능 검증)
# 에폭(epoch)별 학습결과를 저장할 리스트들
train_loss_list = [] # train set으로 검증했을 떄 list
val_loss_list = [] # test set으로 검증했을 떼 list
val_accuracy_list = [] # test set으로 검증했을 때 accuracy(정확도) - 전체 중 맞은 비율
start = time.time()
# Train
for epoch in range(N_EPOCH):
###########################
# Train
###########################
model.train() # 모델을 train 모드로 변경
train_loss = 0.0 # 현재 epoch의 학습 결과 loss를 저장할 변수
# 배치 단위로 학습
for X_train, y_train in train_loader: #batch 단위(input, output) 튜플로 반환
# 1. X, y를 device로 옯긴다(model, X, y는 같은 device상에 위치해야 한다)
X_train, y_train = X_train.to(device), y_train.to(device)
# 2. 추론
pred = model(X_train)
#3. Loss 계산
loss = loss_fn(pred, y_train) # args 순서: (모델 예측값, 정답)
#4. 모델의 파라미터 업데이트(최적화)
## 1. vkfkalxjdml gradient 값들을 초기화
optimizer.zero_grad()
## 2. gradient 계산 ===> 계산 결과는 파라미터.grad 속성에 저장
loss.backward()
## 3. vkfkalxj(weight, bias) 업데이트( 파라미터 = 학습률*grad)
optimizer.step()
##### 현재 batch의 loss값을 train_loss 변수에 누적
train_loss += loss.item() # Tensor -> 파이썬 값
# 1epoch 학습 종료
# epoch의 평균 loss를 계산해서 리스트에 저장.(train_loss: step별 Loss를 누적)
train_loss_list.append(train_loss/len(train_loader)) # step수 나눔
#######################
# validate(검증) = test set(학습할 떄 사용하지 않았던 데이터 셋)
#######################
model.eval() # 모델을 검증(편가) 모드로 전환
## 현재 epoch에 대한 검증 결과(loss, accuracy)를 저장할 변수
val_loss = 0.0
val_acc = 0.0
### 모델 추정을 위한 연산 - forward propagation
### 검증/편가/ 서비스 -> gradient 계산이 필요없다 => 도함수를 계싼할 필요가 없다
with torch.no_grad():
## batch 단위로 검증
for X_val, y_val in test_loader:
# 1. device 로 옮기기
X_val, y_val = X_val.to(device), y_val.to(device)
#2. 모델을 이용해 추론
pred_val = model(X_val)
# 3. 검증
## 1. loss 계산 + val_loss에 누적
val_loss += val_loss + loss_fn(pred_val, y_val).item() # loss_fn(pred_val, y_val).는 256개에 대한 평균
## 2. 정확도(accuracy): 맞은 것 개수/ 전체 개수
val_acc = val_acc + torch.sum(pred_val.argmax(axis=-1) == y_val).item()
# test set 전체에 대한 검증이 완료 => 현 epoch에 대한 검증 완료
## val_loss, val_acc 값을 리스트에 저장
val_loss_list.append(val_loss / len(test_loader)) # loss는 step 수 나눔
val_accuracy_list.append(val_acc / len(test_loader.dataset)) # 전체 데이터 개수로 나눈다 - len(test_loader)는 step 수를 반환함으로 dataset를 뒤에 붙여줌
## 현재 epoch train 결과를 출력
print(f"[{epoch+1:2d}/{N_EPOCH:2d}] Train Loss: {train_loss_list[-1]}, Val Loss: {val_loss_list[-1]}, Val Accuracy: {val_accuracy_list[-1]}")
end = time.time()
print(f"학습에 걸린 시간: {end-start}초")
실행 시 총 20번의 train loss, Val loss, Val Accuracy 의 값을 보여준다.
현재 실행 시 실행중이던 것이 강제 종료되는 현상 발생
해결 필요
save_path = os.path.join(MODEL_SAVE_PATH, "mnist")
os.makedirs(save_path, exist_ok=True)
save_file_path = os.path.join(save_path, "mnist_mlp.pth")
print(save_file_path)
path 저장
torch.save(model, save_file_path)
저장한 모델 불러오기
load_model = torch.load(save_file_path)
load_model
from torchinfo import summary
summary(load_model, (BATCH_SIZE, 1, 28, 28))
# device 로 옮기기
load_model = load_model.to(device)
# 평가모드 전환
load_model.eval()
test_loss, test_acc = 0.0, 0.0
with torch.no_grad():
for X, y in test_loader:
# device옮기기
X, y = X.to(device), y.to(device)
# 추정
pred = load_model(X)
# 평가 - loss, accuracy
test_loss += loss_fn(pred, y)
test_acc += torch.sum(pred.argmax(axis =-1) == y).item()
test_loss /= len(test_loader) # step수로 나누기
test_acc /= len(test_loader.dataset) # 총 데이터수로 나누기
print(f"Test loss: {test_loss}, Test accuracy: {test_acc}")
output: Test loss: 0.08572249859571457, Test accuracy: 0.9767
대략 98% 정도의 정확성을 보여줌으로 이 모델은 사용 가능하다.
pip install opencv-contrib-python
설치 후
파일 경로를 읽어주고
from glob import glob
import cv2
glob으로 가져온다
img_file_list = glob("test_img/num/*.png")
img_file_list
실행시 총 10개의 파일이 존재함을 볼 수 있다
file_cnt = len(img_file_list)
print("개수: ", file_cnt)
input_tensor = torch.zeros((file_cnt, 28, 28))
for i in range(file_cnt):
# image 읽기
test_img = cv2.imread(img_file_list[i], cv2.IMREAD_GRAYSCALE)
# 28 x 28 리사이즈
test_img = cv2.resize(test_img, (28, 28))
# ndarray ->torch.Tensor + 정규화 (0~1) => input_tensor에 추가
input_tensor[i] = transforms.ToTensor()(test_img)
ndarray를 tensor type으로 바꿔주고 0과 1사이로 만들어 준다
print(type(input_tensor))
print(input_tensor.shape)
print(input_tensor.min(), input_tensor.max())
추론하여 확률값을 찾는다
load_model.to(device)
load_model.eval()
input_tensor = input_tensor.to(device)
pred_new = load_model(input_tensor)
# 모델이 예측한 값을 확률로 변환 => softmax함수
pred_proba = nn.Softmax(dim=-1)(pred_new)
print(pred_new.shape, pred_proba.shape)
pred_new
위의 pred_new
와 아래의 pred_proba
를 보면 바뀐 것을 볼 수 있다.
pred_proba
# pred-proba.sum(axis=-1)
pred_label = pred_proba.argmax(dim=-1)
pred_label
pred_label_proba = pred_proba.max(dim=-1).values
pred_label_proba
실행 시 각 이미지에 대한 추론 내용과 확룰을 볼 수 있다.
plt.figure(figsize = (15, 7))
for i in range(file_cnt):
plt.subplot(2, 5, i+1) #파일이 10개니까
plt.imshow(input_tensor[i].to("cpu").numpy(), cmap="gray") #device를 옮겨서 cpu인지 gpu인지 모르지만 cpu를 사용해야하기 때문에 cppu로 이동
plt.title(f"{pred_label[i].item()} - {pred_label_proba[i].item():.2f}")
plt.tight_layout()
plt.show()