[Java] Thread - 4 Thread Pool

Hyeonsu Bang·2021년 12월 18일
0

Java Basic

목록 보기
11/11

스레드풀


병렬 작업 처리가 늘어나게 되면 스레드 개수가 증가되고, 그에 따른 스레드 생성과 스레드 스케줄링으로 인해 CPU의 메모리 처리량이 늘어나게 된다. 이는 애플리케이션 성능의 저하와 직결된다. 이렇게 갑작스레 늘어날 수 있는 병렬 작업 처리량을 관리하기 위해서 스레드풀을 사용한다.

스레드풀은 스레드가 처리해야할 작업량을 조절할 수 있도록 한다. 스레드풀은 작업 큐 (Queue)에 작업을 저장해놓고 병렬로 처리할 수 있는 스레드의 개수를 제한한다. 각 스레드는 큐에서 작업을 얻어와 처리하고, 작업이 끝나면 다음 작업을 다시 큐에서 얻어온다. 결과적으로 작업 요청이 폭증하여도 애플리케이션 기능이 급갑하지 않게 된다.


자바에서는 스레드풀을 생성하고 사용할 수 있도록 java.util.concurrent.ExecutorService 인터페이스와 Executors 클래스를 제공한다. Executors의 정적 메서드를 통해 ExecutorService를 구현할 수 있는데, 이 구현 객체가 스레드풀을 의미한다.

[스레드풀의 작업 방식]


1. 작업 요청이 들어온다.
2. 스레드풀이 작업을 큐에 적재해놓는다.
3. 실행 대기 상태인 스레드가 순서대로 큐를 처리한다.
4. 결과를 리턴한다.


그림에서 보는 것처럼 ThreadPool은 병렬 처리를 할 스레드 개수를 제한하고, 작업 요청이 있을 때 요청을 작업장에 모은다. 해당 작업들은 들어온 순서대로 작업이 가능한 스레드에 의해 차례로 처리된다. (FIFO, Queue)

스레드풀 생명주기 관리(생성, 종료)

스레드풀을 생성하기 위해서는 ExecutorService를 구현해야 한다. 기본적으로는 ExecutorService 를 구현하고 있는 ThreadPoolExecutor 클래스를 생성해서 쓰면 되지만, Executor 클래스에서 간편하게 스레드풀을 생성할 수 있는 static 메서드를 제공한다.

  • newCachedThreadPool()

초기 스레드 수와 코어 스레드 수가 0개, 최대 스레드가 Integer.MAX_VALUE인 스레드풀을 생성한다. 스레드 개수보다 작업 수가 많으면 스레드를 하나 생성시켜 작업을 처리하게 된다. 스레드의 idle 상태가 60초 이상 지속되면 스레드를 종료하고 풀에서 제거한다.

  • newFixedThreadPool(int nThreads)
    public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    초기 스레드 수는 0개이고, 코어 스레드와 최고 스레드 수는 생성 시 입력한 int 값, 고정된 값으로 스레드풀을 생성하는 메서드이다. idle한 스레드를 따로 처리하지 않고 그냥 둔다.

위처럼 설정되어 있는 스레드풀 이외에 작업자가 원하는 상태의 스레드를 얻고 싶다면 ThreadPoolExecutor 객체를 생성하면 된다.

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize, 
                          long keepAliveTime, 
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(),defaultHandler);
}

스레드풀 종료

스레드풀은 데몬 스레드가 아니기 때문에 main 스레드가 종료되더라도 계속 실행 상태로 남아있다. 그래서 main() 호출이 종료되어도 애플리케이션 프로세스는 종료되지 않는다. 애플리케이션을 종료하려면 스레드풀을 종료시켜서 스레드들이 종료상태가 되도록 처리해주어야 한다. ExecutorService는 종료와 관련하여 세 개의 메서드를 제공한다.


  • **void shutdown()** 현재 처리 중인 작업 뿐 아니라 작업 큐에 대기하고 있는 모든 작업을 처리한 뒤에 스레드풀을 종료 시킨다.
  • **List<Runnable> shutdownNow()** 현재 작업 처리 중인 스레드를 interrupt 해서 작업 중지를 시도하고 스레드풀을 종료 시킨다. 작업을 완료하지 못한 스레드에 대한 정보가 List로 리턴된다.
  • **boolean awaitTermination(long timeout, TimeUnit unit)** shutdown() 호출 이후, 모든 작업 처리를 timeout 시간 내 완료하면 true, 완료하지 못하면 작업 처리 중인 스레드를 interrupt하고 false를 리턴한다.

