[Reactive Programming] Future, CompletionStage

DaeHoon·2023년 6월 5일
0

CompletableFuture

public class CompletableFuture<T> implements Future<T>, CompletionStage<T>
  • 2014년에 발표된 java 8에서 처음 도입
  • 비동기 프로그래밍 지원
  • Lambda, Method reference 등 Java 8의 새로운 기능 지원
  • Future, CompletionStage를 상속받음.

Method Reference

  • :: 연산자를 이용해서 함수에 대한 참조를 간결하게 표현
@RequiredArgsConstructor
public static class Person {
@Getter
	private final String name;
      public Boolean compareTo(Person o) {
      	return o.name.compareTo(name) > 0;
      }
}
public static void print(String name) {
	System.out.println(name);
}

public static void main(String[] args) {
	var target = new Person("f");
	Consumer<String> staticPrint = MethodReferenceExample::print;
	Stream.of("a", "b", "g", "h")
    .map(Person::new)// constructor reference
    .filter(target::compareTo) // method reference
    .map(Person::getName) // instance method reference
    .forEach(staticPrint); // static method reference    
 }

Future

  • 비동기적인 작업을 수행
  • 해당 작업이 완료되면 결과를 반환하는 인터페이스

Future Interface

public interface Future<V> {
  boolean cancel(boolean mayInterruptIfRunning);
  boolean isCancelled();
  boolean isDone();
  V get() throws InterruptedException, ExecutionException;
  V get(long timeout, TimeUnit unit)
	  throws InterruptedException, ExecutionException, TimeoutException;
}

get()

  • 결과를 구할 때까지 thread가 계속 block
  • future에서 무한 루프나 오랜 시간이 걸린다면 thread가 blocking 상태 유지

get(long timeout,TimeUnit unit)

  • 결과를 구할 때까지 timeout동안 thread가
    block
  • timeout이 넘어가도 응답이 반환되지 않으면
    TimeoutException 발생

isDone(), isCancelled()

  • future의 상태를 반환
  • isDone: task가 완료되었다면, 원인과 상관없이 true 반환
  • isCancelled: task가 명시적으로 취소된 경우, true 반환

cancel(booleanmayInterruptIfRunning)

  • future의 작업 실행을 취소
  • 취소할 수 없는 상황이라면 false를 반환
  • mayInterruptIfRunning가 false라면 시작하지 않은 작업에 대해서만 취소

FutureHelper

public static Future<Integer> getFuture() {
    var executor = Executors.newSingleThreadExecutor();
    try {
        return executor.submit(() -> {
            return 1;
        });
    } finally {
        executor.shutdown();
    }
}

public static Future<Integer> getFutureCompleteAfter1s() {
    var executor = Executors.newSingleThreadExecutor();
    try {
        return executor.submit(() -> {
            Thread.sleep(1000);
            return 1;
        });
    } finally {
        executor.shutdown();
    }
}
  • getFuture(): 새로운 쓰레드를 생성하여 1을 반환
  • getFutureCompleteAfter1S(): 새로운 쓰레드를 생성하고 1초 대기 후 1을 반환

Future 인터페이스의 한계

  • cancel을 제외하고 외부에서 future를 컨트롤 할 수 없다
  • 반환된 결과를 get() 해서 접근하기 때문에 비동기 처리가 어렵다
  • 완료되거나 에러가 발생했는지 구분하기 어렵다

ExecutorService

  • 쓰레드 풀을 이용하여 비동기적인 작업을 실행하고 관리
  • 별도의 쓰레드를 생성하고 관리하지 않아도 되므로, 코드를 간결하게 유지 가능
  • 쓰레드 풀을 이용하여 자원을 효율적으로 관리

ExecutorService Method

public interface ExecutorService extends Executor {
  void execute(Runnable command);
  <T> Future<T> submit(Callable<T> task);
  void shutdown();
}
  • execute(Runnable command): Runnable 인터페이스를 구현한 작업을 쓰레드 풀에서 비동기적으로 실행

  • <T> Future<T> submit(Callable<T> task): Callable 인터페이스를 구현한 작업을 쓰레드 풀에서 비동기적으로 실행하고, 해당 작업의 결과를 Future<T>로 반환

  • shutdown(): ExecutorService를 종료. 더 이상 task를 받지 않는다.

