- 반드시 해당 지식들이 있어야 WebFlux를 사용할 수 있는 것은 아닙니다.
- 하지만, 결국 Java의 thread 관련 기술들을 알아야 깊은 WebFlux의 필요성과 이해를 할 수 있다고 생각해서 추가하게되었습니다.
Java의 Concurrent ?
- Java에서 지원하는 Concurrent 프로그래밍
- 멀티 프로세싱(
process) => ProcessBuilder
- 멀티 쓰레딩(
thread) => Thread, Runnable, Callable, Executor 등
- 본 글에서는 Java의
멀티 쓰레딩 관련 개념들을 살펴본다
Executor와 구현체들
[ Executor ]
[ 개념 ]
- Java 5부터 Java Concurrency api를 뒷받침하기 위해 제공된 인터페이스(interface)
- 비동기적으로 호출을 실행하는 메서드를 제공
- 하위 구현체와 인터페이스들을 통해 실제 사용
[ 구조 ]
- 수행할 작업인 Runnable 을 받아서 실행
package java.util.concurrent.Executor;
public interface Executor {
void excute(Runnable command);
}
[ Executor 인터페이스와 하위 관계 ]

[ ExecutorService ]
[ 개념 ]
- Executor 인터페이스를 확장해서 라이프 사이클(시작 / 중지 / 종료)을 관리할 수 있는 기능을 정의한 인터페이스(interface)
- Thread의 라이프사이클이나 발생할 수 있는 low level의 고려사항을 개발자가 신경쓰지 않게 해준다
- ExecutorService에 Task(작업)을 지정해주면, 가진 ThreadPool을 이용해서 Task를 실행
- 만약, Thread Poool의 Thread 수 보다 Task가 많다면, Task는 큐(Queue)에서 대기하게 된다
[ 구성 ]
< T > Future< T >submit(Callable< T > task)
Future<?> submit(Runnable task)
< T > Future< T > submit(Runnable task, T result)
< T > List<Future< T > > invokeAll(Collections<? extends Callable< T > > tasks)
< T > List<Future< T > > invokeAll(Collections<? extends Callable< T > > tasks , long timeout, TimeUnit unit)
< T > invokeAny(Collection<? extends Callable< T > > tasks)
T invokeAny(Collections<? extends Callable< T > > tasks, long timeout, TimeUnit unit)
void shutdown()
List< Runnable > shutdownNow()
boolean isShutdown()
boolean isTerminated()
boolean awaitTermination(long timeout, TimeUnit unit)
[ ThreadPoolExecutor ]
[ 개념 ]
- Spring에서는
ThreadPoolExecutor를 상속받은 ThreadPoolTaskExecutor를 제공해서 더 편리하게 사용 가능
[ 핵심 파라미터 ]
- corePoolSize
최초 생성되는 쓰레드 사이즈
corePoolSize가 될 때 까지는 계속 쓰레드를 유지
- 계속 유지가 되기 때문에, 적절한 값을 찾아서 적용하는 것이 중요
- maximumPoolSize
- 해당 풀에
최대로 유지할 수 있는 개수
corePoolSize = maximumPoolsize 이면 fixed-size thread pool과 동일
- keepAliveTime
corePoolSize를 넘어 maximumPoolSize까지 증가하는 과정에서 KeepAliveTime동안 idle 상태에 있으면 다시 corePoolSize로 개수를 줄인다
- unit
- workQueue
corePoolSize보다 많아졌을 경우, Task가 쌓이며 기다리는 BlockingQueue의 work 방식을 설정
- 종류
SynchronousQueue : task를 큐에 유지하지 않고, 바로 스레드로 넘기는 방식의 큐
LinkedBlockingQueue : 크기 제한이 없는 큐
ArrayBlockingQueue : 크기 제한이 있는 큐
[ 생성 ]
final int corePoolSize = 3;
final int maximumPoolSize = 5;
final int queueCapacity = 3;
final ThreadPoolExecutor executor
= new ThreadPoolExecutor(corePoolSize,
maximumPoolSize,
1L,
TimeUnit.MINUTES,
new ArrayBlockingQueue<>(queueCapacity));
ExecutorService executorService = Executors.newSingleThreadExecutor()
ExecutorService executorService = Executors.newFixedThreadPool(4)
ExecutorService executorService = Executors.newCachedThreadPool()
ExecutorService executorService = Executors.newWorkStealingPool(10)
...
[ Executors ]
Executors.newSingleThreadExecutor()
Executors.newFixedThreadPool(int Threads)
Threads 갯수 만큼 고정된 쓰레드 풀을 생성
Executors.newCachedThreadPool()
- 필요할 때, 필요한 만큼의 쓰레드 풀을 생성 (계속 증가 가능 => 위험성 존재)
- 이미 생성된 쓰레드를 재활용할 수 있기 때문에 성능상의 이점이 있을 수 있음
Executors.newWorkStealingPool(int)
- 시스템에 가용 가능한 만큼 쓰레드를 활용하는 풀을 생성
Executors.newWorkStealingPool(int parallelism)
- Java8 에서 도입된 Thread Pool
- 지정된
parallelism을 지원할 만큼 충분한 Thread를 유지하고, 여러 Queue를 사용하여 경합을 줄임
- Thread를 동적으로 늘리고 줄인다
- 작업이 실행되는 순서를 보장하지 않음
[ 그 외 ]
ScheduledExecutorService
- 지정한 스케쥴에 따라 작업을 수행할 수 있는 기능이 추가된
ExecutorService 인터페이스
ScheduledThreadPoolExecutor
Runnable / Callable
[ Runnable ]
[ 개념 ]
- 다중 스레드 작업을 나타내기 위해 제공되는 핵심 인터페이스
void 반환 타입을 가진다
=> 실행 후 결과를 받지 X
[ 구성 ]
public interface Runnable {
public void run();
}
[ Callable ]
[ 개념 ]
- Runnable과 동일하게, 다중 스레드 작업을 나타내기 위해 제공되는 핵심 인터페이스
<V> 반환 타입 (특정 객체 리턴)
=> 실행 후 결과를 객체로 받음
=> 기존 Runnable에서는 실행 결과를 받기 위해 공용 메모리나 파이프와 같은 것들이 필요했다
[ 구성 ]
public Interface Callable<V> {
V call() throws Exception
}
[ 특징 ]
- 최상단 인터페이스인 Executor에는 Runnable만 매개변수로 가진다
- 하지만, Executor를 상속받은 ExecutorService에 Callable을 변수로 가지는 메소드들이 정의
=> submit() / invokeAll() / invokeAny() 등
Future 타입
[ Future ]
[ 설명 ]
- 비동기적인 Task의 현재 상태를 조회하거나 결과를 가져오기 위한 객체
- Runnable / Callable의 상태를 조회하거나 결과를 확인하기 위해 사용
- FutureTask라는 구현체를 가진다
- 시간이 걸릴 수 있는 작업을 Future 내부에 작성하고,
호출자 스레드가 결과를 기다리는 동안 다른 유용한 작업을 할 수 있음
=> 실행을 맞기고 미래 시점에 결과를 얻는 것으로 이해 가능
[ 구성 ]
public interface Future<V> {
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
boolean isCancelled();
boolean isDone();
boolean cancel(boolean mayInterruptIfRunning)
}
[ 한계 ]
- 복잡한 로직 구현 불가
- isDone() / isCanceled처럼 기본 사항들만 체크할 수 있기 때문에 복잡한 로직 구현에 한계가 존재
- get() 뒤에만 콜백을 지정할 수 있었다
- get()을 통한 강제 Blocking 발생
- 결과가 보장되지 않는 상태라서 콜백을 Future를 정의하며 등록할 수 없었다
- 여러 Future 조합 불가능
- 예외 처리용 API를 제공하지 않음
[ CompletableFuture ]
[ 설명 ]
[ 비동기 작업 실행 ]
- 특징
- 직접 쓰레드를 생성하지 않는다
--> 내부적으로 ForkJoinPool.commonPool()의 쓰레드를 수행해서 비동기 작업 수행
[ 콜백 제공 ]
thenApply(Function)
비동기 로직이 수행된 후 결과값을 받고, 값을 반환(return) 한다
Function 이니까 입력값 1개 / 출력값 1개
thenAccept(Consumer)
비동기 로직이 수행된 후 결과값을 받고, 값을 반환(return)하지 않는다
Consumer 니까 입력값 1개 / 출력값 0개
thenRun(Runnable)
비동기 로직이 수행된 후 결과값을 받지 않고, 값을 반환(return)하지도 않는다
Runnable 이니까 입력값 0개 / 출력값 0개
[ 조합하기 ]
thenCompose()
연관성이 있는 2개의 작업을 처리할 때 사용
해당 작업을 처리한 후 결과를 받아서 다음 비동기 작업을 처리할 때 사용
thenCombine()
연관성이 없는 2개의 작업을 처리할 때 사용
allOf()
연관성이 없는 다수의 작업을 수행할 때 사용
각 작업의 반환(return)값이 같다고 보장할 수 없기 때문에 반환값을 가지지 않는다
만약 모든 작업의 결과를 ArrayList로 저장하려면 아래처럼 결과 값에 접근해서 만들어야 한다
(에러처리의 복잡함 때문에 get()대신 join() 사용)
anyOf()
연관성이 없는 다수의 작업들 중 먼저 끝나는 하나가 있으면 종료하는 방식

[ 예외처리 ]
exceptionally(Function)
비동기 작업을 하는 중에 에러가 발생했을 때, 에러를 처리하기 위한 용도
handle(BiFunction)
비동기 작업을 하는 중에 에러가 발생했을 때, 결과 값과 에러 모두를 처리하기 위한 용도
[ ListenableFuture ]
[ 설명 ]
- Spring에서 제공하는 클래스로 콜백 메소드를 추가할 수 있도록 Future를 확장한 인터페이스
- Future가 이미 완료된 상태에서 콜백이 추가된다면 콜백은 즉시 호출
[ 구성 ]
public interface ListenableFuture<T> extends Future<T> {
void addCallback(ListenableFutureCallback<? super T> callback);
void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback);
default CompletableFuture<T> completable() {
CompletableFuture<T> completable = new DelegatingCompletableFuture(this);
this.addCallback(completable::complete, completable::completeExceptionally);
return completable;
}
}
ref