[Java8]CompletableFuture

Wintering·2022년 5월 24일
0

이펙티브 자바

목록 보기
10/18

Java8 Concurrent 프로그래밍

Executor와 ThreadPool

  • Executor 프레임워크와 쓰레드풀을 통해서 쓰레드의 힘을 높은 수준으로 끌어 올리는 태스크 제출과 실행을 분리할 수 있는 기능을 제공

Thread의 문제

  • Java 스레드는 직점 운영체제 스레드에 접근
  • 운영체제 스레드는 만들고 종료하는데에 있어 비용이 비쌈
  • 또한 스레드 숫자는 제한되어 있으므로, 운영체제가 지원하는 스레드 초과 사용시 충돌 발생 가능

ThreadPool 의 장점

  • Java의 executorService 는 Task를 제출하고 나중에 결과를 수집할 수 있는 인터페이스를 수집
  • 프로그램은 newFixedThreadPool과 같은 팩토리 메소드 중 하나를 이용하여, Thread Pool을 만들어서 사용
    nTreads를 포함하는 ExecutorService를 만들고, 이를 ThreadPool에 저장
  • 하드웨어에 맞는 태스크를 유지함과 동시에 수 천개의 Task를 아무 오버헤드 없이 제출 가능

ThreadPool의 문제점

  1. K개의 스레드를 가진 풀은 오직 k만큼의 스레드를 동시에 실행할 수 있다. 초과로 제출된 태스크는 Queue에 저장되며, 이전에 태스크 중 하나가 종료되기 전까지는 스레드에 할당하지 않는다.

    Sleep 상태나 I/O를 기다리거나 네트워크 연결을 기다리는 태스크가 있을 경우 주의해야한다.
    해당 작업이 발생하는 경우 태스크가 워커 스레드에 할당 된 상태를 유지하지만 아무 작업도 하지 않는다.
    (Waste한 상태가 되고 최악의 경우 Deadlock(두 개 이상의 작업이 서로가 끝나기만을 기다리는 교착상태)발생)

  2. 중요한 코드를 실행하는 스레드가 죽지 않도록, Java는 Main을 반환하기 전 모든 스레드의 작업이 끝나길 기다린다. -> 프로그램을 종료하기 전 모든 쓰레드풀을 종료하는 습관이 필요

동시성을 구현하는 자바의 진화

버전사용방법
java5이전Runnable과 Thread를 이용하여 구현
Java5ExecutorService, Callable, Future
java7Fork/Join , RecursiveTask
java8Steam, CompletableFuture
java9분산 비동기 프로그래밍은 명시적으로 지원(발행 구독 프로토콜 지원 Flow API)

동기식 처리 (Synchronous)

직렬적으로 task를 수행한다. 즉, 태스크는 순차적으로 실행되며 어떤 작업이 수행중이면 다음 작업은 대기한다.

비동기식 처리 (Asynchronous)

병렬적으로 task를 수행한다. 즉, 태스크가 종료되지 않은 상태라 하더라도 대기하지 않고 다음 태스크를 실행한다.

Future 형식의 API는 일회성의 값을 처리하는 데 적합하고,
리액티브 형식의 비동기 API는 자연스럽게 일련의 값을 처리하는데 적합하다.

동기 API와 비동기 API

전통적인 동기 API 에서는 메소드를 호출한 다음에 메소드가 계산을 완료할 때까지 기다렸다가 메소드가 반환되면 호출자는 반환된 값으로 계속 다른 동작을 수행한다. 호출자와 피호출자가 각각 다른 스레드에서 실행되는 상황이었더라도, 호출자는 피호출자의 동작 완료를 기다렸을 것이다. 이처럼 동기 API를 사용하는 상황을 블록 호출 이라고 한다.

반면 비동기 API 에서는 메소드가 즉시 반환되며, 끝내지 못한 나머지 작업을 호출자 스레드와 동기적으로 실행될 수 있도록 다른 스레드에 할당한다. 이와같은 비동기 API를 사용하는 상황을 비블록 호출이라고 한다. 다른 스레드에 할당된 나머지 계산 결과는 콜백 메소드를 호출해서 전달하거나 호출자가 계산결과를 끝날때까지 기다림 메소드를 추가로 호출하면서 전달된다.


CompletableFuture

자바에서 비동기(Asynchronous) 프로그래밍을 가능하게하는 인터페이스

  • Future로는 하기 어려운 작업들
    - Future를 외부에서 완료 시킬 수 없다. 취소하거나, get()에 타임아웃을 설정할 수는 있다.
    - 블로킹 코드를 사용하지 않고서는 작업이 끝났을 때 콜백을 실행할 수 없다.
    - 여러 Future를 조합할 수 없다. ex) Event 정보를 가져온 다음, Event에 참석하는 회원 가져오기
    - 예외 처리용 API를 제공하지 않는다.

    ex) 🔻Future를 단순 활용한 비동기
