딥러닝을 이용한 돼지고기 등급 분류

미남홀란드·2022년 12월 6일
3

프로젝트

목록 보기
1/1

배경주제

삼겹살 소비량 1위 국가인 대한민국, 축구만큼이나 뜨겁다.
인공지능으로 소고기를 판정하는 기사가 있었고, 왜 돼지는 아직 모호한 기준이나 판별이 없을까 생각을 하고 프로젝트 주제를 선정했었다.

데이터 설명

Ai HUB 를 서치하던 도중에 축산물 품질 이미지가 있었고, 삼겹살의 단면적 이미지 데이터가 있었다 5GB가량 데이터는 만장 정도가 있었다.

1+ 등급, 1등급, 2등급 3개 등급으로 나누어져있고, 데이터 규모는 Train set 8천장 Test set 1천여장이 있었다. 등급별로 폴더가 나누어져있어 따로 라벨링을 할 필요는 없었다.

1+ 3,000 | 390 1 3,000 | 390 2 2,500 | 320

전처리과정

이미지 전처리 과정은 Resize, Padding, GrayScale, Augmentation, Zero-Centering 을 거치며 처리하였다

분석방법

ResNet 50, ResNet 101, ResNet 152, efficientNetV2S(L,M) 과 같은 모델을 사용하였고, 전이학습을 통해 전부 재학습과, 미세조정을 통한 성능개선을 진행하였다.

  1. ResNet 은 사람보다 잘 인식하는 '최소'의 기준이 되는 모델이라고 생각하여서 성능개선을 통해서 뎁스가 높은 모델로 변경하면서 모델링을 하였다.

  2. EfficientNet은 V2모델로 2021년 비교적 최신 도입 모델로 논문자체도 21년 기준으로 나와있다. 그래서 최신 모델을 사용해보자는 취지에서 사용하게 되었다. 논문에도 나와 있듯이 여러 모델중에 성능이 가장 좋은걸로 나와있다.
    효율적이라는 이름값답게 과연 효율적인 모습을 보여줄지 궁금했다.

모델구현

data 준비는 생략하겠습니다. 각자의 데이터가 다를 뿐만아니라 라벨링 된 데이터 셋이라던지 환경이 다르기 때문에

이미지 전처리

label2index = {'seg0' : 0, 'seg1' : 1 , 'seg2' : 2}
labels = list(label2index.keys())
labels

폴더별로 나누어놓은 데이터를 라벨링한 작업이다.

리사이즈

for label, filenames in dataset.items():
    for filename in filenames:
        img = cv2.imread(filename)

        percent = 1
        if(img.shape[1] > img.shape[0]):
            percent = 128/img.shape[1]
        else:
            percent = 128/img.shape[0]

        img = cv2.resize(img, dsize=(0, 0), fx=percent, fy=percent, interpolation=cv2.INTER_LINEAR)

        y,x,h,w = (0,0,img.shape[0], img.shape[1])

        w_x = (128-(w-x))/2
        h_y = (128-(h-y))/2

        if(w_x < 0):
            w_x = 0
        elif(h_y < 0):
            h_y = 0

        M = np.float32([[1,0,w_x], [0,1,h_y]])
        img_re = cv2.warpAffine(img, M, (128, 128)) 
       
        cv2.imwrite('/content/resized/{0}/{1}'.format(label, filename.split("/")[-1]) , img_re)

128 x 128 픽셀로 리사이즈 해주는 작업이다.

어그먼테이션

datagen = ImageDataGenerator(
    rotation_range=40,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    vertical_flip=True,
)
# 회전각도(degree), 너비/높이 전환비율, 사다리꼴, 줌, 뒤집기
# cutmix, mixup 등은 다른 함수(generator)를 사용해야 함.
# 파일명은 기존에 없는 이름으로 부여

부족한 데이터를 원하는 부분, 방식으로 증강 시킨다.

그레이스케일

path = '/content/gs'

# 서브 디렉토리 목록 출력
for root, subdirs, files in os.walk(path):
    for d in subdirs:
        fullpath = root + '/' + d
        print(fullpath)

# 서브 디렉토리별 파일 개수 출력
for root, subdirs, files in os.walk(path):
    if len(files) > 0:
        print(root, len(files))

이미지 색깔을 반전을 하여 검정, 흰색으로만 표현한다.

트레인셋 저장

folder_path = '/content/dataset/train'
dataset = {}

