학습 알고리즘 구현하기

Benedictus Park·2022년 12월 15일
0
post-thumbnail

1. 학습 알고리즘 구현

1-1. 학습 알고리즘 구현 과정

단계별로 나누어 설명하자면, 아래와 같다.

사전 준비) 훈련용, 시험용 데이터와 그 레이블을 준비한다.
1단계) 훈련 데이터 중 일부를 무작위로 가져온다.(미니 배치)
2단계) 각 가중치 매개변수의 수치 미분(기울기)을 구한다.
3단계) 기울기 정보를 활용해 가중치 매개 변수를 적절한 방향으로 갱신한다.
4단계) 1~3단계를 (정해진 에포크 * 배치 사이즈)만큼 반복한다.

모든 훈련 데이터를 한 번 사용한 것을 1 에포크(Epoch)라고 한다. 미니 배치를 사용하는 경우, 예를 들어 batch size가 100이고 훈련 데이터가 10000개라면, 미니 배치 학습을 100번 하면 1 에포크가 된다. 이 에포크 값은 Hyper Parameter중 하나로, 너무 작으면 학습이 제대로 이뤄지지 않고, 너무 크면 과적합(Overfitting)이 발생할 가능성이 크다. 그러므로 시험 데이터로 신경망의 범용적인 성능을 평가하며 적절한 값을 사용하는 것이 좋다.

어쨌든 이것이 신경망 학습이 이뤄지는 순서다. 우린 경사 하강법으로 매개 변수를 갱신할 것인데, 이때 데이터를 미니 배치로, 즉 무작위로 뽑아오기 때문에 이 방법을 확률적 경사 하강법(Stochastic Gradient Descent)이라고 부른다.

재사용 가능성이 있는 코드는 common 디렉터리 내부에 넣어서 패키지를 만들어버릴 예정이다. 따라서 디렉터리 구조는 아래와 같다.

2-Layer-NeuralNet/
2-Layer-NeuralNet/Two_Layer_NeuralNet.py
2-Layer-NeuralNet/main.py
common/
common/__init__.py
common/MNIST_Loader.py
common/Activate_Functions.py
common/Loss_Functions.py
common/Calc_Gradient.py
common/Optimizers.py

[Github Link]

일단 2-Layer-NeuralNet(MNIST)/ 디렉터리와 common/ 디렉터리를 만든 후 common/ 디렉터리에 __init__.py 파일을 생성하고, 2층 신경망으로 MNIST 손글씨 숫자를 학습/인식하는 신경망 구현을 시작해 보자!

알아둘 것) MacBook Pro 2019 16inch, i7 모델 기준 학습 소요 시간이 대략 45시간 정도다. 그러니 '직접 구현을 해본다'에 의미를 뒀으면 좋겠다. 다음 포스트에서 언급할 오차 역전파법을 이용하면 속도가 비약적으로 향상되니, 이번 포스트에서 만든 신경망을 가지고 컴퓨터를 2~3일 내내 혹사시키지는 말자. 정신 건강에 해롭다.

1-2. MNIST Dataset 다운로드 & 로딩

MNIST Dataset을 다운로드 받고, 이것을 신경망 학습에 적절한 형태로 전처리하여 반환하는 모듈을 구현할 것이다. common/ 디렉터리 하위에 MNIST_Loader.py 파일을 만들고 거기서 작업을 시작하면 된다.

필요한 import는 아래와 같다.

import os
import time
import gzip
import pickle
import numpy as np
import urllib.request as req

먼저, 다운로드 시작, 파일 쓰기 등 동작이 수행될 때 현재 진행중인 동작에 대한 로그를 Console에 찍어주는 _log 함수를 만들겠다.

def _log(msg):
    print("[Log][{0}]".format(time.strftime("%H:%m:%S")) + msg)

함수명 앞에 언더바(_) 하나를 붙이면 '내부용 함수'라는 의미이다. 내부용 함수는 동일한 모듈에서만 호출 가능하다.

이번에는 MNIST Dataset을 다운로드하는 함수를 구현하겠다.

def _download_dataset(fname, save_name):
    _log("[_download_dataset()] Starting Dataset Download...({0})".format(fname))
    fpath = "MNIST_Raw/"

    if not os.path.isdir(fpath):
        os.mkdir(fpath)
    else:
        if os.path.isfile(fpath + save_name):
            _log("[_download_dataset()] {0} Already Exists. Download Stopped.".format(fname))
            return
    req.urlretrieve("http://yann.lecun.com/exdb/mnist/" + fname, fpath + save_name)
    _log("[_download_dataset()] Done!")

다운로드 받을 파일의 이름과 다운로드 된 파일을 저장할 이름을 함수의 인자로 주면, 해당 파일을 다운로드한 후 지정된 이름으로 저장을 한다.

파일 다운로드는 urllib.request 모듈에서 제공하는 urlretrieve() 메서드를 사용했다. 다운로드할 파일의 URL과, 저장할 경로&파일명을 인자로 넘겨주면 파일을 다운로드 하고 지정된 경로에 저장한다.

아직 전처리되지 않은 Dataset 파일은 MNIST_Raw/ 디렉터리에 저장한다. 만약 MNIST_Raw/ 디렉터리가 없다면 파일을 다운로드 받기에 앞서 MNIST_Raw/ 디렉터리를 만든다. 또, 이미 파일을 다운로드 받은 적이 있으면 파일을 다운로드하지 않고 함수를 종료한다.


이번에는 어떤 객체를 파일에 저장하는 함수, 파일에 저장된 객체를 읽어오는 함수와 MNIST Dataset 파일로부터 데이터를 읽어오는 함수, Label을 읽어오는 함수를 작성할 것이다.

def _load_pkl(fname):
    _log("[_load_pkl()] Loading data from {0}.pkl...".format(fname))
    data = None
    with open("MNIST_pkl/" + fname + ".pkl", "rb") as fp:
        data = pickle.load(fp)
    _log("[_load_pkl()] Done!")
    return data

def _dump_pkl(fname, data):
    _log("[_dump_pkl()] Output data to {0}.pkl...".format(fname))
    with open("MNIST_pkl/" + fname + ".pkl", "wb") as fp:
        pickle.dump(data, fp)
    _log("[_dump_pkl()] Done!")
    
