CompletableFuture 에 가지고 있던 오해....

Mando·2025년 3월 23일
2

JAVA

목록 보기
12/12

마찬가지로 새싹 스터디 발표 자료2탄!

오해 1: CompletableFuture에서 발생한 예외는 자동으로 메인 스레드로 전파된다?

CompletableFuture.runAsync(() -> {
    throw new RuntimeException("오류 발생!");
});

// 이 예외는 메인 스레드로 전파되지 않는다.

CompletableFuture 내부에서 발생한 예외가 자동으로 메인 스레드로 전파❌

실제로는

  • CompletableFuture에서 발생한 예외는 CompletableFuture 객체 내부에 저장만 될 뿐, 메인 스레드로 자동 전파되지 않는다.
  • 예외 정보를 얻으려면 반드시 get() 또는 join() 메서드를 호출해야 한다.

왜 예외가 자동으로 전파되지 않는가?

Java에서 예외는 호출 스택(call stack)을 따라 상위 메서드로 전파된다.
그러나 CompletableFuture는 별도의 스레드에서 실행되므로 메인 스레드와 호출 스택이 완전히 분리되어 있다.

따라서
1. 별도 스레드에서 발생한 예외는 해당 스레드의 호출 스택으로만 전파된다.
2. 메인 스레드와 작업 스레드는 서로 다른 호출 스택을 가지기 때문에 예외가 자동으로 건너갈 수 없다.
3. CompletableFuture는 대신 발생한 예외를 내부에 저장하고, 사용자가 get() 또는 join()을 호출할 때만 해당 예외를 전달한다.

따라서 예외 처리를 제대로 하려면 반드시 get(), join(), 또는 exceptionally(), handle() 등의 예외 처리 메서드를 사용해야 한다.

오해 2: get()과 join()은 동일하다?

public String getDataWithGet() throws InterruptedException, ExecutionException {
    return CompletableFuture.supplyAsync(() -> {
        throw new RuntimeException("오류 발생!");
    }).get(); // checked Exception 발생
}

public String getDataWithJoin() {
    return CompletableFuture.supplyAsync(() -> {
        throw new RuntimeException("오류 발생!");
    }).join(); // unChecked Exception 발생
}

get()join() 메서드는 둘 다 CompletableFuture의 결과를 가져오는 역할을 하지만, 중요한 차이점이 있다:

  • get(): ExecutionException이라는 checked exception을 던진다. 따라서 호출하는 메서드에서 예외 처리가 필요하다.
  • join(): CompletionException이라는 unchecked exception을 던진다. 명시적인 예외 처리가 필요하지 않다.

이 차이는 특히 트랜잭션 관리에 중요하다. Checked Exception은 일반적으로 롤백을 트리거하지 않지만, Unchecked Exception은 트리거한다.

( 이걸 몰라서.. 회사 업무 할 때 트랜젝션 원자성이 보장되지 않는 상황이 발생했었다.. 🥲 )

오해 3: cancel()이 실행 중인 작업을 중단시킨다?

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    while (!Thread.currentThread().isInterrupted()) {
        // 오래 걸리는 작업
    }
    return "완료";
});

future.cancel(true); // 작업이 실제로 중단되지 않을 수 있음

cancel() 메서드에 대한 흔한 오해는 이 메서드가 실행 중인 작업을 즉시 중단시킨다❌
실제로는

  1. cancel() 메서드는 CompletableFuture를 CancellationException으로 완료시킨다.
  2. 하지만 실제로 실행 중인 작업은 자동으로 중단되지 않는다.

이는 CompletableFuture가 기본적으로 ForkJoinPool의 작업 스레드에서 실행되는데, 이 작업들은 단순히 Future를 취소한다고 해서 중단되지 않기 때문이다.

작업을 실제로 중단하려면 다음과 같은 접근 방식이 필요하다.

ExecutorService executor = Executors.newSingleThreadExecutor();
CompletableFuture<String> completableFuture = new CompletableFuture<>();

Future<?> future = executor.submit(() -> {
    try {
        // 작업 수행
        completableFuture.complete("완료");
    } catch (InterruptedException e) { // 이거처럼 해당 task가 취소가 되었는지 파악할 수 있는 곳이 있어야 한다.
        completableFuture.complete("취소됨");
    }
});

// 취소 핸들러
completableFuture.exceptionally(ex -> {
    if (ex instanceof CancellationException) {
        future.cancel(true);
        executor.shutdownNow();
    }
    return null;
});

오해 4: 타임아웃된 작업은 자동으로 취소된다?

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(10000); // 10초 동안 실행
        return "완료";
    } catch (InterruptedException e) {
        return "중단됨";
    }
}).orTimeout(3, TimeUnit.SECONDS); // 3초 후 타임아웃

// 타임아웃되어도 원래 작업은 계속 실행된다!

