여러 웹서비스에 접근해야하는데, 응답을 기다리는 동안 연산이 블록되거나 CPU 자원을 낭비하지 않고, 비동기적으로 작업을 처리하고 싶다.
CompletableFuture: 비동기 작업 조합의 핵심 도구 → thenCombine, thenCompose 활용
앞 장들에서는 포크/조인 프레임워크와 병렬 스트림으로 간단하게 병렬 실행 가능
동시성 기술의 필요성
병렬성이 아니라 동시성을 필요로 하는 상황 = 조금씩 연관된 작업을 같은 CPU에서 동작하는 것 또는 애플리케이션을 생산성을 극대화할 수 있도록 코어를 바쁘게 유지하는 것이 목표라면, 원격 서비스나 데이터베이스 결과를 기다리는 스레드를 볼록함으로 연산 자원을 낭비하는 일은 피해야 한다.
=> Future 인터페이스로 CompletableFuture 구현, 플로 API
동시성 제어 기법들은 단일 스레드가 여러 작업을 번갈아 처리하는 상황뿐 아니라, 실제로 여러 스레드가 동시에 작업을 처리하는 멀티스레드 환경(즉, 병렬 처리)에서도 똑같이 적용되어 데이터의 일관성과 프로그램의 안정성을 보장
ex) 데이터 불일치, 경쟁 조건(race 컨디션), 데드락 같은 문제를 해결하는 기법
자바의 동시성 프로그래밍 지원은 하드웨어 발전(멀티코어 CPU 등)과 프로그래밍 패러다임 변화에 맞춰 단계적으로 발전해왔습니다. 주요 단계를 시대별로 정리하면 다음과 같습니다:
new Thread(() -> System.out.println("실행")).start();
ExecutorService executor = Executors.newFixedThreadPool(4);
Future<Integer> future = executor.submit(() -> 42);
Integer result = future.get(); // 블로킹 호출
executor.submit(task)
get()
호출 시 블로킹class SumTask extends RecursiveTask<Long> {
protected Long compute() {
// 작업 분할 및 병합 로직
}
}
list.parallelStream().map(...).collect(...); // 간단한 병렬 처리
CompletableFuture.supplyAsync(() -> "데이터")
.thenApplyAsync(s -> s + " 처리")
.thenAccept(System.out::println); // 논블로킹[3]
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
publisher.subscribe(new Subscriber<>() { ... });
✅ 1. 추상화 수준
도구 | 추상화 수준 | 관리 방식 |
---|---|---|
Thread/Runnable | Low-Level | 직접 스레드 생성/관리 |
ExecutorService | Mid-Level | 스레드 풀 자동 관리 |
Fork/Join | High-Level | 작업 분할 자동화 |
CompletableFuture | High-Level | 스레드 풀 + 비동기 파이프라인 |
Flow API (Reactive Streams) | Very High-Level | 비동기 데이터 흐름 자동 관리 (Backpressure 포함) |
✅ 2. 주요 사용 사례
도구 | 적합한 작업 유형 | 최적화 포인트 |
---|---|---|
Thread/Runnable | 간단한 단일 작업 | - |
ExecutorService | 독립적이고 균일한 다수 작업 | 스레드 재사용 |
Fork/Join | 재귀적 분할 가능한 대규모 작업 | 작업 도둑질 (Work-Stealing) |
CompletableFuture | 의존 관계가 있는 비동기 작업 | 체이닝(thenX), 예외 처리 |
Flow API (Reactive Streams) | 데이터 스트림 처리 (무한/이벤트 기반) | Backpressure, 연산자 기반 조합 |
✅ 3. 선택 가이드
상황 | 추천 도구 | 이유 |
---|---|---|
간단한 비동기 작업 | ExecutorService | 사용 편의성 |
균등한 작업 + 성능 중요 | ThreadPoolExecutor | 객체 생성 오버헤드 최소화 |
재귀적 분할 작업 | Fork/Join | Work-Stealing 최적화 |
I/O 집중 작업 | ExecutorService | 블로킹 대응에 유리 |
의존 관계 있는 비동기 연쇄 작업 | CompletableFuture | 직관적 체이닝 + 예외 처리 지원 |
데이터 스트림 + 반응형 UI/서버 | Flow API (Reactor, RxJava 등) | 이벤트 기반 설계 최적화, backpressure 대응 |
"동시성 처리는 이제 라이브러리 수준에서 추상화되어, 개발자는 비즈니스 로직에 집중할 수 있게 되었습니다."
이러한 변화들은 클라우드 환경과 대규모 분산 시스템 요구사항에 대응하기 위한 자바의 지속적인 진화를 보여준다.
1. 스레드의 문제점
2. 스레드 풀의 장점
3. 스레드 풀의 단점
스레드 풀 동작 원리
// 이 구조 덕분에, 과도한 스레드 생성 없이 효율적인 작업 처리가 가능
1. 작업 제출
2. 코어 스레드가 비어 있다면 → 즉시 실행
3. 코어 스레드가 모두 사용 중이라면
→ 작업 큐에 저장 (큐가 안 찼다면)
→ 큐가 가득 찼다면
→ 최대 스레드 수 초과 여부 판단
→ 초과 안 했으면 스레드 추가 생성
→ 초과했다면 거부 정책 발동 (예: 예외 발생)
Future.get()
으로 결과를 구조화된 방식으로 수신Future
를 반환하도록 설계return 문
으로 결과를 반환하는 것이 아니라 결과가 준비되면 이를 람다로 호출하는 태스크를 만드는 것CompletableFuture
조합 → java.util.concurrent.Flow
(발행-구독 모델)고려사항
병렬성을 극대화하기 위해 모든 함수를 Future 로 감싸려고 한다면, 시스템이 커지고 각자의 박스, 채널이 등장한다면 많은 task가 get() 호출해서 Future 끝나기를 기다리는 상황 => 하드웨어의 병렬성 저조 혹은 데드락 가능성
=> CompletableFuture와 콤비네이터로 문제 해결
목적: 동시성 프로그램을 추상화하여 구조화
핵심 요소
위험: 많은 task의 과도한 Future.get() 사용 → 병렬성 저하/데드락
해결책: CompletableFuture와 콤비네이터로 문제 해결
CompletableFuture와 thenCombine
을 이용한 비동기 작업 조합
기본 개념
Future
의 확장판으로, 작업 체이닝과 조합 기능을 제공문제 상황
CompletableFuture<Integer> a = new CompletableFuture<>();
executorService.submit(() -> a.complete(f(x)));
int b = g(x); System.out.println(a.get() + b); // ❌ a.get()에서 블로킹 발생
g(x)
는 동기 실행 → f(x)
와 병렬 처리되지 않음해결책: thenCombine
사용
CompletableFuture<Integer> a = CompletableFuture.supplyAsync(() -> f(x), executor);
CompletableFuture<Integer> b = CompletableFuture.supplyAsync(() -> g(x), executor);
// 결과를 추가하는 세 번째 연산c는 다른 두 작업이 끝날 때까지 실행되지 않는다.
CompletableFuture<Integer> c = a.thenCombine(b, (y, z) -> y + z);
System.out.println(c.get()); // ✅ 최종 결과만 블로킹
Java에서 Future
는 한 번 실행해서 결과를 한 번 반환하는 비동기 방식, 하지만 리액티브 프로그래밍은 시간이 지나면서 계속해서 데이터를 발행하고, 그 결과에 반응하는 프로그래밍이다.
즉, Future는 "한 방에 끝", 리액티브는 "계속 반응하면서 처리"
Java 9부터는 java.util.concurrent.Flow
API를 통해 이 개념을 지원하며, 발행-구독(Publisher-Subscriber) 모델을 기반
📈 발행-구독(Pub-Sub) 모델의 데이터 흐름 방향:
Publisher → Subscription → Subscriber
데이터 흐름 방향: 업스트림과 다운스트림
onNext(newValue)
notifyAllSubscribers()
예제
// 이를 통해 SimpleCell은 구독자 혹은 발행자가 될 수 있다.
public class SimpleCell implements Publisher<Integer>, Subscriber<Integer> {}
subscribe : 구독자 추가
onNext : 구독자들에게 전파
// Flow.Subscriber의 주요 메서드
void onSubscribe(Subscription subscription); // Subscription은 Publisher와 Subscriber 사이의 커넥션을 나타냅니다
@Override
public void onNext(Integer item) {
// 처리...
subscription.request(1); // request(n)을 호출해 몇 개를 받을지 명시
}
역압력 설계 시 고민할 점 ex) 실시간 처리 vs 중요한 로그 저장 등
리액티브 시스템은 애플리케이션 전체 구조나 아키텍처 차원의 개념
시스템이 외부 환경의 변화, 오류, 부하 증가에 잘 반응하고 견디도록 설계되어야 한다.
리액티브 프로그래밍은 데이터의 변화나 이벤트 흐름에 따라 자동으로 반응하는 코딩 방식
예를 들어, 버튼을 클릭하면 자동으로 이벤트 핸들러가 실행되거나, 실시간 데이터가 들어오면 UI가 즉시 업데이트되는 경우
✅ 코드 단위에서 비동기 흐름과 이벤트 기반 반응을 구현하는 기술입니다.
- 예:
RxJava
,Project Reactor
,Flow API (Java 9+)
- 기술 관점:
onNext
,subscribe
,stream
,event
등으로 표현됨- 역할: 데이터 흐름에 대한 반응형 처리
즉, 리액티브 프로그래밍은 구현 기술이고, 리액티브 시스템은 전체 시스템의 설계 철학이다.
하지만 단지 리액티브 프로그래밍을 썼다고 해서 전체 시스템이 리액티브 시스템이 되는 것은 아니다. 장애 대응, 부하 분산, 서비스 간 독립성, 모니터링까지 포함돼야 리액티브 시스템
Future란?
ExecutorService executor = Executors.newCachedThreadPool();
// 1. 비동기 작업 제출 (오래 걸리는 계산)
Future<Double> future = executor.submit(() -> doSomeLongComputation());
// 2. 다른 작업 수행
doSomethingElse();
try {
// 3. 최대 1초 동안 결과 기다림
Double result = future.get(1, TimeUnit.SECONDS);
} catch (TimeoutException e) {
// 타임아웃 처리
} catch (InterruptedException | ExecutionException e) {
// 예외 처리
}
💡 get() 메서드는 결과가 나올 때까지 현재 스레드를 블록시킴. 오래 걸리는 작업이 영원히 끝나지 않을 수 있는 문제가 있으므로 get 메서드를 오버로드해서 스레드가 대기할 최대 타임아웃 시간을 설정하는 것이 좋다.
Future
를 반환해 다른 작업을 병행할 수 있게 한다기본 패턴
public Future<Double> getPriceAsync(String product) {
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
new Thread(() -> {
double price = calculatePrice(product);
futurePrice.complete(price); // 계산 완료 시 결과 전달
}).start();
return futurePrice;
}
사용 예시
// 비동기로 처리되므로 즉시 Future 반환하고, 그 사이에 다른 작업 처리
Shop shop = new Shop("BestShop");
long start = System.nanoTime();
Future<Double> futurePrice = shop.getPriceAsync("my favorite product"); //제품 가격 요청
long invocationTime = ((System.nanoTime() - start) / 1_000_000);
//제품의 가격을 계산하는 동안
doSomethingElse();
//다른 상점 검색 등 작업 수행
try {
double price = futurePrice.get(); //가격정보를 받을때까지 블록
} catch (Exception e) {
throw new RuntimeException(e);
}
long retrievalTime = ((System.nanoTime() - start) / 1_000_000);
CompletableFuture
에서는 completeExceptionally()
로 비동기 내부 예외를 외부에 전달 가능public Future<Double> getPriceAsync(String product) {
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
new Thread(() -> {
try {
double price = calculatePrice(product);
futurePrice.complete(price);
} catch {
futurePrice.completeExceptionally(ex); //에러를 포함시켜 Future를 종료
}
}).start();
return futurePrice;
}
CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> calculatePrice("product"));
stream.map(...)
을 쓴다고 바로 실행되지 않고, 최종 연산(collect
, forEach
)이 있어야 실행됨.map
, filter
등)은 데이터를 바로 처리하지 않고, 파이프라인만 만든다.CompletableFuture.join()
은 결과를 기다리는 동기 메서드이다.CompletableFuture
의 작업이 완료될 때까지 기다림.예제
shops.stream()
.map(shop -> shop.getPrice(product))
.map(Discount::applyDiscount)
.collect(toList()); // 동기, 순차 처리
해결
병렬 스트림으로 요청 병렬화하기 : parallelStream()
CompletableFuture로 비동기 호출 구현하기 : CompletableFuture.supplyAsync
개선 : CompletableFuture를 사용한 결과는 순차 방식보단 빠르지만 병렬 스트림보단 느림
List<CompletableFuture<String>> priceFutures = shops.stream()
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenCompose(
quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)))
.collect(toList());
return priceFutures.stream()
.map(CompletableFuture::join)
.collect(toList());
중요
map(...).map(join)
형태로 사용하면 순차 처리됨CompletableFuture
로 비동기 작업 생성 → 나중에 join()
으로 결과 모음// 잘못된 접근
shop1.getPrice() → 기다림(join)
shop2.getPrice() → 기다림(join)
shop3.getPrice() → 기다림(join)
// 올바른(병렬) 처리
shop1.getPrice()
shop2.getPrice()
shop3.getPrice()
↓
기다림(join)
↓
결과 모음
CompletableFuture을 이용한 예제
하나의 주제(가격 정보 조회 및 처리) 안에서 점점 복잡한 기능을 단계적으로 확장
1단계: 기본적인 가격 조회 (동기, 순차적 처리)
n * (가격 + 할인)
시간)2단계: CompletableFuture
를 이용한 비동기 처리 (병렬화)
Quote
로 파싱join()
으로 한꺼번에 모음3단계: 독립적인 두 작업 합치기 (thenCombine
)
getPrice
와 getRate
는 서로 독립적이므로 동시에 실행 가능thenCombine
으로 서로 독립적인 비동기 작업을 동시에 실행한 두 결과를 결합하여 새로운 결과 생성4단계: 타임아웃 처리 (orTimeout
, completeOnTimeout
)
orTimeout
: 지정 시간 초과 시 예외 발생completeOnTimeout
: 시간 초과 시 기본값으로 자동 완료// # 1단계
shops.stream()
.map(shop -> shop.getPrice(product))
.map(Quote::parse)
.map(Discount::applyDiscount)
.collect(toList());
// # 2단계
List<CompletableFuture<String>> priceFutures =
shops.stream()
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor)) // ①
.map(future -> future.thenApply(Quote::parse)) // ②
.map(future -> future.thenCompose(quote ->
CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor))) // ③
.collect(toList());
return priceFutures.stream()
.map(CompletableFuture::join) // 결과 모으기
.collect(toList());
// # 3단계
Future<Double> futurePriceInUSD =
CompletableFuture.supplyAsync(() -> shop.getPrice(product)) // 가격 요청
.thenCombine(
CompletableFuture.supplyAsync(() -> exchangeService.getRate(EUR, USD)), // 환율 요청
(price, rate) -> price * rate); // 결과 조합
// # 4단계
// 실패할 수 있는 네트워크 호출에 타임아웃 설정
CompletableFuture.supplyAsync(() -> shop.getPrice(product))
.thenCombine(CompletableFuture.supplyAsync(() -> exchangeService.getRate(EUR, USD))
.completeOnTimeout(DEFAULT_RATE, 1, TimeUnit.SECONDS), // 환율 기본값 사용
(price, rate) -> price * rate)
.orTimeout(3, TimeUnit.SECONDS); // 전체 작업 제한 시간
CompletableFuture
로 비동기 파이프라인을 구성하여 병렬 처리 성능을 향상.모든 결과 vs 하나의 결과
CompletableFuture.allOf()
: 모든 작업 완료 후 실행CompletableFuture.anyOf()
: 하나라도 완료되면 즉시 실행 → 반응형 UX, 알림 처리 등CompletableFuture.anyOf(futures).thenAccept(System.out::println);
요약
구성요소 | 설명 |
---|---|
Publisher | 데이터를 발행하는 주체 (뉴스 발행사) |
Subscriber | 데이터를 소비하는 주체 (뉴스 구독자) |
Subscription | 데이터 흐름 조절 (얼마나 받을지 설정) |
Processor | 데이터 가공 (중간 필터 역할) |
변화한 환경
➡️ 이처럼 빠르고 안정적인 처리를 위해 비동기적으로 데이터 스트림을 다루는 리액티브 프로그래밍이 중요해짐.
리액티브 시스템의 핵심 원칙 4가지
원칙 | 설명 |
---|---|
반응성 (Responsive) | 항상 일정하고 예측 가능한 속도로 반응 |
회복성 (Resilient) | 장애가 발생해도 반응성을 유지하며 복구 가능 |
탄력성 (Elastic) | 트래픽이 몰리면 시스템을 자동으로 확장 |
메시지 주도 (Message-driven) | 비동기 메시지를 사용하여 컴포넌트 간의 느슨한 결합 유지 |
구분 | 애플리케이션 수준 | 시스템 수준 |
---|---|---|
초점 | 단일 앱의 비동기 처리 | 분산 시스템 전체 아키텍처 |
주요 기술 | Reactor, RxJava | 카프카, Akka, 쿠버네티스 |
예시 | 웹 서버의 논블로킹 IO | 마이크로서비스 간 이벤트 드리븐 통신 |
리액티브 프로그래밍은 리액티브 스트림을 사용하는 프로그래밍이다. 리액티브 스트림은 잠재적으로 무한의 비동기 데이터를 순서대로 그리고 블록하지 않는 역압력을 전제해 처리하는 표준 기술이다.
java.util.concurrent.Flow
API 구성인터페이스 | 설명 |
---|---|
Publisher<T> | 구독자에게 데이터를 발행 |
Subscriber<T> | 발행자로부터 데이터를 수신 |
Subscription | 데이터 요청/취소를 관리 |
Processor<T, R> | 데이터 변환 중개자 (Publisher + Subscriber 역할 모두 수행) |
흐름 요약
Subscriber
는 Publisher
에 구독(subscribe
)을 요청함Publisher
는 onSubscribe()
로 Subscription
을 전달함Subscriber
는 request(n)
으로 몇 개의 데이터를 받을지 요청함Publisher
는 최대 n
개의 데이터를 onNext()
로 전달onComplete()
, 에러 나면 onError()
//Publisher가 발행한 리스너로 Subscriber에 등록할 수 있다.
@FunctionalInterface
public interface Publisher<T> {
void subscribe(Subscriber<? super T> s);
}
//Publisher가 관련 이벤트를 발행할 때 호출할 수 있도록 콜백 메서드 네 개를 정의
public interface Subscriber<T> {
void onSubscribe(Subscription s);
void onNext(T t);
void onError(Throwable t);
void onComplete();
}
// Subscription은 Publisher와 Subscriber 사이의 제어 흐름, 역압력을 관리
public interface Subscription {
void request(long n); //publisher에게 이벤트를 처리할 준비가 되었음을 알림
void cancel(); //publisher에게 이벤트를 받지 않음을 통지
}
//리액티브 스트림에서 처리하는 이벤트의 변환단계를 나타냄
//에러나 Subscription 취소 신호 등을 전파
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { }
Processor
는 중간 처리자로써, 예를 들어 온도를 섭씨로 바꾸는 등의 변환 작업 수행Subscriber
이자 Publisher
이므로 중간에서 가공 가능public class SimpleReactiveExample {
public static void main(String[] args) throws InterruptedException {
// 1. Publisher 생성
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
// 2. Subscriber 생성
Flow.Subscriber<String> subscriber = new Flow.Subscriber<>() {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
System.out.println("✔ 구독 시작!");
subscription.request(1); // 먼저 1개 요청
}
@Override
public void onNext(String item) {
System.out.println("📦 받은 데이터: " + item);
subscription.request(1); // 처리 후 다음 1개 요청
}
@Override
public void onError(Throwable throwable) {
System.err.println("❌ 에러 발생: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("✅ 모든 데이터 수신 완료!");
}
};
// 3. Subscriber를 Publisher에 등록
publisher.subscribe(subscriber);
// 4. 데이터 발행
publisher.submit("Hello");
publisher.submit("Reactive");
publisher.submit("Streams");
// 5. 마무리
publisher.close(); // onComplete() 호출됨
Thread.sleep(1000); // 비동기이므로 대기
}
}
// result
✔ 구독 시작!
📦 받은 데이터: Hello
📦 받은 데이터: Reactive
📦 받은 데이터: Streams
✅ 모든 데이터 수신 완료!
- `onSubscribe()`에서 구독을 시작하고 첫 데이터를 요청
- `onNext()`에서 데이터 수신 후 다시 1개 요청 → 역압력(backpressure)
- `close()`를 호출했기 때문에 모든 데이터 전송 후 `onComplete()` 호출됨
자바는 왜 Flow API 구현체를 직접 제공하지 않을까?
클래스 | 설명 |
---|---|
Flowable | 역압력 지원 |
Observable | 역압력 미지원 (GUI 이벤트 등 가벼운 스트림에 적합) |
Flowable
: 데이터가 많고(대량 데이터 처리), 처리 속도가 불균형할 때Flowable.range(1, 1000000)
.onBackpressureDrop()
.subscribe(System.out::println);
Observable
: 클릭, 마우스 이동 등 빠른 응답이 필요한 GUI 이벤트 처리Observable.just("Hello", "World")
.subscribe(System.out::println);
map()
, flatMap()
, subscribe()
등의 체이닝 구조는 코드를 추적하고 디버깅하기 어려움onError
, onComplete
등 다양한 상태 처리를 명확히 이해해야 함Flowable.fromPublisher(publisher)
.map(data -> transform(data))
.flatMap(result -> process(result))
.subscribe(System.out::println);
// → 이런 코드는 처음 접하는 사람에게는 “무슨 일이 일어나는지” 직관적이지 않음.
NullPointerException
, TimeoutException
등이 체이닝 중간에서 발생하면 로그만 봐서는 원인 파악이 힘듦.doOnError()
, .onErrorResume()
등을 잘 다뤄야 함상황 | 리액티브 유리 |
---|---|
실시간 데이터 스트리밍 | 예: IoT 센서, 주식 데이터 |
수천 개의 동시 연결 | 예: 채팅 앱, SSE, WebSocket |
외부 API 호출이 많고 비동기 처리 가능 | 예: API 게이트웨이, API 집계 서버 |
마이크로서비스 간 호출 | 메시지 기반 처리와 궁합이 좋음 |
리액티브 프로그래밍의 복잡성을 줄이면서도 높은 처리량(throughput)을 달성할 수 있는 대안
예전 자바 스레드는 "실제 운영체제 스레드"와 1:1로 매칭됬으나, 가상 스레드는 JVM이 직접 관리하는 초경량 스레드로서 1만 개의 가상 스레드를 만들어도 실제 운영체제는 10~20개 정도의 진짜 스레드만 관리
기존 리액티브 프로그래밍의 문제점
가상 스레드의 동작 원리
가상 스레드는 경량 스레드로, JVM이 직접 관리하며 OS 스레드(캐리어 스레드)에 매핑된다.
리액티브 프로그래밍 대체 가능성
성능 비교
남은 리액티브 프로그래밍의 활용처