마찬가지로 새싹 스터디 발표 자료2탄!
CompletableFuture.runAsync(() -> {
throw new RuntimeException("오류 발생!");
});
// 이 예외는 메인 스레드로 전파되지 않는다.
CompletableFuture 내부에서 발생한 예외가 자동으로 메인 스레드로 전파❌
실제로는
get()
또는 join()
메서드를 호출해야 한다.Java에서 예외는 호출 스택(call stack)을 따라 상위 메서드로 전파된다.
그러나 CompletableFuture는 별도의 스레드에서 실행되므로 메인 스레드와 호출 스택이 완전히 분리되어 있다.
따라서
1. 별도 스레드에서 발생한 예외는 해당 스레드의 호출 스택으로만 전파된다.
2. 메인 스레드와 작업 스레드는 서로 다른 호출 스택을 가지기 때문에 예외가 자동으로 건너갈 수 없다.
3. CompletableFuture는 대신 발생한 예외를 내부에 저장하고, 사용자가 get()
또는 join()
을 호출할 때만 해당 예외를 전달한다.
따라서 예외 처리를 제대로 하려면 반드시 get()
, join()
, 또는 exceptionally()
, handle()
등의 예외 처리 메서드를 사용해야 한다.
public String getDataWithGet() throws InterruptedException, ExecutionException {
return CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("오류 발생!");
}).get(); // checked Exception 발생
}
public String getDataWithJoin() {
return CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("오류 발생!");
}).join(); // unChecked Exception 발생
}
get()
과 join()
메서드는 둘 다 CompletableFuture의 결과를 가져오는 역할을 하지만, 중요한 차이점이 있다:
get()
: ExecutionException이라는 checked exception을 던진다. 따라서 호출하는 메서드에서 예외 처리가 필요하다.join()
: CompletionException이라는 unchecked exception을 던진다. 명시적인 예외 처리가 필요하지 않다.이 차이는 특히 트랜잭션 관리에 중요하다. Checked Exception은 일반적으로 롤백을 트리거하지 않지만, Unchecked Exception은 트리거한다.
( 이걸 몰라서.. 회사 업무 할 때 트랜젝션 원자성이 보장되지 않는 상황이 발생했었다.. 🥲 )
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
while (!Thread.currentThread().isInterrupted()) {
// 오래 걸리는 작업
}
return "완료";
});
future.cancel(true); // 작업이 실제로 중단되지 않을 수 있음
cancel()
메서드에 대한 흔한 오해는 이 메서드가 실행 중인 작업을 즉시 중단시킨다❌
실제로는
cancel()
메서드는 CompletableFuture를 CancellationException
으로 완료시킨다.이는 CompletableFuture가 기본적으로 ForkJoinPool의 작업 스레드에서 실행되는데, 이 작업들은 단순히 Future를 취소한다고 해서 중단되지 않기 때문이다.
작업을 실제로 중단하려면 다음과 같은 접근 방식이 필요하다.
ExecutorService executor = Executors.newSingleThreadExecutor();
CompletableFuture<String> completableFuture = new CompletableFuture<>();
Future<?> future = executor.submit(() -> {
try {
// 작업 수행
completableFuture.complete("완료");
} catch (InterruptedException e) { // 이거처럼 해당 task가 취소가 되었는지 파악할 수 있는 곳이 있어야 한다.
completableFuture.complete("취소됨");
}
});
// 취소 핸들러
completableFuture.exceptionally(ex -> {
if (ex instanceof CancellationException) {
future.cancel(true);
executor.shutdownNow();
}
return null;
});
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(10000); // 10초 동안 실행
return "완료";
} catch (InterruptedException e) {
return "중단됨";
}
}).orTimeout(3, TimeUnit.SECONDS); // 3초 후 타임아웃
// 타임아웃되어도 원래 작업은 계속 실행된다!
orTimeout
메서드는 지정된 시간이 지난 후에 CompletableFuture를 TimeoutException으로 완료시킨다(내부적으로 ScheduledThreadExecutor 사용). 그러나 이것이 실행 중인 작업을 취소하는 것은 아니다.
타임아웃이 발생해도:
CompletableFuture를 사용할 때 여러 스레드 간에 트랜잭션 원자성 보장을 신경써야 한다.
@Transactional
public void tx() {
// 메인 스레드에서 Food 저장
foodRepository.save(new Food("피자"));
// 별도 스레드에서 Drink 저장
CompletableFuture.runAsync(() -> {
drinkRepository.save(new Drink("콜라"));
throw new RuntimeException("오류 발생!");
});
}
위 코드에서 발생하는 문제점:
1. 별도의 스레드는 자체적인 트랜잭션 컨텍스트를 가진다.
2. 따라서 Drink 저장 시 발생한 예외는 Food 저장에 영향을 주지 않는다.
3. Food는 정상적으로 저장되고, Drink만 롤백된다.
이는 CompletableFuture가 별도의 스레드에서 실행되면서 새로운 트랜잭션이 시작되기 때문이다. 마치 ThreadLocal 변수가 각 스레드마다 독립적인 복사본을 가지는 것과 유사한 원리다.
따라서 이 경우에는 위에서 언급한 것처럼
UnCheckedException을 발생시키는 join()을 통해 별도의 스레드에서 발생한 예외를 메인 스레드로 전파하면 원자성 보장을 할 수 있다.
하지만... 어지러운 건 반대의 상황이다.
이번에는 메인스레드에서 예외가 발생하고 별도의 스레드에서 진행한 작업은 성공했다.
@Transactional
public void txMain() {
// 별도 스레드에서 Drink 저장
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
drinkRepository.save(new Drink("콜라"));
});
// 메인 스레드에서 Food 저장 후 예외 발생
foodRepository.save(new Food("피자"));
throw new RuntimeException("메인 스레드 오류 발생!");
}
이 경우 메인스레드의 트랜젝션은 롤백되지만, 별도 스레드의 작업은 롤백되지 않는다.
메인 스레드에서 예외가 발생했다는 것을 별도의 스레드에서 캐치할 방법이 없기 때문이다.
@Transactional(propagation = ...)
설정도 스레드 경계를 넘어 적용되지 않는다.이러한 문제가 존재할 수 있다는 것을 파악하지 못 하고
지금까지 CompletableFuture를 사용하고 있었다.
따라서... 다음 스터디 때 분산 트랜젝션에 대해서 공부해보기로 했다~ (3탄에서)
CompletableFuture가 기본적으로 사용하는 ForkJoinPool.commonPool()은 CPU 바운드 작업에 적합하다. 이 풀은 다음과 같이 초기화된다:
// ForkJoinPool static 블록에서 common을 초기화
static {
// ...
common = createCommonPool();
// ...
}
// 여기서 코어 수 만큼의 스레드를 생성
private static ForkJoinPool createCommonPool() {
// ...
int parallelism = Runtime.getRuntime().availableProcessors() - 1;
// ...
return new ForkJoinPool(parallelism, ...);
}
이는 시스템의 사용 가능한 프로세서 수에서 1을 뺀 수만큼의 스레드를 생성한다.
따라서 I/O 바운드 작업을 하려면 별도의 Executor를 필수적으로 생성해야할 것 같다.