따라서 모든 작업을 마무리하고 스레드풀을 종료할 때에는 shutdown(), 바로 작업을 종료하려면 shutdownNow()을 호출한다.



작업 생성과 처리 요청

작업 생성

작업은 Runnable 또는 Callable 을 구현하는 것으로 표현한다. 두 인터페이스의 차이점은 리턴값의 유무이다. Callable은 파라미터 T 타입을 call()의 리턴값으로 반환한다.

Callable<T> task = new Callable<T>(){

	@Override
	public T call() throws Exception{
		// code
		return T;
	}
}

스레드풀의 스레드는 작업 큐에서 Runnable 또는 Callable 객체를 가져와 run(), call()을 실행한다.


작업 처리 요청

작업 처리 요청이란 ExecutorService의 작업 큐에 생성한 작업 (Runnable or Callable) 객체를 넣는 행위를 말한다. 이 처리를 위해 두 가지 메서드를 제공한다.


  • void execute(Runnable command): Runnable을 작업 큐에 저장.
  • Future<?> submit(Runnable task)
  • Future<V> submit(Runnable task, V result)
  • Future<V> submit(Callable<V> task) Runnable 또는 Callable을 작업 큐에 저장하며, Future 라는 객체가 리턴되어 작업 결과를 확인 가능하다.

리턴값의 유무와 별개로, 두 메서드의 또 다른 차이점은 execute()의 경우 스레드에서 예외가 발생했을 때 스레드가 종료되고 해당 스레드는 스레드풀에서 제거된다. 따라서 스레드풀은 다른 작업을 처리하기 위해서 새로 스레드를 생성하게 된다. 반면 submit()의 경우 스레드가 남아있기 때문에 스레드를 재사용할 수 있다. 따라서 생성 ovehead를 줄이기 위해 submit()를 사용하는것이 좋다.


다음 예제는 의도적으로 NumberFormatException을 발생시켜서 위 두 메서드의 차이를 확인하는 예제이다.

public class ThreadPoolA {
    public static void main(String[] args) throws InterruptedException {

        ExecutorService executorService = Executors.newFixedThreadPool(2);

        for(int i=0;i<10;i++){

            Runnable runnable = new Runnable() {

                @Override
                public void run() {
                    ThreadPoolExecutor threadPoolExecutor = 
                    		(ThreadPoolExecutor) executorService;
                    int poolSize = threadPoolExecutor.getPoolSize();
                    String threadName = Thread.currentThread().getName();
                    System.out.println("총 스레드 개수 :" + poolSize + " / 작업 스레드 이름 : "+ threadName);

                    int value = Integer.parseInt("삼");
                }
            };
            executorService.execute(runnable);
//            executorService.submit(runnable);
            Thread.sleep(1000);
        }
        executorService.shutdown();
    }

}

총 스레드 개수 :1 / 작업 스레드 이름 : pool-1-thread-1
Exception in thread "pool-1-thread-1" java.lang.NumberFormatException: For input string: "삼"
	at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
	at java.lang.Integer.parseInt(Integer.java:580)
	at java.lang.Integer.parseInt(Integer.java:615)
	at org.java.chap12.thread_pool.ThreadPoolA$1.run(ThreadPoolA.java:21)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
총 스레드 개수 :2 / 작업 스레드 이름 : pool-1-thread-3
Exception in thread "pool-1-thread-3" java.lang.NumberFormatException: For input string: "삼"
	at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
	at java.lang.Integer.parseInt(Integer.java:580)
	at java.lang.Integer.parseInt(Integer.java:615)
	at org.java.chap12.thread_pool.ThreadPoolA$1.run(ThreadPoolA.java:21)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
총 스레드 개수 :2 / 작업 스레드 이름 : pool-1-thread-2
Exception in thread "pool-1-thread-2" java.lang.NumberFormatException: For input string: "삼"
	at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
	at java.lang.Integer.parseInt(Integer.java:580)
	at java.lang.Integer.parseInt(Integer.java:615)
	at org.java.chap12.thread_pool.ThreadPoolA$1.run(ThreadPoolA.java:21)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