# 이미지와 라벨 리스트에 담기
for label in os.listdir(folder_path):
    sub_path = folder_path+'/'+label+'/'
    dataset[label] = []
    for filename in os.listdir(sub_path):
        dataset[label].append(sub_path+filename)

dataset

Img -> array

x_train, y_train = [], []

for label, filenames in dataset.items():
    for filename in filenames:
        image = cv2.imread(filename) # img를 array 형태로 변경
        
        x_train.append(image)
        y_train.append(label2index[label]) # label을 index로 변경
x_train, y_train = np.array(x_train), np.array(y_train)
x_train = x_train.astype('int8')  
# float32 : 메모리 용량이 너무 커짐(실수형)
x_train.shape, y_train.shape

Zero-Centering

def zero_mean(image):
    # zero-centering
    return np.mean(image, axis=0)
zero_mean_img = zero_mean(x_train)
zero_mean_img.shape #shape 확인
x_train = x_train - zero_mean_img

양수, 음수 같이나오게 해줌으로써 효율적으로 전처리 시켜준다

Mish 활성화함수 선언

class Mish(Activation):
    def __init__(self, activation, **kwargs):
        super(Mish, self).__init__(activation, **kwargs)
        self.__name__ = 'Mish'

def mish(x):
    return x * K.tanh(K.softplus(x))

get_custom_objects().update({'mish': Mish(mish)})

선언을 해준후에 분류기(활성화 함수 Mish)를 정의해줄 예정이다.

model tpu compile 기본 틀

def create_Res101():
    base_model = ResNet101(include_top=False, input_shape = (128, 128 ,3), weights = 'imagenet')
    base_model.trainable = False
 
    inputs = tf.keras.Input(shape=(128, 128, 3))
    
    x = base_model(inputs, training=False)
    x = tf.keras.layers.Flatten(input_shape=base_model.output_shape[1:])(x)
    x = tf.keras.layers.Dense(256, activation='leakyrelu')(x)
    x = tf.keras.layers.Dropout(0.5)(x)
    
    outputs = tf.keras.layers.Dense(3, activation='softmax')(x)
    return tf.keras.Model(inputs, outputs)

코랩을 쓰다보면 대용량 데이터에서 RAM , GPU 부족 현상으로 터지는 현상이 자주 일어난다. 런타임을 유지하기 위해 병렬연산이 가능한 TPU 를 사용해주었다. Tpu 사용법은 구글 검색하면 친절하게 나와있다.

Base model 선언

base_50 = ResNet50(include_top=False, input_shape = (128, 128 ,3), weights = 'imagenet')
base_101 = ResNet101(include_top=False, input_shape = (128, 128 ,3), weights = 'imagenet')
base_152 = ResNet152(include_top=False, input_shape = (128, 128 ,3), weights = 'imagenet')
base_v2s = efficientnet_v2.EfficientNetV2S(include_top=False, input_shape = (128, 128 ,3), weights = 'imagenet')
# 라벨 설정
class_names = labels
class_names

Early Stopping

early = tf.keras.callbacks.EarlyStopping(monitor='val_accuracy', patience=5)

Model Check Point

!mkdir ckpt
# 체크포인트 파일 경로 설정
checkpoint_path = "/content/ckpt/cp.ckpt" 
checkpoint_dir = os.path.dirname(checkpoint_path) # 현재 경로 폴더 반환
checkpoint_dir

cp_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_path, monitor='val_accuracy',
    save_best_only=True, save_weights_only=True)
    

LearningRateScheduling

reduceLr = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_accuracy', factor=0.2,
                              patience=3, min_lr=0)
def scheduler(epoch, lr):
   if epoch < 2:
     return lr
   else:
     return lr * 0.1
LrSdu = tf.keras.callbacks.LearningRateScheduler(scheduler, verbose=0)

Pretrained Model 그대로 사용

base_101 = ResNet101(include_top=True, input_shape = (224, 224 ,3), weights = 'imagenet')

base_101.compile(optimizer = 'adam', loss = 'sparse_categorical_crossentropy',metrics = 'accuracy')

base_152 = ResNet152(include_top=True, input_shape = (224, 224 ,3), weights = None)

base_152.compile(optimizer = 'adam', loss = 'sparse_categorical_crossentropy',metrics = 'accuracy')

#x_test = tf.keras.layers.experimental.preprocessing.Resizing(128,128)(x_test)
#x_test = tf.keras.applications.resnet50.preprocess_input(x_test)
resize = lambda x: tf. image.resize(x, (128, 128))
x_test = resize(x_test)
x_test.shape

base_101.evaluate(x_test, y_test)
base_152.evaluate(x_test, y_test)

