Java 재활 훈련 11일차 - Thread

0

java

목록 보기
11/18

Thread

java에서는 Thread를 생성 할 때도 class를 만들어 생성해야한다. 이때 thread class는 Runnable이라는 interface를 구현해야하는데, Runnalbe interface의 run 메서드가 thread가 실행될 때 호출되는 메서드이다.

  • Task.java
public class Task implements Runnable {
    @Override
    public void run() {
        System.out.println("Run Task thread");
    }
}

Task class는 Runnable interface를 구현하여 run을 오버라이드 했으므로 thread로 실행이 가능하다. 다음은 Task를 thread로 실행하는 방법이다.

  • Main.java
public class Main {
    public static void main(String[] args) {
        System.out.println("hello main");
        Task task = new Task();
        Thread taskThread = new Thread(task);
        taskThread.start();
    }
}

결과로 다음과 같이 나온다.

hello main
Run Task thread

만약 main thread가 더 빨리 끝나버린다면 Run Task thread이 안찍힐 수도 있다.

thread를 생성하는 방법으로 Thread 클래스를 상속하여 child class를 생성하는 방법이 있다.

  • Worker.java
public class Worker extends Thread{
    @Override
    public void run() {
        System.out.println("Worker starts");
    }
}

Thread class를 상속하여 run을 오버라이드하면 된다. 실행하는 방법은 이전과 동일하지만, Thread 객체를 굳이 또 만들 필요가 없다.

  • Main.java
public class Main {
    public static void main(String[] args) {
        System.out.println("hello main");
        Worker worker = new Worker();
        worker.start();
    }
}

결과는 다음과 같다.

hello main
Worker starts

Thread 이름

thread들은 자신의 이름을 가지고 있다. main thread는 main이라는 이름을 가지고 있고, 이외의 작업 thread들은 Thread-n이라는 이름이 기본으로 할당된다. 만약 다른 이름을 설정하고 싶다면 다음과 같이 할 수 있다.

thread.setName("thread-name");

thread이름을 잘 설정해야 java 디버깅에 있어서 이점이 크다. 특히 현재 code가 어느 thread에서 실행되고 있는 지 확인하기 위해서는 currentThread()로 확인이 가능하다.

  • Task.java
public class Task implements Runnable {
    @Override
    public void run() {
        Thread thread = Thread.currentThread();
        System.out.println("Current Thread: " + thread.getName());
    }
}

Thread.currentThread()으로 현재 코드가 실행되는 thread를 가져올 수 있다.

  • Main.java
public class Main {
    public static void main(String[] args) {
        System.out.println("hello main");
        Task task = new Task();
        Thread taskThread = new Thread(task);
        taskThread.setName("task-thread");
        taskThread.start();
    }
}

setName으로 task thread의 이름을 task-thread로 선정해준 것을 볼 수 있다. 실행 결과를 확인하면 다음과 같다.

hello main
Current Thread: task-thread

task-thread라는 thread이름이 나온 것을 볼 수 있다.

Thread 상태

thread 객체를 생성하여 start 메서드를 실행하게 되면 RUNNABLE 상태가 되어 '실행 대기'를 하는 것이지 바로 실행되지 않는다. CPU core가 할당되기를 기다리면서 CPU 스케줄링에 따라 할당되는 것이다.

thread 생성 ---> RUNNABLE(실행 대기)
                      ^        | ^
                     /         v |
         BLOCK(일시 정지)   CPU 할당
                       ^       | ^
                        \      v | 
                        RUNNING(실행) --> TERMINATED(종료)

RUNNING에 있는 thread들은 run method를 실행하되, 한 번에 모두 실행되는 것이 아니라, CPU 스케줄링에 따라 일부만 실행되고 CPU 점유를 뺏길 수 있다. 이렇게 뺏기게된 thread는 다시 RUNNALBE 상태가 되어 대기를 하게 되고, 이러한 과정을 반복하게 된다. run method가 모두 실행되었다면 종료되는 것이다.

실행 상태에서 일시 정지 상태로도 가는데, 일시 정지 상태는 thread를 실행할 수 없는 상태를 말한다. 이는 sleep이나 join, wait 처럼 강제로 일시 정지 상태에 빠지게 하는 경우이다.

thread가 다시 실행 상태로 가기 위해서는 일시 정지 상태에서 실행 대기 상태로 가야한다. 이를 정리한 것이 아래의 표이다.