Executors - ExecutorService 생성

  • newSingleThreadExecutor: 단일 쓰레드로 구성된 스레드 풀을 생성, 하 번에 하나의 작업만 실행
  • newFixedThreadPool: 고정된 크기의 쓰레드 풀을 생성. 크기는 인자로 주어진 n과 동일
  • newCachedThreadPool: 사용 가능한 쓰레드가 없다면 새로 생성해서 작업을 처리하고, 있다면
    재사용. 쓰레드가 일정 시간 사용되지 않으면 회수
  • newScheduledThreadPool: 스케줄링 기능을 갖춘 고정 크기의 쓰레드 풀을 생성. 주기적이거
    나 지연이 발생하는 작업을 실행
  • newWorkStealingPool: work steal 알고리즘을 사용하는 ForkJoinPool을 생성

CompletionStage

  • 비동기적인 작업을 수행
  • 해당 작업이 완료되면 결과를 처리하거나 CompletionStage를 연결하는 인터페이스

CompletionStage Interface

public interface CompletionStage<T> {
 public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
 public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
 
 public CompletionStage<Void> thenAccept(Consumer<? super T> action);
 public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
 
 public CompletionStage<Void> thenRun(Runnable action);
 public CompletionStage<Void> thenRunAsync(Runnable action);
 
 public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U > fn);
 public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U > fn);
 
 public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);
}

CompletionStage 연산자 조합

Helper.completionStage()
            .thenApplyAsync(value -> {
        log.info("thenApplyAsync: {}", value);
        return value + 1;
    }).thenAccept(value -> {
        log.info("thenAccept: {}", value);
    }).thenRunAsync(() -> {
        log.info("thenRun");
    }).exceptionally(e -> {
        log.info("exceptionally: {}", e.getMessage());
        return null;
    });
Thread.sleep(100);
  • 50개에 가까운 연산자들을 활용하여 비동기 task들을 실행하고 값을 변형하는 등 chaining을 이용한 조합 가능
  • 에러를 처리하기 위한 콜백 제공

ForkJoinPool - thread pool

  • CompletableFuture는 내부적으로 비동기 함수들을 실행하기 위해 ForkJoinPool을 사용
  • ForkJoinPool의 기본 size = 할당된 cpu 코어 1
  • 데몬 쓰레드. main 쓰레드가 종료되면 즉각적으로 종료

ForkJoinPool - fork & join

  • Task를 fork를 통해서 subtask로 나누고
  • Thread pool에서 steal work 알고리즘을 이용해서 균등하게 처리해서
  • join을 통해서 결과를 생성
    자세한 내용 https://ttl-blog.tistory.com/800

CompletionStage 연산자

Functional Interface

  • 함수형 프로그래밍을 지원하기 위해 java 8부터 도입
  • 1개의 추상 메서드를 갖고 있는 인터페이스
  • 함수를 1급 객체로 사용할 수 있다
    - 함수를 변수에 할당하거나 인자로 전달하고 반환값으로 사용 가능
  • Function, Consumer, Supplier, Runnable등
  • 함수형 인터페이스를 구현한 익명 클래스를 람다식으로 변경 가능
@FunctionalInterface
public interface Function<T, R> {
 R apply(T t);
}

@FunctionalInterface
public interface Consumer<T> {
 void accept(T t);
}

@FunctionalInterface
public interface Supplier<T> {
 T get();
}

@FunctionalInterface
public interface Runnable {
 public abstract void run();
}

CompletionStage 연산자와 연결

  • Consumer - accept 메소드 -> thenAccept(Consumer action)
  • Function - apply 메소드 -> thenApply(Function fn)
  • Function - compose 메소드 (추상 메소드 x) -> thenCompose(Function fn)
  • Runnable - run 메소드 -> thenRun(Runnable action)

thenAccept(Consumer action)

  • Consumer를 파라미터로 받음.
  • 이전 Task로부터 값을 받지만 넘기지는 않음.
  • 다음 Task에게 null이 전달된다.
  • 값을 받아서 action만 수행하는 경우에 사용됨
@FunctionalInterface
public interface Consumer<T> {
	void accept(T t);
}

CompletionStage<Void> thenAccept(Consumer<? super T> action);
CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);

Helper

  • finishedStage: 1을 반환하는 완료된 CompletableFuture 반환 (future는 항상 종료시킨 다음에 반환함 == 무조건 Done 상태)
  • runningStage: 1초를 sleep한 후 1을 반환하는 completableFuture (future는 무조건 진행중인 상태)
