RAG 사용하기

Apic·2025년 7월 25일
0

코딩

목록 보기
28/31

LLM 파인튜닝

저번에 작품(소설, 웹툰)의 정보를 토대로 작품의 태그를 만드는 LLM을 만들었었다.
LLM 파인튜닝

하지만 데이터의 양이 적어서 그런지 결과물이 썩 만족스럽지 못했다.

그래서 좀 찾다가 RAG라는 것을 발견했고 그 기술을 적용시켜보고자 한다.

RAG

RAG는 Retrieval-Augmented Generation의 약자다.
직역하면 검색 증강 생성이라고 한다.
그리고 이것을 한 마디로 표현하면 오픈북이라고 할 수 있다.

기존에 LLM(Large Language Model)은 학습되어있는 모델이다.
즉 학습되는 데이터가 정해져 있고 그것을 바탕으로 예측하는 것에 가깝다.

마치 백과사전을 통채로 암기하고 신조어를 설명하려고 하는 것과 같다.

그래서 학습 이후에 발생한 최신 정보나 특정 도메인에 국한된 전문 지식, 혹은 비공개 데이터에 대해서 한계가 있다.

그래서 내가 하려는 작업(태그 만드는 것)에 대해서 사실에 근거하지 않은 정보를 그럴듯하게 지어내는 환각 현상을 일으키거나 지나치게 일반적이고 피상적인 태그를 만들어 내었다.

왜 RAG인가

RAG는 LLM의 몇 가지 한게를 해결했다.

지식의 한계 극복

LLM은 앞에서 설명했듯이 기존 LLM은 과거의 데이터에 멈춰있다.
이것을 실시간 데이터와 연결하여 항상 최신 정보를 활용할 수 있게 된다.

환각 현상 방지

잘 모르는 내용에 대해 추측으로 답변하는 대신에 검증된 외부 데이터를 참고하게 하여 사실이 아닌 정보를 생성할 위험을 줄인다.

전문 지식 활용

개인 문서나 특정 분야의 개인화된 정보를 제공하여 특정 도메인에 특화된 전문가로 만들 수 있다.

비용 효율성

LLM을 파인튜닝 하는 것은 돈이 많이 드는 일이다.
하지만 그냥 참고 자료만 업데이트 하면 자연스럽게 최신 정보를 통해 답변을 생성하므로 훨씬 경제적이다.

RAG 원리

RAG는 크게 3가지 개념으로 구성된다.

검색(Retrieve)

데이터 수집 및 분할

AI가 참고할 모든 문서를 수집하고 그 문서들을 작은 단락이나 문단 단위로 잘게 나눈다.
이것을 청킹(Chunking) 이라고 하며 통째로 보는 것 보다 특정 주제대 대한 한두 단락을 찾는게 훨씬 효율적이기 때문에 이 방식을 사용한다.

임베딩

잘게 나눈 텐스트 조각들을 컴퓨터가 이해할 수 있는 숫자들의 배열, 죽 백터(Vector)로 변환한다.
이 과정을 임베딩(Embedding)이라고 한다.
임베딩의 핵심은 단순히 단어를 숫자로 바꾸는 것이 아니라, 텍스트의 의미를 숫자료 표현하는 것이다.
예를 들어 "자동차 수리 방법"과 "차량 정비 가이드"는 단어는 다르지만 의미가 비슷하므로 가까운 숫자 배열(백터)로 변환된다.

백터 DB 저장

이렇게 만들어진 수많은 백터들을 백터 데이터베이스라는 특수한 서랍장에 저장하고 색인을 생성한다.
이 DB는 특정 질문과 의미적으로 가장 유사한 텍스트 조각을 초고속으로 찾아주는 역할을 한다.

생성(Generate)

질문 검색

사용자가 "우리 회사 연차 정책 알려줘"라고 질문하면 RAG 시스템은 먼저 이 질문 백터와 가장 유사한 의미를 가진 문서 조각(백터)들을 몇개 찾아낸다.
이 경우 '연차', '휴가 규정' 등의 내용이 담긴 인사 규정 문서 조각들이 찾아질 것이다.

증강(Augment)

프롬프트 증강

이제 질문과 백터 DB에서 찾은 문서 조각들을 하나로 합쳐 새로운 프롬프트로 만든다.

"아래의 '참고 자로'를 바탕으로 '우리 회사 연차 정책이 뭐야?'라는 질문에 답해줘
[참고 자료]

  • 인사 규정 3.1조 ~~~
  • 휴가 사용 안내: ~~~

답변 생성

이런 프롬프트를 기반으로 최종 잡변을 생성한다.
자신의 기억에만 의존하지 않고 눈 앞에 놓인 정확한 자료를 참고하기 때문에 훨씬 구체적이고 신뢰도 높은 답변을 할 수 있게 된다.

파인튜닝 vs RAG

이 두개에 대한 비교는 아래 표를 보면 알 수 있다.

기준 (Criterion)검색 증강 생성 (RAG)미세 조정 (Fine-tuning)
데이터 최신성외부 DB 업데이트만으로 최신 정보 즉시 반영 가능새로운 정보를 반영하려면 모델 재학습 필요
환각 현상 방지검색된 근거 기반으로 답변하여 환각 억제에 매우 효과적모델 내재 지식에 의존하므로 환각 발생 가능성 존재
답변 근거 확인검색된 소스를 제시하여 답변의 출처와 신뢰성 확인 가능답변 생성 과정이 '블랙박스'로, 근거 추적이 어려움
도메인 특화특정 사실, 제품 정보 등 '사실적 지식' 주입에 유리특정 스타일, 어조, 행동 패턴 등 '암묵적 지식' 내재화에 유리
구현 비용 및 시간상대적으로 저렴하고 빠르게 구현 가능대규모 데이터 준비와 모델 학습에 많은 비용과 시간 소요
초기 데이터 요구사항라벨링되지 않은 데이터로도 지식 베이스 구축 가능대량의 고품질 라벨링 데이터 필수

구현하기

환경

구성버전
OSUbuntu 22.04
Python3.10.11
cuda12.1

코드

먼저 vector db를 만들었다.
db는 chroma를 사용했다.

백터 db 만드는 곳은 여기에서 확인할 수 있다.

import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
from sentence_transformers import SentenceTransformer
import pandas as pd
import json

# --- 전역 설정 ---
# 2.1절에서 선정한 한국어 특화 임베딩 모델
EMBEDDING_MODEL_ID = "dragonkue/snowflake-arctic-embed-l-v2.0-ko"
# 2.2절에서 선정한 7B 파라미터 한국어 LLM