메서드설명
sleep주어진 시간 동안 thread를 일시 정지 상태로 만든다. 주어진 시간이 끝나면 자동적으로 실행 대기 상태가 된다.
joinjoin 메서드를 호출한 thread는 일시 정지되어, join한 thread가 종료될 때까지 일시 정지해있는다.
wait동기화 블록 내에서 스레드를 일시 정지 상태로 만든다.

위는 thread를 '일시 정지'로 만드는 메서드들이다. 다음은 '일시 정지'에서 다시 '실행 대기' 상태로 만드는 메서드들이다.

메서드설명
interrupt일시 정지 상태일 경우, InterruptedException을 발생시켜 실행 대기 상태 또는 종료 상태로 만든다.
notify, notifyAllwait() 메서드로 인해 일시 정지 상태인 thread를 실행 대기 상태로 만든다.

마지막으로 yield 메서드는 'RUNNING' 상태에 있는 thread를 'RUNNABLE' 상태로 보내기 위해서 사용한다.

그림으로 정리하면 다음과 같다.

thread 생성 --->    RUNNABLE(실행 대기)
                      ^        |   ^
                      |        |   | 
            interrupt, notify  |   |
                notifyAll      |   |
                     /         v   |
         BLOCK(일시 정지)     CPU 할당
                       ^       |   ^
                       |       |   |
                       |       |  yield
                sleep, join    |   |
                    wait       |   |
                       |       |   |
                        \      v   | 
                         RUNNING(실행) --> TERMINATED(종료)

thread 내에서 Thread.sleep()을 호출하면 지정한 second동안 thread가 일시정지 상태로 빠진다.

public class Task implements Runnable {
    @Override
    public void run() {
        Thread thread = Thread.currentThread();
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Current Thread: " + thread.getName());
    }
}

실행 후에 3초 후 Current Thread: task-thread 로그가 나올 것이다.

다른 thread 기다리기

A thread가 B thread를 기다려서 결과를 얻어와야 하는 상황이 있다. 이러한 경우에 join을 사용하면 된다.

---------ThreadA---------      ----------ThreadB---------
|                       |      |                        |
| threadB.start();    ------------> run()               |
| threadB.join();       |      |     |                  |
|      다시 시작     <------------- return               |
|                       |      --------------------------
------------------------
public class SumThread extends Thread{
    private long sum;
    
    public long getSum() {
        return sum;
    }
    
    public void setSum(long sum) {
        this.sum = sum;
    }

    @Override
    public void run() {
        for(int i = 0; i <= 100; i++) {
            sum += i;
        }
    }
}

SumThread thread는 1~100까지의 합을 계산하는데, SumThread thread를 호출하고 그 결과가 나올 때까지 main thread를 멈추도록 하자.

public class Main {
    public static void main(String[] args) {
        SumThread sumThread = new SumThread();
        sumThread.start();

        try {
            sumThread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("1~100: " + sumThread.getSum());
    }
}

main thread는 sumThread와 join하여 sumThreadrun method가 종료될 때까지 '일시 정지'해있는다. 결과를 출력하면 sumThread의 모든 합이 잘 나온 것을 확인할 수 있다.

1~100: 5050

만약 join을 안했다면 0이 나오거나, 합이 일부만 되어 결과로 출력되었을 것이다.

다른 thread에게 양보

일부 thread들은 반복적인 실행만 하는 경우가 있다. 이 경우에 CPU core를 다른 thread들에게 양보하기 위해서 yield가 존재한다.

public class Task implements Runnable {
    @Override
    public void run() {
        while (true) {
            System.out.println("task!");
            Thread.yield();
        }
    }
}

단, yield를 사용하는 것은 thread를 '일시 정지' 상태로 만드는 것이 아니라, '실행 가능' 상태로 만드는 것일 뿐이기 때문에 자원에 여유가 있다면 thread가 바로 실행된다.

Thread 동기화

thread들 간의 shared object를 사용할 때는 동기화가 필요하다. 이를 위해서 java에서는 shared object의 동기화를 위한 synchronized 메서드와 블럭을 제공한다. shared object에 대한 동기화 이므로 shared object 내부에 사용한다.

object 내부에 동기화 메서드와 동기화 블록이 여러 개 있다면 thread는 이들 중 하나를 실행할 때, 다른 thread는 해당 메서드는 물론이고, 다른 동기화 메서드 및 블록도 실행할 수 없다. 단, 일반 메서드는 실행이 가능하다.

동기화 메서드는 synchronized 키워드를 붙이면 된다. synchronized 키워드는 일반 메서드와 정적 메서드 어디든 붙일 수 있다.

public synchronized void method() {
    // 단 하나의 thread만 실행하는 영역
}

thread가 동기화 메서드를 실행하는 즉시 객체는 잠금이 일어나고, 메서드 실행이 끝나면 잠금이 풀린다. 메서드 전체가 아닌 일부 영역을 실행할 때만 객체 잠금을 걸고 싶다면 다음과 같이 동기화 블록을 만들면 된다.

public void method() {
    synchronized(공유객체) {
        // 단 하나의 thread만 실행하는 영역
    }
    // 여러 thread가 실행하는 영역
}
  • Calculator
public class Calculator {
    private int memory;

