Insightface를 활용한 얼굴 인식

오형상·2024년 12월 9일
0

Ficket_Face

목록 보기
1/2

Ficket은 얼굴 인식을 기반으로 한 티켓팅 서비스로, 사용자가 등록한 얼굴 이미지를 통해 현장에서 티켓 소지 여부를 확인할 수 있는 시스템입니다. 이 시스템은 InsightFace를 활용해 얼굴 임베딩(특징 벡터)을 생성하고, AES 암호화를 통해 데이터를 DB에 관리하며, AWS S3를 사용해 이미지를 저장합니다.


설치

1) 라이브러리 설치

다음 명령어를 실행하여 프로젝트에 필요한 라이브러리를 설치합니다.
C++이 없으면 오류가 발생하니 설치 필요합니다.

pip install insightface flask flask-restx opencv-python-headless numpy pycryptodome boto3 apscheduler

2) s3 설정 및 sse-kms 적용

AWS S3 버킷 만들기

S3 SSE-KMS 적용하기

2) S3 및 암호 키, KMS 설정

face-service-local.yml 파일에 S3 정보와 벡터값을 AES 암호화를 위한 secret key, S3-KMS ARN 값을 설정합니다.

flask:
  mysql:
    url: mysql+pymysql://root:{password}@localhost:3306/msa-exercise?charset=utf8
    password: "{cipher}AQBzuG1fOvuKxvTC1TXksA9FVwIHjHkduMGtKFzru6DkuWMyegUWEmEabfgvjIf3wkE3iepKtjWsaCfh+5tGJnmlJtqwL5zqYRSHufLxs84MoqjBE9yV9lbnrpvr7/PboqdQ/Jn4792QpjESos6h6bcUqGVMksP5PclqfYFPWf9/dZBfOdeVmWMm8n+hyhBnSJO0+XetpEyrO2/znrZoer/10iwCZ5WtBfoTuAnRhxcY94Ujb5LZ0xEE0CwBl2jMbs/uhwhqDJhXDd3RVZ3UHFprt7SqdZCKZRZkr6DlVsLRppoe+1YBFbHleKjGlkT6YDECXYNnCSAfhfu5d5spMkm/iVqbY1ScKVQ/8vu59V8cAzuwO9ensILhA/H3nvg0urk="

encryption:
  secret_key: "{cipher}AQCPh+nVMl+4uuQJsr0oGmAu0z40eNSM9JoVwvFica+M9B2RfHx/keku4ooCPpHOtxXDXT5m6w/dF+hgPXteIctVUjLwaqNb3NR8gL+gF9s+UP2I2qWzo1QG+xT/OatCSgp3x530yYTZ4xKTit1VffFIKBI5dJ7rOMs73eQX6Ql4yeCy344Ib+grp3W9EJF8zvpXfzz2kjd7Xf9SHSKo4k6+JrVapPy/GDN8HliIDnD075GZho+ecRm5s60i3EcVm8dxodW+9PHqpFFmaH0hpYZkf56i8HIt3yE+3yQn61HBC7j1STZh73uOgnl6hyejgiG1imtXFO7x1GBOMwl6J/oxRlkGK61SdL8ZBBLyc09fFdFYC+EMlgvWHO3rT7dh3H9wJazFIHak+LGUve/cakwJ"

s3:
  aws:
    accesskey: "{cipher}AQBBpTDKAfl0aXPYmPzSfphzKw2baCH4WooqgsGidwc9ypAcOqqIB03v3qpoqrxYuZkA4fD/7NiYNvLizgK5J85O/ILHYwtTPVbunH4D4qDo8gJjz18UO02V/shE1N+qyNXVAEm/5uGA0UiOqf8BsZtnIAFinSSImPd7vOFzBf8gGgbrBDMniPNH/QdN+lmwCXCv1KtbiZUQ2QndkUn93AotgdClQopxzysX7FFxleqJjddooFm6TWRKUURR7FBRMwS+x8h1m0PpwrjyZYgxjv+gmnxXCIUQNX/qVec/TA6KLOYzw8ePqPx0kDveoQuFLzEtoO3CcgKIQQHpIR+ZxuX7z52S7HSoqtDq2tyDVJTrSAF4PnJAZVw7G1BR87VFbMhqxgMey58o7C0FVh3oSnxD"
    secretkey: "{cipher}AQB8X7MBmqVQpnMRqJCK9o2FDcgOnstV8GzL8gbqk+l2a1UPnQVPKjdUVvzpN7IbLxlSMY1Tac/+CzAEfnHaF0uJUkV0xO/SxlvYvYGg3XUT9XCLjosb3AiulFRuADarWChHDIc4XdHDPoPvvRqS+ui5pfi+ySN5HUjkr+R/cn+5b8xE7xXeSespZeiH/iJOnXWkwmVVQN6pc3UCDkQKmIVZly2mfuynJJvpsAKnY9QBalsAsDt8S+ycw2kRjSsZryK/Qx/U9swFN8TBQMgxCkA8HxpODAwV+CsV0WSceY5wbUR7gbzvoO9P35u5hxgy8kP5q6XbDtx+0H3PjK6W3BM3KhU0uqJGHdt5e9dFzcQnT2b+JsBomoBSQZhTpvCde1pyWft9jreLT3185QwPRFURpELBvfAL2wY3Or2OgThQZw=="
    bucketname: ficket-event-content
    region: ap-northeast-2
  kms: "{cipher}AQB998ZzYPTlUIQSU/7/TSYldmrSc7VfnGl3EAlTPNswxluUvub0dYG0wmRostcrbjrIstUzbPoT5cuRWq78S2AFEt6uNinWDsnHUdeu3wzh72ETvbiBS4tlgRSANFzQZ6pc5Eq7zruHoRjv7F9oc0+ukmSuMBalCtu+FK9WoDgcBO4p7+LoirnIO9/ipdWH54IJwB25AUbCK+SSWtYbgrzQg81akBlBk9U63DE2PENoT2tfF5G/lnpe3sep2007bTWeeH0OYDhnh+9o8jWcwKLdsi+CtrmtOAGhdYPfS3R0iELNZ3sMHr+g8FmeQndEvd1MpskVHv00BujtcUEgyHky9L5q7wdpgo5okCWpo26/Y9i8CR3ocb0bPqwS6w/gsP55OMbNmdLQP42atLrGkoX5+kAExPO8OFnuq3A6EWJSGqOBiktyxLHUru3+U++zOmpzDIk1xl3ZFP/z+DMkIESrOeG7wwFHn90i3gVD7WcK0Q=="
    