@SneakyThrows
public static CompletionStage<Integer> finishedStage() {
   var future = CompletableFuture.supplyAsync(() > {
   log.info("supplyAsync");
   return 1;
 });
 Thread.sleep(100);
 return future;
}

public static CompletionStage<Integer> runningStage() {
   return CompletableFuture.supplyAsync(() > {
   try {
     Thread.sleep(1000);
     log.info("I'm running!");
   } catch (InterruptedException e) {
   	throw new RuntimeException(e);
   }
 return 1;
 });
}

thenAccept vs thenAcceptAsync (finishedStage)

CompletionStageThenAcceptExample.java

log.info("start main");
CompletionStage<Integer> stage = Helper.finishedStage();
stage.thenAccept(i > {
 log.info("{} in thenAccept", i);
}).thenAccept(i > {
 log.info("{} in thenAccept2", i);
});
log.info("after thenAccept");
Thread.sleep(100);
[main] - start main
[ForkJoinPool.commonPool-worker-19] - return in future
[main] - 1 in thenAccept
[main] - null in thenAccept2
[main] - after thenAccept
CompletionStageThenAcceptAsyncExample.java

log.info("start main");
CompletionStage<Integer> stage = Helper.finishedStage();
stage.thenAcceptAsync(i > {
 log.info("{} in thenAcceptAsync", i);
}).thenAcceptAsync(i > {
 log.info("{} in thenAcceptAsync2", i);
});
log.info("after thenAccept");
Thread.sleep(100)
[main] - start main
[ForkJoinPool.commonPool-worker-19] - return in future
[main] - after thenAccept
[ForkJoinPool.commonPool-worker-19] - 1 in thenAcceptAsync
[ForkJoinPool.commonPool-worker-5] - null in thenAcceptAsync2
  • Future 완료 시 thenAccept는 caller의 스레드에서 action이 실행됨
  • thenAcceptAsync는 각각의 action이 별도의 스레드에서 실행된다.

thenAccept[Async]의 실행 쓰레드

  • done 상태에서 thenAcceptcaller(main)의 쓰레드에서 실행된다.
  • done 상태의 completionStagethenAccept를 사용하는 경우, caller 쓰레드를 block 할 수 있다.
  • done 상태가 아닌 thenAcceptAsynccallee(forkJoinPool)의 쓰레드에서 실행
  • done 상태가 아닌 completionStagethenAcceptAsync를 사용하는 경우, callee를 block 할 수 있다.

thenAccept vs thenAcceptAsync (runningStage)

thenAccept

CompletionStageThenAcceptRunningExample.java 

log.info("start main");
CompletionStage<Integer> stage = Helper.runningStage();
stage.thenAccept(i > {
 	log.info("{} in thenAccept", i);
}).thenAccept(i > {
 	log.info("{} in thenAccept2", i);
});
Thread.sleep(2000);
[main] INFO - start main
[ForkJoinPool.commonPool-worker-19] INFO - I'm running!
[ForkJoinPool.commonPool-worker-19] INFO - 1 in thenAccept
[ForkJoinPool.commonPool-worker-19] INFO - null in thenAccept2

thenAcceptAsync

log.info("start main");
CompletionStage<Integer> stage = Helper.runningStage();
stage.thenAcceptAsync(i > {
	 log.info("{} in thenAcceptAsync", i);
}).thenAcceptAsync(i > {
 	log.info("{} in thenAcceptAsync", i);
});
Thread.sleep(2000);
output
[main] INFO - start main
[ForkJoinPool.commonPool-worker-19] INFO - I'm running!
[ForkJoinPool.commonPool-worker-5] INFO - 1 in thenAcceptAsync
[ForkJoinPool.commonPool-worker-5] INFO - null in thenAcceptAsync2
  • thenAccept는 19번 스레드에서 계속 실행되는 반면 thenAcceptAsync 스레드 풀에 있는 스레드에서 action이 실행된다.
  • future가 종료되지 않은 상황이라면 thenAccept를 넘긴 callback는 callee (19번 스레드)에서 실행된다.

then*[Async]의 실행 쓰레드

then*Async의 쓰레드풀 변경

CompletionStageThenAcceptAsyncExecutorExample.java

var single = Executors.newSingleThreadExecutor();
var fixed = Executors.newFixedThreadPool(10);