    public synchronized int getMemory() {
        return memory;
    }

    public synchronized void setMemory1(int memory) {
        this.memory = memory;
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + ": " + this.memory);
    }

    public void setMemory2(int memory) {
        synchronized (this) {
            this.memory = memory;
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + ": " + this.memory);
        }
    }
}

Calculatormemory 맴버 변수와 관련된 모든 method들을 synchronized로 동기화하였다. 따라서, 해당 method들은 접근 시에 하나의 thread만 접근이 가능하다. 즉, 동시 접근이 불가능하다.

setMemory1, setMemory2 method들은 memory 값을 변경하고 2초간 멈추는 것을 볼 수 있다.

  • User1Thread
public class User1Thread extends Thread{
    private Calculator calculator;

    public User1Thread() {
        setName("User1Thread");
    }

    public void setCalculator(Calculator calculator) {
        this.calculator = calculator;
    }

    @Override
    public void run() {
        calculator.setMemory1(100);
    }
}

calculator.setMemory1()synchronized로 동기화 처리 되었으므로, User1Thread가 접근하면 다른 thread는 접근하지 못한다.

  • User2Thread
public class User2Thread extends Thread{
    private Calculator calculator;

    public User2Thread() {
        setName("User2Thread");
    }

    public void setCalculator(Calculator calculator) {
        this.calculator = calculator;
    }

    @Override
    public void run() {
        calculator.setMemory2(50);
    }
}

User1Thread와 동일한 로직이다.

  • Main
public class Main {
    public static void main(String[] args) {
        Calculator calculator = new Calculator();

        User1Thread user1Thread = new User1Thread();
        user1Thread.setCalculator(calculator);
        user1Thread.start();

        User2Thread user2Thread = new User2Thread();
        user2Thread.setCalculator(calculator);
        user2Thread.start();

        System.out.println("memory: " +calculator.getMemory());
    }
}

User1Thread에서 먼저 Calculator에 접근하여 값을 수정하고, 두번째로 User2ThreadCalculator에 접근하여 값을 수정한다. 마지막으로 Main thread에서 calculatormemory 값을 가져온다. memoryUser1Thread, UserThread2에 의해서 동기화되어있으므로 이들이 먼저 처리한 후에 Main thread에 전달된다.

따라서, 결과는 다음과 같다.

User1Thread: 100
User2Thread: 50
memory: 50

wait과 notify thread 제어

두 개의 thread가 번갈아 작업을 수행할 때가 있다. 이 방법의 핵심은 shared object에 있다. shared object는 두 thread가 작업할 내용을 각가 동기화 메서드로 정해 놓는다. 한 thread가 작업을 완료하면 notify 메서드를 호출해서 '일시 정지' 상태에 있는 다른 thread를 '실행 대기' 상태로 만들고, 자신은 두 번 작업 하지 않도록 'wait()' 메서드로 '일시 정지' 상태로 만든다.

            RUNNABLE(실행 대기)
            ^        |   ^
            |        |   | 
  notify, notifyAll  |   |
            |        v   |
BLOCK(일시 정지)     CPU 할당
            ^       |   ^
            |       |   |
           wait     |   |
            |       |   |
             \      v   | 
            RUNNING(실행)

notifywait에 의해 '일시 정지'된 thread 중 한개를 실행 대기 상태로 만들고, notifyAllwait에 의해 일시 정지된 모든 thread를 실행 대기 상태로 만든다. 주의할 점은 이 두 메서드는 동기화 메서드 또는 동기화 블록 내에서만 사용이 가능하다.

condition variable을 java로 구현한 것으로 볼 수도 있다.