LLM_MODEL_ID = "SEOKDONG/llama3.1_korean_v1.1_sft_by_aidx"
# LLM_MODEL_ID = "spow12/Ko-Qwen2-7B-Instruct"
# GPU 사용 가능 여부 확인
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

def load_embedding_model():
    """
    SentenceTransformer를 사용하여 임베딩 모델을 로드하고 GPU에 올립니다.
    """
    print(f"임베딩 모델({EMBEDDING_MODEL_ID})을 로드합니다...")
    model = SentenceTransformer(EMBEDDING_MODEL_ID, device=DEVICE)
    print("임베딩 모델 로드 완료.")
    return model

def load_llm():
    """
    4비트 양자화를 적용하여 대규모 언어 모델(LLM)과 토크나이저를 로드합니다.
    """
    print(f"LLM({LLM_MODEL_ID})을 로드합니다...")
    
    quantization_config = BitsAndBytesConfig(
        load_in_4bit=True,
        bnb_4bit_quant_type="nf4",
        bnb_4bit_compute_dtype=torch.bfloat16,
        bnb_4bit_use_double_quant=True,
    )

    model = AutoModelForCausalLM.from_pretrained(
        LLM_MODEL_ID,
        quantization_config=quantization_config,
        device_map="auto",
        torch_dtype=torch.bfloat16
    )
    
    tokenizer = AutoTokenizer.from_pretrained(LLM_MODEL_ID)
    
    # --- 수정된 부분 ---
    # 토크나이저에 pad_token이 정의되어 있지 않은 경우, eos_token으로 설정합니다.
    # 이는 일부 모델에서 발생할 수 있는 생성 오류 및 경고 메시지를 방지합니다.
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token
        model.config.pad_token_id = tokenizer.pad_token_id
    
    print("LLM 로드 완료.")
    return model, tokenizer

import chromadb

# --- ChromaDB 클라이언트 설정 ---
# 로컬에 영구적으로 데이터베이스를 저장하는 PersistentClient를 사용합니다.
# 데이터베이스 파일은 'chroma_db' 디렉토리에 저장됩니다.
CHROMA_PATH = "chroma_db"
client = chromadb.PersistentClient(path=CHROMA_PATH)

# 사용할 컬렉션을 가져오거나, 없으면 새로 생성합니다.
# 이 예제에서는 'webtoon_works'라는 이름의 컬렉션을 사용합니다.
COLLECTION_NAME = "work_collection"
collection = client.get_or_create_collection(name=COLLECTION_NAME)

def retrieve_context(query_text, embedding_model, n_results=10):
    """
    ChromaDB에서 주어진 쿼리 텍스트와 가장 유사한 n개의 문서를 검색합니다.
    """
    print(f"'{query_text[:30]}...'에 대한 컨텍스트를 검색합니다...")
    genre = query_text.split('\n')[-1].replace('장르: ', '')
    
    # 1. 쿼리 텍스트를 임베딩 벡터로 변환
    query_embedding = embedding_model.encode([query_text], show_progress_bar=False)
    
    # 2. query_embeddings를 사용하여 검색
    results = collection.query(
        query_embeddings=query_embedding.tolist(),
        n_results=n_results,
        # where={"genre": genre}
    )

    retrieved_docs = []
    if results and results['documents'] and results['documents'][0]:
        docs_list = results['documents'][0]
        meta_list = results['metadatas'][0]
        
        print('발견된 문서 개수: ', len(docs_list))
        
        for doc, meta in zip(docs_list, meta_list):
            retrieved_docs.append({
                "content": doc,
                "metadata": meta
            })
    
    print(f"{len(retrieved_docs)}개의 유사한 문서를 찾았습니다.")
    return retrieved_docs

def create_prompt(work_data, context_docs):
    """
    LLM에 전달할 최종 프롬프트 템플릿을 시스템-유저 메시지 형식으로 생성합니다.
    """
    context_str = ""
    for i, doc in enumerate(context_docs, 1):
        context_str += f"--- 유사 작품 {i} ---\n"
        context_str += f"내용: {doc['content']}\n"
        if doc.get('metadata') and doc['metadata'].get('tags'):
            context_str += f"기존 태그: {doc['metadata']['tags']}\n\n"
        else:
            context_str += "기존 태그: 정보 없음\n\n"
    
    messages = [
        {
            'role': 'system',
            'content': """당신은 디지털 도서관의 전문 콘텐츠 분석가이자 태그 전문가입니다.
당신의 임무는 주어진 작품의 정보와 유사 작품들의 컨텍스트를 참고하여, 3개에서 8개의 관련성 높고 통찰력 있는 태그를 생성하는 것입니다.

**지시사항:**
- 가장 중요한 규칙: 생성할 태그의 개수는 반드시 3개에서 8개 사이여야 합니다.
- 대체 지침: 만약 유사 작품 컨텍스트에서 명확한 태그를 찾기 어렵다면, 분석 대상 작품의 '줄거리'와 '제목'을 중심으로 핵심 키워드를 추출하여 태그를 만드십시오.
- 출력은 반드시 쉼표로 구분된 단일 라인의 태그 목록이어야 합니다. (예: 태그1,태그2,태그3)
- 어떠한 설명이나 서론, 번호 매기기도 포함하지 마십시오.
- 태그는 분석 대상 작품에 맞춤화되어야 하며, 작품 정보와 관련있는 단어여야 합니다.
- 태그에는 장르명, 플랫폼 명을 포함하지 마십시오."""
        },
        {
            'role': 'user',
            'content': f"""다음 작품에 대한 태그를 생성해 주세요.

<WORK_TO_TAG>
제목: {work_data['title']}
작가: {work_data['author']}
장르: {work_data['genre']}
줄거리: {work_data['summary']}
</WORK_TO_TAG>

<SIMILAR_WORKS_CONTEXT>
다음은 참고할 만한 유사 작품들의 정보입니다. 이 정보를 분석하여 공통된 주제, 스타일, 키워드를 파악하되, 태그를 그대로 복사하지는 마십시오.
{context_str}
</SIMILAR_WORKS_CONTEXT>

위의 모든 정보를 바탕으로 '{work_data['title']}' 작품에 가장 적합한 태그를 생성해 주십시오."""
        }
    ]
    
    return messages

import re

