기술 스택:
아키텍처:
데이터 흐름:
Google Drive (명함 PDF 업로드)
→ Google Document AI (OCR로 텍스트 추출)
→ Ollama LG exaone3.5:32b (주소 및 이름 처리)
→ 공공 API (주소 파싱)
→ Airflow (일일 작업 스케줄링)
→ Scala-Spark (데이터 정제 및 변환)
→ OpenSearch (최종 저장 및 검색 가능 구조화)
예시 시나리오:
설명: Google Document AI를 활용해 명함 PDF에서 텍스트를 추출하며, 한글과 영문 필드를 분리 처리
모델 성과: 기본 F1 점수 0.82에서 미세조정 후 0.883으로 향상
추출 필드: 회사명, 이름, 직함, 부서, 주소, 전화번호, 이메일 등
예시:
입력:
출력:
{
"회사명": "삼성전자",
"이름": "김영수",
"직함": "과장",
"부서": "반도체 사업부",
"주소": "경기도 수원시 영통구 삼성로 129",
"전화번호": "+82-31-123-4567",
"이메일": "youngsoo.kim@samsung.com"
}
설명: Ollama LG exaone3.5:32b로 주소를 분석하고, 공공 API로 행정구역(시/구/동)과 도로명 주소를 분리
기능: 한국 주소인지 확인 후 중복 제거
예시:
입력: "서울특별시 강남구 테헤란로 123 5층"
처리 후:
{
"시": "서울특별시",
"구": "강남구",
"도로명": "테헤란로 123",
"상세주소": "5층"
}
설명: Ollama LG exaone3.5:32b로 한국식 이름을 성과 이름으로 분리
예시:
입력: "박지민"
처리 후:
{
"성": "박",
"이름": "지민"
}
설명: 무료 라이브러리로 전화번호를 국가번호와 번호로 분리, 유효성 검사
예시:
입력: "+821012345678"
처리 후:
{
"국가번호": "+82",
"전화번호": "10-1234-5678"
}
설명: Airflow를 통해 전체 파이프라인을 자동화하고, 작업 간 의존성을 관리하며, 오류 발생 시 알림 기능 제공
세부 사항:
예시:
Dynamic DAG 코드:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 10, 1),
'email_on_failure': True,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'business_card_pipeline',
default_args=default_args,
description='동적 명함 OCR 처리 파이프라인',
schedule_interval='0 8 * * *',
catchup=False,
)
def get_files_from_drive():
# Google Drive API 호출 시뮬레이션
return ["file1.pdf", "file2.pdf", "file3.pdf"]
def process_file(file_name):
print(f"Processing {file_name}")
# 실제 OCR 및 데이터 처리 로직 호출
return process_ocr_with_google_doc_ai(file_name)
with TaskGroup('dynamic_processing', dag=dag) as tg:
file_list = get_files_from_drive()
for file in file_list:
task = PythonOperator(
task_id=f'process_{file.replace(".", "_")}',
python_callable=process_file,
op_args=[file],
dag=dag,
)
# 후속 작업 연결
save_task = PythonOperator(
task_id='save_to_opensearch',
python_callable=save_to_opensearch,
dag=dag,
)
tg >> save_task
설명: Scala-Spark를 사용하여 대량의 명함 데이터를 병렬 처리하고, OpenSearch에 효율적으로 인덱싱
세부 사항:
데이터 변환: OCR 및 처리된 데이터를 Spark DataFrame으로 변환
추가 기능:
예시:
Spark 코드:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.elasticsearch.spark.sql._
object BusinessCardProcessor {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("BusinessCardProcessing")
.config("spark.master", "local[*]")
.config("spark.dynamicAllocation.enabled", "true")
.config("spark.shuffle.service.enabled", "true")
.getOrCreate()
import spark.implicits._
// UDF 정의: 주소 파싱
val parseAddressUDF = udf((address: String) => {
val response = ollamaModel.predict(address) // Ollama API 호출
Map("시" -> response.city, "구" -> response.district, "도로명" -> response.road)
})
// 데이터 로드
val rawData = spark.read.json("s3://business-cards/raw-data/")
// 데이터 정제 및 변환
val processedData = rawData
.withColumn("parsed_address", parseAddressUDF(col("주소")))
.withColumn("성", split(col("이름"), " ").getItem(0))
.withColumn("이름", split(col("이름"), " ").getItem(1))
.select(
col("회사명"),
col("성"),
col("이름"),
col("직함"),
col("부서"),
struct(col("parsed_address.*")).alias("주소"),
col("전화번호"),
col("이메일")
)
// 소규모 주소 코드 테이블 브로드캐스트
val addressCodes = spark.read.parquet("s3://address-codes/")
val broadcastCodes = broadcast(addressCodes)
// 조인으로 주소 유효성 검사
val enrichedData = processedData.join(broadcastCodes, Seq("시", "구"), "left_outer")
// OpenSearch에 저장
enrichedData.write
.format("org.elasticsearch.spark.sql")
.option("es.nodes", "localhost")
.option("es.port", "9200")
.option("es.resource", "business_cards/_doc")
.mode("append")
.save()
spark.stop()
}
}
성과:
OCR 처리 코드:
def process_ocr_with_google_doc_ai(pdf_path):
client = documentai.DocumentProcessorServiceClient()
with open(pdf_path, "rb") as pdf_file:
content = pdf_file.read()
raw_document = documentai.RawDocument(content=content, mime_type="application/pdf")
request = documentai.ProcessRequest(name=processor_name, raw_document=raw_document)
result = client.process_document(request=request)
return parse_ocr_result(result)