아래는 WorkOjbect로 두 thread가 해야할 작업을 동기화 메서드인 methodAmethodB로 각각 정의해두고, ThreadAThreadB가 교대로 호출하도록 하는 것이다.

  • WorkObject
public class WorkObject {
    public synchronized void methodA() {
        Thread thread = Thread.currentThread();
        System.out.println(thread.getName());

        notify(); // wake another thread up
        try {
            wait(); // current thread to be blocked
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public synchronized void methodB() {
        Thread thread = Thread.currentThread();
        System.out.println(thread.getName());
        notify(); // wake another thread up
        try {
            wait(); // current thread to be blocked
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

methodAmethodB는 동기화 작업이므로 하나의 thread에서만 실행이 가능하다. ThreadAmethodA를 실행하고 ThreadBmethodB를 실행하되, 둘 다 모두 동기화가 걸려있으므로 번갈아 실행하도록 만들자.

  • ThreadA
public class ThreadA extends Thread{
    private WorkObject workObject;

    public ThreadA(WorkObject workObject) {
        setName("ThreadA");
        this.workObject = workObject;
    }

    @Override
    public void run() {
        for(int i = 0; i < 10; i++) {
            workObject.methodA();
        }
    }
}

ThreadAWorkObjectmethodA를 실행하지만, methodA에서는 notify 후에 wait 상태로 빠지므로, 바로 block 상태로 갈 것이다.

  • ThreadB
public class ThreadB extends Thread{
    private WorkObject workObject;

    public ThreadB(WorkObject workObject) {
        setName("ThreadB");
        this.workObject = workObject;
    }

    @Override
    public void run() {
        for(int i = 0; i < 10; i++) {
            workObject.methodB();
        }
    }
}

ThreadBWorkObjectmethodB를 실행하지만, methodB에서는 notify 후에 wait 상태로 빠지므로, 바로 block 상태로 갈 것이다. 이때 methodA를 실행하는 ThreadA가 실행될 것이다.

  • Main
public class Main {
    public static void main(String[] args) {
        WorkObject workObject = new WorkObject();

        ThreadA threadA = new ThreadA(workObject);
        ThreadB threadB = new ThreadB(workObject);

        threadA.start();
        threadB.start();
    }
}

threadAthreadB가 번갈아 실행되어야 한다. 결과는 아래와 같다.

ThreadA
ThreadB
ThreadA
...
ThreadA
ThreadB
ThreadA
ThreadB

정확히 번갈아서 각각 10번씩 실행된다. 이는 서로 깨우고, 잠자고, 깨우고, 잠자고를 반복해서 그렇다.

thread를 안전하게 종료

run 메서드를 모두 실행하면 thread는 종료되지만, 강제 종료가 필요할 때도 있다. 가령 동영상을 갑자기 종료할 때가 있다. stop 이라는 메서드를 제공했으나 이 메서드는 더 이상 사용하지 않는다. 그 이유는 thread를 갑자기 종료하게 되면 사용 중이던 리소스들이 불안전한 상태로 남겨지기 때문이다. 여기서 resource란 파일, 네트워크 연결 등을 말한다.

thread를 안전하게 종료하는 방법은 사용하던 resource들을 정리하고 run 메서드를 빨리 종료하도록 하는 것이다. 주로 조건 이용 방법과 interrupt 메서드를 이용한다.

조건 이용

thread가 while문으로 반복되는 경우, 조건을 이용해서 run 메서드의 종료를 유도하는 것이다.

public class ThreadC extends Thread{
    private boolean stop;

    public void setStop(boolean stop) {
        this.stop = stop;
    }

    @Override
    public void run() {
        while (!stop) {
            // thread code
        }
        // done
    }
}

setStop(true)을 호출하여 run 메서드를 안전하게 종료시키는 것이다.

interrupt 메서드 이용

interrupt 메서드는 thread가 일시 정지 상태에 있을 때 InterruptedException을 발생시키는 역할을 한다. 이것을 이용하면 exception 처리를 이용해 run 메서드를 정상 종료시킬 수 있다.

public class ThreadC extends Thread{
    @Override
    public void run() {
        try {
            while (true) {
                // thread code
            }
            // done
        } catch (InterruptedException e) {
            // remove thread resource
        }
    }
}

ThreadC의 인스턴스를 threadC라고 할 때 threadC.interrupt()를 호출하면 된다. run에서 try안에서 실행하던 코드에 InterruptedException이 발생하게 되어 안전하게 종료된다.

  • PrintThread
public class PrintThread implements Runnable {
    @Override
    public void run() {
        try{
            while (true) {
                System.out.println("running");
                Thread.sleep(1);
            }
        } catch(InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("remove resource");
        System.out.println("Done!");
    }
}

위는 run을 반복하면서 running을 호출하는 PrintThread이다. 해당 thread를 종료시켜보도록 하자.

  • Main
public class Main {
    public static void main(String[] args) {
        Thread printThread = new Thread(new PrintThread());
        printThread.start();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        printThread.interrupt();
    }
}

1초 뒤에 printThread.interrupt();를 호출하여 printThread를 종료시키는 것이다.

print결과는 아래와 같다.

running
running
...
(1초후)
...
remove resource
Done!

그러나 interrupt 메서드는 thread가 '실행(Running)', '실행 대기(Runnable)' 상태일 때는 interrupt 메서드가 호출되어도 InterruptedException이 발생하지 않는다. 때문에 위의 PrintThreadInterruptedException을 받기 위해 반복해서 sleep을 실행하고 있던 것이다.

'일시 정지(blocked)' 상태를 만들지 않고도 interrupt 메서드 호출 여부를 알 수 있는 방법이 있다. ThreadinterruptedisInterrupted 메서드는 interrupt 메서드 호출 여부를 리턴한다. interrupted는 정적 메서드이고, isInterrupted는 인스턴스 메서드이다.

boolean status = Thread.interruped();
boolean status = objThread.isInterrupted();

아래는 PrintThread를 수정해서 Thread.sleep(1) 대신 Thread.interrupted를 사용해서 interrupt 메서드가 호출되었는지 확인한 다음 while문을 빠져나가도록 한 것이다.

public class PrintThread implements Runnable {
    @Override
    public void run() {
        while (true) {
            System.out.println("running");
            if(Thread.interrupted()) {
                break;
            }
        }
        
        System.out.println("remove resource");
        System.out.println("Done!");
    }
}

Main에서는 똑같이 interrupt()를 실행하면 된다.

데몬 스레드(daemon thread)

java는 다른 언어와 달리 main thread가 끝나도, 다른 thread들이 종료되지 않는다. 즉, process의 종료 조건이 main thread에 달려있는 것이 아니라, 모든 thread들이 종료됨에 따라 결정되는 것이다.

daemon thread는 main thread의 작업을 돕는 보조적인 역할을 수행하는 thread이다. main thread가 종료되면 daemon thread들도 자동으로 종료된다.

main thread가 특정 thread를 daemon thread로 만들 수 있도록 하는 것은 setDaemon(true)를 호출하면 된다.

public static void main(String[] args) {
    AutoSaveThread thread = new AutoSaveThread();
    thread.setDaemon(true);
    thread.start();
    ...
}

AutoSaveThread는 main thread의 daemon thread가 된다.

아래는 1초 주기로 save() 메서드를 호출하는 AutoSaveThread를 daemon thread로 실행시킨다. 그리고 main thread가 3초 후 종료되면 AutoSaveThread도 따라서 자동 종료된다.

  • AutoSaveThread
public class AutoSaveThread implements Runnable{
    public void save() {
        System.out.println("work done");
    }

    @Override
    public void run() {
        while (true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                break;
            }
            save();
        }
    }
}

1초마다 반복해서 work done을 출력하는 AutoSaveThread이다.

  • Main
public class Main {
    public static void main(String[] args) {
        Thread autoSaveThread = new Thread(new AutoSaveThread());
        autoSaveThread.setDaemon(true);
        autoSaveThread.start();

        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("exit main thread");
    }
}

AutoSaveThread를 Daemon thread로 만들었으므로, main thread의 main method가 종료되면 AutoSaveThread도 종료된다.

Thread pool

병렬 작업 처리가 많아지면 thread의 개수가 폭증하여 CPU가 바빠지고 메모리 사용량이 늘어난다. 이에 따라 application의 성능 또한 급격히 저하된다. 이렇게 병렬 작업 증가로 인한 thread의 폭증을 막으려면 ThreadPool을 사용하는 것이 좋다.

ThreadPool은 작업 처리에 사용되는 thread를 제한된 개수만큼 만들어놓고 놓고, 작업 큐(Queue)에 들어오는 작업들을 thread가 하나씩 맡아 처리하는 방식이다. 작업 처리가 끝나 thread는 다시 작업 큐에서 새로운 작업을 가져와 처리한다. 이렇게하면 작업량이 증가해도 thread의 갯수가 늘어나지 않아 application의 성능이 급격히 저하되지 않는다.

Application
|
|                               --------ThreadPool(ExecutorService)---------
|                               |                                          |
| --스레드풀에 작업 처리 요청 ---                       thread1(task 처리)    |
|                              \                    /                       |
|                               ----task queue-----/                        |
|                               | task -- task .. |--- thread2(task 처리)   |
|                               -------------------\                        |
|                              /                    \        ...            |
| --스레드풀에 작업 처리 요청 ---                      \                      |
|                               |                      threadN(task 처리)   |
|                               |                                           |
|                               ---------------------------------------------
|  <---thread1 결과
|  <---thread2 결과
|
v 작업 흐름

application에서 task를 만들어 task queue에 전달하면 thread pool에서 task queue의 task를 차례대로 가져와 thread에 실행시켜서 그 결과를 application에 전달하는 것이다.

Thread poll 생성

java는 thread pool을 생성하고 사용할 수 있도록 ExecutorService interface와 Executors class를 제공하고 잇다. Executors의 다음 두 정적 메서드를 이용하면 간단하게 thread pool인 ExecutorService 구현 객체를 만들 수 있다.

메서드명default thread 수코어 수최대 수
newCachedThreadPool00Integer.MAX_VALUE
newFixedThreadPool(int nThreads)0생성된 수nThreads

default thread 수는 thread pool이 생성될 때 기본적으로 생성되는 thread 수를 말하고, code 수는 thread가 증가된 후 사용되지 않는 thread를 제거할 때 최소한 pool을 유지하는 thread 수를 말한다. 그리고 최대 수는 증가되는 thread의 한도 수이다.

다음과 같이 newCachedThreadPool 메서드로 생성된 thread pool의 초기 수와 코어 수는 0개이고 작업 개수가 많아지면 새 thread를 생성시켜 작업을 처리한다. 60초 동안 thread가 아무 작업을 하지 않으면 thread를 pool에서 제거한다.

ExecutorService executorService = Executors.newCachedThreadPool();

다음과 같이 newFixedThreadPool()로 생성된 thread pool의 초기 수는 0개이고, 작업 개수가 많아지면 최대 5개까지 thread를 만들어 작업을 처리한다. 이 thread pool의 특징은 생성된 thread를 제거하지 않는다.

ExecutorService executorService1 = Executors.newFixedThreadPool(5);

위 두 메서드를 사용하지 않고 직접 ThreadPoolExecutor로 thread pool을 생성할 수도 있다. 아래는 초기 수 0개, 코어 수 3개, 최대 수 100개인 thread pool을 생성하는 코드이다. 추가된 thread가 120초 동안 놀고 있을 경우 해당 thread를 pool에서 제거한다.

ExecutorService threadPool = new ThreadPoolExecutor(
        3, // core thread
        100, // max thread
        120L, // idle time
        TimeUnit.SECONDS, // an unit of the idle time
        new SynchronousQueue<Runnable>() // task queue
);

thread pool 종료

thread pool의 thread는 기본적으로 daemon thread가 아니기 대문에 main thread가 종료되어도 task를 처리하ㅣ 위해서 계속 남아있다. thread pool의 모든 thread를 종료시키려면, ExecutorService의 두 메서드 중 하나를 실행시켜야한다.

메서드 명설명
void shutdown()현재 처리 중인 작업뿐만 아니라 작업 큐에 대기하고 있는 모든 작업을 처리한 뒤에 thread pool을 종료시킨다.
List shutdownNow()현재 작업 처리 중인 thread를 interrupt해서 작업을 중지시키고, thread pool을 종료시킨다. return값은 작업 큐에 있는 미처리 작업의 목록이다.

남아있는 작업을 마무리하고 thread pool을 종료할 때에는 shutdown()을 호출하고 남아있는 작업과는 상관없이 강제로 종료할 때는 shutdownNow()를 호출하면 된다. 아래는 최대 5개의 thread로 운영되는 thread pool을 생성하고 종료한다.

  • Main
import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) {
        ExecutorService executorService1 = Executors.newFixedThreadPool(5);
        executorService1.shutdown();
    }
}

위 예제는 바로 종료될텐데, thread pool에 task들이 없기 때문이다.

task 생성과 처리 요청

task는 Runnable 또는 Callable 구현 객체로 표현한다. RunnableCallable의 차이점은 task 처리 완료 후에 리턴값이 있느냐 없느냐이다.

  • Runnable 익명 구현 객체
new Runnable() {
    @Override
    public void run() {
        // thread가 처리할 작업 내용
    }
}
  • Callable 익명 구현 객체
new Callable<T>() {
    @Override
    public T call() throws Exception {
        // thread가 처리할 작업 내용
        return T;
    }
}

Runnablerun 메서드는 리턴값이 없고, Callablecall 메서드는 리턴값이 있다. call의 리턴 타입은 Callable<T>에서 지정한 T타입 파라미터와 동일한 타입이어야 한다.

task 처리 요청이란 ExecutorService의 task 큐에 Runnable 또는 Callable 객체를 넣는 행위를 말한다. 작업 처리 요청을 위해 ExecutorService는 다음의 두 가지 메서드를 제공한다.

메서드 명설명
void execute(Runnable command)Runnable을 작업 큐에 저장, 작업 처리 결과 반환 X
Future submit(Callable task)Callable을 작업 큐에 저장, 작업 처리 결과를 얻을 수 있도록 Future를 반환

Runnable 또는 Callable 객체가 thread pool인 ExecutorService 작업 큐에 들어가면 ExecutorSercice는 처리할 thread가 있는 지 보고, 없다면 thread를 새로 생성한다. thread는 작업 큐에서 Runnable 또는 Callable 객체를 꺼내와 run 또는 call 메서드를 실행하면서 작업을 처리한다.

  • Main
import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) {
        String[][] mails = new String[1000][3];
        for(int i =0; i < mails.length; i++) {
            mails[i][0] = "admin@my.com";
            mails[i][1] = "member" + i + "@my.com";
            mails[i][2] = "new product";
        }

        ExecutorService executorService = Executors.newFixedThreadPool(5);

        for(int i = 0; i < 1000; i++) {
            final int idx = i;
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    Thread thread = Thread.currentThread();
                    String from = mails[idx][0];
                    String to = mails[idx][1];
                    String content = mails[idx][2];
                    System.out.println("[" + thread.getName() + "]" + from + " ==> " + to + ": " + content);
                }
            });
        }

        executorService.shutdown();
    }
}

