executorService
는 Task를 제출하고 나중에 결과를 수집할 수 있는 인터페이스를 수집newFixedThreadPool
과 같은 팩토리 메소드 중 하나를 이용하여, Thread Pool을 만들어서 사용nTreads
를 포함하는 ExecutorService
를 만들고, 이를 ThreadPool에 저장Sleep 상태나 I/O를 기다리거나 네트워크 연결을 기다리는 태스크가 있을 경우 주의해야한다.
해당 작업이 발생하는 경우 태스크가 워커 스레드에 할당 된 상태를 유지하지만 아무 작업도 하지 않는다.
(Waste한 상태가 되고 최악의 경우 Deadlock(두 개 이상의 작업이 서로가 끝나기만을 기다리는 교착상태)발생)
버전 | 사용방법 |
---|---|
java5이전 | Runnable과 Thread를 이용하여 구현 |
Java5 | ExecutorService, Callable, Future |
java7 | Fork/Join , RecursiveTask |
java8 | Steam, CompletableFuture |
java9 | 분산 비동기 프로그래밍은 명시적으로 지원(발행 구독 프로토콜 지원 Flow API) |
동기식 처리 (Synchronous)
직렬적으로 task를 수행한다. 즉, 태스크는 순차적으로 실행되며 어떤 작업이 수행중이면 다음 작업은 대기한다.
비동기식 처리 (Asynchronous)
병렬적으로 task를 수행한다. 즉, 태스크가 종료되지 않은 상태라 하더라도 대기하지 않고 다음 태스크를 실행한다.
Future 형식의 API는 일회성의 값을 처리하는 데 적합하고,
리액티브 형식의 비동기 API는 자연스럽게 일련의 값을 처리하는데 적합하다.동기 API와 비동기 API
전통적인 동기 API 에서는 메소드를 호출한 다음에 메소드가 계산을 완료할 때까지 기다렸다가 메소드가 반환되면 호출자는 반환된 값으로 계속 다른 동작을 수행한다. 호출자와 피호출자가 각각 다른 스레드에서 실행되는 상황이었더라도, 호출자는 피호출자의 동작 완료를 기다렸을 것이다. 이처럼 동기 API를 사용하는 상황을 블록 호출 이라고 한다.
반면 비동기 API 에서는 메소드가 즉시 반환되며, 끝내지 못한 나머지 작업을 호출자 스레드와 동기적으로 실행될 수 있도록 다른 스레드에 할당한다. 이와같은 비동기 API를 사용하는 상황을 비블록 호출이라고 한다. 다른 스레드에 할당된 나머지 계산 결과는 콜백 메소드를 호출해서 전달하거나 호출자가 계산결과를 끝날때까지 기다림 메소드를 추가로 호출하면서 전달된다.
자바에서 비동기(Asynchronous) 프로그래밍을 가능하게하는 인터페이스
ExecutorService executor = Executors.newCachedThreadPool();
Future<Double> future = executor.submit(new Callable<double>() {
public Double call() {
return doSomeLongComputation();// 시간이 오래걸리는 Task
}
});
doSomethingElse(); // 비동기 작업을 수행하는 동안 다른 작업을 수행.
try {
Double result = future.get(1,TimeUnit.SECONDS);
} catch(ExecutionException ee){
// 계산 중 예외 발생
} catch (InterruptedException ie){
// 현재 스레드에서 대기 중 인터럽트 발생
} catch (TimeoutException te){
// Future가 완료되기 전에 타임아웃 발생
}
ExecutorService
가 제공하는 스레드가 시간이 오래걸리는 작업을 처리하는 동안, 메인 스레드로 다른 작업을 동시에 실행할 수 있다. 다른 작업을 처리하다가 시간이 오래 걸리는 작업의 결과가 필요한 시점에 get()을 사용하여 결과를 가져온다. get 메소드를 호출했을 때, 이미 계산이 완료되어 결과가 준비 되었다면, 즉시 결과를 반환하지만 결과가 준비되지 않았따면 작업이 완료될 때 까지 스레드를 블록시킨다.
⚠ 오래 걸리는 작업이 영원히 끝나지 않으면 전체 작업이 영원히 Block될 수 있다.
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("Hello: " + Thread.currentThread().getName());
});
future.get();
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello: " + Thread.currentThread().getName());
return "Hello";
});
System.out.println(future.get());
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello: " + Thread.currentThread().getName());
return "Hello";
}).thenApply((s) -> {
System.out.println(Thread.currentThread().getName());
return s.toUpperCase();
});
System.out.println(future.get());
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello: " + Thread.currentThread().getName());
return "Hello";
}).thenAccept((s) -> {
System.out.println(Thread.currentThread().getName());
System.out.println(s.toUpperCase());
});
future.get();
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello: " + Thread.currentThread().getName());
return "Hello";
}).thenRun(() -> {
System.out.println(Thread.currentThread().getName());
});
future.get();
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello: " + Thread.currentThread().getName());
return "Hello";
});
CompletableFuture<String> future = hello.thenCompose(App::getWorld);
System.out.println(future.get());
}
private static CompletableFuture<String> getWorld(String message) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("World: " + Thread.currentThread().getName());
return message + " World";
});
}
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello: " + Thread.currentThread().getName());
return "Hello";
});
CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
System.out.println("World: " + Thread.currentThread().getName());
return "World";
});
CompletableFuture<String> future = hello.thenCombine(world, (h, w) -> h + " " + w);
System.out.println(future.get());
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello: " + Thread.currentThread().getName());
return "Hello";
});
CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
System.out.println("World: " + Thread.currentThread().getName());
return "World";
});
List<CompletableFuture<String>> futures = Arrays.asList(hello, world);
CompletableFuture[] futuresArray = futures.toArray(new CompletableFuture[futures.size()]);
CompletableFuture<List<String>> result = CompletableFuture.allOf(futuresArray)
.thenApply(v -> {
return futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
});
result.get().forEach(System.out::println);
CompletableFuture<Void> future = CompletableFuture.anyOf(hello, world).thenAccept(System.out::println);
future.get();
boolean throwError = true;
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
if(throwError){
throw new IllegalArgumentException();
}
System.out.println("Hello: " + Thread.currentThread().getName());
return "Hello";
}).exceptionally(ex -> {
System.out.println(ex);
return "Error!";
});
System.out.println(hello.get());
boolean throwError = true;
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
if(throwError){
throw new IllegalArgumentException();
}
System.out.println("Hello: " + Thread.currentThread().getName());
return "Hello";
}).handle((result, ex) -> {
if (ex != null){
System.out.println(ex);
return "Error!";
}
return result;
});
System.out.println(hello.get());