def generate_tags(work_data, llm, tokenizer, embedding_model, n_context=3):
    """
    주어진 작품 데이터에 대한 태그를 생성하는 전체 RAG 파이프라인을 실행합니다.
    """
    query_text = f"제목: {work_data['title']}\n작가: {work_data['author']}\n소개글: {work_data['summary']}\n장르: {work_data['genre']}"
    
    context_docs = retrieve_context(query_text, embedding_model, n_results=n_context)
    
    messages = create_prompt(work_data, context_docs)
    
    print("LLM을 통해 태그를 생성합니다...")
    
    # 메시지를 채팅 템플릿으로 변환
    prompt = tokenizer.apply_chat_template(
        messages, 
        tokenize=False, 
        add_generation_prompt=True
    )
    
    inputs = tokenizer(prompt, return_tensors="pt").to(DEVICE)
    
    outputs = llm.generate(
        **inputs,
        max_new_tokens=50,
        temperature=0.7,
        do_sample=True,
        pad_token_id=tokenizer.eos_token_id
    )
    
    input_length = inputs["input_ids"].shape[1]
    generated_tokens = outputs[0, input_length:]
    raw_tags = tokenizer.decode(generated_tokens, skip_special_tokens=True)
    
    print(f"생성된 원시 태그: '{raw_tags}'")
    
    # --- 개선된 태그 파싱 로직 ---
    # 정규표현식을 사용하여 단어(알파벳, 숫자, 한글)만 추출합니다.
    # 대괄호, 따옴표 등 불필요한 문자를 모두 제거합니다.
    cleaned_tags = re.findall(r'[\w가-힣]+', raw_tags)
    
    # 중복을 제거하고 빈 태그를 필터링합니다.
    tags = sorted(list(set(tag.strip() for tag in cleaned_tags if tag.strip())))
    
    return tags


if __name__ == "__main__":
    # 1. 모델 로딩
    embedding_model = load_embedding_model()
    llm, tokenizer = load_llm()
    
    df = pd.read_csv('datas/inference_data.csv', encoding='utf-8', low_memory=False)
    print(f'처리할 데이터 목록: {df.shape[0]}')
    
    df = df[['title', 'author', 'summary', 'genre', 'adult']]
    df_json = df.to_json(orient='records')
    datas = json.loads(df_json)
    
    generated_tags = []
    for data in datas:
        # 3. 태그 생성 파이프라인 실행  
        generated_tags.append(generate_tags(data, llm, tokenizer, embedding_model))
    
    # 4. 결과 출력
    print("\n--- 최종 생성된 태그 ---")
    print(generated_tags[:5])   
    df['tags'] = ''
    df['tags'] = generate_tags

    df.to_csv('datas/final_inference_data.csv', encoding='utf-8', index=False)

속도

하지만 문제가 하나 있었는데 바로 속도 문제다.

지금 내가 생성해야 하는 태그는 약 12만개다.
그리고 약 2시간 30분(9000초)동안 약 1200개의 작품을 처리했다.

이렇게 되면 모든 작품의 태그를 생성하는데 약 10일 ~ 11일이 걸리고 하나의 작품을 처리하는데 약 7.5초가 소모된다는 계산이 나왔다...

그래서 조금 더 속도를 줄일 방법을 찾아보았다.

배치 처리

현재 코드는 하나씩 작품을 처리하고 있다.
이것을 배치처리를 통해 처리하면 빨라진다.

동시 처리

개별 처리시 메모리 패턴

GPU 메모리 ← 데이터1 로드 → 처리 → 언로드
GPU 메모리 ← 데이터2 로드 → 처리 → 언로드  # 매번 메모리 I/O
GPU 메모리 ← 데이터3 로드 → 처리 → 언로드

배치 처리시 메모리 패턴

GPU 메모리 ← [데이터1,2,3...] 한번에 로드 → 동시 처리 → 한번에 언로드

일단 GPU와 CPU간의 데이터 전송에서 시간을 아낄 수 있다.
그리고 하나의 작품을 처리하는 것이 아니라 여러 작품을 동시에 처리하기 때문에 속도를 더 높일 수 있다.

오버해드 감소

하나씩 순차적으로 처리하게 되면 하나의 작품을 처리할 때마다 토크가니저, 모델 등등 함수를 호출하게 된다.
12만개의 작품을 처리한다고 하면 12만번의 함수를 호출하는 것이고, 이것은 속도에 영향을 미치게 된다.

# 개별 처리시 오버헤드
for prompt in prompts:
    # 매번 발생하는 오버헤드들:
    tokenizer.encode(prompt)        # 토크나이저 호출 오버헤드
    model.forward()                 # 모델 초기화 오버헤드  
    torch.cuda.synchronize()        # GPU 동기화 오버헤드
    memory_allocation()             # 메모리 할당 오버헤드

# 배치 처리시 오버헤드
tokenizer.encode(all_prompts)       # 1번만 호출
model.forward()                     # 1번만 초기화
torch.cuda.synchronize()            # 1번만 동기화
memory_allocation()                 # 1번만 할당

왜 이런 차이가 나는 것인가?

거기에는 GPU 아키텍처의 특성에 있다.

  • GPU는 SIMD(Single Instruction, Multiple Data) 방식
  • 같은 연산을 여러 데이터에 동시에 적용하는 것에 최적화
  • Transformer 모델의 어텐션 계산은 행렬 연산이 많아서 배치 처리에 매우 적합

하지만 배치 크기가 크면 그만큼 GPU의 메모리를 많이 차지하기 때문에 스와핑이 발생하여 오히려 느려질 수도 있다.

배치 사이즈 설정

위에서 배치 사이즈가 커지면 오히려 느려질 수도 있다고 했다.
그래서 배치 사이즈를 자동으로 정해주는 코드까지 추가했다.

최대로 사용 가능한 배치 사이즈의 80% 정도만 사용하는 것을 권장한다고 한다.

구현

해당 부분을 코드로 구현했다.