위의 코드는 thread pool인 executorService에 5개의 thread를 만들어놓고, executorService.execute method를 통해 task들을 처리한다. task들은 new Runnable이라는 익명 객체로 만들어진 것이다.

결과는 아래와 같이 나온다.

[pool-1-thread-3]admin@my.com ==> member878@my.com: new product
[pool-1-thread-4]admin@my.com ==> member887@my.com: new product
...
[pool-1-thread-2]admin@my.com ==> member999@my.com: new product
[pool-1-thread-4]admin@my.com ==> member998@my.com: new product
[pool-1-thread-1]admin@my.com ==> member983@my.com: new product
[pool-1-thread-5]admin@my.com ==> member982@my.com: new product

아래는 자연수를 덧셈하는 작업으로, 100개의 Callable을 생성하고 submit 메서드로 작업 queue에 넣는다. ExecutorService는 최대 5개의 thread로 작업 큐에서 Callable을 하나씩 꺼내어 call 메서드를 실행하면서 작업을 처리한다. 반환값인 Futureget 메서드는 작업이 끝날 때까지 기다렸다가 call 메서드가 반환한 값을 준다.

  • Main
import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(5);

        for(int i = 1; i <= 100; i++) {
            final int idx = i;
            Future<Integer> future = executorService.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    int sum = 0;
                    for(int i = 1; i <= idx; i++) {
                        sum += i;
                    }
                    Thread thread = Thread.currentThread();
                    System.out.println("[" + thread.getName() + "] 1~" + idx + "SUM");
                    return sum;
                }
            });

            try {
                int res = future.get();
                System.out.println("return: " + res);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

결과는 아래와 같다.

[pool-1-thread-1] 1~1SUM
return: 1
[pool-1-thread-2] 1~2SUM
return: 3
[pool-1-thread-3] 1~3SUM
...
return: 4950
[pool-1-thread-5] 1~100SUM
return: 5050

futureget은 해당 thread에서 반환값을 반환하기 전, 즉 thread가 종료되기 전까지 해당 thread를 call시킨 thread에서는 멈추게 된다.

0개의 댓글