총 스레드 개수 :2 / 작업 스레드 이름 : pool-1-thread-4
Exception in thread "pool-1-thread-4" java.lang.NumberFormatException: For input string: "삼"
	at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
	at java.lang.Integer.parseInt(Integer.java:580)
	at java.lang.Integer.parseInt(Integer.java:615)
	at org.java.chap12.thread_pool.ThreadPoolA$1.run(ThreadPoolA.java:21)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
총 스레드 개수 :2 / 작업 스레드 이름 : pool-1-thread-5
Exception in thread "pool-1-thread-5" java.lang.NumberFormatException: For input string: "삼"
	at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
	at java.lang.Integer.parseInt(Integer.java:580)
	at java.lang.Integer.parseInt(Integer.java:615)
	at org.java.chap12.thread_pool.ThreadPoolA$1.run(ThreadPoolA.java:21)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
총 스레드 개수 :2 / 작업 스레드 이름 : pool-1-thread-6

execute()로 스레드풀에 작업을 수행했을 때에는 작업 중 예외가 발생했을 때 스레드를 종료하고 새 스레드를 만든다. 스레드가 1부터 10까지 계속 생성되어서 작업을 처리하고 있는 것을 볼 수 있다.

반면 submit()으로 처리했을 때를 보자.

총 스레드 개수 :1 / 작업 스레드 이름 : pool-1-thread-1
총 스레드 개수 :2 / 작업 스레드 이름 : pool-1-thread-2
총 스레드 개수 :2 / 작업 스레드 이름 : pool-1-thread-1
총 스레드 개수 :2 / 작업 스레드 이름 : pool-1-thread-2
총 스레드 개수 :2 / 작업 스레드 이름 : pool-1-thread-1
총 스레드 개수 :2 / 작업 스레드 이름 : pool-1-thread-2
총 스레드 개수 :2 / 작업 스레드 이름 : pool-1-thread-1
총 스레드 개수 :2 / 작업 스레드 이름 : pool-1-thread-2
총 스레드 개수 :2 / 작업 스레드 이름 : pool-1-thread-1
총 스레드 개수 :2 / 작업 스레드 이름 : pool-1-thread-2

총 스레드 개수는 두 메서드 모두 2개로 같지만, 작업 스레드 개수가 submit()의 경우 2개인 것을 확인할 수 있다. 스레드가 종료되지 않고 계속해서 다른 작업을 수행하기 때문이다.

블로킹 방식의 작업 완료 통보

ExecutorService.submit()Runnable 또는 Callable의 작업을 작업 큐에 저장하고 Future<V> 를 리턴한다. Future 객체는 하나의 작업 결과가 아닌 최종 결과를 얻는데 사용된다. 즉 하나의 작업 결과를 블로킹(지연)하여 모든 결과를 기다린다는 의미이다. 이 떄문에 Future을 지연 완료 (pending completion) 객체라고도 한다. Furture.get()를 호출하면 스레드가 작업을 완료할 때까지 블로킹되었다가 작업을 완료하면 처리 결과를 리턴한다.


  • **V get():** 작업이 완료될 때까지 블로킹되었다가 처리 결과 리턴
  • V get(long timeout, TimeUnit unit): timeout 내에 리턴하지 않으면 TimeOutException 발생

리턴 타입인 V는 submit()의 매개변수에 따라 Callable 또는 임이의 타입 파라미터 V이다.

Future을 이용한 블로킹 방식의 작업 완료 통보에서 주의할 점은 작업을 처리하는 스레드가 작업을 완료하기 전까지는 get() 호출이 블로킹(지연) 되므로 다른 코드를 실행할 수가 없다는 점이다. 따라서 새로운 스레드 또는 스레드풀의 또 다른 스레드가 get() 호출을 담당해야 한다.

new Thread(new Runnable(){

	@Override
	public void run(){
		try{
		future.get();
		} catch (Exception e){}
	}

}).start();

executorService.submit(new Runnable(){
	@Override
	public void run(){
		try{
		future.get();
		} catch (Exception e){}
	}
});

