
작업 성능 향상을 위해 병렬 처리를 사용하게 될 때가 있다.
Python은 병렬 처리에 관여하지 않고 Rust에서 시스템상의 모든 코어를 사용한다.
~/workspace$ mkdir parallel-compute && cd parallel-compute ~/workspace/parallel-compute$ python3 -m venv venv ~/workspace/parallel-compute$ source venv/bin/activate ~/workspace/parallel-compute$ pip install maturin fastapi uvicorn ~/workspace/parallel-compute$ maturin init ~/workspace/parallel-compute$ # 선택지 중 기본값인 PyO3 선택 ~/workspace/parallel-compute$ # Cargo.toml과 src/lib.rs가 자동 생성된다 ~/workspace/parallel-compute$ # Python 코드는 직접 생성해 주어야 한다 ~/workspace/parallel-compute$ mkdir app && touch app/main.py ~/workspace/parallel-compute$ tree -I venv . ├── app │ └── main.py ├── Cargo.toml ├── pyproject.toml └── src └── lib.rs
Cargo.toml 파일을 열어 라이브러리 이름을 수정해 주겠다.
병렬 처리를 위한 Rayon 크레이트도 추가해 준다.
Rayon이 사실상 기능적으로 완성되었을 때의 안정적인 버전인
Rayon 1.8 을 쓰는 프로젝트도 많다고 하는데
버전이 올라갈수록 효율적인 처리가 더 정교해지니
여기선 현 시점 가장 최신 버전을 사용하겠다.
Rayon은 하위 호환성을 매우 중요하게 생각하는 라이브러리라
버전에 따른 코드 상의 차이는 발생하지 않는다.
Cargo.toml[package] name = "parallel-compute" version = "0.1.0" edition = "2024" [lib] name = "rust_engine" crate-type = ["cdylib"] [dependencies] pyo3 = "0.28.0" rayon = "1.11"
Rayon 라이브러릴 사용하기 위해 use 를 통해 그것을 불러와야 한다.
여기선 읽기 전용 데이터를 병렬 처리 하므로 Mutex가 필요하지 않아
iter() 를 par_iter() 로 바꾸는 것만으로 손쉽게 병렬 처리를 할 수 있다.
data race 발생 가능성은 Rust에서 내부적으로 처리해준다.
Rayon은 파이썬 스레드가 아닌 별도의 OS 스레드를 사용하므로
GIL(Global Interpreter Lock)을 해제하여
Python으로 하여금 다른 작업을 수행할 수 있도록 하는 게 좋다.
해제하지 않아도 작동은 하지만 해제하는 편이 성능 효율성을 높일 수 있다.
GIL을 해제한 클로저 내부에서는 Python 객체에 접근할 수 없게 된다는 것만 알아두자.
과거에는 py.allow_threads 를 사용한 GIL 해제가 많이 쓰였지만
최신 PyO3에서는 py.detach() 를 사용한다.
src/lib.rsuse pyo3::prelude::*; use rayon::prelude::*; #[pyfunction] fn heavy_parallel_compute(py: Python<'_>, data: Vec<i32>) -> PyResult<i64> { let sum = py.detach(|| { data.par_iter() .map(|&x| (x as i64).pow(2)) .sum() }); Ok(sum) } #[pymodule] fn rust_engine(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_function(wrap_pyfunction!(heavy_parallel_compute, m)?)?; Ok(()) }
이번에도 정수 값을 담는 List 를 속성으로 가진 DataInput 클래스를 만들어 사용할 것이다.
이것은 데이터 검증 라이브러리 pydantic 의 BaseModel 클래스를 상속받아 생성한다.
병렬 처리에 대한 연산은 Rust가 알아서 다 해주기 때문에
Python에서는 추가적으로 해줘야 할 건 없다.
app/main.pyfrom fastapi import FastAPI from pydantic import BaseModel import rust_engine app = FastAPI() class DataInput(BaseModel): numbers: list[int] @app.post("/parallel-compute") def parallel_compute(input_data: DataInput): result = rust_engine.heavy_parallel_compute(input_data.numbers) return { "result": result, "mode": "multi-core (Rayon)" }
Maturin 라이브러리를 통해 Rust 코드를 Python에서 호출 가능한 형태로 컴파일한다.
병렬 처리가 포함된 코드는 성능 최적화를 위해 --release 를 붙여 컴파일한다.
컴파일 후 pip list 명령어를 사용해 보면 Cargo.toml 파일에 작성한 패키지 이름을 확인할 수 있다.
uvicorn 라이브러리를 통해 FastAPI를 실행한다.
~/workspace/parallel-compute$ maturin develop --release ~/workspace/parallel-compute$ uvicorn app.main:app --reload
curl 명령어 또는 브라우저를 통해 테스트를 해볼 수 있다.
이 예제의 경우 GET 메서드가 아닌 POST 메서드로 통신하므로
브라우저를 통한 테스트 시 Swagger UI를 사용해야 한다.
FastAPI의 경우 다음과 같은 주소로 Swagger UI가 내장되어 있다.
http://127.0.0.1:8000/docs브라우저상에서는 대용량 데이터를 전달하기 어려우므로
테스트를 위해 클라이언트 파일을 사용한다.
sample-client.pyimport requests import time # 1. 테스트 데이터 생성 (0부터 9,999,999까지의 리스트) print("데이터 생성 중... (10,000,000개)") large_data = list(range(10_000_000)) # 2. 서버 주소 및 데이터 설정 url = "http://127.0.0.1:8000/parallel-compute" payload = {"numbers": large_data} # 3. 시간 측정 시작 print("서버에 요청을 보냅니다...") start_time = time.time() # 4. POST 요청 전송 response = requests.post(url, json=payload) # 5. 결과 출력 end_time = time.time() if response.status_code == 200: print(f"✅ 성공! 결과값: {response.json()['result']}") print(f"⏱️ 총 소요 시간: {end_time - start_time:.4f} 초") else: print(f"❌ 실패: {response.status_code}, {response.text}")