CompletableFuture 정리

Bruce Han·2023년 2월 14일
1

Java8-정리

목록 보기
16/20
post-thumbnail

이 포스팅의 코드 및 정보들은 강의를 들으며 정리한 내용을 토대로 작성한 것입니다.

Future에는 어떤 불편함이 있었는가

이전 Future를 쓰던 방식의 가장 큰 문제점들이 여럿 있었다.

  • 예외 처리용 API를 제공하지 않는다던가
  • 여러 Future를 조합하는 것도 어렵고
  • 가장 큰 단점이 Future에서 반환하는 결과값을 가지고 무언가를 하는 작업들은 get() 뒤에 와야 한다.
public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService executorService = Executors.newFixedThreadPool(4);
    Future<String> future = executorService.submit(() -> "Hello World");

    // get을 하기 전까지 어떤 것도 할 수 없다.
    future.get(); 
}

그런데 get()이 블로킹 콜이기 때문에 get() 전에 많은 작업을 하면 되지만, 보통 프로그래밍을 할 때 "Hello World"작업이 완료된 뒤의 작업도 같이 구현시킨다. get()을 통해서 "Hello World"가 완료되면 그때 뒤에 써준 작업이 호출되도록 하는 것이 일반적으로 봐왔던 비동기적인 프로그래밍의 코딩 패턴이다.

하지만, 이런 여러 작업들은 Java 5의 Future 만으로는 어려운 작업이 된다.

CompletableFuture

CompletableFuture란

자바에서 비동기(Asynchronous) 프로그래밍을 가능하게 하는 인터페이스

CompletableFuture가 구현하는 것들

  • Future
  • CompletionStage
    • Completable은 외부에서 명시적으로 Complete을 시킬 수 있다.
    • 몇 초 이내에 응답이 안 오면 그냥 기본값으로 미리 캐시해둔 값이나 특정한 값을 리턴하도록 만들 수 있다.

CompletableFuture를 쓰면 더이상 명시적으로 Executor를 만들어서 쓸 필요가 없다. CompletableFuture만 가지고 비동기적으로 어떤 작업들을 실행할 수 있다.

public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture<String> future = new CompletableFuture<>();
    future.complete("Foo");
}

complete()를 통해 future의 기본 값을 Foo라고 정해주면서 Future의 작업 자체를 끝내게 되는 것이다.

단, get()이 없어지지는 않는다.

비동기로 작업 실행하기 이전에

future.complete("Foo") 이거는 명시적으로 값을 준 거고,

static factory method를 활용해서 값을 받을 수도 있다.

비동기로 작업 실행하기

만약, 실제로 어떤 작업을 실행하고 싶으면 반환이 있는/없는 작업으로 나눌 수 있다.

반환 없이 비동기로 작업하기 - runAsync()

반환이 없는 작업은 runAsync()를 쓰면 된다.

runAsync()가 반환 타입이 없기 때문에, 제네릭 타입도 Void로 나온다. 이거는 Future만 정의한 것이기 때문에 아무 일이 벌어지지 않으므로, get()또는 join()을 해야 어떤 일이 벌어진다.

join()은 안에서 Exception이 벌어지거든, UncatchedException으로 던져지므로 굳이 에러 처리를 명시적으로 할 필요는 없다.

반환값을 가지고 비동기로 작업하기 - supplyAsync()

위 실습에서 future.get()만 하면 [Thread-Example] ForkJoinPool.commonPool-worker-3만 출력된다. 반환 값이 있으므로 콘솔 출력을 통해 반환값까지 받아 출력하면 되는 것에 유의할 것

Future를 사용하던 때랑 비슷한 것 같은데요

그냥 ExecutorService에다가 Callable 넘겨준 후 get()호출해서 blocking한 다음에 결과 가져와서 출력하는 거랑 같다.

이 포스팅 맨 위에서 서술한 바와 같이 뭔가 결과가 왔을 때 비동기적으로 이에 해당하는 callback을 실행해야 한다. Future로는 한계가 있고 get()도 여전히 호출해야 하지만, 적합한 방법들이 있다.

콜백 제공하기

thenApply(Function)

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    System.out.println("[Thread-Example] " + Thread.currentThread().getName());
    return "The sentence for example";
}).thenApply();

callback을 주는 방법은 thenApply()를 뒤에 붙이는 것이다. 우리가 받은 그 결과값을 다른 타입으로 변경할 수 있으며, thenApply() 자리에 function이 들어가는 것이다.

Java5의 Future를 썼을 때와는 달리, callback을 get() 호출하기 전에 정의하는 게 가능한 걸 확인할 수 있다.

thenAccept(Consumer)

thenAccept(Consumer) callback은 반환이 없고, 받아서 쓰기만 하면 될 때 활용할 수 있다.

thenAccept()에는 Consumer가 들어오며, 반환이 없기 때문에 타입이 달라지는 건 자명하다.

supplyAsync(() -> {
            System.out.println("[Thread-Example] " + Thread.currentThread().getName());
            return "The sentence for example";
})