Future에는 get() 말고도 작업이 종료되었는지 확인하는 isDone(), 작업 취소 여부를 확인하는 isCancled() 등 추가 메서드가 있다.


리턴값이 없는 작업 완료 통보

작업 완료에 대해서 리턴값이 필요 없다면 submit(Runnable task)를 이용하면 된다. submit()의 매개변수로 리턴값이 있는 Callable이나 Runnable과 더불어 타입 파라미터를 넣어주면 거기에 맞는 타입을 리턴하지만, Runnable만 매개했을 경우 null이 리턴된다.


정상적으로는 null이 리턴되지만, 스레드가 작업 처리 도중 interrupt 되면 InterruptedException을 발생시키고, 예외가 있을 경우 ExecutionException을 발생시킨다.

try{
  executorService.submit(runnable).get();
} catch (InterruptedException e){
  System.out.println("interrupted"); // interrupted 되었을 때 코드
}   catch (ExecutionException e){
  System.out.println("error occurred"); // 예외 발생 시 코드
  e.printStackTrace();
}

아래 예시를 보자.

package org.java.chap12.thread_pool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ThreadPoolB {

    public static void main(String[] args) {

        ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

        System.out.println("task is being processed");

        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                int sum = 0;
                for(int i = 1;i<=10;i++){
                    sum +=i;
                    System.out.println("output : "+ sum);
                }
            }
        };

        Future future = executorService.submit(runnable);

        try{
            future.get();
            System.out.println("task done");
        } catch (Exception e){
            System.out.println(e.getMessage());
        }

        executorService.shutdown();

    }
}

코드를 수행해보면 아래와 같다.

task is being processed
output : 1
output : 3
output : 6
output : 10
output : 15
output : 21
output : 28
output : 36
output : 45
output : 55
task done

반면 future.get()을 쓰지 않으면 코드가 아래처럼 수행이 된다.

task is being processed
task done
output : 1
output : 3
output : 6
output : 10
output : 15
output : 21
output : 28
output : 36
output : 45
output : 55

future.get()이 블로킹하는 부분을 여기서 확인할 수 있다. 스레드의 작업의 종료를 기다리지 않고 main 스레드가 바로 Syso("task done") 코드를 수행하는 것을 확인할 수 있다.

리턴값이 있는 작업 완료

스레드풀의 스레드가 작업 완료 후 작업 값을 얻어야 한다면 작업 객체를 Callable로 생성하면 된다. 이 경우 리턴 타입은 Callable의 리턴 타입인 제네릭 타입 파라미터 T이다.

다음 예제를 보자.

package org.java.chap12.thread_pool;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ThreadPoolWithResult {

    public static void main(String[] args) {

        ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

        System.out.println("processing...");

        Callable<Integer> callable = new Callable<Integer>() {

            @Override
            public Integer call() throws Exception {
                int sum = 0;
                for(int i=1;i<=90;i++){
                    sum += i;
                }

                return sum;
            }
        };

        Future<Integer> result = executorService.submit(callable);

        try{
            int sum = result.get();
            System.out.println("result returned : "+sum);
            System.out.println("process done");

        } catch (Exception e){
            System.out.println(e.getMessage());
        }
        executorService.shutdown();
    }
}

Callable의 리턴 타입을 Integer로 정했기 때문에 Future도 같은 타입으로 출력할 수 있다.

작업 처리 결과를 외부 객체에 저장하기

스레드의 작업 처리 결과를 외부 객체에 저장해야할 경우도 있다. 예컨대 한 스레드의 작업 처리를 완료하고 다른 스레드의 결과물과 취합해야 할 때처럼 말이다. 이럴 때는 값을 저장할 수 있는 임의의 객체를 만들어(Result) submit(Runnable, Result)를 이용하면 된다. Result는 대개 공유 객체가 되는데, 스레드에서 결과를 저장하기 위해서 사용되어야 하므로 생성자를 통해 Result 객체를 주입받도록 해야 한다.

class Task implements Runnable {

	Result result;

	public Task(Result result){
		this.result = result;
	}

	@Override 
	public void run(){

	//.. code
	this.result.someMethod(); // result에 처리 결과 저장
	}

}

다음은 두 개의 스레드의 작업 값을 Result 객체에 담는 예제이다.

