명함 OCR 파이프라인 프로젝트 포트폴리오

data_hamster·2025년 4월 18일
0

1. 프로젝트 개요 및 목표

  • 목표: 명함 이미지에서 OCR을 통해 기업 및 담당자 정보를 추출하고, 이를 정제하여 데이터베이스에 저장하는 자동화 파이프라인 구축
  • 설명: Google Drive에서 수집된 명함 PDF를 처리하여 회사명, 이름, 직함, 부서, 주소 등의 데이터를 추출하고 OpenSearch에 저장함으로써, 기존 수작업으로 처리되던 명함 데이터를 자동화하여 비즈니스 효율성을 극대화하고자 함
  • 사용 사례: 영업팀이 매일 수십 장의 명함을 수작업으로 입력하던 프로세스를 자동화하여 하루 2시간 이상의 작업 시간을 절약

2. 기술 스택 및 아키텍처

  • 기술 스택:

    • Google Document AI: 명함 OCR 처리 및 모델 미세조정
    • Ollama LG exaone3.5:32b: 주소 및 이름의 자연어 처리
    • Airflow: 작업 스케줄링 및 파이프라인 관리
    • Scala-Spark: 대량 데이터 처리 및 OpenSearch 인덱싱
    • 공공 API: 주소 세분화 및 유효성 검사
    • 무료 라이브러리: 전화번호 정제 및 포맷팅
  • 아키텍처:

    • 데이터 흐름:

      Google Drive (명함 PDF 업로드)  
      → Google Document AI (OCR로 텍스트 추출)  
      → Ollama LG exaone3.5:32b (주소 및 이름 처리)  
      → 공공 API (주소 파싱)  
      → Airflow (일일 작업 스케줄링)  
      → Scala-Spark (데이터 정제 및 변환)  
      → OpenSearch (최종 저장 및 검색 가능 구조화)  
    • 예시 시나리오:

      • 영업 담당자가 Google Drive에 50장의 명함 PDF를 업로드하면, 하루 이내에 모든 데이터가 OpenSearch에 저장되어 서비스에서 검색 가능

3. 프로젝트의 주요 구성 요소

3.1 OCR 처리

  • 설명: Google Document AI를 활용해 명함 PDF에서 텍스트를 추출하며, 한글과 영문 필드를 분리 처리

  • 모델 성과: 기본 F1 점수 0.82에서 미세조정 후 0.883으로 향상

  • 추출 필드: 회사명, 이름, 직함, 부서, 주소, 전화번호, 이메일 등

  • 예시:

    • 입력:

      • 양면 명함: 앞면(한글), 뒷면(영문)
    • 출력:

      {
        "회사명": "삼성전자",
        "이름": "김영수",
        "직함": "과장",
        "부서": "반도체 사업부",
        "주소": "경기도 수원시 영통구 삼성로 129",
        "전화번호": "+82-31-123-4567",
        "이메일": "youngsoo.kim@samsung.com"
      }

3.2 주소 처리

  • 설명: Ollama LG exaone3.5:32b로 주소를 분석하고, 공공 API로 행정구역(시/구/동)과 도로명 주소를 분리

  • 기능: 한국 주소인지 확인 후 중복 제거

  • 예시:

    • 입력: "서울특별시 강남구 테헤란로 123 5층"

    • 처리 후:

      {
        "시": "서울특별시",
        "구": "강남구",
        "도로명": "테헤란로 123",
        "상세주소": "5층"
      }

3.3 이름 처리

  • 설명: Ollama LG exaone3.5:32b로 한국식 이름을 성과 이름으로 분리

  • 예시:

    • 입력: "박지민"

    • 처리 후:

      {
        "성": "박",
        "이름": "지민"
      }

3.4 전화번호 처리

  • 설명: 무료 라이브러리로 전화번호를 국가번호와 번호로 분리, 유효성 검사

  • 예시:

    • 입력: "+821012345678"

    • 처리 후:

      {
        "국가번호": "+82",
        "전화번호": "10-1234-5678"
      }

3.5 워크플로우 관리 (Airflow)

  • 설명: Airflow를 통해 전체 파이프라인을 자동화하고, 작업 간 의존성을 관리하며, 오류 발생 시 알림 기능 제공

  • 세부 사항:

    • DAG 구성: OCR 처리 → 회사 설명 처리(웹 수집 후 정제) → 주소 처리 → 이름 처리 → 전화번호 처리 → OpenSearch 저장
    • 스케줄링: 매일 오전 8시 실행
    • 오류 처리: 실패한 작업에 대해 Slack 알림 및 재시도 설정
  • 예시:

    • Dynamic DAG 코드:

      from airflow import DAG
      from airflow.operators.python import PythonOperator
      from airflow.utils.task_group import TaskGroup
      from datetime import datetime, timedelta
      
      default_args = {
          'owner': 'airflow',
          'depends_on_past': False,
          'start_date': datetime(2023, 10, 1),
          'email_on_failure': True,
          'retries': 3,
          'retry_delay': timedelta(minutes=5),
      }
      
      dag = DAG(
          'business_card_pipeline',
          default_args=default_args,
          description='동적 명함 OCR 처리 파이프라인',
          schedule_interval='0 8 * * *',
          catchup=False,
      )
      
      def get_files_from_drive():
          # Google Drive API 호출 시뮬레이션
          return ["file1.pdf", "file2.pdf", "file3.pdf"]
      
      def process_file(file_name):
          print(f"Processing {file_name}")
          # 실제 OCR 및 데이터 처리 로직 호출
          return process_ocr_with_google_doc_ai(file_name)
      
      with TaskGroup('dynamic_processing', dag=dag) as tg:
          file_list = get_files_from_drive()
          for file in file_list:
              task = PythonOperator(
                  task_id=f'process_{file.replace(".", "_")}',
                  python_callable=process_file,
                  op_args=[file],
                  dag=dag,
              )
      
      # 후속 작업 연결
      save_task = PythonOperator(
          task_id='save_to_opensearch',
          python_callable=save_to_opensearch,
          dag=dag,
      )
      
      tg >> save_task