def generate_tags_batch(work_data_list, llm, tokenizer, embedding_model, n_context=3, batch_size=8):
    """
    배치 단위로 태그를 생성하여 처리 속도를 향상시킵니다.
    """
    all_tags = []
    total_batches = (len(work_data_list) + batch_size - 1) // batch_size
    
    for batch_idx in range(0, len(work_data_list), batch_size):
        batch = work_data_list[batch_idx:batch_idx+batch_size]
        current_batch_num = batch_idx // batch_size + 1
        
        print(f"\n=== 배치 {current_batch_num}/{total_batches} 처리 중 (크기: {len(batch)}) ===")
        
        # 1. 배치 단위로 검색 쿼리 생성
        query_texts = []
        for work_data in batch:
            query_text = f"제목: {work_data['title']}\n작가: {work_data['author']}\n소개글: {work_data['summary']}\n장르: {work_data['genre']}"
            query_texts.append(query_text)
        
        # 2. 배치 단위로 컨텍스트 검색
        batch_contexts = retrieve_context_batch(query_texts, embedding_model, n_results=n_context)
        
        # 3. 배치 단위로 프롬프트 생성
        batch_prompts = []
        for work_data, context_docs in zip(batch, batch_contexts):
            messages = create_prompt_compact(work_data, context_docs)
            prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
            batch_prompts.append(prompt)
        
        # 4. 배치 단위로 토크나이징
        inputs = tokenizer(
            batch_prompts, 
            return_tensors="pt", 
            padding=True, 
            truncation=True,
            max_length=1024  # 토큰 길이 제한
        ).to(DEVICE)
        
        print(f"배치 토크나이징 완료: {inputs['input_ids'].shape}")
        
        # 5. 배치 단위로 생성
        with torch.no_grad():
            outputs = llm.generate(
                **inputs,
                max_new_tokens=50,
                temperature=0.7,
                do_sample=True,
                pad_token_id=tokenizer.eos_token_id,
                num_beams=1,  # 빔 서치 비활성화로 속도 향상
                use_cache=True
            )
        
        print(f"배치 생성 완료: {outputs.shape}")
        
        # 6. 배치 결과 파싱
        for j, output in enumerate(outputs):
            # 입력 길이 계산 (각 프롬프트마다 다를 수 있음)
            input_length = len(tokenizer.encode(batch_prompts[j]))
            generated_tokens = output[input_length:]
            raw_tags = tokenizer.decode(generated_tokens, skip_special_tokens=True)
            
            # 태그 파싱
            cleaned_tags = re.findall(r'[\w가-힣]+', raw_tags)
            tags = sorted(list(set(tag.strip() for tag in cleaned_tags if tag.strip())))
            
            all_tags.append(tags)
            print(f"  작품 {j+1}: {batch[j]['title'][:20]}... -> {tags}")
        
        # GPU 메모리 정리
        torch.cuda.empty_cache()
        
        print(f"배치 {current_batch_num} 완료: {len(batch)}개 작품 처리")
    
    return all_tags

def find_optimal_batch_size(datas, llm, tokenizer, embedding_model):
    """
    GPU 메모리에 맞는 최적 배치 크기를 찾아서 반환합니다.
    """
    print("최적 배치 크기를 찾는 중...")
    
    # 테스트용 샘플 데이터 (처음 20개)
    test_data = datas[:20]
    max_successful_batch = 0
    best_throughput = 0
    performance_results = []
    
    for batch_size in [4, 8, 12, 16, 20, 24]:
        try:
            print(f"\n배치 크기 {batch_size} 테스트 중...")
            torch.cuda.empty_cache()  # 메모리 정리
            
            import time
            start_time = time.time()
            
            # 작은 배치로 테스트
            test_batch = test_data[:min(batch_size, len(test_data))]
            
            # 검색 쿼리 생성
            query_texts = []
            for work_data in test_batch:
                query_text = f"제목: {work_data['title']}\n작가: {work_data['author']}\n소개글: {work_data['summary']}\n장르: {work_data['genre']}"
                query_texts.append(query_text)
            
            # 컨텍스트 검색
            batch_contexts = retrieve_context_batch(query_texts, embedding_model, n_results=3)
            
            # 프롬프트 생성
            batch_prompts = []
            for work_data, context_docs in zip(test_batch, batch_contexts):
                messages = create_prompt_compact(work_data, context_docs)
                prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
                batch_prompts.append(prompt)
            
            # 토크나이징
            inputs = tokenizer(
                batch_prompts, 
                return_tensors="pt", 
                padding=True, 
                truncation=True,
                max_length=1024
            ).to(DEVICE)
            
            # GPU 메모리 사용량 체크
            if torch.cuda.is_available():
                memory_used = torch.cuda.memory_allocated() / 1024**3  # GB 단위
                memory_cached = torch.cuda.memory_reserved() / 1024**3
                print(f"  GPU 메모리 사용량: {memory_used:.2f}GB (캐시: {memory_cached:.2f}GB)")
            
            # 생성 테스트
            with torch.no_grad():
                outputs = llm.generate(
                    **inputs,
                    max_new_tokens=50,
                    temperature=0.7,
                    do_sample=True,
                    pad_token_id=tokenizer.eos_token_id,
                    num_beams=1,
                    use_cache=True
                )
            
            end_time = time.time()
            processing_time = end_time - start_time
            throughput = len(test_batch) / processing_time  # 초당 처리량
            
            print(f"  ✅ 성공! 처리 시간: {processing_time:.2f}초, 처리량: {throughput:.2f} 작품/초")
            
            # 성공한 배치 크기 기록
            max_successful_batch = batch_size
            performance_results.append({
                'batch_size': batch_size,
                'throughput': throughput,
                'processing_time': processing_time
            })
            
            if throughput > best_throughput:
                best_throughput = throughput
            
        except torch.cuda.OutOfMemoryError:
            print(f"  ❌ 배치 크기 {batch_size}: GPU 메모리 부족 (OOM)")
            break
        except Exception as e:
            print(f"  ❌ 배치 크기 {batch_size}: 오류 발생 - {str(e)}")
            break
    
    # 최적 배치 크기 계산 (최대 성공 배치 크기의 80%)
    if max_successful_batch > 0:
        optimal_batch_size = max(4, int(max_successful_batch * 0.8))
        
        print(f"\n=== 테스트 결과 ===")
        print(f"최대 성공 배치 크기: {max_successful_batch}")
        print(f"권장 배치 크기: {optimal_batch_size} (안전 마진 20% 적용)")
        
        # 성능 요약 출력
        if performance_results:
            print("\n성능 요약:")
            for result in performance_results:
                print(f"  배치 {result['batch_size']}: {result['throughput']:.2f} 작품/초")
        
        return optimal_batch_size
    else:
        print("\n❌ 모든 배치 크기에서 실패했습니다. 기본값 4를 사용합니다.")
        return 4

