공유 자원을 다루는 병렬 처리

Pt J·2026년 3월 22일
post-thumbnail

공유 자원을 다루는 병렬 처리

지난 예제에는 읽기 전용 데이터를 공유하여
데이터에 동시에 접근해도 문제 없는 상황에서의 병렬 처리를 살펴 보았다.

이번에는 하나의 공유 자원에 동시에 데이터를 작성하려고 할 때
순서가 꼬이지 않게 하는 방법을 살펴보겠다.

작업공간 생성 및 구조 확인

~/workspace$ mkdir parallel-sync && cd parallel-sync
~/workspace/parallel-sync$ python3 -m venv venv
~/workspace/parallel-sync$ source venv/bin/activate
~/workspace/parallel-sync$ pip install maturin fastapi uvicorn
~/workspace/parallel-sync$ maturin init
~/workspace/parallel-sync$ # 선택지 중 기본값인 PyO3 선택
~/workspace/parallel-sync$ # Cargo.toml과 src/lib.rs가 자동 생성된다
~/workspace/parallel-sync$ # Python 코드는 직접 생성해 주어야 한다
~/workspace/parallel-sync$ mkdir app && touch app/main.py
~/workspace/parallel-sync$ tree -I venv
.
├── app
│   └── main.py
├── Cargo.toml
├── pyproject.toml
└── src
    └── lib.rs

Cargo.toml 파일을 열어 라이브러리 이름을 수정해 주겠다.
병렬 처리를 위한 Rayon 크레이트도 추가해 준다.

Cargo.toml

[package]
name = "parallel-sync"
version = "0.1.0"
edition = "2024"

[lib]
name = "rust_engine"
crate-type = ["cdylib"]

[dependencies]
pyo3 = "0.28.0"
rayon = "1.11"

코드 작성

Rust 코드

Rayon 크레이트를 사용하기 위해 use 를 통해 그것을 불러와야 한다.

그리고 여기서 중요한 건 ArcMutex 다.

  • Arc (Atomic Reference Counter)
    데이터의 주인이 여럿이라고 선언하는 안전한 복사본 생성기.
    스레드 사이를 자유롭게 넘나들 수 있게 해준다.
  • Mutex (Mutual Exclusion)
    혼자만 들어갈 수 있는 영역의 문을 잠그는 자물쇠.
    열려 있을 때 잠그고 들어가야만 그 영역의 연산을 수행할 수 있으며
    연산을 모두 마친 후에는 다른 스레드를 위해 다시 열고 나와야 한다.

src/lib.rs

use pyo3::prelude::*;
use rayon::prelude::*;
use std::sync::{Arc, Mutex};

#[pyfunction]
fn compute_with_shared_log(py: Python<'_>, data: Vec<i32>) -> PyResult<(i64, Vec<String>)> {
    let shared_logs = Arc::new(Mutex::new(Vec::new()));

    let total_sum: i64 = py.detach(|| {
        data.par_iter().map(|&x| {
            let val = x as i64;
            if val % 1_000_000 == 0 {
                let logs = Arc::clone(&shared_logs);
                let mut logs_lock = logs.lock().unwrap();
                logs_lock.push(format!("Thread processing value: {}", val));
            }
            val.pow(2)
        }).sum()
    });

    let final_logs = shared_logs.lock().unwrap().clone();
    Ok((total_sum, final_logs))
}

#[pymodule]
fn rust_engine(m: &Bound<'_, PyModule>) -> PyResult<()> {
    m.add_function(wrap_pyfunction!(compute_with_shared_log, m)?)?;

    Ok(())
}

Python 코드

이번에도 정수 값을 담는 List 를 속성으로 가진 DataInput 클래스를 만들어 사용할 것이다.
이것은 데이터 검증 라이브러리 pydanticBaseModel 클래스를 상속받아 생성한다.

병렬 처리에 대한 연산은 Rust가 알아서 다 해주기 때문에
Python에서는 추가적으로 해줘야 할 건 없다.

app/main.py

from fastapi import FastAPI
from pydantic import BaseModel
import rust_engine

app = FastAPI()

class DataInput(BaseModel):
    numbers: list[int]

@app.post("/shared-log-test")
def shared_log_test(data: DataInput):
    result, logs = rust_engine.compute_with_shared_log(data.numbers)

    return {
        "result": result,
        "worker_logs": logs,
        "log_count": len(logs)
    }

빌드 및 실행

Maturin 라이브러리를 통해 Rust 코드를 Python에서 호출 가능한 형태로 컴파일한다.
병렬 처리가 포함된 코드는 성능 최적화를 위해 --release 를 붙여 컴파일한다.
컴파일 후 pip list 명령어를 사용해 보면 Cargo.toml 파일에 작성한 패키지 이름을 확인할 수 있다.

uvicorn 라이브러리를 통해 FastAPI를 실행한다.

~/workspace/parallel-sync$ maturin develop --release
~/workspace/parallel-sync$ uvicorn app.main:app --reload

curl 명령어 또는 브라우저를 통해 테스트를 해볼 수 있다.

이 예제의 경우 GET 메서드가 아닌 POST 메서드로 통신하므로
브라우저를 통한 테스트 시 Swagger UI를 사용해야 한다.
FastAPI의 경우 다음과 같은 주소로 Swagger UI가 내장되어 있다.

  • http://127.0.0.1:8000/docs

브라우저상에서는 대용량 데이터를 전달하기 어려우므로
테스트를 위해 클라이언트 파일을 사용한다.

sample-client.py

import requests
import time

# 1. 테스트 데이터 생성 (0부터 9,999,999까지의 리스트)
print("데이터 생성 중... (10,000,000개)")
large_data = list(range(10_000_000))

# 2. 서버 주소 및 데이터 설정
url = "http://127.0.0.1:8000/shared-log-test"
payload = {"numbers": large_data}

# 3. 시간 측정 시작
print("서버에 요청을 보냅니다...")
start_time = time.time()

# 4. POST 요청 전송
response = requests.post(url, json=payload)

# 5. 결과 출력
end_time = time.time()

if response.status_code == 200:
    print(f"✅ 성공! 결과값: {response.json()['result']}")
    print("📄로그:");
    for log in response.json()['worker_logs']:
        print(f"> {log}")
    print(f"⏱️ 총 소요 시간: {end_time - start_time:.4f} 초")
else:
    print(f"❌ 실패: {response.status_code}, {response.text}")
profile
Peter J Online Space - since July 2020 | 아무데서나 채용해줬으면 좋겠다

0개의 댓글