def _load_imgs(fname):
    data = None
    if os.path.isfile("MNIST_pkl/" + fname + ".pkl"):
        _log("[_load_imgs()] Loading images from {0}.pkl".format(fname))
        data = _load_pkl(fname)
    else:
        _log("[_load_imgs()] Loading images from {0}".format(fname))
        with gzip.open("MNIST_Raw/" + fname, "rb") as fp:
            data = np.frombuffer(fp.read(), np.uint8, offset=16)
        
        # row 차원 위치에 -1을 넣으면, 
        # shape가 (numOfData / column) * column이 됨.
        data = data.reshape(-1, 784)
        _dump_pkl(fname, data)

    _log("[_load_imgs()] Done!")
    return data

def _load_labels(fname):
    data = None
    if os.path.isfile("MNIST_pkl/" + fname + ".pkl"):
        _log("[_load_labels()] Loading labels from {0}".format(fname + ".pkl"))
        data = _load_pkl(fname)
    else:
        _log("[_load_labels()] Loading labels from {0}".format(fname))
        with gzip.open("MNIST_Raw/" + fname, "rb") as fp:
            data = np.frombuffer(fp.read(), np.uint8, offset=8)

        _dump_pkl(fname, data)

    _log("[_load_labels()] Done!")
    return data

_load_pkl() 함수와 _dump_pkl() 함수에 관해서는 _load_imgs(), _load_lables() 함수를 먼저 소개한 후 설명하도록 하겠다.

_load_imgs(), _load_lables() 함수에 관해 설명할 것은 크게 두 가지다.

  • gzip.open()에 관하여
  • np.frombuffer()에 관하여
  • np.frombuffer()의 shape

하나씩 설명하겠다. 먼저 gzip.open()에 대해 얘기해 보겠다.

_load_imgs(), _load_labels()에서 파일을 open() 함수로 열지 않고 gzip.open()으로 열었다. Dataset 파일이 .gz 타입 파일이기 때문에 gzip.open()으로 연 것이다. 사실 원래대로라면 .gz타입 파일을 압축 해제하고, 그걸 저장한 뒤에, 다시 그 파일을 open() 함수로 열어서 사용을 해야 했다.

그러나 gzip.open()을 이용해 파일을 열면 압축을 해제하는 과정 없이 바로 .gz 타입 파일로부터 데이터를 읽어올 수 있다. 그래서 gzip.open()을 사용해 ioStream을 생성한 것이다.

아무튼, 이렇게 파일을 열고 fp.read()의 반환을 np.frombuffer()의 첫 번째 인자로 넣어준다. fp.read() 함수는 기본적으로 아무 인자도 넘겨주지 않으면 파일의 처음부터 끝까지를 읽는다. 지금의 경우 Filemode가 "rb"이므로, 지정된 .gz 타입 파일을 전부 읽어 bytes 타입으로 가져온다.


이번에는 np.frombuffer()를 설명하겠다.

np.frombuffer()의 첫째 인자로 fp.read()를 줬다. 즉, Dataset 파일의 전체 Byte가 담긴 Bytes를 인자로 줬다. 두 번째 인자로는 np.uint8을 줬는데, 이것은 첫째 인자로 주어진 Bytes를 8bit(1 byte)씩 읽어서 그걸 unsigned integer형으로 해석해 np.array를 하나씩 채우라는 의미다. 그리고 마지막 offset은 첫 번째 인자인 Bytes 배열의 offset byte 지점부터 데이터를 읽으라는 의미다.

잠깐, 왜 처음부터 읽지 않고 Image Data는 16byte 지점부터, Label은 8byte 지점부터 읽으라고 하는 걸까?

답은 MNIST Dataset 파일의 구조에 있다. MNIST Dataset 중 이미지가 담긴 파일은 아래와 같은 구조다.

32bit Integer: Magic Number
32bit Integer: Number of Images
32bit Integer: Number of rows
32bit Integer: Number of Columns
*) 이후부터 끝까지 unsigned byte형 이미지 픽셀 값이 들어있음.
**) 28px * 28px = 784byte당 사진 한 장씩

32 * 4bit, 즉, 16byte 지점 이후부터 이미지 데이터가 들어있는 것이다. 그래서 _load_imgs 내부 np.frombuffer()의 offset에 16이란 값을 준 것이다. 또, 이미지의 각 픽셀이 unsigned byte형, 즉 uint8형으로 저장되어 있기 때문에 np.frombuffer()의 두 번째 인자로 np.uint8를 준 것이다.

MNIST Dataset 중 레이블이 담긴 파일은 구조가 아래와 같다.

32bit Integer: Magic Number
32bit Integer: Number of Items
*) 이후부터 끝까지 unsigned byte형 레이블 값이 들어있음.

추가로, Image Dataset과 Label Dataset은 784byte - 1byte씩 짝지어지는 관계라고 할 수 있다.

아무튼, 이제는 왜 np.uint8을 두 번째 인자로 줘서 8bit씩 unsigned integer형으로 읽으라고 했는지, 왜 offset 값을 위 코드처럼 설정했는지 이해가 갔을 것이다.


np.frombuffer()는 1차원 배열을 반환한다. 그렇기 때문에 읽어온 데이터를 사용하기 위해서는 reshape를 거쳐야 한다.

일단, _load_labels()에서의 np.frombuffer()의 반환값은 reshape를 할 필요가 없다. 그저 한 요소가 하나의 Label이 되기 때문이다. 하지만 _load_imgs()에서는 1차원 배열의 요소를 784개씩 묶어야 각각 하나의 이미지가 되어 신경망에 학습시킬 수 있다. 그래서 파일을 읽은 후에 아래 코드를 실행하는 것이다.

data = data.reshape(-1, 784)

이렇게 하면 맨 앞에서부터 784열(Column)의 요소씩 묶인 배열 여러 개가 들어있는 2차원 배열이 나온다. 그렇게 되면 data[0], data[1]과 같은 방식으로 각 이미지가 1차원으로 평탄화되어 들어있는 np.array를 가져올 수 있다.