# 메인 코드에서 사용
if __name__ == "__main__":
    # 모델 로딩
    embedding_model = load_embedding_model()
    llm, tokenizer = load_llm()
    
    df = pd.read_csv('datas/inference_data.csv', encoding='utf-8', low_memory=False)
    df = df[['title', 'author', 'summary', 'genre', 'adult']]
    datas = json.loads(df.to_json(orient='records'))
    
    # 최적 배치 크기 자동 찾기
    optimal_batch_size = find_optimal_batch_size(datas, llm, tokenizer, embedding_model)
    
    print(f"\n🚀 실제 처리를 배치 크기 {optimal_batch_size}로 시작합니다...")
    
    generated_tags = generate_tags_batch(datas, llm, tokenizer, embedding_model, batch_size=optimal_batch_size)
    
    df['tags'] = generated_tags
    df.to_csv('datas/final_inference_data.csv', encoding='utf-8', index=False)
    
    print(f"\n✅ 태그 생성 완료! 총 {len(generated_tags)}개 작품 처리됨.")
    print("결과가 'datas/final_inference_data.csv'에 저장되었습니다.")

결과

그러면 이런식으로 작동하게 된다.

출력 LOG

배치 크기 4 테스트 중...
배치 크기 4에 대한 컨텍스트를 검색합니다...
배치 검색 완료: 4개 결과
  GPU 메모리 사용량: 7.44GB (캐시: 7.56GB)
A decoder-only architecture is being used, but right-padding was detected! For correct generation results, please set `padding_side='left'` when initializing the tokenizer.
  ✅ 성공! 처리 시간: 13.77초, 처리량: 0.29 작품/초

배치 크기 8 테스트 중...
배치 크기 8에 대한 컨텍스트를 검색합니다...
배치 검색 완료: 8개 결과
  GPU 메모리 사용량: 7.44GB (캐시: 7.64GB)
A decoder-only architecture is being used, but right-padding was detected! For correct generation results, please set `padding_side='left'` when initializing the tokenizer.
  ✅ 성공! 처리 시간: 17.92초, 처리량: 0.45 작품/초

배치 크기 12 테스트 중...
배치 크기 12에 대한 컨텍스트를 검색합니다...
배치 검색 완료: 12개 결과
  GPU 메모리 사용량: 7.44GB (캐시: 7.79GB)
A decoder-only architecture is being used, but right-padding was detected! For correct generation results, please set `padding_side='left'` when initializing the tokenizer.
  ✅ 성공! 처리 시간: 22.31초, 처리량: 0.54 작품/초

배치 크기 16 테스트 중...
배치 크기 16에 대한 컨텍스트를 검색합니다...
배치 검색 완료: 16개 결과
  GPU 메모리 사용량: 7.44GB (캐시: 7.89GB)
A decoder-only architecture is being used, but right-padding was detected! For correct generation results, please set `padding_side='left'` when initializing the tokenizer.
  ❌ 배치 크기 16: GPU 메모리 부족 (OOM)

=== 테스트 결과 ===
최대 성공 배치 크기: 12
권장 배치 크기: 9 (안전 마진 20% 적용)

성능 요약:
  배치 4: 0.29 작품/초
  배치 8: 0.45 작품/초
  배치 12: 0.54 작품/초

GPU 사용률

|=========================================+========================+======================|
|   0  NVIDIA GeForce RTX 3060        Off |   00000000:01:00.0  On |                  N/A |
| 59%   69C    P2            152W /  170W |   10689MiB /  12288MiB |     98%      Default |
|                                         |                        |                  N/A |
+-----------------------------------------+------------------------+----------------------+

원래 약 8GB 정도만 사용했는데 지금은 약 10GB로 대부분의 메모리를 잘 사용하는 것을 볼 수 있다.

백터 검색 최적화

두 번째 방법으로는 백터 검색 최적화가 있다.
현재 코드의 작동 순서는

작품 불러옴(1개) -> 이 작품과 관련된 작품을 검색 -> 해당 작품들의 데이터와 함께 LLM에 입력 -> 출력

이렇게 작동하고 있다.

예를 들어 100개의 작품에 대해 검색을 해야 할 때 미리 100개에 대한 작품들을 검색하고 미리 저장해 둔 다음에 그 데이터를 사용한다면 훨신 빠를 것이다.
왜냐하면 db를 한 번만 호출할 것이기 때문이다.

구현

def retrieve_context_batch_optimized(query_texts, embedding_model, n_results=3):
    """
    벡터 검색 최적화: 자동으로 최적 임베딩 배치 크기를 사용합니다.
    """
    print(f"배치 크기 {len(query_texts)}에 대한 최적화된 벡터 검색을 시작합니다...")
    
    # 1. 배치 단위로 모든 쿼리를 한번에 임베딩 생성
    query_embeddings = embedding_model.encode(
        query_texts, 
        show_progress_bar=False, 
        batch_size=64,  
        convert_to_tensor=False,
        normalize_embeddings=True
    )
    
    # 2. 배치 단위로 ChromaDB 검색 (한번에 모든 쿼리 처리)
    print("  배치 벡터 검색 실행 중...")
    try:
        # ChromaDB는 배치 검색을 지원하므로 모든 임베딩을 한번에 검색
        results = collection.query(
            query_embeddings=query_embeddings.tolist(),
            n_results=n_results
        )
        
        # 3. 결과 파싱 최적화
        all_results = []
        
        if results and results['documents']:
            # 각 쿼리별 결과 처리
            for i in range(len(query_texts)):
                retrieved_docs = []
                
                if i < len(results['documents']) and results['documents'][i]:
                    docs_list = results['documents'][i]
                    meta_list = results['metadatas'][i] if results['metadatas'] else [{}] * len(docs_list)
                    
                    # 결과를 딕셔너리로 변환
                    for doc, meta in zip(docs_list, meta_list):
                        retrieved_docs.append({
                            "content": doc,
                            "metadata": meta or {}
                        })
                
                all_results.append(retrieved_docs)
        else:
            # 검색 결과가 없는 경우 빈 리스트로 채우기
            all_results = [[] for _ in range(len(query_texts))]
    
    except Exception as e:
        print(f"  ⚠️ 배치 검색 실패, 개별 검색으로 대체: {str(e)}")
        # 배치 검색 실패시 개별 검색으로 폴백
        all_results = []
        for i, query_embedding in enumerate(query_embeddings):
            try:
                results = collection.query(
                    query_embeddings=[query_embedding.tolist()],
                    n_results=n_results
                )
                
                retrieved_docs = []
                if results and results['documents'] and results['documents'][0]:
                    docs_list = results['documents'][0]
                    meta_list = results['metadatas'][0] if results['metadatas'] else [{}] * len(docs_list)
                    
                    for doc, meta in zip(docs_list, meta_list):
                        retrieved_docs.append({
                            "content": doc,
                            "metadata": meta or {}
                        })
                
                all_results.append(retrieved_docs)
            except Exception as inner_e:
                print(f"    개별 검색 {i+1} 실패: {str(inner_e)}")
                all_results.append([])
    
    avg_docs = sum(len(r) for r in all_results) / len(all_results) if all_results else 0
    print(f"  배치 벡터 검색 완료: {len(all_results)}개 결과 (평균 {avg_docs:.1f}개 문서/쿼리)")
    return all_results