one hot encoding

# sparse categorical crossentropy VS categorical crossentropy + one_hot 
y_train2 = tf.keras.utils.to_categorical(y_train, 3)
y_test2 = tf.keras.utils.to_categorical(y_test, 3)
y_train2.shape, y_test2.shape

Resnet-101 전체재학습

def create_Res101_all():
    base_model = ResNet101(include_top=False, input_shape = (128, 128 ,3), weights = 'imagenet')
    base_model.trainable = True
 
    #for layer in base_model.layers[:]:	
        #print(layer.name, layer.trainable)

    inputs = tf.keras.Input(shape=(128, 128, 3))

    x = base_model(inputs, training=False)
    x = tf.keras.layers.Flatten(input_shape=base_model.output_shape[1:])(x)
    x = tf.keras.layers.Dense(128, activation='relu')(x)
    x = tf.keras.layers.Dropout(0.2)(x)
    outputs = tf.keras.layers.Dense(3, activation='softmax')(x)

    return tf.keras.Model(inputs, outputs)
    
with strategy.scope():
    Res101all = create_Res101_all()

    Res101all.compile(optimizer = tf.keras.optimizers.Adam(learning_rate= 0.0001)
    ,loss = 'sparse_categorical_crossentropy',metrics = ['accuracy'])
    Res101all.fit(x_train, y_train, epochs = 100, validation_data=(x_test, y_test), batch_size=64
                  , callbacks=[early])

Resnet-152 전체재학습

def create_Res152_all():
    base_model = ResNet152(include_top=False, input_shape = (128, 128 ,3), weights = 'imagenet')
    base_model.trainable = True
 
    #for layer in base_model.layers[:]:	
        #print(layer.name, layer.trainable)

    inputs = tf.keras.Input(shape=(128, 128, 3))

    x = base_model(inputs, training=False)
    x = tf.keras.layers.Flatten(input_shape=base_model.output_shape[1:])(x)
    x = tf.keras.layers.Dense(256, activation='relu')(x)
    x = tf.keras.layers.Dropout(0.5)(x)
    outputs = tf.keras.layers.Dense(3, activation='softmax')(x)

    return tf.keras.Model(inputs, outputs)

with strategy.scope():
    Res152all = create_Res152_all()

    Res152all.compile(optimizer = tf.keras.optimizers.Adam(learning_rate= 0.0001)
    ,loss = 'sparse_categorical_crossentropy',metrics = ['accuracy'])
    Res152all.fit(x_train, y_train, epochs = 100, validation_data=(x_test, y_test), batch_size=64,
                  callbacks=[early])

Efficinet_V2S 전체 재학습

def create_V2S_all():
    base_model = efficientnet_v2.EfficientNetV2S(include_top=False, input_shape = (128, 128 ,3), weights = 'imagenet')
    base_model.trainable = True
 
    #for layer in base_model.layers[:]:	
        #print(layer.name, layer.trainable)

    inputs = tf.keras.Input(shape=(128, 128, 3))

    x = base_model(inputs, training=False)
    x = tf.keras.layers.Flatten(input_shape=base_model.output_shape[1:])(x)
    x = tf.keras.layers.Dense(256, activation='relu')(x)
    x = tf.keras.layers.Dropout(0.5)(x)
    outputs = tf.keras.layers.Dense(3, activation='softmax')(x)

    return tf.keras.Model(inputs, outputs)
    
with strategy.scope():
    V2Sall = create_V2S_all()

    V2Sall.compile(optimizer = tf.keras.optimizers.Adam(learning_rate= 0.0001)
    ,loss = 'sparse_categorical_crossentropy',metrics = ['accuracy'])
    V2Sall.fit(x_train, y_train, epochs = 100, validation_data=(x_test, y_test), batch_size=64
                  , callbacks=[early])
                  
base_model.trainable = True				

for layer in base_model.layers[:]:	
  print(layer.name, layer.trainable)
  
inputs = tf.keras.Input(shape=(128, 128, 3))

x = base_model(inputs, training=False)

x = tf.keras.layers.Flatten(input_shape=model.output_shape[1:])(x)
x = tf.keras.layers.Dense(256, activation='mish')(x)
x= tf.keras.layers.Dropout(0.2)(x)
outputs = tf.keras.layers.Dense(3, activation='softmax')(x)

model = tf.keras.Model(inputs, outputs)

model.compile(optimizer = tf.keras.optimizers.Adam(learning_rate= 0.0001),
                  loss = 'sparse_categorical_crossentropy',
                  metrics=['accuracy'])