단, 현재의 이미지는 평탄화 되어 있는 상태이므로, 이미지를 matplotlib으로 표시하고 싶다면 data[index].reshape(28, 28)와 같은 방식으로 reshape를 해 주어야 한다.


이제는 _load_pkl() 함수와 _dump_pkl() 함수의 사용에 대해 설명하겠다.

본격적인 설명에 앞서 Pickle을 소개하자면, Pickle은 Python object를 파일에 저장할 수 있도록 해주는 모듈이다. 아래처럼 사용하면 객체를 파일에 저장하거나, 읽어올 수 있다.

import pickle

data = None
obj = object()

with open("data.pkl", "wb") as fp:
    pickle.dump(obj, fp)
    
with open("data.pkl", "rb") as fp:
	data = pickle.load(fp)

위처럼, pickle.dump()의 첫 번째 인자로 어떤 객체를, 두 번째 인자로 File Stream을 주면 Python 객체가 해당 파일에 저장된다.

그리고 pickle.load()에 File Stream을 주면 해당 파일에 저장되어 있던 Python 객체가 반환된다.

필자는 프로그램 첫 실행 시에만 .gz 타입으로 되어 있는 MNIST Dataset 파일을 1차원 np.array의 형태로 읽어오고 신경망을 학습시키기에 적절한 shape로 가공한 결과를 그대로 파일에 저장했다가, 두 번째 실행부터는 파일에 저장되어 있던 np.array를 곧바로 꺼내와 반환하기 위해 Pickle을 사용했다.


_load_pkl() 함수와 _dump_pkl() 함수는 np.array 형태의 데이터를 파일에 저장하거나, 파일로부터 읽어오는 부분을 분리한 함수다.

_load_imgs(), _load_lables() 함수를 잘 보면, 아래와 같은 코드가 있다.

if os.path.isfile("MNIST_pkl/" + fname + ".pkl"):
        _log("[_load_labels()] Loading labels from {0}".format(fname + ".pkl"))
        data = _load_pkl(fname)

이것은 MNIST_pkl/ 디렉터리에 [fname].pkl 파일이 존재한다면, MNIST Dataset의 .gz 타입 파일을 읽는 대신 MNIST_pkl/[fname].pkl 파일에 저장된 np.array 객체를 _load_pkl 함수로써 data 변수에 할당하는 코드이다.

만약 MNIST_pkl/ 디렉터리에 [fname].pkl 파일이 존재하지 않으면, 그 때는 MNIST Dataset의 .gz 타입 파일을 읽는다. 그리고 np.array 타입의 label과, reshape를 통해 784열(row)씩 묶인 np.array들이 들어있는 2차원 np.array 타입의 데이터를 _dump_pkl 함수로써 파일에 저장한다.

아무래도 매번 실행시마다 압축이 되어 있는 MNIST Dataset의 .gz 타입 파일을 읽는 것보다는, 첫 번째 실행 시 Pickle을 이용해 np.array를 그대로 파일에 저장했다가 2차 실행시부터는 해당 파일을 바로 읽어오는 것이 속도가 빠를 것 같아 이런 식으로 구현을 했다.

def _load_pkl(fname):
    _log("[_load_pkl()] Loading data from {0}.pkl...".format(fname))
    data = None
    with open("MNIST_pkl/" + fname + ".pkl", "rb") as fp:
        data = pickle.load(fp)
    _log("[_load_pkl()] Done!")
    return data

def _dump_pkl(fname, data):
    _log("[_dump_pkl()] Output data to {0}.pkl...".format(fname))
    with open("MNIST_pkl/" + fname + ".pkl", "wb") as fp:
        pickle.dump(data, fp)
    _log("[_dump_pkl()] Done!")

위 코드의 _load_pkl() 함수는 MNIST_pkl 디렉터리에서 fname에 해당하는 파일에 저장되어 있던 객체를 불러오는 함수이고, _dump_pkl() 함수는 MNIST_pkl 디렉터리에 fname이라는 이름의 파일을 만들고 거기에 data를 저장하는 함수이다.


이번엔 _init_MNIST() 함수를 소개하겠다.

def _init_MNIST():
    s = time.time()
    if not os.path.isdir("MNIST_pkl/"):
        os.mkdir("MNIST_pkl/")

    _log("[_init_MNIST()] Downloading MNIST Dataset...")
    _download_dataset("train-images-idx3-ubyte.gz", "train_img.gz")
    _download_dataset("train-labels-idx1-ubyte.gz", "train_lbl.gz")
    _download_dataset("t10k-images-idx3-ubyte.gz", "test_img.gz")
    _download_dataset("t10k-labels-idx1-ubyte.gz", "test_lbl.gz")
    _log("[init_MNIST()] Dataset Downloaded.")

    _log("[_init_MNIST()] Loading Dataset...")
    train_imgs = _load_imgs("train_img.gz")
    train_lbls = _load_labels("train_lbl.gz")

    test_imgs = _load_imgs("test_img.gz")
    test_lbls = _load_labels("test_lbl.gz")
    _log("[_init_MNIST()] Dataset Loaded.")
    _log("[_init_MNIST()] Elapsed Time: {0}s".format(time.time() - s))

    return (train_imgs, train_lbls), (test_imgs, test_lbls)

_init_MNIST() 함수는 _download_dataset() 함수를 이용해 모든 MNIST Dataset 파일을 내려받는다.

이어서 _load_imgs(), _load_labels() 함수를 이용해 학습용 이미지(train_imgs)와 학습용 Label(train_lbls), 시험용 이미지(test_imgs)와 시험용 Label(test_lbls)을 .gz 파일로부터, 혹은 Pickle을 통해 np.array가 저장된 파일로부터 읽어온다.

그리고 time.time() 함수를 이용해 데이터 다운로드 및 로딩에 걸린 시간을 측정, 출력한다.

마지막으로는 학습 및 시험에 필요한 데이터가 저장된 train_imgs, train_lbls, test_imgs, test_lbls를 (train_imgs, train_lbls), (test_imgs, test_lbls) 형태로 반환한다. 이게 끝이다. 딱히 이해가 필요한 부분은 없을 것이다.