package org.java.chap12.thread_pool;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ThreadPoolWithResult {

    public static void main(String[] args) {

        ExecutorService executorService = 
        	Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

        System.out.println("processing");

        class Task implements Runnable{

            Result result;

            Task(Result result){
                this.result = result;
            }

            @Override
            public void run() {
                int sum = 0;
                for(int i =1;i<=10;i++){
                    sum += i;
                }
                result.addValue(sum);
            }

        }

        Result result = new ThreadPoolWithResult.Result();
        Runnable task1 = new Task(result);
        Runnable task2 = new Task(result);

        Future<Result> future1 = executorService.submit(task1, result);
        Future<Result> future2 = executorService.submit(task2, result);

        try{
            result = future1.get();
            System.out.println(result.accumulatedValue); // 110
            result = future2.get();
            System.out.println(result.accumulatedValue); // 110

        } catch(Exception e){
            e.printStackTrace();
        }

        executorService.shutdown();
    }

    static class Result {

        int accumulatedValue;
        synchronized void addValue(int value){
            accumulatedValue += value;
        }
    }
}

공유 객체인 Result를 만들어서 동기화 메서드를 통해 값을 넣어준다. task1과 2가 모두 result를 참조하고 있기 때문에 addValue가 각각 호출되면서 값이 저장된다.

작업 완료 순으로 통보

스레드는 스케줄링에 따라서 임의 순서대로 동작한다. 따라서 작업 요청 순서대로 작업 처리가 완료된다고 보장할 수는 없다. 여러 개의 작업들이 순차적으로 처리될 필요가 없고, 처리 결과도 순차적으로 이용할 필요가 없다면 작업 처리가 완료된 것부터 결과를 얻어 이용하면 된다. 스레드풀에서 작업 처리가 완료된 것만 통보받고 싶다면 CompletionService 인터페이스를 이용하면 된다. 인터페이스에서는 poll(), take()를 제공한다.

  • Future<V> poll(): 완료된 Future을 가져옴. 완료된 작업이 없으면 즉시 null 리턴
  • **Future<V> take():** 완료된 작업의 Future 가져옴. 완료된 작업이 없다면 있을 때까지 블로킹(지연)

CompletionServiceExecutorCompletionService<V>를 통해 구현된다. 생성자 매개값으로 ExecutorService를 제공하면 된다.

ComopletionService completionService = ExecutorCompletionService<V>(executorService);

위 인터페이스의 메서드를 이용하려면 Future 가 있어야 하므로, 스레드풀에 작업을 추가할 때 submit()의 리턴값이 있는 메서드 오버로딩을 이용해야 한다.

다음은 take()를 이용해 스레드 결과값을 얻는 예제이다.

public class Test {
    public static void main(String[] args) {

        ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

        CompletionService<Result> completionService = new ExecutorCompletionService<>(executorService);

        System.out.println("submit tasks");

        for(int i =0;i<3;i++) {

            completionService.submit(new Callable<Result>() {

                @Override
                public Result call() throws Exception {

                    Result result = new Result();
                    result.threadName = Thread.currentThread().getName();

                    for(int i=1;i<=10;i++)
                        result.val += i;

                    return result;
                }

            });
        }

        System.out.println("take result from completed task");

        executorService.submit(new Runnable() {

            @Override
            public void run() {

                while(true){
                    try{
                        Result result;
                        Future<Result> future = completionService.take();

                        result = future.get();
                        System.out.println("result : "+ result.val);
                        System.out.println("threadName : "+ result.threadName);

                    } catch (Exception e){
                        break;
                    }
                }
            }
        });

        try {

            Thread.sleep(3000);

        } catch (Exception e){}

        System.out.println("shutting down");
        executorService.shutdownNow();
    }

    public static class Result{
        String threadName;
        int val;

    }
}

CompletionService 를 구현한 ExecutorCompletionServiceExecutorService를 주입받았기 때문에 ExecutorService의 메서드를 사용할 수 있다. CompletionService가 총 세 개의 작업을 생성해 큐에 추가한다. 스레드풀에 제한을 따로 걸지 않았으므로 CPU에 여유가 있다면 스레드 3개를 만들어 병렬처리할 것이다.