model.summary()

model.fit(x_train, y_train, epochs = 20, validation_data=(x_test, y_test), batch_size=64)

ResNet- 50 fine Tune

def create_Res50_fine():
    base_model = ResNet50(include_top=False, input_shape = (128, 128 ,3), weights = 'imagenet')
    base_model.trainable = True

    for layer in base_model.layers[:-100]: 
        layer.trainable = False				

    inputs = tf.keras.Input(shape=(128, 128, 3))

    x = base_model(inputs, training=False)
    x = tf.keras.layers.Flatten(input_shape=base_model.output_shape[1:])(x)
    x = tf.keras.layers.Dense(128, activation='relu')(x)
    #x = tf.keras.layers.Dropout(0.7)(x)
    outputs = tf.keras.layers.Dense(3, activation='softmax')(x)

    return tf.keras.Model(inputs, outputs)
    
 with strategy.scope():
    Res50fine = create_Res50_fine()

    Res50fine.compile(optimizer = tf.keras.optimizers.Adam(learning_rate= 0.0001)
    ,loss = 'sparse_categorical_crossentropy',metrics = 'accuracy')
    Res50fine.fit(x_train, y_train, epochs = 100, validation_data=(x_test, y_test), batch_size=32
                  , callbacks=[early])

학습을 파라미터를 다양하게 주어 학습을 시켰으나 성과가 좋은 코드만 뽑아왔다.
우리데이터 자체는 파인튜닝, 전체 재학습이 성능은 제일 좋게 나왔다.

앙상블 모델 적용

앙상블은 모델끼리 평균을 내서 모델 최고성능을 내기 위해 앙상블을 해준다. 데이터별, 어큐러시 별 다양하게 시도를 해봐서 성능개선을 시킬 수 있다.

from sklearn.metrics import accuracy_score

pred_v1 = Res101clf.predict(x_test)
pred_v2 = Res152clf.predict(x_test)
pred_ensemble = np.mean([pred_v1, pred_v2], axis=0)
pred_ensemble

pred_ensemble2 = np.argmax(pred_ensemble, axis=1)

accuracy_score(pred_ensemble2, y_test)

# 단일 모델과 비교
accuracy_score(np.argmax(pred_v1, axis=1), y_test)
accuracy_score(np.argmax(pred_v2, axis=1), y_test)

아쉽게도 우리 모델은 앙상블을 적용하니 조금 성능이 낮아졌다. 모델 분류 다양하게 시도를 해보면 성능이 올라가지 않을까 생각한다. 추 후 적용해볼 예정이다.

결과자료 시각화(train만) - 1

# Load the TensorBoard notebook extension
%load_ext tensorboard

import datetime

model_name = 'Res_clf'
current_time = datetime.datetime.now().strftime("%Y%m%d") # Y : 연도 / m : 월 / d : 일 / H : 시 / M : 분 / S : 초
current_time

Res101clf = create_Res101_clf()
Res101clf.compile(optimizer = tf.keras.optimizers.Adam(learning_rate= 0.0001)
,loss=tf.losses.SparseCategoricalCrossentropy(from_logits=True),metrics = ['accuracy'])
Res101clf_run_eagerly = True

log_dir = "logs/fit/" + current_time + model_name
tb_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1) # epoch마다 히스토그램 계산

tf.compat.v1.disable_eager_execution()

x_train.shape, y_train.shape

Res101clf.fit(x_train, y_train, epochs = 5, 
              validation_data=(x_test, y_test), 
              callbacks=[tb_callback])
              
Res152clf = create_Res152_clf()
Res152clf.compile(optimizer = tf.keras.optimizers.Adam(learning_rate= 0.0001)
,loss = 'sparse_categorical_crossentropy',metrics = 'accuracy')

log_dir = "logs/fit/" + current_time + model_name
tb_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1)

Res152clf.fit(x_train, y_train, epochs = 100, validation_data=(x_test, y_test), 
              batch_size=64, callbacks=[tb_callback])
              
 %tensorboard --logdir logs/fit

결과자료 시각화(train,test) - 2

model_name = 'version.2'
current_time = datetime.datetime.now().strftime("%Y%m%d") 

log_dir = "logs/fit/" + current_time + model_name
tb_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, 
                                                      histogram_freq=1) # epoch마다 히스토그램 계산

model.fit(x=x_train, 
          y=y_train, 
          epochs=5, 
          validation_data=(x_test, y_test), 
          callbacks=[tb_callback])
          