마지막으로 소개할 함수는 _one_hot_encoder() 함수와 load_mnist() 함수다.

def _one_hot_encoder(x):
    tmp = np.zeros((x.size, 10))

    for i in range(x.shape[0]):
        tmp[i][x[i]] = 1
    
    return tmp

def load_MNIST(normalize=True, flatten=True, one_hot_encoding=True):
    (train_imgs, train_lbls), (test_imgs, test_lbls) = _init_MNIST()

    if normalize:
        train_imgs = train_imgs / 255
        test_imgs = test_imgs / 255

    if not flatten:
        train_imgs = train_imgs.reshape(-1, 1, 28, 28)
        test_imgs = test_imgs.reshape(-1, 1, 28, 28)

    if one_hot_encoding:
        train_lbls = _one_hot_encoder(train_lbls)
        test_lbls = _one_hot_encoder(test_lbls)

    return (train_imgs, train_lbls), (test_imgs, test_lbls)

_init_MNIST() 함수의 Return을 그대로 언패킹해서 변수에 담고, 인자로 주어진 normalize가 True이면 학습용, 시험용 이미지 데이터를 0~1 사이의 수로 정규화한다.

인자 중 flatten이 True인 경우는 특별한 처리를 하지 않고, 만약 False가 넘어오면 이미지 데이터를 -1, 1, 28, 28 형상으로 만든다.

마지막 인자인 *one_hot_encoding이 True이면 _one_hot_encoder() 함수를 이용해 1차원 np.array이던 train_lbls, test_lbls에 10열짜리 one-hot label들이 들어있는 2차원 np.array를 재할당 한다.

*) one-hot encoding은 분류할 클래스의 수에 해당하는 길이의 배열을 만들고, 정답에 해당하는 인덱스는 1로, 나머지는 0으로 채우는 것을 말한다. 예를 들어 손글씨 숫자 인식 모델이고, img[0]에 대한 정답이 3이라면, [0, 0, 0, 1, 0, 0, 0, 0, 0, 0]을 label로 사용하는 것이다.
**) MNIST Dataset의 레이블은 단일 정수로 구성되어 있기 때문에, 학습에 사용하려면 레이블을 one-hot encoding하는 것이 좋다. 만약 단일 정수를 그대로 레이블로 사용한다면 신경망의 예측 결과가 소수로 나올 것이다. 그런데, 그렇게 되면 그것을 반올림한 값을 정답으로 인정해야 할지, 반내림한 값을 정답으로 인정해야 할지에 관한 문제가 생긴다.

1-3. 활성화 함수

신경망 학습에는 활성화 함수가 필요하다. 그러므로, 이번에는 각종 활성화 함수들이 들어있는 모듈을 만들 것이다. 모듈의 이름은 Activation_Functions.py이다. 역시 common/ 디렉터리 하위에 만들고 작업을 시작하면 된다.

필요한 import 구문은 아래와 같다.

import numpy as np

Sigmoid와 Softmax 활성화 함수는 이전에도 설명을 했다. 그러나, 우리는 미니배치 학습을 사용할 것이기 때문에, softmax() 활성화 함수에 변경점이 약간 있다.

def sigmoid(x):
    return 1 / (1 + np.exp(-x))

def softmax(x):
    if x.ndim == 2:
        x = x.T - np.max(x, axis=0)
        return (np.exp(x) / np.sum(np.exp(x), axis=0)).T

    max_val = np.max(x)
    exp_x = np.exp(x - max_val)
    exp_x_sum = np.sum(exp_x - max_val)

    return exp_x / exp_x_sum

if문 블록을 제외한 나머지 부분은 이전과 같다. if문 블록은 미니배치 형태의 데이터가 들어왔을 때 수행하는 부분이다.

x에 np.array가 담겨있을 때, x.T를 하면 x의 *전치행렬이 반환된다. 즉 if문 내의 첫 번째 줄 코드인 x = x.T는 x를 전치행렬로 바꾸는 부분이다. 왜 x를 가지고 그대로 연산하지 않고 굳이 전치행렬로 바꿔서 사용하는 걸까?

*) 전치행렬: (i, j)번째 원소를 (j, i)에 배치한 행렬. aTa^Taa의 전치행렬이라 했을 때,

a=[abcdef]a=\left[\begin{matrix} a&b&c\\ d&e&f \end{matrix} \right]
aT=[adbecf]a^T=\left[\begin{matrix} a&d\\ b&e\\ c&f \end{matrix} \right]
(aT)T=[abcdef](a^T)^T=\left[\begin{matrix} a&b&c\\ d&e&f \end{matrix} \right]

활성화 함수 구현의 용이성에 있어 전치행렬을 사용하는 편이 훨씬 낫기 때문이다. 아래 인터프리터 입출력을 보자.

>>> import numpy as np
>>> a = np.array([[1, 2, 3], [4, 5, 6]])
>>> a
array([[1, 2, 3],
       [4, 5, 6]])
>>> np.max(a, axis=0)
array([4, 5, 6])

[[1, 2, 3], [4, 5, 6]]배열이 담긴 a 변수를 softmax() 함수에 넘겨 각 row에 대한 softmax 출력을 구해야 한다고 생각해보자. 그럼 a[0] - np.max(a[0]), a[1] - np.max(a[1])을 해줘야 하는데... 이 방법도 괜찮긴 하지만 코드 길이가 좀 더 길어진다.

그럼, 그냥 a - np.max(a, axis=0)을 하면 어떨까?

>>> np.max(a, axis=0)
array([4, 5, 6])
>>> a - np.max(a, axis=0)
array([[-3, -3, -3],
       [ 0,  0,  0]])

아이고마... np.max(a, axis=0)이 브로드캐스트 되어

[123456][456456]\left[\begin{matrix} 1&2&3\\ 4&5&6 \end{matrix} \right]-\left[\begin{matrix} 4&5&6\\ 4&5&6 \end{matrix} \right]