여기서 콘솔 출력하는 작업까지 끝나면(종료되면)

thenAccept((s) -> {
	System.out.println(Thread.currentThread().getName());
    System.out.println(s.toUpperCase());
});

supplyAsync()에서 작업하고 나서의 결과값을 받아서 부가적인 처리를 한다.

thenRun(Runnable)

반환 받을 필요 없이 뭔가 동작을 하기만 하면 될 때 활용할 수 있는 callback이다. thenApply()thenAccept()와는 달리 결과값을 참고하지 않고 안에는 Runnable이 온다.

생각해보니 스레드 풀 없이 콜백을 쓰고 있었다

3개의 callback을 스레드 풀을 만들지 않고 사용하고 있었는데, 이는 ForkJoinPool 덕분에 그런 것이다.

ForkJoinPool은 Java7에 들어온 건데, 이것도 역시 Executor를 구현한 구현체 중 하나이다.

조금 다른 점은 작업을 Work Stealing 알고리즘을 사용한다는 것이며, 이는 다른 Deque(덱, Dequeue)을 쓴다는 것이다.
Queue는 먼저 들어온 게 먼저 나가지만 Deque(Dequeue)는 맨 마지막에 들어온 게 먼저 나간다.
그래서 Deque를 써서 자기 스레드가 할 일이 없으면 스레드가 직접 Deque에서 자기가 할 일을 가져와서 처리하는 방식의 Framework이다.

작업 단위를 자기가 파생시키는 세부적인 sub task가 있다면, 그 sub task를 잘게 쪼개서 다른 스레드에 분산시켜서 작업을 처리하고 모아서(joining) 그 결과값을 도출해내는 ForkJoin Framework이다.

그러므로, 우리가 별다른 Executor를 사용하지 않아도 내부적으로 ForkJoin Pool에 있는 commonPool()이라는 걸 쓰게 된다.

다만, 원한다면 얼마든지 직접 스레드 풀을 만들어서 스레드를 이용할 수 있다.

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService executorService = Executors.newFixedThreadPool(4);
        
    CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
        System.out.println("[Thread-Example] " + Thread.currentThread().getName());
        return "The sentence for example";
    }).thenRun(() -> {
        System.out.println(Thread.currentThread().getName());
    });
    System.out.println(future.get());
}

이렇게 만든 거를 supplyAsync()를 호출할 때 두 번째 인자로 줄 수 있다. supplyAsync()runAsync(), 이 두 개의 function을 호출하는 작업을 여기서 (직접 제공하려는) 스레드 풀을 사용해서 처리한다.

이렇게 하면 이름이 다른 Pool이 출력될 것이다.

thenApply(), thenAccept(), thenRun() 이 셋도 마찬가지로 callback을 실행할 어떤 풀을 다른 곳에서 실행하고 싶다면 thenRunAsync()를 쓰면 된다.

thenRunAsync()를 쓰니 스레드의 이름이 다르게 출력되는 것을 확인할 수 있다.

ExecutorService를 쓸 때는 shutdown()을 해줘야 한다.

이것도 마찬가지로 스레드의 이름이 다르게 출력된다.

정리

  • 자바에서 비동기(Asynchronous) 프로그래밍을 가능케하는 인터페이스

    • Future를 사용해서도 어느정도 가능했지만 하기 힘든 일들이 많았다
  • Future로는 하기 어려웠던 작업들

    • Future를 외부에서 완료시킬 수 없다. 취소하거나 get()에 타임아웃을 설정할 수는 있다.
    • blocking code(get())를 사용하지 않고서는 작업이 끝났을 때 콜백을 실행할 수 없다.
    • 여러 Future를 조합할 수 없다.
      • 예) Study 정보를 가져온 다음 Study에 참석하는 동아리원 목록 가져오기
    • 예외 처리용 API를 제공하지 않는다.
  • CompletableFuture

    • Implements Future
    • Implements Completion Stage
  • 비동기로 작업 실행하기

    • 반환값이 없는 경우 : runAsync()
    • 반환값이 있는 경우 : supplyAsync()
    • 원하는 Executor(스레드풀)를 사용해서 실행할 수도 있다.
      • 기본은 ForkJoinPool.commonPool()
  • 콜백 제공하기

    • thenApply(Function) : 반환값(결과)을 받아서 다른 값으로 바꾼 후 그걸 반환하는 callback
    • thenAccept(Consumer) : 결과를 받아 다른 작업을 반환없이 처리하는 callback
    • thenRun(Runnable) : 반환값을 받지 않고 다른 작업(동작)을 마찬가지로 반환 없이 처리만하는 callback
    • callback 자체를 또 다른 스레드에서 실행할 수 있다.

Reference

profile
만 가지 발차기를 한 번씩 연습하는 사람은 두렵지 않다. 내가 두려워 하는 사람은 한 가지 발차기를 만 번씩 연습하는 사람이다. - Bruce Lee

0개의 댓글