%tensorboard --logdir logs/fit

Summary Writer 활용

len(x_train)
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
test_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test))

train_dataset = train_dataset.shuffle(len(x_train)).batch(64)
test_dataset = test_dataset.batch(64)

loss_object = tf.keras.losses.SparseCategoricalCrossentropy()
optimizer = tf.keras.optimizers.Adam()

# Define our metrics
train_loss = tf.keras.metrics.Mean('train_loss', dtype=tf.float32)
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy('train_accuracy')
test_loss = tf.keras.metrics.Mean('test_loss', dtype=tf.float32)
test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy('test_accuracy')

def train_step(model, optimizer, x_train, y_train):
  with tf.GradientTape() as tape:
    predictions = model(x_train, training=True)
    loss = loss_object(y_train, predictions)
  grads = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(grads, model.trainable_variables))

  train_loss(loss)
  train_accuracy(y_train, predictions)

def test_step(model, x_test, y_test):
  predictions = model(x_test)
  loss = loss_object(y_test, predictions)

  test_loss(loss)
  test_accuracy(y_test, predictions)
  
current_time = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
train_log_dir = 'logs/gradient_tape/' + current_time + '/train'
test_log_dir = 'logs/gradient_tape/' + current_time + '/test'
train_summary_writer = tf.summary.create_file_writer(train_log_dir)
test_summary_writer = tf.summary.create_file_writer(test_log_dir)


model = create_model() # reset our model

EPOCHS = 5

for epoch in range(EPOCHS):
  for (x_train, y_train) in train_dataset:
    train_step(model, optimizer, x_train, y_train)
  with train_summary_writer.as_default():
    tf.summary.scalar('loss', train_loss.result(), step=epoch)
    tf.summary.scalar('accuracy', train_accuracy.result(), step=epoch)

  for (x_test, y_test) in test_dataset:
    test_step(model, x_test, y_test)
  with test_summary_writer.as_default():
    tf.summary.scalar('loss', test_loss.result(), step=epoch)
    tf.summary.scalar('accuracy', test_accuracy.result(), step=epoch)

  template = 'Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, Test Accuracy: {}'
  print (template.format(epoch+1,
                         train_loss.result(), 
                         train_accuracy.result()*100,
                         test_loss.result(), 
                         test_accuracy.result()*100))

  # Reset metrics every epoch
  train_loss.reset_states()
  test_loss.reset_states()
  train_accuracy.reset_states()
  test_accuracy.reset_states()
  

결과 해석

1.모델 그대로 사용하는 것보다 전이학습을 하는것이 더 높은 score를 얻음.
2.Loss, accuracy 그래프 개형으로 보아 Overfit인가?
-> score가 계속 올라가므로 바로 판단하기는 아직 이르다고 판단.
3. 한정된 시간 내에 목표점수 달성

그런데 
왜 60점대에 그쳤는가…?
원인을 알면, 더 향상시킬 수 있다!

문제점 & 개선사항

근본적인 문제 : 한정된 시간 및 자원
짧은 프로젝트 진행 시간, 용량이 큰 데이터를 다룸.
문제 : 코랩 램 오버로 인한 세션 종료가 매우 빈번하게 발생
-> Jupyter Notebook, Colab Pro+ 등의 더 나은 작업환경 구축 
or 기본 이미지 용량 조절 등의 방법 시도 필요

모델링 관련 문제 : 학습이 너무 빨리 이루어진다.(Overfit이 아닐까 의심하지만, 판단은 아직 유보)
-> 전체 데이터를 증강해서 시행했더니, 학습속도는 그대로인데 오히려 score 하락.
-> Dropout, Dense(분류층) node 개수, batch size 등의 parameter를 조절하여 일부 해결
-> 앙상블 기법을 적용하는 방법은 현재 시도하고 있음.
-> ResNet, EfficientNet 이외의 더 다양한 모델도 추후에 적용할 것임.

인사이트 도출

우리의 목표 돼지고기 축산업자, 판매자의 등급 분류를 
보조할 수 있는 인공지능 모델 구축

현재 수준(최대 60%)으로는 곧바로 적용하기는 무리이나, 짧은 시간내에 성능을 크게 끌어올린 것으로 전망하여
근시일 내에 성능을 실제 데이터로 시험 적용해도 좋을 만큼 끌어올릴 수 있을 것으로 전망함.

profile
AI engineer

1개의 댓글

comment-user-thumbnail
2022년 12월 11일

Well Done.
In spite of our tasks proceeded only 1 week, we made sth as write to here.

답글 달기