이 되어버린다. 애당초 a에서 [4, 5, 6]을 뺀다는 것 자체가 말이 안 되는 짓이긴 하다. a[0]에서는 a[0]의 요소중 최댓값인 3을 빼고, a[1]에서는 a[1]의 요소중 최댓값인 6을 빼는 게 올바른 계산법이기 때문이다. 잠깐, 그러면 np.max(a, axis=1)을 쓰면 안 되냐고? 된다!

>>> a - np.max(a, axis=1)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
ValueError: operands could not be broadcast together with shapes (2,3) (2,)

근데 이렇게 하면 안 되고...

>>> a - np.max(a, axis=1).reshape(2, 1)
array([[-2, -1,  0],
       [-2, -1,  0]])

이렇게 해주면 된다. 근데 전치행렬을 쓰면, 아래처럼 흘러간다.

>>> a = a.T
>>> a
array([[1, 4],
       [2, 5],
       [3, 6]])
>>> np.max(a, axis=0)
array([3, 6])
>>> a - np.max(a, axis=0)
array([[-2, -2],
       [-1, -1],
       [ 0,  0]])
>>> np.exp(a)
array([[  2.71828183,  54.59815003],
       [  7.3890561 , 148.4131591 ],
       [ 20.08553692, 403.42879349]])
>>> np.sum(np.exp(a))
636.6329774790333
>>> y = np.exp(a) / np.sum(np.exp(a))
>>> y
array([[0.00426978, 0.08576079],
       [0.01160646, 0.23312201],
       [0.03154963, 0.63369132]])
>>> y.T
array([[0.00426978, 0.01160646, 0.03154963],
       [0.08576079, 0.23312201, 0.63369132]])

쨘! 아주 깔끔하게 계산이 된다.
1) a를 전치행렬로 바꾼다.
2) 그 후 np.max(a, axis=0)을 하면 각 열의 최댓값이 나오고,
3) 그걸 a에서 바로 뺼 수 있으니 a에서 np.max(a, axis=0)을 빼버리고
4) 그 결과를 가지고 np.exp(a) / np.sum(np.exp(a))를 수행하고
5) a.T를 출력하면 a=(aT)Ta=(a^T)^T이므로 올바른 Cross-Entropy Error가 담긴 배열이 나온다!

그러니까... Cross-Entropy를 구하는 방식은 크게 세 가지가 있다.

1번 방법) np.max(x, axis=1)을 reshape하여 x에 적용, np.sum(np.exp(x), axis=1)을 reshape하여 np.exp(x)를 나눠줌

if x.ndim == 2:
    x -= np.max(x, axis=1).reshape(x.shape[0], 1)
    return np.exp(x) / np.sum(np.exp(x), axis=1).reshape(x.shape[0], 1)

모든 np.max(), np.sum()에 axis=1을 마지막 인수로 주고 reshape(x.shape[0], 1)까지 해줘야 해서 상당히... 스파게티 코드 같은 느낌이 난다.

2번 방법) 전치행렬을 이용한 방법

if x.ndim == 2:
    x = x.T - np.max(x, axis=0)
    return (np.exp(x) / np.sum(np.exp(x), axis=0)).T

shape를 참조할 일도 없고, reshape를 사용할 일도 없다. 훨씬 깔끔하고, '아하! 미니배치 데이터가 들어올 경우를 대비한 거구나!' 하는 것을 바로 느낄 수 있다. 그래서 나는 개인적으로 2번 방법이 더 나은 것 같다. 물론 1번 방법이나 2번 방법이나 작동은 잘 되기 때문에 취향에 맞게 골라서 사용하면 되겠다.

1-4. 손실 함수

신경망을 최적화하기 위해서는 손실 함수가 필요하다. common/ 디렉터리 하위에 Loss_Functions.py 파일을 만들고 구현을 시작하자.

import 해야 하는 모듈은 아래와 같다.

import numpy as np

이후 평균제곱합 오차를(Mean Squared Error) 구하는 함수와 교차 엔트로피 오차(Cross-Entropy Error)를 구하는 함수를 구현하자. 먼저 함수의 인자로 던져줄 y와 t를 만들자.

def mean_squared_error(y, t):
    if y.ndim == 1:
        t = t.reshape(1, t.size)
        y = y.reshape(1, y.size)

    return np.sum((y - t) ** 2) / y.shape[0]

def cross_entropy_error(y, t):
    if y.ndim == 1:
        t = t.reshape(1, t.size)
        y = y.reshape(1, y.size)

    return -np.sum(t * np.log(y + 1e-7)) / y.shape[0]

mean_squared_error() 함수, cross_entropy_error() 함수는 예측값, 실측값을 순서대로 받아 오차를 구한다. 여기에서 아래 코드가 의미하는 바는 무엇일까?

if y.ndim == 1:
    t = t.reshape(1, t.size)
    y = y.reshape(1, y.size)

일단 아래 부분에 집중을 해 보자.

return -np.sum(t * np.log(y + 1e-7)) / y.shape[0]

이 부분 중에서도 아래 부분을 보자.

-np.sum(t * np.log(y + 1e-7))

여기까지는 괜찮다. y에 np.array([0.1, 0.4, 0.5])가 들어가고 t에는 np.array([0, 1, 0])이 들어가든, y에 np.array([[0.1, 0.4, 0.5]])가 들어가고, t에는 np.array([[0, 1, 0], [0, 0, 1]])이 들어가든, 작동은 된다.

하지만 Cross-Entropy의 평균을 구하기 위해 -np.sum(t * np.log(y + 1e-7))을 y.shape[0]으로 나누는 과정에서 문제가 생긴다. 1차원 배열이 입력으로 주어져버리면 y가 하나의 예측값을 나타내는 배열임에도 y.shape[0]이 y의 요소 개수가 되어 이상한 평균값을 내놓아 버린다.

다시 y에 np.array([0.1, 0.4, 0.5])가 들어가고 t에는 np.array([0, 1, 0])이 들어가는 경우를 생각해보자. y에는 하나의 예측값이, t에는 하나의 실측값이 들어갔다. 그렇기 때문에 -np.sum(t * np.log(y + 1e-7))를 1로 나눠줘야 하는데, y.shape[0]은 3이므로 3으로 나누게 된다. 이것은 올바른 결과가 아니다.

