
이 실습은 이전 실습에서 이어서 진행한다.
따라서 작업공간 생성 및 구조 확인은 생략한다.
조건에 맞는 데이터의 인덱스를 뽑아내는 실습을 해보겠다.
Rayon으로 병렬 필터링을 진행하며
enumerate() 로 인덱스와 데이터 쌍을 만들고
filter_map() 으로 조건에 맞는 인덱스만 수집한 뒤
결과 인덱스 배열을 Python 객체로 변환하여 반환한다.
src/lib.rsuse pyo3::prelude::*; use numpy::{PyArray1, PyReadonlyArray1, PyArrayMethods}; use rayon::prelude::*; #[pyfunction] fn zero_copy_sum(array: PyReadonlyArray1<f64>) -> f64 { let slice = array.as_slice().expect("Numpy 슬라이스를 가져오는 데 실패했습니다."); slice.par_iter().sum() } #[pyfunction] fn zero_copy_multiply(array: &Bound<'_, PyArray1<f64>>, factor: f64) { unsafe { let slice = array.as_slice_mut().expect("가변 슬라이스를 가져오는 데 실패했습니다."); slice.par_iter_mut().for_each(|x| *x *= factor); } } // 이상 기존 코드 #[pyfunction] fn zero_copy_filter(py: Python<'_>, array: PyReadonlyArray1<f64>, threshold: f64) ->Py<PyArray1<u64>> { let slice = array.as_slice().expect("Numpy 슬라이스를 가져오는 데 실패했습니다."); let indices: Vec<u64> = slice .par_iter() .enumerate() .filter_map(|(idx, &val)| { if val > threshold { Some(idx as u64) } else { None } }).collect(); PyArray1::from_vec(py, indices).unbind() } #[pymodule] fn rust_engine(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_function(wrap_pyfunction!(zero_copy_sum, m)?)?; m.add_function(wrap_pyfunction!(zero_copy_multiply, m)?)?; m.add_function(wrap_pyfunction!(zero_copy_filter, m)?)?; // NEW! Ok(()) }
app/main.pyimport numpy as np from fastapi import FastAPI from fastapi.responses import ORJSONResponse import rust_engine import time class UTF8ORJSONResponse(ORJSONResponse): media_type = "application/json; charset=utf-8" app = FastAPI(default_response_class=UTF8ORJSONResponse) @app.get("/") def read_root(): return { "status": "200", "info": "서버 가동 중입니다." } @app.get("/compute/{size}") def compute(size: int): data = np.random.rand(size) start = time.perf_counter() result = rust_engine.zero_copy_sum(data) end = time.perf_counter() rust_duration = end - start return { "size": len(data), "result": result, "rust_pure_time": f"Rust 연산에 걸린 시간: {rust_duration:.4f} sec" } @app.get("/multiply/{size}/{factor}") def multiply_inplace(size: int, factor: float): data = np.random.rand(size) start = time.perf_counter() result = rust_engine.zero_copy_multiply(data, factor) end = time.perf_counter() rust_duration = end - start return { "size": len(data), "factor": factor, "result_sample": data[:3].tolist(), "rust_pure_time": f"Rust 연산에 걸린 시간: {rust_duration:.4f} sec" } # 이상 기존 코드 @app.get("/filter/{size}/{threshold}") def filter_data(size: int, threshold: float): data = np.random.rand(size) start = time.perf_counter() indices = rust_engine.zero_copy_filter(data, threshold) end = time.perf_counter() rust_duration = end - start return { "original size": len(data), "filtered count": len(indices), "threshold": threshold, "sample_indices": indices[:5].tolist(), "rust_pure_time": f"Rust 연산에 걸린 시간: {rust_duration:.4f} sec" }
Maturin 라이브러리를 통해 Rust 코드를 Python에서 호출 가능한 형태로 컴파일한다.
병렬 처리가 포함된 코드는 성능 최적화를 위해 --release 를 붙여 컴파일한다.
컴파일 후 pip list 명령어를 사용해 보면 Cargo.toml 파일에 작성한 패키지 이름을 확인할 수 있다.
uvicorn 라이브러리를 통해 FastAPI를 실행한다.
~/workspace/zero-copy$ maturin develop --release ~/workspace/zero-copy$ uvicorn app.main:app --reload
curl 명령어 또는 브라우저를 통해 다음과 같은 테스트를 해볼 수 있다.
http://127.0.0.1:8000/filter/1000000000/0.9~$ curl -i http://127.0.0.1:8000/filter/1000000000/0.9 HTTP/1.1 200 OK date: Sat, 28 Mar 2026 00:09:11 GMT server: uvicorn content-length: 163 content-type: application/json; charset=utf-8 {"original size":1000000000,"filtered count":99997758,"threshold":0.9,"sample_indices":[8,36,39,56,57],"rust_pure_time":"Rust 연산에 걸린 시간: 0.1645 sec"}%값이 0.9보다 큰 데이터만 뽑아낼 때 평균적으로 전체의 10% 정도의 데이터가 추출된다.
10억 번의 조건 분기가 0.1초대에 완료되었다.