[Java] CompletableFuture

nathan·2022년 1월 31일
0

JAVA

목록 보기
39/45

CompletableFuture

CompletableFuture란?

  • 자바에서 비동기(Asynchronous) 프로그래밍을 가능하게 하는 인터페이스이다.
    • 기존의 Future를 사용해서도 어느정도 가능했으나, 제약이 많았다.

Future로 어떤 제약이 있었을까?

  • Future를 외부에서 완료시킬 수 없다. (단, 취소하거나, get()에 타임아웃을 설정할 수는 있다.)
  • 블로킹 코드(get())를 사용하지 않고서는 작업이 끝났을 때 콜백을 실행할 수 없다.
    • Future에서는 return되는 결과값을 가지고 무언가를 하려면 get()이후에만 가능하다.
  • 여러 Future 조합을 사용할 수 없다. (ex. Event 정보를 가져온 다음 Event에 참석하는 회원 목록을 가져오기)
  • 예외 처리용 API를 제공하지 않는다.

CompletableFuture의 장점

  • Future와 달리 외부에서 명시적으로 Complete를 시켜버릴 수 있다.
    • (ex. 몇 초 이내에 응답이 안오면, 특정 값을 리턴)
  • CompletableFuture를 사용하면 명시적으로 Executors를 사용할 필요가 없어진다.
    • 그냥 CompletableFuture만 가지고도 비동기적으로 어떤 작업들을 실행할 수가 있다.
CompletableFuture<String> completableFuture = new CompletableFuture<>();
CompletableFuture<String> completableFuture2 = CompletableFuture.completedFuture("nathan2"); // static factory method
completableFuture.complete("nathan");

System.out.println(completableFuture.get()); // nathan
System.out.println(completableFuture2.get()); // nathan2

비동기로 작업 실행하기

  • 리턴값이 없는 경우: runAsync()
  • 리턴값이 있는 경우: supplyAsync()
  • 원하는 Executor(스레드 풀)를 사용해서 실행할 수도 있다.
  • 기본은 ForkJoinPool.commonPool()

return type이 없는 작업을 하고 싶다면? - runAsync()

  • return이 없기 때문에 get() 또는 join()을 해야 원하는 작업을 얻을 수 있다.
CompletableFuture<Void> tmp1 = CompletableFuture.runAsync(() -> {
     System.out.println("runAsync "+Thread.currentThread().getName());
}); // return이 없는 작업
tmp1.get();

>>>
runAsync ForkJoinPool.commonPool-worker-9

return type이 있는 작업을 하고 싶다면? - supplyAsync()

CompletableFuture<String> tmp2 = CompletableFuture.supplyAsync(()->{
    System.out.println("supplyAsync " + Thread.currentThread().getName());
    return "supplyAsync";
}); // return이 있는 작업
System.out.println(tmp2.get( ));

>>>
supplyAsync ForkJoinPool.commonPool-worker-9
supplyAsync

callback을 이용한 작업들

  • 지금까지는 코드만 달라졌을 뿐 Future를 사용했을 때와 거의 같다.
  • 비동기적(Asynchronous)으로 callback을 주고 작업을 해보자.
    • 여전히 get()은 호출해야 함에 유의하자.
    • 호출하지 않으면, 아무일도 일어나지 않는다.
  • 콜백 자체를 또 다른 스레드에서 실행할 수 있다.

callback에 return을 붙이는 경우 - thenApply()

  • Future일 때는 이런 작업이 불가능 했음.
    • callback을 get() 호출하기 전에 작업하는 것이 불가능.
  • Function이 들어감
// Future를 쓸 때와는 달리 callback을 줄 수 있다. (단, 이 때에도 get은 호출해줘야 한다. - 호출 안하면 아무일도 일어나지 않음)
CompletableFuture<String> tmp3 = CompletableFuture.supplyAsync(()->{
    System.out.println("callback " + Thread.currentThread().getName());
    return "CallBack";
}).thenApply((s) -> {  
    System.out.println(Thread.currentThread().getName());
    return s.toUpperCase();
}); // 받은 결과값을 다른 타입의 값으로 변환 (Function)

System.out.println(tmp3.get());