주요 기능

1) S3 파일 업로드

upload_file_to_s3 함수는 파일을 고유한 이름으로 저장하고, S3 URL을 반환합니다.

import boto3
import os
import uuid

def get_s3_client():
    """환경 변수에서 AWS 자격 증명을 동적으로 가져와 S3 클라이언트 생성"""
    AWS_ACCESSKEY = os.getenv("AWS_ACCESSKEY")
    AWS_SECRETKEY = os.getenv("AWS_SECRETKEY")
    AWS_REGION = os.getenv("AWS_REGION")

    if not AWS_ACCESSKEY or not AWS_SECRETKEY or not AWS_REGION:
        raise ValueError("AWS credentials are missing! Please check environment variables.")

    return boto3.client(
        "s3",
        aws_access_key_id=AWS_ACCESSKEY,
        aws_secret_access_key=AWS_SECRETKEY,
        region_name=AWS_REGION
    )


def upload_file_to_s3(file, folder="faces"):
    """S3에 파일 업로드 (putObject 사용)"""

    s3 = get_s3_client()

    BUCKET_NAME = os.getenv("AWS_BUCKETNAME")
    KMS_KEY_ID = os.getenv("AWS_KMS")

    file_extension = file.filename.split('.')[-1]
    unique_filename = f"{folder}/{uuid.uuid4().hex}.{file_extension}"

    # 파일 내용을 바이트 스트림으로 읽음
    file_content = file.read()

    # put_object를 사용해 업로드
    s3.put_object(
        Bucket=BUCKET_NAME,
        Key=unique_filename,
        Body=file_content,
        ContentType=file.content_type,
    )

    file_url = f"https://{BUCKET_NAME}.s3.amazonaws.com/{unique_filename}"
    return file_url

2) 얼굴 임베딩 생성

얼굴 임베딩(특징 벡터)은 사용자의 얼굴을 벡터화하여 고유한 숫자 값으로 표현한 것입니다. 이 벡터를 활용하면 사용자의 얼굴을 비교하여 동일 인물 여부를 판단할 수 있습니다. Ficket에서는 InsightFace 라이브러리의 FaceAnalysis 객체를 사용하여 얼굴 임베딩을 생성합니다.

1. FaceAnalysis 객체 초기화

from insightface.app import FaceAnalysis
import cv2
import numpy as np
from config import logger

face_analyzer = None

def get_face_analyzer():
    global face_analyzer
    if face_analyzer is None:
        logger.info("Loading FaceAnalysis model... (CPU Mode)")
        face_analyzer = FaceAnalysis(
            name='buffalo_l',
            allowed_modules=['detection', 'recognition'],
            providers=['CPUExecutionProvider']
        )
        face_analyzer.prepare(ctx_id=-1)
        face_analyzer.det_size = (640, 640)
    return face_analyzer
  • buffalo_l 모델을 사용하여 얼굴 탐지 및 인식 기능을 활성화합니다.
  • providers=['CPUExecutionProvider'] 설정을 통해 CPU 모드로 실행합니다.
  • face_analyzer.det_size = (640, 640) 설정을 통해 얼굴 감지 영역을 최적화합니다.

2. 이미지 전처리

def preprocess_image(image_data):
    try:
        img_array = np.frombuffer(image_data, np.uint8)
        img = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
        if img is None:
            logger.error("Failed to decode image")
            return None
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img = cv2.resize(img, (112, 112))
        return img
    except Exception as e:
        logger.error(f"Error in preprocess_image: {str(e)}")
        return None
  • 바이트 데이터를 RGB 이미지로 변환합니다.
  • InsightFace 모델이 요구하는 입력 크기인 112x112로 이미지 크기를 조정합니다.