최종

import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
from sentence_transformers import SentenceTransformer
import pandas as pd
import json
import re
import time
from datetime import timedelta

# --- 전역 설정 ---
EMBEDDING_MODEL_ID = "dragonkue/snowflake-arctic-embed-l-v2.0-ko"
LLM_MODEL_ID = "SEOKDONG/llama3.1_korean_v1.1_sft_by_aidx"
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

# 전역 시작 시간
START_TIME = None

def format_elapsed_time(start_time):
    """
    경과 시간을 시:분:초 형식으로 포맷팅합니다.
    """
    elapsed = time.time() - start_time
    return str(timedelta(seconds=int(elapsed)))

def print_progress_header(batch_num, total_batches, batch_size):
    """
    진행 상황 헤더를 출력합니다.
    """
    elapsed_str = format_elapsed_time(START_TIME)
    print(f"\n====={elapsed_str} 진행중=====")
    print(f"=== 배치 {batch_num}/{total_batches} 처리 중 (크기: {batch_size}) ===")

def load_embedding_model():
    """
    SentenceTransformer를 사용하여 임베딩 모델을 로드하고 GPU에 올립니다.
    """
    print(f"임베딩 모델({EMBEDDING_MODEL_ID})을 로드합니다...")
    model = SentenceTransformer(EMBEDDING_MODEL_ID, device=DEVICE)
    print("임베딩 모델 로드 완료.")
    return model

def load_llm():
    """
    4비트 양자화를 적용하여 대규모 언어 모델(LLM)과 토크나이저를 로드합니다.
    """
    print(f"LLM({LLM_MODEL_ID})을 로드합니다...")
    
    quantization_config = BitsAndBytesConfig(
        load_in_4bit=True,
        bnb_4bit_quant_type="nf4",
        bnb_4bit_compute_dtype=torch.bfloat16,
        bnb_4bit_use_double_quant=True,
    )

    model = AutoModelForCausalLM.from_pretrained(
        LLM_MODEL_ID,
        quantization_config=quantization_config,
        device_map="auto",
        torch_dtype=torch.bfloat16,
        use_cache=True  # KV 캐시 활성화
    )
    
    tokenizer = AutoTokenizer.from_pretrained(LLM_MODEL_ID)
    
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token
        model.config.pad_token_id = tokenizer.pad_token_id
    
    # 패딩 방향을 left로 설정 (decoder-only 모델용)
    tokenizer.padding_side = 'left'
    
    # 모델을 평가 모드로 설정
    model.eval()
    
    print("LLM 로드 완료.")
    return model, tokenizer

import chromadb

# ChromaDB 클라이언트 설정
CHROMA_PATH = "chroma_db"
client = chromadb.PersistentClient(path=CHROMA_PATH)
COLLECTION_NAME = "work_collection"
collection = client.get_or_create_collection(name=COLLECTION_NAME)

# 전역 변수로 최적 배치 크기 저장
OPTIMAL_EMBEDDING_BATCH_SIZE = None

def get_optimal_embedding_batch_size(embedding_model, sample_texts):
    """
    전역 변수에 최적 임베딩 배치 크기를 캐시합니다.
    """
    global OPTIMAL_EMBEDDING_BATCH_SIZE
    
    if OPTIMAL_EMBEDDING_BATCH_SIZE is not None:
        return OPTIMAL_EMBEDDING_BATCH_SIZE
    
    print("임베딩 최적 배치 크기를 찾는 중...")
    
    for batch_size in [16, 32, 64, 96, 128, 160, 192, 224, 256]:
        try:
            print(f"  임베딩 배치 크기 {batch_size} 테스트 중...")
            torch.cuda.empty_cache()
            
            test_texts = sample_texts[:min(batch_size, len(sample_texts))]
            
            embeddings = embedding_model.encode(
                test_texts,
                show_progress_bar=False,
                batch_size=batch_size,
                convert_to_tensor=False,
                normalize_embeddings=True
            )
            
            OPTIMAL_EMBEDDING_BATCH_SIZE = batch_size
            print(f"    ✅ 성공!")
            
        except torch.cuda.OutOfMemoryError:
            print(f"    ❌ 임베딩 배치 크기 {batch_size}: GPU 메모리 부족")
            break
        except Exception as e:
            print(f"    ❌ 임베딩 배치 크기 {batch_size}: 오류")
            break
    
    # 안전 마진 적용 (최대 성공 크기의 80%)
    if OPTIMAL_EMBEDDING_BATCH_SIZE:
        OPTIMAL_EMBEDDING_BATCH_SIZE = max(16, int(OPTIMAL_EMBEDDING_BATCH_SIZE * 0.8))
    else:
        OPTIMAL_EMBEDDING_BATCH_SIZE = 32  # 기본값
    
    print(f"  최적 임베딩 배치 크기: {OPTIMAL_EMBEDDING_BATCH_SIZE}")
    return OPTIMAL_EMBEDDING_BATCH_SIZE