그렇기 때문에 y.ndim이 1인 경우, 즉 하나의 예측값과 실측값이 들어가는 경우에는 그것들을 reshape해서 2차원 배열로 만들어주어야 정상적인 결과가 나온다. 그래서 y.ndim 값에 따라 y, t를 reshape하는 코드가 들어간 것이다.

1-5. 편미분을 이용한 기울기 계산

가중치와 편향의 작은 변화에 손실 함수의 결과가 어떻게 변하는지 계산하기 위해서는 가중치와 편향의 각 요소에 대해 편미분을 수행하여 기울기 벡터를 얻어내야 한다. 필자는 함수와 np.array를 받아 np.array의 각 요소에 대한 함수의 편미분을 구한 후, input np.array와 동일한 shape의 기울기 벡터를 내놓는 calc_gradient() 함수를 구현해 사용할 것이다. common/Calc_Gradient.py 파일을 생성하고 아래 코드를 입력하자.

import numpy as np

def calc_gradient(f, x):
    h = 1e-4
    grad = np.zeros_like(x)

    it = np.nditer(x, flags=['multi_index'], op_flags=['readwrite'])
    
    while not it.finished:
        i = it.multi_index
        tmp = x[i]

        # f(x+h)
        x[i] = tmp + h
        y1 = f(x)

        # f(x-h)
        x[i] = tmp - h
        y2 = f(x)

        x[i] = tmp
        # (f(x+h) - f(x-h)) / 2h
        grad[i] = (y1 - y2) / (2 * h)

        it.iternext()

    return grad

수치 미분을 수행하는 함수다.

1) 기울기 벡터의 형상은 x의 형상과 같기 때문에 np.zeros_like(x)를 사용해 모든 요소가 0으로 채워져 있으며, x와 형상이 같은 np.array를 만들어 grad 변수에 저장한다.
2) x의 요소 중 하나(x[i])를 택해 tmp 변수에 저장한다.
3) x[i]를 tmp + h로 변경한 후, f(x)의 결과값을 y1에 저장한다. 이 값은 f(x + h)와 같다.
4) x[i]를 tmp - h로 변경한 후, f(x)의 결과값을 y2에 저장한다. 이 값은 f(x - h)와 같다.
5) x[i] = tmp를 수행하여 x[i]의 값을 복원한다.
6) 이후 (y1 - y2) / (2 * h) 연산하여 grad 변수에 저장한다.
7) 2~6 과정을 x의 모든 요소에 대해 반복한다.

제대로 된 미분값을 계산하려면 극한을 사용해야 하지만, 극한은 인간만이 계산할 수 있는 추상적인 개념이라고 언급했었다. 해석적 미분이라면 아래 두 식중 하나를 선택하여 미분을 수행해야 한다.

limh0f(x+h)f(x)h\lim_{h \to 0}{f(x+h)-f(x) \over h}
limh0f(x+h)f(xh)2h\lim_{h \to 0}{f(x+h)-f(x-h) \over 2h}

해석적 미분을 수행하면 위 식 둘 다 올바른 결과를 내놓지만, 수치 미분을 사용하면 첫 번째 식이 두 번째 식보다 해석적 미분값과의 오차가 심하다. 이전에 언급했듯 첫 번째 식은 전방 차분을 이용한 미분으로, 미분을 구하고자 하는 지점의 전방(양의 방향)과 미분을 구하고자 하는 지점의 함숫값을 잇는 기울기가 나오기 때문이다. 반면 두 번째 식은 중심(중앙) 차분을 이용한 방식으로 미분을 구하고자 하는 지점에서 +h+h만큼 떨어진 지점과 h-h만큼 떨어진 지점을 잇는 기울기가 결과로 나온다. 때문에 진정한 미분값에 조금 더 가깝다. 그래서 이번에 구현한 수치 미분 함수에서는 두 번째 식을 이용했다. 자세한 내용은 미분과 기울기 포스트를 참고하자.

1-6. 2층 신경망 구현

신경망을 하나의 클래스로 구현했다. 2층짜리 신경망이며, 이름은 MNIST_Net이다. 구현 위치는 2-Layer-NeuralNet(MNIST)/Two_Layer_NeuralNet.py이다.

import os, sys
import numpy as np
sys.path.append(os.pardir)
from common.Calc_Gradient import calc_gradient
from common.Loss_Functions import cross_entropy_error
from common.Activate_Functions import sigmoid, softmax

class MNIST_Net:
    def __init__(self, input_size:int, hidden_size:int, output_size:int, weight_init_std=0.01):
        self.model_infos = {}
        self.model_infos['training_time'] = None
        self.model_infos['loss_list'] = None
        self.model_infos['accuracy_list'] = None

        self.params = {}
        self.params['W1'] = weight_init_std * np.random.randn(input_size, hidden_size)
        self.params['b1'] = np.zeros(hidden_size)
        self.params['W2'] = weight_init_std * np.random.randn(hidden_size, output_size)
        self.params['b2'] = np.zeros(output_size)

생성자에서 입력 데이터의 요소 수 input_size, 은닉층의 요소 수 hidden_size, 출력 데이터의 요소 수 output_size를 받아 신경망 구현에 필요한 np.array를 만들어 self.params에 저장한다. 여기서 weight_init_std는 초기 가중치의 표준편차를 조절하는 인자다. weight_init_std의 값이 클수록 표준편차가 커지고, 작을수록 표준편차가 작아진다. 이 값을 어떻게 세팅하냐에 따라 신경망 학습의 성패가 결정된다. 이 신경망에서의 Hyper Parameter중 하나라고 생각하면 된다.


    def predict(self, x):
        W1, b1 = self.params['W1'], self.params['b1']
        W2, b2 = self.params['W2'], self.params['b2']

        L1 = sigmoid(np.dot(x, W1) + b1)
        y = softmax(np.dot(L1, W2) + b2)

        return y

첫 번째로 구현한 메서드는 predict() 메서드다. 입력 데이터를 받아 self.params에 저장된 W1 가중치, b1 편향과 sigmoid 활성화 함수를 이용해 0층에서 1층으로 향하는 데이터를 처리하고, W2 가중치, b2 편향으로 1층에서 2층으로 향하는 데이터를 처리한 후 softmax 활성화 함수로 예측값을 만들어 낸다.


