import numpy as np
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline
np.random.seed(7777)
tf.random.set_seed(7777)
class Cifar10DataLoader():
def __init__(self):
# data load
(self.train_x, self.train_y), \
(self.test_x, self.test_y) = tf.keras.datasets.cifar10.load_data()
self.input_shape = self.train_x.shape[1:]
def scale(self, x):
return (x / 255.0).astype(np.float32)
def preprocess_dataset(self, dataset):
(feature, target) = dataset
# scaling #
scaled_x = np.array([self.scale(x) for x in feature])
# label encoding #
ohe_y = np.array([tf.keras.utils.to_categorical(
y, num_classes=10) for y in target])
return scaled_x, ohe_y.squeeze(1)
def get_train_dataset(self):
return self.preprocess_dataset((self.train_x, self.train_y))
def get_test_dataset(self):
return self.preprocess_dataset((self.test_x, self.test_y))
cifar10_loader = Cifar10DataLoader()
train_x, train_y = cifar10_loader.get_train_dataset()
print(train_x.shape, train_x.dtype)
print(train_y.shape, train_y.dtype)
test_x, test_y = cifar10_loader.get_test_dataset()
print(test_x.shape, test_x.dtype)
print(test_y.shape, test_y.dtype)
from tensorflow.keras.layers import Input, Conv2D, MaxPool2D, Flatten, Dense, Add
def build_resnet(input_shape):
inputs = Input(input_shape)
net = Conv2D(32, kernel_size=3, strides=2,
padding='same', activation='relu')(inputs)
net = MaxPool2D()(net)
net1 = Conv2D(64, kernel_size=1, padding='same', activation='relu')(net)
net2 = Conv2D(64, kernel_size=3, padding='same', activation='relu')(net1)
net3 = Conv2D(64, kernel_size=1, padding='same', activation='relu')(net2)
net1_1 = Conv2D(64, kernel_size=1, padding='same')(net)
net = Add()([net1_1, net3])
net1 = Conv2D(64, kernel_size=1, padding='same', activation='relu')(net)
net2 = Conv2D(64, kernel_size=3, padding='same', activation='relu')(net1)
net3 = Conv2D(64, kernel_size=1, padding='same', activation='relu')(net2)
net = Add()([net, net3])
net = MaxPool2D()(net)
net = Flatten()(net)
net = Dense(10, activation="softmax")(net)
model = tf.keras.Model(inputs=inputs, outputs=net, name='resnet')
return model
model = build_resnet((32, 32, 3))
model.summary()
lr = 0.03
opt = tf.keras.optimizers.Adagrad(lr)
loss = tf.keras.losses.categorical_crossentropy
model.compile(optimizer=opt, loss=loss, metrics=['accuracy'])
def custom_loss(y_true, y_pred):
# 순서는 정답, 예측치 순으로
return tf.reduce_mean(tf.square(y_true - y_pred))
model.compile(optimizer=opt, loss = [loss, custom_loss], metrics=['accuracy'], loss_weights=[0.9, 0.1])
acc = tf.keras.metrics.Accuracy()
auc = tf.keras.metrics.AUC()
model.compile(optimizer=opt, loss = 'categorical_crossentropy', metrics=[acc, auc], loss_weights=[0.9, 0.1])
fit의 입력값
x = None
y = None
batch_size = None
epochs = 1
verbose = 'auto' : 학습과정 출력문의 모드
callbacks = None : Callback 함수
validation_split = 0.0 : 입력 데이터의 일정 부분을 validation용 데이터로 사용
validation_data = None : validation용 데이터
shuffle = True : 입력값을 Epoch마다 섞는다
class_weight = None : 클래스 별로 다른 중요도를 설정한다 ....
Callback 함수 활용하기
Callback 함수를 활용하면, fit() 함수가 들어가는 와중에도 특정한 주기로 원하는 코드를 실행시킬 수 있음.
ex) 학습이 진행되면, Learning rate를 점점 줄여서 더 세밀하게 모델의 웨이드가 조정될 수 있도록 하고 싶다.
def scheduler(epoch, lr):
if epoch > 10:
return lr * (0.9**(epoch - 10))
else :
return lr
lr_scheduler = tf.keras.callbacks.LearningRateScheduler(scheduler, verbose = 1)
# fit
hist = model.fit(
train_x, train_y, epochs = 1,
validation_split = 0.3,
batch_size = 64, verbose = 1,
callbacks = [lr_scheduler]
)
lr = 0.03
batch_size = 64
opt = tf.keras.optimizers.Adam(lr)
loss_fn = tf.keras.losses.categorical_crossentropy
train_loss = tf.keras.metrics.Mean()
train_acc = tf.keras.metrics.CategoricalAccuracy()
def train_step(x, y):
with tf.GradientTape() as tape:
pred = model(x)
loss = loss_fn(y, pred)
gradients = tape.gradient(loss, model.trainable_variables)
opt.apply_gradients(zip(gradients, model.trainable_variables))
train_loss(y) # 기능이 있어서 구할 때마다 계속 누적이 됨
train_acc(y, pred)
for epoch in range(1):
for i in range(train_x.shape[0] // batch_size):
idx = i * batch_size
x, y = train_x[idx : idx + batch_size], train_y[idx : idx + batch_size]
train_step(x, y)
print('{} / {}' .format(i, train_x.shape[0] // batch_size), end = '\r')
fmt = 'epoch {} loss : {} acc : {}'
print(fmt.format(
epoch + 1, train_loss.result(),
train_acc.result()
))
train_loss.reset_states() # 누적되는 것을 막고 초기화하기 위해...
train_acc.reset_states()
@tf.function # 선언이 되었을 때 미리 그래프로 구현...연산이 이뤄지는 def에 선언을 해주면 속도가 더 빨라짐...
def train_step(x, y):
with tf.GradientTape() as tape:
pred = model(x)
loss = loss_fn(y, pred)
gradients = tape.gradient(loss, model.trainable_variables)
opt.apply_gradients(zip(gradients, model.trainable_variables))
train_loss(y) # 기능이 있어서 구할 때마다 계속 누적이 됨
train_acc(y, pred)
for epoch in range(1):
for i in range(train_x.shape[0] // batch_size):
idx = i * batch_size
x, y = train_x[idx : idx + batch_size], train_y[idx : idx + batch_size]
train_step(x, y)
print('{} / {}' .format(i, train_x.shape[0] // batch_size), end = '\r')
fmt = 'epoch {} loss : {} acc : {}'
print(fmt.format(
epoch + 1, train_loss.result(),
train_acc.result()
))
train_loss.reset_states() # 누적되는 것을 막고 초기화하기 위해...
train_acc.reset_states()
어렵다...
💻 출처 : 제로베이스 데이터 취업 스쿨