AWS EC2에서 화재감지 프로젝트 실행하기

sangeun jo·2021년 10월 6일
0

영상분석 서버를 구현하기 전에 우선 화재감지 프로젝트 를 따라하면서 감을 잡아보기로 했다.

난관1

내 고물 노트북은 cpu가 노쇠하여 2.X 이상의 버전이 실행되지 않는다. aws에서 컴퓨터를 대여하기로 했다. 그런데 아마존은 커맨드라인 인터페이스라 이미지나 영상을 띄워서 실제로 잘 되는지 볼 수가 없다. 답답해서 Xming도 설치해보고, 우분투 데스크탑을 설치한 후 윈도우에서 원격 연결을 해봤지만 둘 다 심하게 버벅거리고 자꾸 튕겼다. 그냥 프레임을 출력해서 CCTV 영상을 실시간으로 받아오는지 확인만하고, 감지하면 저장해서 보기로 했다.

난관2

CCTV가 아직 없어서 남는 스마트폰을 웹캠으로 만든다음 영상을 실시간으로 받아오기로 했다. 그러나 http:// 형식으로 시작하는 스트리밍 영상은 우분투에서 opencv로 읽어와지지 않았다. 혹시나 싶어서 공공 데이터 포털에 공개된 rtsp:// 형식은 잘 받아와졌다. 밤이면 조명도 불로 인식할 수도 있으니, 교통 cctv 영상을 가져와서 화재감지 모델에 돌려보기로 했다.

난관3

화재감지 cctv 프로젝트가 3년전의 코드다보니 쓰이지 않는 함수들이 많았다. 에러가 날때마다 구글링으로 해결했다.


안드로이드 연결 및 cctv 저장 기능을 없앤 간결한 코드

import cv2
import os
import Constants
import queue
import threading
import datetime
import sys
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
import time


def init():
    global img_file_path, frame_queue, fourcc
    print("init")
    frame_queue = queue.Queue() #프레임 큐
    fourcc = cv2.VideoWriter_fourcc(*'XVID') #포맷 
    
def set_video_writer(video_name, fourcc, FPS, width, height):
    return cv2.VideoWriter(video_name, fourcc, FPS, (width, height))

#CCTV 영상 실시간으로 가져와서 프레임 큐에 저장
def connect_cctv():
    global frame_queue,fource, cur_frame
    
    url = 'rtsp://210.99.70.120:1935/live/cctv001.stream' #공공 데이터 센터에서 제공한 공공 CCTV
    
    try:
        cap = cv2.VideoCapture(url)
        print("연결성공: ", url)
    except:
        print("연결 실패")
        
    while True:
        ret, frame = cap.read()
        #전역 변수 프레임 큐에 프레임 담음 
        if frame_queue.qsize() < Constants.QUEUE_SIZE:
            frame_queue.put(frame)
        else:
            frame_queue.get()
            frame_queue.put(frame)


#실시간으로 화재 감지 
def run_detector():
    global frame_queue
    print("run_detector")
    
    from utils import label_map_util
    from utils import visualization_utils as vis_util

    sys.path.append("..")  
    
    MODEL_NAME = 'inference_graph'
 
    CWD_PATH = os.getcwd()     # 현재 경로
    PATH_TO_CKPT = os.path.join(CWD_PATH,MODEL_NAME,'frozen_inference_graph.pb')     # 모델 경로
    PATH_TO_LABELS = os.path.join(CWD_PATH,'training','label_map.pbtxt')     # pbtext 파일 
    NUM_CLASSES = 1

    #라벨링 맵 로드
    label_map = label_map_util.load_labelmap(PATH_TO_LABELS)
    categories = label_map_util.convert_label_map_to_categories(label_map, max_num_classes=NUM_CLASSES, use_display_name=True)
    category_index = label_map_util.create_category_index(categories)

    # 텐서플로우 모델 로드
    detection_graph = tf.Graph()
    with detection_graph.as_default():
        od_graph_def = tf.compat.v1.GraphDef()
        with tf.io.gfile.GFile(PATH_TO_CKPT, 'rb') as fid:
            serialized_graph = fid.read()
            od_graph_def.ParseFromString(serialized_graph)
            tf.import_graph_def(od_graph_def, name='')
        sess = tf.compat.v1.Session(graph=detection_graph)
        
    #인풋
    image_tensor = detection_graph.get_tensor_by_name('image_tensor:0')

    # 아웃풋
    detection_boxes = detection_graph.get_tensor_by_name('detection_boxes:0')     # 박스 
    detection_scores = detection_graph.get_tensor_by_name('detection_scores:0')     #점수  
    detection_classes = detection_graph.get_tensor_by_name('detection_classes:0')   
    num_detections = detection_graph.get_tensor_by_name('num_detections:0') 
    
    print(detection_scores, num_detections)
    
    while True:
        if frame_queue.empty() == True:
            continue
        image = frame_queue.get()       
        image_expanded = np.expand_dims(image, axis=0)

        (boxes, scores, classes, num) = sess.run(
            [detection_boxes, detection_scores, detection_classes, num_detections],
            feed_dict={image_tensor: image_expanded})
           
  
        # 박스, 라벨 표시 
        det_ret_list = vis_util.visualize_boxes_and_labels_on_image_array(
            image,
            np.squeeze(boxes),
            np.squeeze(classes).astype(np.int32),
            np.squeeze(scores),
            category_index,
            use_normalized_coordinates=True,
            line_thickness=8,
            min_score_thresh=0.80)

        #불 감지 
        if 0 < len(det_ret_list):
            for det_ret in det_ret_list:
                cats_list = det_ret["cats"]
                for cats_str in cats_list: 
                    splt_cat = cats_str.split(":")
                    print("불 감지됨, 감지시간: ", time.strftime('%H시%M분%S초'))
                    cv2.imwrite(os.path.join('fire',  time.strftime('%H시%M분%S초') + '.jpg'), image)   
       
        else:
            print("감지 안됨")
            continue
                          
if __name__ == "__main__":
    init()
    t1 = threading.Thread(target=connect_cctv)
    t1.start()
    run_detector()
    t1.join()

앞으로의 과제

  • 이 예제는 한대의 cctv만 분석할 수 있다 cctv가 10개만 되어도 엄청나게 자원을 많이 소모할 텐데 벌써 비용이 걱정된다. 머신러닝 서빙에 특화된 클라우드 서비스를 알아봐야겠다.
  • 각 cctv를 어떻게 각 스마트폰에 알맞게 알림을 보내고, 제어해야할지 모르겠다. 소켓프로그래밍 책을 다시 읽어봐야겠다.
profile
코더가 아니라 개발자가 되자

0개의 댓글