def retrieve_context_batch_optimized(query_texts, embedding_model, n_results=3):
    """
    벡터 검색 최적화: 자동으로 최적 임베딩 배치 크기를 사용합니다.
    """
    print(f"배치 크기 {len(query_texts)}에 대한 최적화된 벡터 검색을 시작합니다...")
    
    # 1. 배치 단위로 모든 쿼리를 한번에 임베딩 생성 (가장 큰 성능 향상)
    query_embeddings = embedding_model.encode(
        query_texts, 
        show_progress_bar=False, 
        batch_size=64,
        convert_to_tensor=False,
        normalize_embeddings=True
    )
    
    # 2. 배치 단위로 ChromaDB 검색 (한번에 모든 쿼리 처리)
    print("  배치 벡터 검색 실행 중...")
    try:
        # ChromaDB는 배치 검색을 지원하므로 모든 임베딩을 한번에 검색
        results = collection.query(
            query_embeddings=query_embeddings.tolist(),
            n_results=n_results
        )
        
        # 3. 결과 파싱 최적화
        all_results = []
        
        if results and results['documents']:
            # 각 쿼리별 결과 처리
            for i in range(len(query_texts)):
                retrieved_docs = []
                
                if i < len(results['documents']) and results['documents'][i]:
                    docs_list = results['documents'][i]
                    meta_list = results['metadatas'][i] if results['metadatas'] else [{}] * len(docs_list)
                    
                    # 결과를 딕셔너리로 변환
                    for doc, meta in zip(docs_list, meta_list):
                        retrieved_docs.append({
                            "content": doc,
                            "metadata": meta or {}
                        })
                
                all_results.append(retrieved_docs)
        else:
            # 검색 결과가 없는 경우 빈 리스트로 채우기
            all_results = [[] for _ in range(len(query_texts))]
    
    except Exception as e:
        print(f"  ⚠️ 배치 검색 실패, 개별 검색으로 대체: {str(e)}")
        # 배치 검색 실패시 개별 검색으로 폴백
        all_results = []
        for i, query_embedding in enumerate(query_embeddings):
            try:
                results = collection.query(
                    query_embeddings=[query_embedding.tolist()],
                    n_results=n_results
                )
                
                retrieved_docs = []
                if results and results['documents'] and results['documents'][0]:
                    docs_list = results['documents'][0]
                    meta_list = results['metadatas'][0] if results['metadatas'] else [{}] * len(docs_list)
                    
                    for doc, meta in zip(docs_list, meta_list):
                        retrieved_docs.append({
                            "content": doc,
                            "metadata": meta or {}
                        })
                
                all_results.append(retrieved_docs)
            except Exception as inner_e:
                print(f"    개별 검색 {i+1} 실패: {str(inner_e)}")
                all_results.append([])
    
    avg_docs = sum(len(r) for r in all_results) / len(all_results) if all_results else 0
    print(f"  배치 벡터 검색 완료: {len(all_results)}개 결과 (평균 {avg_docs:.1f}개 문서/쿼리)")
    return all_results

def create_prompt_compact(work_data, context_docs):
    """
    태그 생성 품질을 높인 프롬프트
    """
    context_str = ""
    for i, doc in enumerate(context_docs[:3], 1):
        content = doc['content'][:150] + "..." if len(doc['content']) > 150 else doc['content']
        context_str += f"작품{i}: {content}\n"
        if doc.get('metadata') and doc['metadata'].get('tags'):
            tags = doc['metadata']['tags'][:80] + "..." if len(doc['metadata']['tags']) > 80 else doc['metadata']['tags']
            context_str += f"태그예시: {tags}\n"
    
    messages = [
        {
            'role': 'system',
            'content': """당신은 웹소설 태그 전문가입니다. 작품의 핵심 특징을 나타내는 정확한 태그를 생성하세요.

규칙:
1. 정확히 3-8개의 태그만 생성
2. 쉼표로 구분하여 한 줄로 출력
3. 각 태그는 2-10글자 이내
4. 설명문이나 부가 설명 금지
5. 작품의 핵심 요소만 포함 (장르, 소재, 분위기, 캐릭터 특성 등)

예시 출력: 현대판타지,학원물,능력자,성장,액션"""
        },
        {
            'role': 'user',
            'content': f"""작품 정보:
제목: {work_data['title']}
작가: {work_data['author']} 
줄거리: {work_data['summary'][:400]}

참고 유사작품:
{context_str}

위 정보를 바탕으로 '{work_data['title']}'에 적합한 태그를 생성하세요."""
        }
    ]
    
    return messages

def generate_tags_batch(work_data_list, llm, tokenizer, embedding_model, n_context=3, batch_size=8):
    """
    배치 단위로 태그를 생성하여 처리 속도를 향상시킵니다.
    """
    all_tags = []
    total_batches = (len(work_data_list) + batch_size - 1) // batch_size
    
    for batch_idx in range(0, len(work_data_list), batch_size):
        batch = work_data_list[batch_idx:batch_idx+batch_size]
        current_batch_num = batch_idx // batch_size + 1
        
        # 진행 상황 헤더 출력
        print_progress_header(current_batch_num, total_batches, len(batch))
        
        # 1. 배치 단위로 검색 쿼리 생성
        query_texts = []
        for work_data in batch:
            query_text = f"제목: {work_data['title']}\n작가: {work_data['author']}\n소개글: {work_data['summary']}\n장르: {work_data['genre']}"
            query_texts.append(query_text)
        
        # 2. 최적화된 배치 벡터 검색
        batch_contexts = retrieve_context_batch_optimized(query_texts, embedding_model, n_results=n_context)
        
        # 3. 배치 단위로 프롬프트 생성
        batch_prompts = []
        for work_data, context_docs in zip(batch, batch_contexts):
            messages = create_prompt_compact(work_data, context_docs)
            prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
            batch_prompts.append(prompt)
        
        # 4. 배치 단위로 토크나이징
        inputs = tokenizer(
            batch_prompts, 
            return_tensors="pt", 
            padding=True, 
            truncation=True,
            max_length=1024  # 토큰 길이 제한
        ).to(DEVICE)
        
        print(f"  배치 토크나이징 완료: {inputs['input_ids'].shape}")
        
        # 5. 배치 단위로 생성
        with torch.no_grad():
            outputs = llm.generate(
                **inputs,
                max_new_tokens=30,
                temperature=0.3,
                do_sample=True,
                pad_token_id=tokenizer.eos_token_id,
                num_beams=1,  # 빔 서치 비활성화로 속도 향상
                use_cache=True,
                repetition_penalty=1.1,   # 반복 방지
                early_stopping=True       # 조기 종료
            )
        
        print(f"  배치 생성 완료: {outputs.shape}")
        
        # 6. 배치 결과 파싱
        for j, output in enumerate(outputs):
            # 입력 길이 계산 (각 프롬프트마다 다를 수 있음)
            input_length = len(tokenizer.encode(batch_prompts[j]))
            generated_tokens = output[input_length:]
            raw_tags = tokenizer.decode(generated_tokens, skip_special_tokens=True)
            
            # 태그 파싱
            cleaned_tags = re.findall(r'[\w가-힣]+', raw_tags)
            tags = sorted(list(set(tag.strip() for tag in cleaned_tags if tag.strip())))
            
            all_tags.append(tags)
            print(f"    작품 {j+1}: {batch[j]['title'][:20]}... -> {tags}")
        
        # GPU 메모리 정리
        torch.cuda.empty_cache()
        
        print(f"  배치 {current_batch_num} 완료: {len(batch)}개 작품 처리")
    
    return all_tags