3.6 데이터 저장 (Scala-Spark)

  • 설명: Scala-Spark를 사용하여 대량의 명함 데이터를 병렬 처리하고, OpenSearch에 효율적으로 인덱싱

  • 세부 사항:

    • 데이터 변환: OCR 및 처리된 데이터를 Spark DataFrame으로 변환

    • 추가 기능:

      • Broadcast Joins: 주소 코드 테이블 등 소규모 데이터셋 활용 시 성능 개선
  • 예시:

    • Spark 코드:

      import org.apache.spark.sql.SparkSession
      import org.apache.spark.sql.functions._
      import org.elasticsearch.spark.sql._
      
      object BusinessCardProcessor {
        def main(args: Array[String]): Unit = {
          val spark = SparkSession.builder()
            .appName("BusinessCardProcessing")
            .config("spark.master", "local[*]")
            .config("spark.dynamicAllocation.enabled", "true")
            .config("spark.shuffle.service.enabled", "true")
            .getOrCreate()
      
          import spark.implicits._
      
          // UDF 정의: 주소 파싱
          val parseAddressUDF = udf((address: String) => {
            val response = ollamaModel.predict(address) // Ollama API 호출
            Map("시" -> response.city, "구" -> response.district, "도로명" -> response.road)
          })
      
          // 데이터 로드
          val rawData = spark.read.json("s3://business-cards/raw-data/")
      
          // 데이터 정제 및 변환
          val processedData = rawData
            .withColumn("parsed_address", parseAddressUDF(col("주소")))
            .withColumn("성", split(col("이름"), " ").getItem(0))
            .withColumn("이름", split(col("이름"), " ").getItem(1))
            .select(
              col("회사명"),
              col("성"),
              col("이름"),
              col("직함"),
              col("부서"),
              struct(col("parsed_address.*")).alias("주소"),
              col("전화번호"),
              col("이메일")
            )
      
          // 소규모 주소 코드 테이블 브로드캐스트
          val addressCodes = spark.read.parquet("s3://address-codes/")
          val broadcastCodes = broadcast(addressCodes)
      
          // 조인으로 주소 유효성 검사
          val enrichedData = processedData.join(broadcastCodes, Seq("시", "구"), "left_outer")
      
          // OpenSearch에 저장
          enrichedData.write
            .format("org.elasticsearch.spark.sql")
            .option("es.nodes", "localhost")
            .option("es.port", "9200")
            .option("es.resource", "business_cards/_doc")
            .mode("append")
            .save()
      
          spark.stop()
        }
      }
    • 성과:

      • 10,000건 데이터 처리 시 8분 (기존 25분 대비 3배 속도 향상)

4. 성과

  • 정확도 향상: F1 점수 0.82 → 0.883
  • 효율성: 수작업 대비 50% 시간 단축 (1시간 → 30분)
  • 데이터 품질: 주소 95%, 이름 98%, 전화번호 99% 구조화 성공률
  • 확장성: Spark와 Airflow 개선으로 일 10만 건 처리 가능

5. 기술적 도전과 해결책

  • 도전 1: 대량 데이터 처리 시 병목 현상
    • 해결: Spark Dynamic Allocation과 Broadcast Joins로 최적화
  • 도전 2: 동적 작업 관리의 복잡성
    • 해결: Airflow Dynamic Task Generation으로 유연성 확보

6. 코드 및 구현 세부 사항

  • OCR 처리 코드:

    def process_ocr_with_google_doc_ai(pdf_path):
        client = documentai.DocumentProcessorServiceClient()
        with open(pdf_path, "rb") as pdf_file:
            content = pdf_file.read()
        raw_document = documentai.RawDocument(content=content, mime_type="application/pdf")
        request = documentai.ProcessRequest(name=processor_name, raw_document=raw_document)
        result = client.process_document(request=request)
        return parse_ocr_result(result)

7. 시각화 및 결과

  • 성과 시각화:
    • 처리 속도: 1시간 → 30분
    • Spark 처리량: 1,000건/10분 → 10,000건/8분

8. 결론 및 향후 계획

  • 결론: Spark와 Airflow 개선으로 대규모 데이터 처리 및 운영 효율성 극대화
  • 향후 계획:
    • 실시간 스트리밍 처리 도입 (Kafka + Spark Streaming)
    • Airflow 풀링으로 다중 팀 지원 확장
profile
반갑습니다 햄스터 좋아합니다

0개의 댓글