log.info("start main");
CompletionStage<Integer> stage = Helper.completionStage();
stage.thenAcceptAsync(i -> {
	log.info("{} in thenAcceptAsync", i);
}, fixed).thenAcceptAsync(i -> {
	log.info("{} in thenAcceptAsync2", i);
}, single);
log.info("after thenAccept");
Thread.sleep(200);

single.shutdown();
fixed.shutdown();
[main] - start main
[ForkJoinPool.commonPool-worker-19] - return in future
[main] - after thenAccept
[pool-3-thread-1] - 1 in thenAcceptAsync
[pool-2-thread-1] - null in thenAcceptAsync2
  • thenAcceptAsync를 사용했기 때문에 동기적으로 동작하지 않음 (return in future 다음에 after thenAccept가 출력된 이유)
  • 모든 모든 then*Async 연산자는 executor를 추가 인자로 받는다. executor를 넘겨줌으로써 통해서 다른 쓰레드풀로 task를 실행할 수 있다.

thenApply[Async]

@FunctionalInterface
public interface Function<T, R> {
	R apply(T t);
}

<U> CompletionStage<U>
	thenApply(Function<? super T,? extends U> fn);
    
<U> CompletionStage<U>
	thenApplyAsync(Function<? super T,? extends U> fn);
  • Function을 파라미터로 받는다.
  • 이전 task로부터 T 타입의 값을 받아서 가공하고 U 타입의 값을 반환한다
    • 다음 task에게 반환했던 값이 전달된다
    값을 변형해서 전달해야 하는 경우 유용하다.

thenApplyAsync

CompletionStageThenApplyAsyncExample.java

CompletionStage<Integer> stage = Helper.completionStage();
stage.thenApplyAsync(value -> {
  var next = value + 1;
  // logging
  return next;
}).thenApplyAsync(value -> {
  var next = "result: " + value;
  // logging
  return next;
}).thenApplyAsync(value -> {
  var next = value.equals("result: 2");
  // logging
  return next;
}).thenAcceptAsync(value -> log.info("{}", value));

Thread.sleep(100);
[ForkJoinPool.commonPool-worker-19] - return in future
[ForkJoinPool.commonPool-worker-19] - in thenApplyAsync: 2
[ForkJoinPool.commonPool-worker-19] - in thenApplyAsync2: result: 2
[ForkJoinPool.commonPool-worker-19] - in thenApplyAsync3: true
[ForkJoinPool.commonPool-worker-19] - true

thenCompose[Async]

@FunctionalInterface
public interface Function<T, R> {
	default <V> Function<V, R> compose(Function<? super V, ? extends T> before) 
    	Objects.requireNonNull(before);
		return (V v) -> apply(before.apply(v));
	}
}

<U> CompletionStage<U> thenCompose(
	Function<? super T, ? extends CompletionStage<U>> fn);
<U> CompletionStage<U> thenComposeAsync(
	Function<? super T, ? extends CompletionStage<U>> fn);
  • Function을 파라미터로 받는다
  • 이전 task로부터 T 타입의 값을 받아서 가공하고 U 타입의 CompletionStage를 반환한다
  • 반환한 CompletionStage가 done 상태가 되면 값을 다음 task에 전달한다
  • 다른 future를 반환해야하는 경우 유용

thenComposeAsync

CompletionStageThenComposeAsyncExample.java

CompletionStage<Integer> stage = Helper.completionStage();
stage.thenComposeAsync(value -> {
  var next = Helper.addOne(value);
  log.info("in thenComposeAsync: {}", next);
	return next;
}).thenComposeAsync(value -> {
  var next = Helper.addResultPrefix(value);
  log.info("in thenComposeAsync2: {}", next);
  return next;
}).thenAcceptAsync(value -> {
	log.info("{} in thenAcceptAsync", value);
});

Thread.sleep(1000);
public static CompletionStage<Integer> addOne(int value) {
	return CompletableFuture.supplyAsync(() -> {
      try {
	      Thread.sleep(100);
      } catch (InterruptedException e) {
    	  throw new RuntimeException(e);
      }
      return value + 1;
	});
}
19:14:48 [main] - start main
19:14:48 [ForkJoinPool.commonPool-worker-19] - I'm running!
19:14:48 [ForkJoinPool.commonPool-worker-19] - in thenComposeAsync:
java.util.concurrent.CompletableFuture@37b05857[Not completed]
19:14:48 [ForkJoinPool.commonPool-worker-19] - in thenComposeAsync2:
java.util.concurrent.CompletableFuture@6398d2c5[Not completed]
19:14:48 [ForkJoinPool.commonPool-worker-5] - result: 2 in thenAcceptAsync