take()는 작업이 끝날 때까지 블로킹된다. 따라서 future.get()은 블로킹 없이 바로 수행될 것이다. 또한 이 작업은 앱이 실행되는 동안 계속 수행되어야 하므로, while 루핑으로 스레드풀에서 동작하고 있다가 shutdownNow()가 수행되어서 interruptException를 발생시켰을 때 break하면 된다. 스레드 이름을 확인하기 위해 Callable의 리턴값을 이름과 값을 담을 수 있는 객체 Result로 설정했다.

코드를 실행하면 다음과 같은 결과가 나온다.

submit tasks
take result from completed task
result : 55
threadName : pool-1-thread-3
result : 55
threadName : pool-1-thread-2
result : 55
threadName : pool-1-thread-1
shutting down

스레드 3, 2, 1 순으로 작업이 완료된 것을 확인할 수 있다. 스레드 순서는 코드를 수행할 때마다 당연히 바뀐다. 작업은 스레드 스케줄링에 의해 작동하기 때문이다.

콜백 방식의 작업 완료 통보

앞에서는 작업이 완료될 때까지 코드 수행을 블로킹하는 메서드 get()을 통해서 작업 완료를 확인하는 방법을 알아보았다. 이번에는 콜백 방식을 통해서 작업 완료를 확인해본다.

이름만 들어도 대충 두 방법의 차이가 느껴질 것이다. 콜백은 작업 요청을 호출한 스레드가 작업 중인 스레드의 결과를 기다리지 않고 다른 작업을 계속할 수 있다. 작업이 끝나면 스레드에서 콜백 메서드로 결과를 알려주기 때문이다.


스레드풀을 구현하고 있는 ExecutorService에서는 이런 콜백 메서드 기능이 없으므로 Runnable을 구현할 때 콜백 기능을 함께 구현해야 한다. 이것은 직접 개발자가 구현할 수도 있겠지만, java.nio.channels.CompletionHandler를 이용하면 좀 더 편하게 할 수 있다. 해당 인터페이스는 비동기 통신에서 콜백 객체를 만들 때 사용한다. 기본적인 사용 코드는 아래와 같다.

CompletionHandler<V, A> completionHandler = new CompletionHandler<V, A>() {

    @Override
    public void completed(V result, A attachment) {
        
    }

    @Override
    public void failed(Throwable exc, A attachment) {

    }
};

각 메서드는 작업이 정상 처리 되었을 때와 실패했을 때를 호출되는 콜백 메서드이다. V 타입 파라미터는 리턴 타입이고, A는 첨부값의 타입이다. 다음 예제를 보자.

public class CallbackExam {

    private ExecutorService executorService;

    public CallbackExam(){
        this.executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    }

    private CompletionHandler<Integer, Void> callback = new CompletionHandler<Integer, Void>() {

        @Override
        public void completed(Integer result, Void attachment) {
            System.out.println("task completed. the result : "+ result);
        }

        @Override
        public void failed(Throwable exc, Void attachment) {
            System.out.println("task failed. " + exc.toString());
        }
    };

    public void doWork(final String x, final String y){
        Runnable task = new Runnable() {
            @Override
            public void run() {
                try{
                    int intX = Integer.parseInt(x);
                    int intY = Integer.parseInt(y);
                    int result = intX + intY;
                    callback.completed(result, null);
                } catch (NumberFormatException e){
                    callback.failed(e, null);
                }
            }
        };
        executorService.submit(task);
    }

    public void finish(){
        executorService.shutdown();
    }

    public static void main(String[] args) {

        CallbackExam callbackExam = new CallbackExam();
        callbackExam.doWork("3","3");
        callbackExam.doWork("3","삼");
        callbackExam.finish();

    }
}

String 두 개를 인자로 넣어 각각 파싱했을 때 성공하면 completed(), 실패하면 failed()가 호출되도록 한 코드이다. 결과는 다음과 같이 출력된다.

task completed. the result : 6
task failed. java.lang.NumberFormatException: For input string: "삼"

source: https://github.com/thom-droid/basic-java/tree/master/src/org/java/chap12/thread_pool

reference : 「이것이 자바다」, 신용권

profile
chop chop. mish mash. 재밌게 개발하고 있습니다.

0개의 댓글