>>>
callback ForkJoinPool.commonPool-worker-9
main
CALLBACK

callback에 return을 붙이지 않는 경우 - thenAccept()

  • Consumer가 들어감
  • return 값이 없으므로 type은 Void (최종적으로 실행되는 것에 focus를 맞춘다.)
CompletableFuture<Void> tmp4 = CompletableFuture.supplyAsync(() -> {
    System.out.println("callback2 " + Thread.currentThread().getName());
    return "callback2";
}).thenAccept((s) -> {
    System.out.println(Thread.currentThread().getName());
    System.out.println(s.toUpperCase());
}); // 받은 결과값을 그냥 받기만 할 때

tmp4.get();

>>>
callback2 ForkJoinPool.commonPool-worker-9
main
CALLBACK2

return 없이 callback으로 넘어오는 경우 - thenRun()

  • return 없이 넘어온 callback에 return이 붙지 않는 경우에 쓰인다.
CompletableFuture<Void> tmp5 = CompletableFuture.supplyAsync(() -> {
    System.out.println("callback3 " + Thread.currentThread().getName());
    return "callback3";
}).thenRun(()->{
    System.out.println("thenRun");
}); // 결과값을 참조하지 않음. Runnable이 온다.

>>>
callback3 ForkJoinPool.commonPool-worker-9
thenRun

ForkJoinPool

  • 어떻게 스레드 풀을 만들지 않고 별도의 스레드들로 동작을 하는걸까?
    • ForkJoinPool 때문에 가능한 것
  • ForkJoinPool이란 Executor 구현체 중 하나인데, Deque를 통하여 자기 스레드가 할 일이 없으면, Deque에서 가져와서 처리하는 방식의 프레임 워크이다.
  • 자기가 파생시킨 서브 태스크들을 다른 스레드들에 분산시켜 처리하고 모아서 Join하는 식으로 작업단위를 구성한다.

스레드 풀을 직접 만들어 진행해보기

  • 원하면, 언제든 스레드 풀을 직접 생성하여 진행할 수 있다.
  • supplyAsync 호출 할 때 두 번째 인자로 줄 수 있음(runAsync도 마찬가지)
// 스레드 풀을 직접 만들어 진행해보기
ExecutorService executorService2 = Executors.newFixedThreadPool(4); // supplyAsync 호출 할 때 두 번째 인자로 줄 수 있음
CompletableFuture<String> tmp6 = CompletableFuture.supplyAsync(()->{
    System.out.println("ThreadPool " + Thread.currentThread().getName());
    return "ThreadPool";
}, executorService2);

System.out.println(tmp6.get());

executorService2.shutdown();

>>>
ThreadPool pool-2-thread-1
ThreadPool

작업들을 조합하기

두 작업을 서로 이어서 실행하려면? - thenCompose()

  • 두 작업이 서로 이어서 실행하도록 조합한다.
public class CompletableFutureEx2 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 예전에는 Future만 가지고서는 어떤 작업들(비동기적인 작업들)을 이어서 처리하는 것이 힘들었음 - 콜백을 주기 힘들었기 때문
        CompletableFuture<String> hello = CompletableFuture.supplyAsync(()->{
            System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        });

        CompletableFuture<String> world = hello.thenCompose(CompletableFutureEx2::getWorld); // 뒤에 이어서 추가적인 비동기작업을 진행할 수 있음(연관관계 : 의존적일 때)
        System.out.println(world.get());

    private static CompletableFuture<String> getWorld(String message) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println(message + Thread.currentThread().getName());
            return message+ " World";
        });
    }
}
>>>
Hello ForkJoinPool.commonPool-worker-9
HelloForkJoinPool.commonPool-worker-9
Hello World

두 작업을 독립적으로 실행하고, 둘 다 종료 했을 때 콜백을 실행하려면? - thenCombine()

  • 두 작업을 독립적으로 실행하고 둘 다 종료 했을 때 콜백을 실행한다.
  • 두 작업이 서로 연관관계(의존성)는 없지만, 동시에 따로따로 실행하는 방법 : thenCombine + biFunction