비용 함수를 계산하는 메서드와 정확도를 구하는 메서드도 만들었다.

    def loss(self, x, t):
        y = self.predict(x)
        return CEE(y, t)

    def accuracy(self, x, t):
        y = self.predict(x)

        y = np.argmax(y, axis=1)
        t = np.argmax(t, axis=1)

        return np.sum(y == t) / float(x.shape[0])

loss() 메서드의 첫 번째 인자(x)로는 날 것의 입력(784열의 데이터)이 그대로 들어간다. 두 번째 인자(t)로는 Label이 들어간다. 그러면 x를 self.predict() 메서드에 넘겨 예측값을 구하고, 구해진 예측값과 t에 저장된 실측값을 cross_entropy_error() 함수에 넘겨 오차를 구한다.

accuracy() 메서드 역시 첫 번째 인자(x)로는 날 것의 입력(784열의 데이터)을 받고, 두 번째 인자로는 Label을 받는다. loss() 메서드 때와 동일하게 첫 번째 인자로 주어진 x를 이용해 self.predict() 메서드로 예측값을 구한다. 이 뒤부터는 loss() 메서드와 조금 다른데, y = np.argmax(x), t = np.argmax(t)를 해준다. np.argmax() 함수는 입력으로 들어온 배열에서 가장 큰 요소가 들어있는 요소의 인덱스를 반환한다. 아래 인터프리터 실행 결과를 보자.

>>> import numpy as np
>>> a = np.array([0.2, 0.1, 0.7])
>>> b = np.array([0.15, 0.57, 0.28])
>>> np.argmax(a)
2
>>> np.argmax(b)
1

우린 출력층의 활성화 함수로 softmax() 함수를 이용했다. 따라서, 예측값 배열 요소들의 전체 합은 1이며, 배열의 각 요소에 담긴 값은 '입력이 각 요소의 인덱스에 해당하는 Class일 확률'으로 해석된다. 그렇기에 np.argmax(y)를 하면 입력에 대한 결과일 확률이 가장 높은 Class의 인덱스가 나온다.

우리는 입력이 0일 확률은 0번 인덱스에, 1일 확률은 1번 인덱스에... 9일 확률은 9번 인덱스에 저장되도록 코드를 작성했으므로, np.argmax(y)의 결과를 곧바로 '신경망이 예측한 숫자'로 해석할 수 있다. 그래서 y = np.argmax(y), t = np.argmax(t)를 하면 완전한 (정수 형태의)예측 결과와 실측값이 y, t에 각각 저장된다.

마지막 return문에서는 np.sum(y == t) / float(x.shape[0])를 수행하는데, np.sum(y == t)는 배열 전체에 대해 y[i] == t[i]인 경우의 수를 구한다. 아래 인터프리터 입출력 결과를 보면 이해가 쉬울 것이다.

>>> a = np.array([1, 9, 4, 2, 3])
>>> b = np.array([1, 0, 4, 3, 3])
>>> a == b
array([ True, False,  True, False,  True])
>>> np.sum(a == b)
3

a == b를 하면, a, b의 형상과 같은 형상의 배열이 결과로 나온다. 결과 배열의 각 요소에는 a[i] == b[i]인지 비교 연산을 한 결과 Bool 값이 들어 있다. Python에서 True는 1로 해석되고, False는 0으로 해석되므로, np.sum(a == b)를 하면 a[i] == b[i]인 경우가 몇 개나 되는지를 구하는 식이 된다. 마지막으로 그것을 x.shape[0]으로 나누면 정확도가 산출되는 것이다. 전혀 어렵지 않다.


마지막 메서드는 calc_gradient() 메서드다.

    def calc_gradient(self, x, t):
        loss_W = lambda W: self.loss(x, t)

        grads = {}
        grads['W1'] = calc_gradient(loss_W, self.params['W1'])
        grads['b1'] = calc_gradient(loss_W, self.params['b1'])
        grads['W2'] = calc_gradient(loss_W, self.params['W2'])
        grads['b2'] = calc_gradient(loss_W, self.params['b2'])

        return grads

loss_W는 lambda 문법으로 만든 함수의 객체가 들어있다. W를 인자로 받는데 이걸 함수 내부에서 사용하진 않는다. 즉, 인자 W는 더미(dummy)이다. 이 부분은 별로 설명할 게 없다. grads 변수에 빈 딕셔너리를 만들고, 거기에 각 가중치, 편향에 대한 calc_gradient()(common/Calc_Gradient.py에 들어있는 그 함수) 함수 실행 결과를 구해 저장한다.

이렇게 Two_Layer_NeuralNet 클래스를 완성했다!

1-6. 학습 구현

드디어 이론적으로만 알고 있던 신경망의 학습 알고리즘을 구현할 차례다! 2-Layer-NeuralNet/main.py 파일을 만들고 작업에 착수하자. 제일 먼저 할 일은 우리가 만든 MNIST Loader와 2층 네트워크를 import하는 것이다.

import time
import random
import pickle
import sys, os
sys.path.append(os.pardir)
from common.MNIST_Loader import load_MNIST
from Two_Layer_NeuralNet import Two_Layer_NeuralNet

이후 Network와 학습에 필요한 Hyper Parameter들을 초기화 한다.

net = Two_Layer_NeuralNet(784, 50, 10)
(x_train, t_train), (x_test, t_test) = load_MNIST(normalize=True, flatten=True, one_hot_encoding=True)

learning_rate = 0.01
batch_size = 100
epoch = 17

가중치, 편향 매개 변수를 한 번에 얼마씩 조정할지를 설정하는 learning_rate를 0.01로 하였다. 사실 학습 결과를 보면서 learning_rate를 조금씩 조정해야 하는데, 현재 우리가 구현한 신경망 학습은 학습에 상당한 시간이 걸리기 때문에 learning_rate를 학습 추이 보면서 조절할 그런 여유가 없다. 그래서 그냥 0.01로만 두고 테스트를 진행했다.

