삼겹살 소비량 1위 국가인 대한민국, 축구만큼이나 뜨겁다.
인공지능으로 소고기를 판정하는 기사가 있었고, 왜 돼지는 아직 모호한 기준이나 판별이 없을까 생각을 하고 프로젝트 주제를 선정했었다.
Ai HUB 를 서치하던 도중에 축산물 품질 이미지가 있었고, 삼겹살의 단면적 이미지 데이터가 있었다 5GB가량 데이터는 만장 정도가 있었다.
1+ 등급, 1등급, 2등급 3개 등급으로 나누어져있고, 데이터 규모는 Train set 8천장 Test set 1천여장이 있었다. 등급별로 폴더가 나누어져있어 따로 라벨링을 할 필요는 없었다.
1+ 3,000 | 390 1 3,000 | 390 2 2,500 | 320
이미지 전처리 과정은 Resize, Padding, GrayScale, Augmentation, Zero-Centering 을 거치며 처리하였다
ResNet 50, ResNet 101, ResNet 152, efficientNetV2S(L,M) 과 같은 모델을 사용하였고, 전이학습을 통해 전부 재학습과, 미세조정을 통한 성능개선을 진행하였다.
ResNet 은 사람보다 잘 인식하는 '최소'의 기준이 되는 모델이라고 생각하여서 성능개선을 통해서 뎁스가 높은 모델로 변경하면서 모델링을 하였다.
EfficientNet은 V2모델로 2021년 비교적 최신 도입 모델로 논문자체도 21년 기준으로 나와있다. 그래서 최신 모델을 사용해보자는 취지에서 사용하게 되었다. 논문에도 나와 있듯이 여러 모델중에 성능이 가장 좋은걸로 나와있다.
효율적이라는 이름값답게 과연 효율적인 모습을 보여줄지 궁금했다.
data 준비는 생략하겠습니다. 각자의 데이터가 다를 뿐만아니라 라벨링 된 데이터 셋이라던지 환경이 다르기 때문에
label2index = {'seg0' : 0, 'seg1' : 1 , 'seg2' : 2}
labels = list(label2index.keys())
labels
폴더별로 나누어놓은 데이터를 라벨링한 작업이다.
for label, filenames in dataset.items():
for filename in filenames:
img = cv2.imread(filename)
percent = 1
if(img.shape[1] > img.shape[0]):
percent = 128/img.shape[1]
else:
percent = 128/img.shape[0]
img = cv2.resize(img, dsize=(0, 0), fx=percent, fy=percent, interpolation=cv2.INTER_LINEAR)
y,x,h,w = (0,0,img.shape[0], img.shape[1])
w_x = (128-(w-x))/2
h_y = (128-(h-y))/2
if(w_x < 0):
w_x = 0
elif(h_y < 0):
h_y = 0
M = np.float32([[1,0,w_x], [0,1,h_y]])
img_re = cv2.warpAffine(img, M, (128, 128))
cv2.imwrite('/content/resized/{0}/{1}'.format(label, filename.split("/")[-1]) , img_re)
128 x 128 픽셀로 리사이즈 해주는 작업이다.
datagen = ImageDataGenerator(
rotation_range=40,
width_shift_range=0.2,
height_shift_range=0.2,
shear_range=0.2,
zoom_range=0.2,
horizontal_flip=True,
vertical_flip=True,
)
# 회전각도(degree), 너비/높이 전환비율, 사다리꼴, 줌, 뒤집기
# cutmix, mixup 등은 다른 함수(generator)를 사용해야 함.
# 파일명은 기존에 없는 이름으로 부여
부족한 데이터를 원하는 부분, 방식으로 증강 시킨다.
path = '/content/gs'
# 서브 디렉토리 목록 출력
for root, subdirs, files in os.walk(path):
for d in subdirs:
fullpath = root + '/' + d
print(fullpath)
# 서브 디렉토리별 파일 개수 출력
for root, subdirs, files in os.walk(path):
if len(files) > 0:
print(root, len(files))
이미지 색깔을 반전을 하여 검정, 흰색으로만 표현한다.
folder_path = '/content/dataset/train'
dataset = {}
# 이미지와 라벨 리스트에 담기
for label in os.listdir(folder_path):
sub_path = folder_path+'/'+label+'/'
dataset[label] = []
for filename in os.listdir(sub_path):
dataset[label].append(sub_path+filename)
dataset
x_train, y_train = [], []
for label, filenames in dataset.items():
for filename in filenames:
image = cv2.imread(filename) # img를 array 형태로 변경
x_train.append(image)
y_train.append(label2index[label]) # label을 index로 변경
x_train, y_train = np.array(x_train), np.array(y_train)
x_train = x_train.astype('int8')
# float32 : 메모리 용량이 너무 커짐(실수형)
x_train.shape, y_train.shape
def zero_mean(image):
# zero-centering
return np.mean(image, axis=0)
zero_mean_img = zero_mean(x_train)
zero_mean_img.shape #shape 확인
x_train = x_train - zero_mean_img
양수, 음수 같이나오게 해줌으로써 효율적으로 전처리 시켜준다
class Mish(Activation):
def __init__(self, activation, **kwargs):
super(Mish, self).__init__(activation, **kwargs)
self.__name__ = 'Mish'
def mish(x):
return x * K.tanh(K.softplus(x))
get_custom_objects().update({'mish': Mish(mish)})
선언을 해준후에 분류기(활성화 함수 Mish)를 정의해줄 예정이다.
def create_Res101():
base_model = ResNet101(include_top=False, input_shape = (128, 128 ,3), weights = 'imagenet')
base_model.trainable = False
inputs = tf.keras.Input(shape=(128, 128, 3))
x = base_model(inputs, training=False)
x = tf.keras.layers.Flatten(input_shape=base_model.output_shape[1:])(x)
x = tf.keras.layers.Dense(256, activation='leakyrelu')(x)
x = tf.keras.layers.Dropout(0.5)(x)
outputs = tf.keras.layers.Dense(3, activation='softmax')(x)
return tf.keras.Model(inputs, outputs)
코랩을 쓰다보면 대용량 데이터에서 RAM , GPU 부족 현상으로 터지는 현상이 자주 일어난다. 런타임을 유지하기 위해 병렬연산이 가능한 TPU 를 사용해주었다. Tpu 사용법은 구글 검색하면 친절하게 나와있다.
base_50 = ResNet50(include_top=False, input_shape = (128, 128 ,3), weights = 'imagenet')
base_101 = ResNet101(include_top=False, input_shape = (128, 128 ,3), weights = 'imagenet')
base_152 = ResNet152(include_top=False, input_shape = (128, 128 ,3), weights = 'imagenet')
base_v2s = efficientnet_v2.EfficientNetV2S(include_top=False, input_shape = (128, 128 ,3), weights = 'imagenet')
# 라벨 설정
class_names = labels
class_names
early = tf.keras.callbacks.EarlyStopping(monitor='val_accuracy', patience=5)
!mkdir ckpt
# 체크포인트 파일 경로 설정
checkpoint_path = "/content/ckpt/cp.ckpt"
checkpoint_dir = os.path.dirname(checkpoint_path) # 현재 경로 폴더 반환
checkpoint_dir
cp_callback = tf.keras.callbacks.ModelCheckpoint(
filepath=checkpoint_path, monitor='val_accuracy',
save_best_only=True, save_weights_only=True)
reduceLr = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_accuracy', factor=0.2,
patience=3, min_lr=0)
def scheduler(epoch, lr):
if epoch < 2:
return lr
else:
return lr * 0.1
LrSdu = tf.keras.callbacks.LearningRateScheduler(scheduler, verbose=0)
base_101 = ResNet101(include_top=True, input_shape = (224, 224 ,3), weights = 'imagenet')
base_101.compile(optimizer = 'adam', loss = 'sparse_categorical_crossentropy',metrics = 'accuracy')
base_152 = ResNet152(include_top=True, input_shape = (224, 224 ,3), weights = None)
base_152.compile(optimizer = 'adam', loss = 'sparse_categorical_crossentropy',metrics = 'accuracy')
#x_test = tf.keras.layers.experimental.preprocessing.Resizing(128,128)(x_test)
#x_test = tf.keras.applications.resnet50.preprocess_input(x_test)
resize = lambda x: tf. image.resize(x, (128, 128))
x_test = resize(x_test)
x_test.shape
base_101.evaluate(x_test, y_test)
base_152.evaluate(x_test, y_test)
# sparse categorical crossentropy VS categorical crossentropy + one_hot
y_train2 = tf.keras.utils.to_categorical(y_train, 3)
y_test2 = tf.keras.utils.to_categorical(y_test, 3)
y_train2.shape, y_test2.shape
def create_Res101_all():
base_model = ResNet101(include_top=False, input_shape = (128, 128 ,3), weights = 'imagenet')
base_model.trainable = True
#for layer in base_model.layers[:]:
#print(layer.name, layer.trainable)
inputs = tf.keras.Input(shape=(128, 128, 3))
x = base_model(inputs, training=False)
x = tf.keras.layers.Flatten(input_shape=base_model.output_shape[1:])(x)
x = tf.keras.layers.Dense(128, activation='relu')(x)
x = tf.keras.layers.Dropout(0.2)(x)
outputs = tf.keras.layers.Dense(3, activation='softmax')(x)
return tf.keras.Model(inputs, outputs)
with strategy.scope():
Res101all = create_Res101_all()
Res101all.compile(optimizer = tf.keras.optimizers.Adam(learning_rate= 0.0001)
,loss = 'sparse_categorical_crossentropy',metrics = ['accuracy'])
Res101all.fit(x_train, y_train, epochs = 100, validation_data=(x_test, y_test), batch_size=64
, callbacks=[early])
def create_Res152_all():
base_model = ResNet152(include_top=False, input_shape = (128, 128 ,3), weights = 'imagenet')
base_model.trainable = True
#for layer in base_model.layers[:]:
#print(layer.name, layer.trainable)
inputs = tf.keras.Input(shape=(128, 128, 3))
x = base_model(inputs, training=False)
x = tf.keras.layers.Flatten(input_shape=base_model.output_shape[1:])(x)
x = tf.keras.layers.Dense(256, activation='relu')(x)
x = tf.keras.layers.Dropout(0.5)(x)
outputs = tf.keras.layers.Dense(3, activation='softmax')(x)
return tf.keras.Model(inputs, outputs)
with strategy.scope():
Res152all = create_Res152_all()
Res152all.compile(optimizer = tf.keras.optimizers.Adam(learning_rate= 0.0001)
,loss = 'sparse_categorical_crossentropy',metrics = ['accuracy'])
Res152all.fit(x_train, y_train, epochs = 100, validation_data=(x_test, y_test), batch_size=64,
callbacks=[early])
def create_V2S_all():
base_model = efficientnet_v2.EfficientNetV2S(include_top=False, input_shape = (128, 128 ,3), weights = 'imagenet')
base_model.trainable = True
#for layer in base_model.layers[:]:
#print(layer.name, layer.trainable)
inputs = tf.keras.Input(shape=(128, 128, 3))
x = base_model(inputs, training=False)
x = tf.keras.layers.Flatten(input_shape=base_model.output_shape[1:])(x)
x = tf.keras.layers.Dense(256, activation='relu')(x)
x = tf.keras.layers.Dropout(0.5)(x)
outputs = tf.keras.layers.Dense(3, activation='softmax')(x)
return tf.keras.Model(inputs, outputs)
with strategy.scope():
V2Sall = create_V2S_all()
V2Sall.compile(optimizer = tf.keras.optimizers.Adam(learning_rate= 0.0001)
,loss = 'sparse_categorical_crossentropy',metrics = ['accuracy'])
V2Sall.fit(x_train, y_train, epochs = 100, validation_data=(x_test, y_test), batch_size=64
, callbacks=[early])
base_model.trainable = True
for layer in base_model.layers[:]:
print(layer.name, layer.trainable)
inputs = tf.keras.Input(shape=(128, 128, 3))
x = base_model(inputs, training=False)
x = tf.keras.layers.Flatten(input_shape=model.output_shape[1:])(x)
x = tf.keras.layers.Dense(256, activation='mish')(x)
x= tf.keras.layers.Dropout(0.2)(x)
outputs = tf.keras.layers.Dense(3, activation='softmax')(x)
model = tf.keras.Model(inputs, outputs)
model.compile(optimizer = tf.keras.optimizers.Adam(learning_rate= 0.0001),
loss = 'sparse_categorical_crossentropy',
metrics=['accuracy'])
model.summary()
model.fit(x_train, y_train, epochs = 20, validation_data=(x_test, y_test), batch_size=64)
def create_Res50_fine():
base_model = ResNet50(include_top=False, input_shape = (128, 128 ,3), weights = 'imagenet')
base_model.trainable = True
for layer in base_model.layers[:-100]:
layer.trainable = False
inputs = tf.keras.Input(shape=(128, 128, 3))
x = base_model(inputs, training=False)
x = tf.keras.layers.Flatten(input_shape=base_model.output_shape[1:])(x)
x = tf.keras.layers.Dense(128, activation='relu')(x)
#x = tf.keras.layers.Dropout(0.7)(x)
outputs = tf.keras.layers.Dense(3, activation='softmax')(x)
return tf.keras.Model(inputs, outputs)
with strategy.scope():
Res50fine = create_Res50_fine()
Res50fine.compile(optimizer = tf.keras.optimizers.Adam(learning_rate= 0.0001)
,loss = 'sparse_categorical_crossentropy',metrics = 'accuracy')
Res50fine.fit(x_train, y_train, epochs = 100, validation_data=(x_test, y_test), batch_size=32
, callbacks=[early])
학습을 파라미터를 다양하게 주어 학습을 시켰으나 성과가 좋은 코드만 뽑아왔다.
우리데이터 자체는 파인튜닝, 전체 재학습이 성능은 제일 좋게 나왔다.
앙상블은 모델끼리 평균을 내서 모델 최고성능을 내기 위해 앙상블을 해준다. 데이터별, 어큐러시 별 다양하게 시도를 해봐서 성능개선을 시킬 수 있다.
from sklearn.metrics import accuracy_score
pred_v1 = Res101clf.predict(x_test)
pred_v2 = Res152clf.predict(x_test)
pred_ensemble = np.mean([pred_v1, pred_v2], axis=0)
pred_ensemble
pred_ensemble2 = np.argmax(pred_ensemble, axis=1)
accuracy_score(pred_ensemble2, y_test)
# 단일 모델과 비교
accuracy_score(np.argmax(pred_v1, axis=1), y_test)
accuracy_score(np.argmax(pred_v2, axis=1), y_test)
아쉽게도 우리 모델은 앙상블을 적용하니 조금 성능이 낮아졌다. 모델 분류 다양하게 시도를 해보면 성능이 올라가지 않을까 생각한다. 추 후 적용해볼 예정이다.
# Load the TensorBoard notebook extension
%load_ext tensorboard
import datetime
model_name = 'Res_clf'
current_time = datetime.datetime.now().strftime("%Y%m%d") # Y : 연도 / m : 월 / d : 일 / H : 시 / M : 분 / S : 초
current_time
Res101clf = create_Res101_clf()
Res101clf.compile(optimizer = tf.keras.optimizers.Adam(learning_rate= 0.0001)
,loss=tf.losses.SparseCategoricalCrossentropy(from_logits=True),metrics = ['accuracy'])
Res101clf_run_eagerly = True
log_dir = "logs/fit/" + current_time + model_name
tb_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1) # epoch마다 히스토그램 계산
tf.compat.v1.disable_eager_execution()
x_train.shape, y_train.shape
Res101clf.fit(x_train, y_train, epochs = 5,
validation_data=(x_test, y_test),
callbacks=[tb_callback])
Res152clf = create_Res152_clf()
Res152clf.compile(optimizer = tf.keras.optimizers.Adam(learning_rate= 0.0001)
,loss = 'sparse_categorical_crossentropy',metrics = 'accuracy')
log_dir = "logs/fit/" + current_time + model_name
tb_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1)
Res152clf.fit(x_train, y_train, epochs = 100, validation_data=(x_test, y_test),
batch_size=64, callbacks=[tb_callback])
%tensorboard --logdir logs/fit
model_name = 'version.2'
current_time = datetime.datetime.now().strftime("%Y%m%d")
log_dir = "logs/fit/" + current_time + model_name
tb_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir,
histogram_freq=1) # epoch마다 히스토그램 계산
model.fit(x=x_train,
y=y_train,
epochs=5,
validation_data=(x_test, y_test),
callbacks=[tb_callback])
%tensorboard --logdir logs/fit
len(x_train)
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
test_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test))
train_dataset = train_dataset.shuffle(len(x_train)).batch(64)
test_dataset = test_dataset.batch(64)
loss_object = tf.keras.losses.SparseCategoricalCrossentropy()
optimizer = tf.keras.optimizers.Adam()
# Define our metrics
train_loss = tf.keras.metrics.Mean('train_loss', dtype=tf.float32)
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy('train_accuracy')
test_loss = tf.keras.metrics.Mean('test_loss', dtype=tf.float32)
test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy('test_accuracy')
def train_step(model, optimizer, x_train, y_train):
with tf.GradientTape() as tape:
predictions = model(x_train, training=True)
loss = loss_object(y_train, predictions)
grads = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(grads, model.trainable_variables))
train_loss(loss)
train_accuracy(y_train, predictions)
def test_step(model, x_test, y_test):
predictions = model(x_test)
loss = loss_object(y_test, predictions)
test_loss(loss)
test_accuracy(y_test, predictions)
current_time = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
train_log_dir = 'logs/gradient_tape/' + current_time + '/train'
test_log_dir = 'logs/gradient_tape/' + current_time + '/test'
train_summary_writer = tf.summary.create_file_writer(train_log_dir)
test_summary_writer = tf.summary.create_file_writer(test_log_dir)
model = create_model() # reset our model
EPOCHS = 5
for epoch in range(EPOCHS):
for (x_train, y_train) in train_dataset:
train_step(model, optimizer, x_train, y_train)
with train_summary_writer.as_default():
tf.summary.scalar('loss', train_loss.result(), step=epoch)
tf.summary.scalar('accuracy', train_accuracy.result(), step=epoch)
for (x_test, y_test) in test_dataset:
test_step(model, x_test, y_test)
with test_summary_writer.as_default():
tf.summary.scalar('loss', test_loss.result(), step=epoch)
tf.summary.scalar('accuracy', test_accuracy.result(), step=epoch)
template = 'Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, Test Accuracy: {}'
print (template.format(epoch+1,
train_loss.result(),
train_accuracy.result()*100,
test_loss.result(),
test_accuracy.result()*100))
# Reset metrics every epoch
train_loss.reset_states()
test_loss.reset_states()
train_accuracy.reset_states()
test_accuracy.reset_states()
1.모델 그대로 사용하는 것보다 전이학습을 하는것이 더 높은 score를 얻음.
2.Loss, accuracy 그래프 개형으로 보아 Overfit인가?
-> score가 계속 올라가므로 바로 판단하기는 아직 이르다고 판단.
3. 한정된 시간 내에 목표점수 달성
그런데 왜 60점대에 그쳤는가…? 원인을 알면, 더 향상시킬 수 있다!
근본적인 문제 : 한정된 시간 및 자원
짧은 프로젝트 진행 시간, 용량이 큰 데이터를 다룸.
문제 : 코랩 램 오버로 인한 세션 종료가 매우 빈번하게 발생
-> Jupyter Notebook, Colab Pro+ 등의 더 나은 작업환경 구축
or 기본 이미지 용량 조절 등의 방법 시도 필요
모델링 관련 문제 : 학습이 너무 빨리 이루어진다.(Overfit이 아닐까 의심하지만, 판단은 아직 유보)
-> 전체 데이터를 증강해서 시행했더니, 학습속도는 그대로인데 오히려 score 하락.
-> Dropout, Dense(분류층) node 개수, batch size 등의 parameter를 조절하여 일부 해결
-> 앙상블 기법을 적용하는 방법은 현재 시도하고 있음.
-> ResNet, EfficientNet 이외의 더 다양한 모델도 추후에 적용할 것임.
현재 수준(최대 60%)으로는 곧바로 적용하기는 무리이나, 짧은 시간내에 성능을 크게 끌어올린 것으로 전망하여 근시일 내에 성능을 실제 데이터로 시험 적용해도 좋을 만큼 끌어올릴 수 있을 것으로 전망함.
Well Done.
In spite of our tasks proceeded only 1 week, we made sth as write to here.