orTimeout 메서드는 지정된 시간이 지난 후에 CompletableFuture를 TimeoutException으로 완료시킨다(내부적으로 ScheduledThreadExecutor 사용). 그러나 이것이 실행 중인 작업을 취소하는 것은 아니다.

타임아웃이 발생해도:

  • CompletableFuture는 예외 상태로 완료되지만
  • 실행 중인 작업 자체는 백그라운드에서 계속 실행된다.
  • 따라서 이는 리소스 낭비로 이어질 수 있다.
  • 심각한 경우 메모리 누수(memory leak)나 리소스 고갈로 이어질 수 있다.

오해 5: 여러 스레드 간의 작업에서 트랜잭션 원자성이 자동으로 보장된다?

CompletableFuture를 사용할 때 여러 스레드 간에 트랜잭션 원자성 보장을 신경써야 한다.

@Transactional
public void tx() {
    // 메인 스레드에서 Food 저장
    foodRepository.save(new Food("피자"));
    
    // 별도 스레드에서 Drink 저장
    CompletableFuture.runAsync(() -> {
        drinkRepository.save(new Drink("콜라"));
        throw new RuntimeException("오류 발생!");
    });
}

위 코드에서 발생하는 문제점:
1. 별도의 스레드는 자체적인 트랜잭션 컨텍스트를 가진다.
2. 따라서 Drink 저장 시 발생한 예외는 Food 저장에 영향을 주지 않는다.
3. Food는 정상적으로 저장되고, Drink만 롤백된다.

이는 CompletableFuture가 별도의 스레드에서 실행되면서 새로운 트랜잭션이 시작되기 때문이다. 마치 ThreadLocal 변수가 각 스레드마다 독립적인 복사본을 가지는 것과 유사한 원리다.

따라서 이 경우에는 위에서 언급한 것처럼
UnCheckedException을 발생시키는 join()을 통해 별도의 스레드에서 발생한 예외를 메인 스레드로 전파하면 원자성 보장을 할 수 있다.

하지만... 어지러운 건 반대의 상황이다.
이번에는 메인스레드에서 예외가 발생하고 별도의 스레드에서 진행한 작업은 성공했다.

@Transactional
public void txMain() {
    // 별도 스레드에서 Drink 저장
    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
        drinkRepository.save(new Drink("콜라"));
    });
    
    // 메인 스레드에서 Food 저장 후 예외 발생
    foodRepository.save(new Food("피자"));
    throw new RuntimeException("메인 스레드 오류 발생!");
}

이 경우 메인스레드의 트랜젝션은 롤백되지만, 별도 스레드의 작업은 롤백되지 않는다.
메인 스레드에서 예외가 발생했다는 것을 별도의 스레드에서 캐치할 방법이 없기 때문이다.

트랜젝션은 스레드에 경계에만 적용된다

  1. 트랜잭션 컨텍스트의 분리:
  • 본래 생각 : 하위 트랜젝션은 상위 트랜젝션이 존재한다면 포함되기에 이 경우에도 하위 트랜젝션에 해당하는 별도의 스레드에서 수행한 작업이 같이 롤백될 것이라고 생각했다.
  • 실제 : 스프링의 트랜잭션 관리는 ThreadLocal을 사용하여 현재 스레드에 트랜잭션 컨텍스트를 바인딩한다. 따라서 새로운 스레드가 시작되면 별도의 트랜잭션 컨텍스트가 생성된다.
  1. 트랜잭션 전파 설정의 한계: 스프링의 @Transactional(propagation = ...) 설정도 스레드 경계를 넘어 적용되지 않는다.
    따라서 default 설정인 상위 트랜젝션에 포함되는게 먹히지 않는다.

여러 스레드에 걸친 작업에서 원자성을 보장하려면?

이러한 문제가 존재할 수 있다는 것을 파악하지 못 하고
지금까지 CompletableFuture를 사용하고 있었다.

따라서... 다음 스터디 때 분산 트랜젝션에 대해서 공부해보기로 했다~ (3탄에서)

추가 참고 사항

ForkJoinPool.commonPool() 이해하기

CompletableFuture가 기본적으로 사용하는 ForkJoinPool.commonPool()은 CPU 바운드 작업에 적합하다. 이 풀은 다음과 같이 초기화된다:

// ForkJoinPool static 블록에서 common을 초기화
static {
    // ...
    common = createCommonPool();
    // ...
}

// 여기서 코어 수 만큼의 스레드를 생성
private static ForkJoinPool createCommonPool() {
    // ...
    int parallelism = Runtime.getRuntime().availableProcessors() - 1;
    // ...
    return new ForkJoinPool(parallelism, ...);
}

이는 시스템의 사용 가능한 프로세서 수에서 1을 뺀 수만큼의 스레드를 생성한다.
따라서 I/O 바운드 작업을 하려면 별도의 Executor를 필수적으로 생성해야할 것 같다.

0개의 댓글