public class CompletableFutureEx2 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 예전에는 Future만 가지고서는 어떤 작업들(비동기적인 작업들)을 이어서 처리하는 것이 힘들었음 - 콜백을 주기 힘들었기 때문
        CompletableFuture<String> hello = CompletableFuture.supplyAsync(()->{
            System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        });
        
        CompletableFuture<String> world = hello.thenCompose(CompletableFutureEx2::getWorld); // 뒤에 이어서 추가적인 비동기작업을 진행할 수 있음(연관관계 : 의존적일 때)    

        // 연관관계(의존성)는 없지만, 동시에 따로따로 실행하는 방법 : thenCombine + biFunction
        CompletableFuture<String> future = hello.thenCombine(world, (h, w) -> {
            return h+"  "+w;
        });

        System.out.println(future.get());

    private static CompletableFuture<String> getWorld(String message) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println(message + Thread.currentThread().getName());
            return message+ " World";
        });
    }
}
>>>
Hello  Hello World

여러 작업을 모두 실행하고 모든 작업 결과에 콜백 실행하려면? - allOf()

  • 여러 작업을 모두 실행하고 모든 작업 결과에 콜백을 실행한다.
  • 두 개 이상의 서브 태스크들을 모두 합쳐서 실행하는 방법
// 두 개 이상의 서브 태스크들을 모두 합쳐서 실행하는 방법 : allOf
List<CompletableFuture<String>> futures = Arrays.asList(hello, world); // 우선 task들을 List로 뭉쳐 놓기
CompletableFuture[] futuresArray = futures.toArray(new CompletableFuture[futures.size()]);
CompletableFuture<List<String>> results = CompletableFuture.allOf(futuresArray)
        .thenApply(v -> { // thenApply 되는 시점에서는 이미 모든 task들이 종료된 상태
            return futures.stream()
                    .map(CompletableFuture::join) // get과 join의 차이 : get은 checked exception, join은 unchecked exception 발생
                    .collect(Collectors.toList()); // join을 하게되면 Future에서 return하는 최종적인 결과값이 return 됨 -> 그걸 모아서 리스트로 구
        });

results.get().forEach(System.out::println);
        
>>>
Hello
Hello World

여러 작업 중 가장 빨리 끝난 하나의 결과에 콜백을 실행하려면? - anyOf()

  • 여러 작업 중에 가장 빨리 끝난 하나의 결과에 콜백을 실행한다.
// 아무거나 하나 빨리 끝나는거 결과 받아서 뭔가를 하고 싶을 때
CompletableFuture<Void> future3 = CompletableFuture.anyOf(world, hello).thenAccept(System.out::println);
future3.get();
>>>
Hello World

예외 처리

exceptionally

  • 비동기적으로 진행되는 task들에서 에러가 발생한다면, exceptionally를 통해 에러 타입을 받아서 뭔가를 return하는 Function을 넘겨줄 수 있다.
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureEx3 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        boolean throwError = true;

        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
            if (throwError){
                throw new IllegalArgumentException();
            }
            System.out.println("Hello "+Thread.currentThread().getName());
            return "Hello";
        }).exceptionally(ex -> {
            System.out.println(ex);
            return "Error!";
        });

        System.out.println(hello.get());
    }
}
>>>
java.util.concurrent.CompletionException: java.lang.IllegalArgumentException
Error!

handle

  • exceptionally 보다 조금 더 일반적인 사용이 가능하다.
  • BiFunction을 넘겨준다.
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureEx3 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        boolean throwError = false;

        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
            if (throwError){
                throw new IllegalArgumentException();
            }
            System.out.println("Hello "+Thread.currentThread().getName());
            return "Hello";
        }).handle((result, ex) ->{
            if (ex != null){
                System.out.println(ex);
                return "Error!";
            }
            return result;
        });

        System.out.println(hello.get());
    }
}
>>>
// boolean throwError = false;
Hello ForkJoinPool.commonPool-worker-9
Hello

// boolean throwError = true;
java.util.concurrent.CompletionException: java.lang.IllegalArgumentException
Error!

함께 보면 좋을 자료들


Reference

profile
나는 날마다 모든 면에서 점점 더 나아지고 있다.

0개의 댓글