def find_optimal_batch_size(datas, llm, tokenizer, embedding_model):
    """
    GPU 메모리에 맞는 최적 배치 크기를 찾아서 반환합니다.
    """
    print("최적 배치 크기를 찾는 중...")
    
    # 테스트용 샘플 데이터 (처음 20개)
    test_data = datas[:20]
    max_successful_batch = 0
    best_throughput = 0
    performance_results = []
    
    for batch_size in [4, 8, 12, 16, 20, 24]:
        try:
            print(f"\n배치 크기 {batch_size} 테스트 중...")
            torch.cuda.empty_cache()  # 메모리 정리
            
            start_time = time.time()
            
            # 작은 배치로 테스트
            test_batch = test_data[:min(batch_size, len(test_data))]
            
            # 검색 쿼리 생성
            query_texts = []
            for work_data in test_batch:
                query_text = f"제목: {work_data['title']}\n작가: {work_data['author']}\n소개글: {work_data['summary']}\n장르: {work_data['genre']}"
                query_texts.append(query_text)
            
            # 최적화된 컨텍스트 검색
            batch_contexts = retrieve_context_batch_optimized(query_texts, embedding_model, n_results=3)
            
            # 프롬프트 생성
            batch_prompts = []
            for work_data, context_docs in zip(test_batch, batch_contexts):
                messages = create_prompt_compact(work_data, context_docs)
                prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
                batch_prompts.append(prompt)
            
            # 토크나이징
            inputs = tokenizer(
                batch_prompts, 
                return_tensors="pt", 
                padding=True, 
                truncation=True,
                max_length=1024
            ).to(DEVICE)
            
            # GPU 메모리 사용량 체크
            if torch.cuda.is_available():
                memory_used = torch.cuda.memory_allocated() / 1024**3  # GB 단위
                memory_cached = torch.cuda.memory_reserved() / 1024**3
                print(f"  GPU 메모리 사용량: {memory_used:.2f}GB (캐시: {memory_cached:.2f}GB)")
            
            # 생성 테스트
            with torch.no_grad():
                outputs = llm.generate(
                    **inputs,
                    max_new_tokens=50,
                    temperature=0.7,
                    do_sample=True,
                    pad_token_id=tokenizer.eos_token_id,
                    num_beams=1,
                    use_cache=True
                )
            
            end_time = time.time()
            processing_time = end_time - start_time
            throughput = len(test_batch) / processing_time  # 초당 처리량
            
            print(f"  ✅ 성공! 처리 시간: {processing_time:.2f}초, 처리량: {throughput:.2f} 작품/초")
            
            # 성공한 배치 크기 기록
            max_successful_batch = batch_size
            performance_results.append({
                'batch_size': batch_size,
                'throughput': throughput,
                'processing_time': processing_time
            })
            
            if throughput > best_throughput:
                best_throughput = throughput
            
        except torch.cuda.OutOfMemoryError:
            print(f"  ❌ 배치 크기 {batch_size}: GPU 메모리 부족 (OOM)")
            break
        except Exception as e:
            print(f"  ❌ 배치 크기 {batch_size}: 오류 발생 - {str(e)}")
            break
    
    # 최적 배치 크기 계산 (최대 성공 배치 크기의 80%)
    if max_successful_batch > 0:
        optimal_batch_size = max(4, int(max_successful_batch * 0.8))
        
        print(f"\n=== 테스트 결과 ===")
        print(f"최대 성공 배치 크기: {max_successful_batch}")
        print(f"권장 배치 크기: {optimal_batch_size} (안전 마진 20% 적용)")
        
        # 성능 요약 출력
        if performance_results:
            print("\n성능 요약:")
            for result in performance_results:
                print(f"  배치 {result['batch_size']}: {result['throughput']:.2f} 작품/초")
        
        return optimal_batch_size
    else:
        print("\n❌ 모든 배치 크기에서 실패했습니다. 기본값 4를 사용합니다.")
        return 4

# 메인 코드에서 사용
if __name__ == "__main__":
    # 전역 시작 시간 설정
    START_TIME = time.time()
    
    print("🚀 RAG 기반 태그 생성 시스템을 시작합니다...")
    print(f"시작 시간: {time.strftime('%Y-%m-%d %H:%M:%S')}")
    
    # 모델 로딩
    embedding_model = load_embedding_model()
    llm, tokenizer = load_llm()
    
    df = pd.read_csv('datas/inference_data.csv', encoding='utf-8', low_memory=False)
    df = df[['title', 'author', 'summary', 'genre', 'adult']]
    df = df.head(100)
    datas = json.loads(df.to_json(orient='records'))
    
    print(f'처리할 데이터 목록: {len(datas)}개')
    
    # 임베딩 최적 배치 크기 미리 찾기
    sample_texts = [f"제목: {d['title']}\n작가: {d['author']}\n소개글: {d['summary']}\n장르: {d['genre']}" for d in datas[:20]]
    
    # LLM 최적 배치 크기 찾기
    optimal_llm_batch = find_optimal_batch_size(datas, llm, tokenizer, embedding_model)
    
    print(f"\n🚀 최적화 완료!")
    print(f"  임베딩 배치 크기: 64")
    print(f"  LLM 배치 크기: {optimal_llm_batch}")
    print(f"  실제 처리를 시작합니다...")
    
    generated_tags = generate_tags_batch(datas, llm, tokenizer, embedding_model, batch_size=optimal_llm_batch)
    
    df['tags'] = generated_tags
    df.to_csv('datas/final_inference_data.csv', encoding='utf-8', index=False)
    
    total_elapsed = format_elapsed_time(START_TIME)
    print(f"\n✅ 태그 생성 완료! 총 {len(generated_tags)}개 작품 처리됨.")
    print(f"총 소요 시간: {total_elapsed}")
    print("결과가 'datas/final_inference_data.csv'에 저장되었습니다.")

결과

100개의 작품을 처리하는데 약 4분 정도 걸렸다.
계산 결과 작품당 약 2.5초가 소모되는 것을 알았다.
단순히 약 3배 더 빨라진 것이다...

  • 약 1만개 하는데 4시간 걸림
profile
코딩 공부하는 사람

0개의 댓글