3. 얼굴 임베딩 생성

def get_face_embedding(image_data):
    try:
        img = preprocess_image(image_data)
        if img is None:
            return None

        face_analyzer = get_face_analyzer()
        faces = face_analyzer.get(img)

        if not faces:
            logger.warning("No face detected.")
            return None

        embedding = faces[0].embedding
        return embedding / np.linalg.norm(embedding)
    except Exception as e:
        logger.error(f"Error extracting face embedding: {str(e)}")
        return None
  • face_analyzer.get(img)을 사용하여 이미지에서 얼굴을 감지합니다.
  • 감지된 얼굴의 임베딩(벡터)을 추출하고 L2 정규화하여 크기를 1로 맞춥니다.

3) 벡터 암호화 및 복호화

AES CBC 모드를 사용해 얼굴 임베딩 데이터를 안전하게 암호화 및 복호화합니다.

import base64
import numpy as np
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
import os

def get_secret_key():
    """환경 변수에서 AES 키를 가져오고 검증"""
    secret_key_env = os.getenv("ENCRYPTION_SECRET_KEY")

    if secret_key_env is None:
        raise ValueError("ENCRYPTION_SECRET_KEY is not set. Please check environment variables.")

    secret_key = secret_key_env.encode()  # 바이트 변환

    if len(secret_key) not in [16, 24, 32]:
        raise ValueError("Invalid AES key length! Must be 16, 24, or 32 bytes.")

    return secret_key

def encrypt_vector(embedding):
    secret_key = get_secret_key()
    data = embedding.tobytes()
    cipher = AES.new(secret_key, AES.MODE_CBC)
    ct_bytes = cipher.encrypt(pad(data, AES.block_size))
    iv = base64.b64encode(cipher.iv).decode('utf-8')
    ct = base64.b64encode(ct_bytes).decode('utf-8')
    return iv + ct

def decrypt_vector(encrypted_data):
    secret_key = get_secret_key()
    iv = base64.b64decode(encrypted_data[:24])
    ct = base64.b64decode(encrypted_data[24:])
    cipher = AES.new(secret_key, AES.MODE_CBC, iv)
    decrypted_data = unpad(cipher.decrypt(ct), AES.block_size)
    return np.frombuffer(decrypted_data, dtype=np.float32)

API

1) 얼굴 등록 API (/upload)

얼굴 이미지를 분석하여 생성된 임베딩 데이터를 암호화하여 DB에 저장하고, S3에 이미지를 저장합니다.

@api.route("/upload")
class UploadFace(Resource):
    @api.expect(file_upload_parser)
    def post(self):
        args = file_upload_parser.parse_args()
        file = request.files.get("file")
        event_schedule_id = args.get("event_schedule_id")

        if not file:
            return ResponseSchema.make_response(400, "File not provided.")

        image_data = file.read()
        embedding = get_face_embedding(image_data)
        if embedding is None:
            return ResponseSchema.make_response(400, "No face detected.")

        encrypted_embedding = encrypt_vector(embedding, secret_key)
        file.seek(0)
        file_url = upload_file_to_s3(file)

        new_face = Face(
            vector=encrypted_embedding,
            face_img=file_url,
            ticket_id=None,
            event_schedule_id=event_schedule_id
        )
        db.session.add(new_face)
        db.session.commit()

        return ResponseSchema.make_response(200, "Face uploaded successfully.", {
            "faceId": new_face.face_id,
            "faceUrl": new_face.face_img
        })

2) 얼굴 매칭 API (/match)

업로드된 이미지를 벡터화하고 데이터베이스에 저장된 암호화된 벡터를 복호화하여 매칭 여부를 확인합니다.

@api.route("/match")
class MatchFace(Resource):
    @api.expect(match_parser)
    def post(self):
        args = match_parser.parse_args()
        file = request.files.get("file")
        event_schedule_id = args.get("event_schedule_id")

        if not file or not event_schedule_id:
            return ResponseSchema.make_response(400, "File or event_schedule_id missing.")

        image_data = file.read()
        embedding = get_face_embedding(image_data)
        if embedding is None:
            return ResponseSchema.make_response(400, "No face detected.")

        faces = Face.query.filter_by(event_schedule_id=event_schedule_id).all()
        if not faces:
            return ResponseSchema.make_response(404, "No faces found for the event schedule.")

        max_similarity = -1
        best_match = None
        for face in faces:
            decrypted_embedding = decrypt_vector(face.vector, secret_key)
            similarity = cosine_similarity(embedding, decrypted_embedding)
            if similarity > max_similarity:
                max_similarity = similarity
                best_match = {
                    "face_id": face.face_id,
                    "face_img": face.face_img,
                    "ticket_id": face.ticket_id,
                    "event_schedule_id": face.event_schedule_id,
                    "similarity": float(similarity),
                }

        threshold = 0.4
        if best_match and max_similarity > threshold:
            return ResponseSchema.make_response(200, "Face match found.", best_match)
        else:
            return ResponseSchema.make_response(404, "No matching face found.")

Reference

0개의 댓글