ExecutorService executor = Executors.newCachedThreadPool();
Future<Double> future = executor.submit(new Callable<double>() {
  public Double call() {
    return doSomeLongComputation();// 시간이 오래걸리는 Task
  }
});
doSomethingElse(); // 비동기 작업을 수행하는 동안 다른 작업을 수행.
try {
  Double result = future.get(1,TimeUnit.SECONDS);
} catch(ExecutionException ee){
  // 계산 중 예외 발생
} catch (InterruptedException ie){
  // 현재 스레드에서 대기 중 인터럽트 발생
} catch (TimeoutException te){
  // Future가 완료되기 전에 타임아웃 발생
}  

ExecutorService가 제공하는 스레드가 시간이 오래걸리는 작업을 처리하는 동안, 메인 스레드로 다른 작업을 동시에 실행할 수 있다. 다른 작업을 처리하다가 시간이 오래 걸리는 작업의 결과가 필요한 시점에 get()을 사용하여 결과를 가져온다. get 메소드를 호출했을 때, 이미 계산이 완료되어 결과가 준비 되었다면, 즉시 결과를 반환하지만 결과가 준비되지 않았따면 작업이 완료될 때 까지 스레드를 블록시킨다.

⚠ 오래 걸리는 작업이 영원히 끝나지 않으면 전체 작업이 영원히 Block될 수 있다.


비동기로 작업 실행하기

  • runAsync() - 리턴값이 없는경우

    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
        System.out.println("Hello: " + Thread.currentThread().getName());
    });

    future.get(); 
  • supplyAsync() - 리턴값이 있는 경우

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        System.out.println("Hello: " + Thread.currentThread().getName());
        return "Hello";
    });

    System.out.println(future.get());

콜백 제공하기

  • thenApply(Function) - 리턴 값이 있을 때

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        System.out.println("Hello: " + Thread.currentThread().getName());
        return "Hello";
    }).thenApply((s) -> {
        System.out.println(Thread.currentThread().getName());
        return s.toUpperCase();
    });

    System.out.println(future.get());  
  • thenAccept(Consumer) - 리턴 값이 없을 때

    CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
        System.out.println("Hello: " + Thread.currentThread().getName());
        return "Hello";
    }).thenAccept((s) -> {
        System.out.println(Thread.currentThread().getName());
        System.out.println(s.toUpperCase());
    });

    future.get();  
  • thenRun(Runnable) - 리턴값을 받지 않고, 다른 작업을 처리

    CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
        System.out.println("Hello: " + Thread.currentThread().getName());
        return "Hello";
    }).thenRun(() -> {
        System.out.println(Thread.currentThread().getName());
    });

    future.get();  

조합하기

thenCompose() - 두 작업이 서로 이어서 실행하도록 조합

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello: " + Thread.currentThread().getName());
            return "Hello";
        });
        CompletableFuture<String> future = hello.thenCompose(App::getWorld);

        System.out.println(future.get());
    }

    private static CompletableFuture<String> getWorld(String message) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("World: " + Thread.currentThread().getName());
            return message + " World";
        });
    }

themCombine() - 두 작업을 독립적으로 실행하고 둘다 종료 했을 때 콜백 실행

    CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
        System.out.println("Hello: " + Thread.currentThread().getName());
        return "Hello";
    });

    CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
        System.out.println("World: " + Thread.currentThread().getName());
        return "World";
    });

    CompletableFuture<String> future = hello.thenCombine(world, (h, w) -> h + " " + w);
    System.out.println(future.get());

allOf() - 여러 작업을 모두 실행하고, 모든 작업 결과에 콜백 실행

    CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
        System.out.println("Hello: " + Thread.currentThread().getName());
        return "Hello";
    });

    CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
        System.out.println("World: " + Thread.currentThread().getName());
        return "World";
    });
    List<CompletableFuture<String>> futures = Arrays.asList(hello, world);
    CompletableFuture[] futuresArray = futures.toArray(new CompletableFuture[futures.size()]);

    CompletableFuture<List<String>> result = CompletableFuture.allOf(futuresArray)
            .thenApply(v -> {
                return futures.stream()
                        .map(CompletableFuture::join)
                        .collect(Collectors.toList());
            });

    result.get().forEach(System.out::println);

anyOf() - 가장 빨리 끝난 하나의 결과에 콜백 실행

    CompletableFuture<Void> future = CompletableFuture.anyOf(hello, world).thenAccept(System.out::println);
    future.get();

예외처리

exceptionally(Function)

    boolean throwError = true;

    CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
        if(throwError){
            throw new IllegalArgumentException();
        }

        System.out.println("Hello: " + Thread.currentThread().getName());
        return "Hello";
    }).exceptionally(ex -> {
        System.out.println(ex);
        return "Error!";
    });

    System.out.println(hello.get());

handle(BiFunction)

    boolean throwError = true;

    CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
        if(throwError){
            throw new IllegalArgumentException();
        }

        System.out.println("Hello: " + Thread.currentThread().getName());
        return "Hello";
    }).handle((result, ex) -> {
        if (ex != null){
            System.out.println(ex);
            return "Error!";
        }
        return result;
    });

    System.out.println(hello.get());

0개의 댓글