batch_size는 100으로 설정했다. 학습 데이터가 60000개이므로, 미니배치 학습을 600번 수행하면 1 Epoch이다.

이 Epoch를 17번 반복 수행하도록 코드를 작성했다.


loss_list = []
acc_list = []
elapsed_time_list = []

train_data_cnt = x_train.shape[0]
iter_for_epoch = train_data_cnt // batch_size

x = None
t = None

s_training = time.time()

학습 진행의 추이를 시각화하기 위한 변수들을 만들었다.

loss_list에는 배치 한 덩어리를 학습한 후 산출된 Loss Function의 값을 하나씩 append 해줄 것이다.

acc_list에는 배치 한 덩이 학습 후 산출된 모델의 정확도를 append한다.

마지막으로 elapsed_time_list에는 배치 한 덩어리의 학습에 걸린 시간을 하나씩 append할 것이다. elapsed_time_list는 얼마나 기다려야 학습이 다 될지, 예상소요시간을 구하기 위해 만들어준 리스트이다. 어느 시점까지 1 배치 학습에 걸린 소요 시간의 평균을 산출하여 남은 데이터들을 학습하는 데에는 시간이 얼마나 걸릴지 계산하는 것이다.

모델의 학습이 제대로 이루어졌다면 loss_list의 원소 값은 뒤로 갈 수록 줄어드는 추세일 것이고, acc_list의 원소 값은 증가하는 추세일 것이다.

train_data_cnt는... 일단은 학습용 데이터가 6만개라는 것을 알고는 있지만, 데이터의 개수가 언젠가는 변할 수도 있기 때문에 얻어온 MNIST Dataset의 학습용 데이터 개수를 직접 구해 저장하는 변수다. 이렇게 처리해주면 나중에 언젠가 MNIST Dataset의 훈련용 데이터가 7만 개가 되든, 10만 개가 되든 코드 수정 없이 학습 수행이 가능할 것이다.

iter_for_epoch에는 1 Epoch가 되기 위해서는 배치 학습을 몇 번 해야 하는지를 계산해 대입했다.

x, t에는 각각 훈련용 이미지, 훈련용 이미지에 대한 레이블이 저장될 것이다. 자세한 건 학습이 진행되는 for문을 보면서 설명하겠다.

s_training은 학습이 시작되고 얼마만에 학습이 완료되었는지를 구하기 위해 학습을 수행하는 for문 진입 직전의 타임스탬프를 찍어 저장해두는 변수다.


for i in range(epoch):
    for k in range(iter_for_epoch):
        s_batch = time.time()
        batch_mask = random.randint(0, train_data_cnt - batch_size)

        x = x_train[batch_mask:batch_mask + batch_size]
        t = t_train[batch_mask:batch_mask + batch_size]

        grad = net.calc_gradient(x, t)
        net.params['W1'] -= (learning_rate * grad['W1'])
        net.params['b1'] -= (learning_rate * grad['b1'])
        net.params['W2'] -= (learning_rate * grad['W2'])
        net.params['b2'] -= (learning_rate * grad['b2'])

        loss = net.loss(x, t)
        loss_list.append(loss)

        accuracy = net.accuracy(x, t)
        acc_list.append(accuracy)

        elapsed_time = time.time() - s_batch
        elapsed_time_list.append(elapsed_time)

        ETA_second = (sum(elapsed_time_list) / len(elapsed_time_list)) * (epoch - i) * (iter_for_epoch - k)

        print("__________Epoch {}/{}__________".format(i + 1, epoch))
        print("***** Batch: {}/{} *****".format(k + 1, iter_for_epoch))
        print("Loss/Accuracy: {}/{}%".format(loss, accuracy * 100))
        print("Elapsed Time(1 Batch): {}s".format(int(elapsed_time)))
        if ETA_second < 60:
            print("ETA: {}m".format((ETA_second / 60) % 60))
        else:
            print("ETA: {}h {}m".format(ETA_second // 3600, (ETA_second / 60) % 60))

여기가 Business Logic이라고 할 수 있다! 본격적인 학습은 여기서 이뤄진다.

바깥쪽 for문의 range(epoch)는 몇 번의 Epoch를 수행할지 정하는 부분이고, 안쪽 for문의 range(iter_for_epoch)는 1 Epoch가 되려면 배치 학습을 몇 번 수행해야 하는지를 정하는 부분이다.

먼저, 한 배치 학습 수행에 시간이 얼마나 걸렸는지 측정하기 위해 s_batch = time.time()으로 타임스탬프를 찍는다.

이후 랜덤한 개수의 데이터를 데이터셋에서 뽑아내기 위해 batch_mask에 random.randint(0, train_data_cnt - batch_size)의 수행 결과를 넣는다. 이렇게 하면 0~59900 사이의 숫자 하나가 발생된다.

이렇게 발생된 batch_mask를 이용해 batch_mask부터 batch_mask + 100까지, 슬라이싱을 해서 x, t에 각각 학습용 이미지와 레이블을 저장해준다.

grad에는 각 가중치(혹은 편향) 변수의 미소한 변화에 의한 손실 함수 값의 변화, 즉 기울기를 계산해서 저장한다. net.calc_gradient(x, t)를 사용한다.

그리고 이전에 보았던 수식

x=xηfxx=x-\eta{\partial f \over \partial x}

를 그대로 구현해준다.

여기까지만 해도 학습의 구현은 끝난 것이다! 이후 코드는 손실값, 정확도의 변화를 시각화 하기 위해 저장하거나, 학습 예상소요시간을 구하기 위한 연산을 하는 부분이다.


net.model_infos['training_time'] = "{}m".format((time.time() - s_training) / 60)
net.model_infos['loss_list'] = loss_list
net.model_infos['accuracy_list'] = acc_list

if not os.path.isdir("model"):
    os.mkdir("model")

with open("model/({})MNIST_model.pkl".format(time.strftime("%y-%m-%d")), "wb") as fp:
    pickle.dump(net, fp)

for문을 빠져나가서는 학습에 걸린 시간, 손실 값 리스트, 정확도 값 리스트를 네트워크의 model_infos에 잘 저장한 후 Pickle을 이용해 파일로 저장만 해준다.

0개의 댓글