thenRun[Async]

@FunctionalInterface
public interface Runnable {
	public abstract void run();
}

public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
  • Runnable을 파라미터로 받는다
  • 이전 task로부터 값을 받지 않고 값을 반환하지 않는다
  • 다음 task에게 null이 전달된다
  • future가 완료되었다는 이벤트를 기록할 때 유용

thenRunAsync

CompletionStageThenRunAsyncExample.java

log.info("start main");
CompletionStage<Integer> stage = Helper.completionStage();
stage.thenRunAsync(() -> {
	log.info("in thenRunAsync");
}).thenRunAsync(() -> {
	log.info("in thenRunAsync2");
}).thenAcceptAsync(value -> {
	log.info("{} in thenAcceptAsync", value);
});

Thread.sleep(100);
48:32 [main] - start main
48:32 [ForkJoinPool.commonPool-worker-19] - return in future
48:32 [ForkJoinPool.commonPool-worker-19] - in thenRunAsync
48:32 [ForkJoinPool.commonPool-worker-19] - in thenRunAsync2
48:32 [ForkJoinPool.commonPool-worker-19] - null in thenAcceptAsync

exceptionally

CompletionStage<T> exceptionally(
	Function<Throwable, ? extends T> fn);
  • Function을 파라미터로 받는다
  • 이전 task에서 발생한 exception을 받아서 처리하고 값을 반환한다
  • 다음 task에게 반환된 값을 전달한다
  • future 파이프에서 발생한 에러를 처리할때 유용
CompletionStageExceptionallyExample.java

Helper.completionStage()
  .thenApplyAsync(i -> {
		log.info("in thenApplyAsync");
		return i / 0;
	}).exceptionally(e -> {
        log.info("{} in exceptionally", e.getMessage());
        return 0;
	}).thenAcceptAsync(value -> {
		log.info("{} in thenAcceptAsync", value);
	});
    
Thread.sleep(1000);
55:36 [ForkJoinPool.commonPool-worker-19] - return in future
55:36 [ForkJoinPool.commonPool-worker-5] - in thenApplyAsync
55:36 [ForkJoinPool.commonPool-worker-5] -
java.lang.ArithmeticException: / by zero in exceptionally
55:36 [ForkJoinPool.commonPool-worker-5] - 0 in thenAcceptAsync

CompletableFuture 연산자

CompletableFuture 클래스

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
  public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {}
  public static CompletableFuture<Void> runAsync(Runnable runnable) {}
  
  public boolean complete(T value) {}
  public boolean isCompletedExceptionally() {}
  
  public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {}
  public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {}
}

supplyAsync

@FunctionalInterface
public interface Supplier<T> {
	T get();
}

public static <U> CompletableFuture<U> supplyAsync(
	Supplier<U> supplier) {}
  • Supplier를 제공하여 CompletableFuture를 생성 가능
  • Supplier의 반환값이 CompletableFuture의 결과

runAsync

@FunctionalInterface
public interface Runnable {
	public abstract void run();
}

public static CompletableFuture<Void> runAsync(
	Runnable runnable) {}
  • Runnable를 제공하여 CompletableFuture를 생성할 수 있다
  • 값을 반환하지 않는다
  • 다음 task에 null이 전달된다

supplyAsync vs runAsync

CompletableFutureSupplyAsyncExample.java

var future = CompletableFuture.supplyAsync(() -> {
  try {
		Thread.sleep(100);
	} catch (InterruptedException e) {
		throw new RuntimeException(e);
	}
	return 1;
});

assert !future.isDone();

Thread.sleep(1000);

assert future.isDone();
assert future.get() == 1;
CompletableFutureRunAsyncExample.java

var future = CompletableFuture.runAsync(() -> {
	try {
		Thread.sleep(100);
	} catch (InterruptedException e) {
		throw new RuntimeException(e);
	}
});
assert !future.isDone();

Thread.sleep(1000);

assert future.isDone();
assert future.get() == null;
  • future가 종료되면 supplyAsync는 콜백함수의 반환값 1을 가져오는 반면 runAsync는 null을 반환한다.

complete

CompletableFuture<Integer> future = new CompletableFuture<>();
assert !future.isDone();

var triggered = future.complete(1);
assert future.isDone();
assert triggered;
assert future.get() == 1;

