기본 병렬 처리

작업 성능 향상을 위해 병렬 처리를 사용하게 될 때가 있다.
Python은 병렬 처리에 관여하지 않고 Rust에서 시스템상의 모든 코어를 사용한다.

작업공간 생성 및 구조 확인

~/workspace$ mkdir parallel-compute && cd parallel-compute
~/workspace/parallel-compute$ python3 -m venv venv
~/workspace/parallel-compute$ source venv/bin/activate
~/workspace/parallel-compute$ pip install maturin fastapi uvicorn
~/workspace/parallel-compute$ maturin init
~/workspace/parallel-compute$ # 선택지 중 기본값인 PyO3 선택
~/workspace/parallel-compute$ # Cargo.toml과 src/lib.rs가 자동 생성된다
~/workspace/parallel-compute$ # Python 코드는 직접 생성해 주어야 한다
~/workspace/parallel-compute$ mkdir app && touch app/main.py
~/workspace/parallel-compute$ tree -I venv
.
├── app
│   └── main.py
├── Cargo.toml
├── pyproject.toml
└── src
    └── lib.rs

Cargo.toml 파일을 열어 라이브러리 이름을 수정해 주겠다.
병렬 처리를 위한 Rayon 크레이트도 추가해 준다.

Rayon이 사실상 기능적으로 완성되었을 때의 안정적인 버전인
Rayon 1.8 을 쓰는 프로젝트도 많다고 하는데
버전이 올라갈수록 효율적인 처리가 더 정교해지니
여기선 현 시점 가장 최신 버전을 사용하겠다.

Rayon은 하위 호환성을 매우 중요하게 생각하는 라이브러리라
버전에 따른 코드 상의 차이는 발생하지 않는다.

Cargo.toml

[package]
name = "parallel-compute"
version = "0.1.0"
edition = "2024"

[lib]
name = "rust_engine"
crate-type = ["cdylib"]

[dependencies]
pyo3 = "0.28.0"
rayon = "1.11"

코드 작성

Rust 코드

Rayon 라이브러릴 사용하기 위해 use 를 통해 그것을 불러와야 한다.

여기선 읽기 전용 데이터를 병렬 처리 하므로 Mutex가 필요하지 않아
iter()par_iter() 로 바꾸는 것만으로 손쉽게 병렬 처리를 할 수 있다.
data race 발생 가능성은 Rust에서 내부적으로 처리해준다.

Rayon은 파이썬 스레드가 아닌 별도의 OS 스레드를 사용하므로
GIL(Global Interpreter Lock)을 해제하여
Python으로 하여금 다른 작업을 수행할 수 있도록 하는 게 좋다.
해제하지 않아도 작동은 하지만 해제하는 편이 성능 효율성을 높일 수 있다.
GIL을 해제한 클로저 내부에서는 Python 객체에 접근할 수 없게 된다는 것만 알아두자.

과거에는 py.allow_threads 를 사용한 GIL 해제가 많이 쓰였지만
최신 PyO3에서는 py.detach() 를 사용한다.

src/lib.rs

use pyo3::prelude::*;
use rayon::prelude::*;

#[pyfunction]
fn heavy_parallel_compute(py: Python<'_>, data: Vec<i32>) -> PyResult<i64> {
    let sum = py.detach(|| {
        data.par_iter()
            .map(|&x| (x as i64).pow(2))
            .sum()
    });

    Ok(sum)
}

#[pymodule]
fn rust_engine(m: &Bound<'_, PyModule>) -> PyResult<()> {
    m.add_function(wrap_pyfunction!(heavy_parallel_compute, m)?)?;

    Ok(())
}

Python 코드

이번에도 정수 값을 담는 List 를 속성으로 가진 DataInput 클래스를 만들어 사용할 것이다.
이것은 데이터 검증 라이브러리 pydanticBaseModel 클래스를 상속받아 생성한다.

병렬 처리에 대한 연산은 Rust가 알아서 다 해주기 때문에
Python에서는 추가적으로 해줘야 할 건 없다.

app/main.py

from fastapi import FastAPI
from pydantic import BaseModel
import rust_engine

app = FastAPI()

class DataInput(BaseModel):
    numbers: list[int]

@app.post("/parallel-compute")
def parallel_compute(input_data: DataInput):
    result = rust_engine.heavy_parallel_compute(input_data.numbers)

    return {
        "result": result,
        "mode": "multi-core (Rayon)"
    }

빌드 및 실행

Maturin 라이브러리를 통해 Rust 코드를 Python에서 호출 가능한 형태로 컴파일한다.
병렬 처리가 포함된 코드는 성능 최적화를 위해 --release 를 붙여 컴파일한다.
컴파일 후 pip list 명령어를 사용해 보면 Cargo.toml 파일에 작성한 패키지 이름을 확인할 수 있다.

uvicorn 라이브러리를 통해 FastAPI를 실행한다.

~/workspace/parallel-compute$ maturin develop --release
~/workspace/parallel-compute$ uvicorn app.main:app --reload

curl 명령어 또는 브라우저를 통해 테스트를 해볼 수 있다.

이 예제의 경우 GET 메서드가 아닌 POST 메서드로 통신하므로
브라우저를 통한 테스트 시 Swagger UI를 사용해야 한다.
FastAPI의 경우 다음과 같은 주소로 Swagger UI가 내장되어 있다.

  • http://127.0.0.1:8000/docs

브라우저상에서는 대용량 데이터를 전달하기 어려우므로
테스트를 위해 클라이언트 파일을 사용한다.

sample-client.py

import requests
import time

# 1. 테스트 데이터 생성 (0부터 9,999,999까지의 리스트)
print("데이터 생성 중... (10,000,000개)")
large_data = list(range(10_000_000))

# 2. 서버 주소 및 데이터 설정
url = "http://127.0.0.1:8000/parallel-compute"
payload = {"numbers": large_data}

# 3. 시간 측정 시작
print("서버에 요청을 보냅니다...")
start_time = time.time()

# 4. POST 요청 전송
response = requests.post(url, json=payload)

# 5. 결과 출력
end_time = time.time()

if response.status_code == 200:
    print(f"✅ 성공! 결과값: {response.json()['result']}")
    print(f"⏱️ 총 소요 시간: {end_time - start_time:.4f} 초")
else:
    print(f"❌ 실패: {response.status_code}, {response.text}")
profile
Peter J Online Space - since July 2020 | 아무데서나 채용해줬으면 좋겠다

0개의 댓글