triggered = future.complete(2);
assert future.isDone();
assert !triggered;
assert future.get() == 1;
  • future.complete(1)로 future가 완료가 된 상황. 이후에 이미 완료가 된 future를 2로 완료 시켜도 1을 얻어온다.
  • CompletableFuture가 완료되지 않았다면 주어진 값으로 채운다
  • complete에 의해서 상태가 바뀌었다면 true, 아니라면 false를 반환한다

isCompletedExceptionally

var futureWithException = CompletableFuture.supplyAsync(() -> {
	return 1 / 0;
});

Thread.sleep(100);
assert futureWithException.isDone();
assert futureWithException.isCompletedExceptionally();
  • Exception에 의해서 complete 되었는지 확인할 수 있다

상태값

allOf

public static CompletableFuture<Void> allOf(
	CompletableFuture<?>... cfs) {}
  • 여러 completableFuture를 모아서 하나의 completableFuture로 변환할 수 있다
  • 모든 completableFuture가 완료되면 상태가 done으로 변경
  • void를 반환하므로 각각의 값에 get으로 접근해야 한다.

allOf Example

var startTime = System.currentTimeMillis();
var firstFuture = Helper.waitAndReturn(100, 1);
var secondFuture = Helper.waitAndReturn(500, 2);
var thirdFuture = Helper.waitAndReturn(1000, 3);

CompletableFuture.allOf(firstFuture, secondFuture, thirdFuture)
	.thenAcceptAsync(v -> {
		log.info("after allOf");
		try {
          log.info("first: {}", firstFuture.get());
          log.info("second: {}", secondFuture.get());
          log.info("third: {}", thirdFuture.get());
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
}).join();

var endTime = System.currentTimeMillis();
11:18 [ForkJoinPool.commonPool-worker-5] - waitAndReturn: 500ms
11:18 [ForkJoinPool.commonPool-worker-23] - waitAndReturn: 1000ms
11:18 [ForkJoinPool.commonPool-worker-19] - waitAndReturn: 100ms
11:19 [ForkJoinPool.commonPool-worker-5] - after allOf
11:19 [ForkJoinPool.commonPool-worker-5] - first: 1
11:19 [ForkJoinPool.commonPool-worker-5] - second: 2
11:19 [ForkJoinPool.commonPool-worker-5] - third: 3
11:19 [main] - elapsed: 1014ms

anyOf

public static CompletableFuture<Object> anyOf(
	CompletableFuture<?>... cfs) {}
  • 여러 completableFuture를 모아서 하나의 completableFuture로 변환할 수 있다
  • 주어진 future 중 하나라도 완료되면 상태가 done으로 변경
  • 제일 먼저 done 상태가 되는 future의 값을 반환
var startTime = System.currentTimeMillis();
var firstFuture = Helper.waitAndReturn(100, 1);
var secondFuture = Helper.waitAndReturn(500, 2);
var thirdFuture = Helper.waitAndReturn(1000, 3);
CompletableFuture.anyOf(firstFuture, secondFuture, thirdFuture)
	.thenAcceptAsync(v -> {
		log.info("after anyOf");
		log.info("first value: {}", v);
	}).join();
    
    
var endTime = System.currentTimeMillis();
log.info("elapsed: {}ms", endTime - startTime);
14:18 [ForkJoinPool.commonPool-worker-23] - waitAndReturn: 1000ms
14:18 [ForkJoinPool.commonPool-worker-19] - waitAndReturn: 500ms
14:18 [ForkJoinPool.commonPool-worker-5] - waitAndReturn: 100ms
14:18 [ForkJoinPool.commonPool-worker-9] - after anyOf
14:18 [ForkJoinPool.commonPool-worker-9] - first value: 1
14:18 [main] - elapsed: 114ms

CompletableFuture의 한계

  • 지연 로딩 기능을 제공하지 않는다
    - CompletableFuture를 반환하는 함수를 호출시 즉시 작업이 실행된다
  • 지속적으로 생성되는 데이터를 처리하기 어렵다
    - CompletableFuture에서 데이터를 반환하고 나면 다시 다른 값을 전달하기 어렵다

정리

  • Future와 CompletionStage 인터페이스
  • CompletableFuture 클래스
  • CompletableFuture를 이용한 비동기 코드 작성

Reference

  • Spring Webflux 완전 정복 : 코루틴부터 리액티브 MSA 프로젝트까지
profile
평범한 백엔드 개발자

0개의 댓글