9. 멀티 스레드

어제는 멀티 스레드의 의의와 필요성 등을 개념적으로 숙지하기 위해서 배경이 되는 내용까지 샅샅이 뒤져보면서 기본적인 스레드의 상태 제어를 확인했다. 그리고 오늘은 그 스레드를 바탕으로 확장(?)된 기능들까지 숙지할 예정😀

4) 스레드 동기화

동기화(synchronization)의 의미는 시스템을 동시에, 병렬적으로 처리하기 위해서 여러 작업 및 사건들을 조화시키는 것을 의미한다. 처음 스레드를 접할 때 순서를 조정한다는 느낌을 받은 것은 얘 때문이 아닐까 싶었는데, 사실 스레드 자체를 상태 메소드를 활용해서 순서를 조정하는 것이 아닌 필요 시에 공유 여부 조정을 통해서 동기화를 구현한다.

(1) 스레드 동기화의 필요성

Thread threadA;

class Example;

Thread threadB;

위처럼 두 개의 스레드와 한 개의 클래스가 존재한다. 만약 각각의 스레드가 Example 클래스의 특정 필드값을 조회해서 저장해야 되는 경우를 생각하면, 보통은 각각의 스레드에게 해당 필드값을 할당하는 식을 생각한다.

그런데, 두 스레드가 시간적인 차이에 따라서 조회 및 할당받아야 되는 필드값이 다를 경우도 있다. 처음에는 threadA 스레드에서 초기값을 Example 클래스의 필드값에서 얻어오고, 이후 threadB 스레드에서 업데이트된 값을 Example 클래스의 필드값에서 얻어와야 하는 경우를 생각할 수 있다.

이때, Example 클래스의 필드값을 업데이트하게 되면, 이것은 threadAthreadB 모두에게 영향을 끼쳐서 초기값을 보존할 수 없게 된다.

이런 사태를 방지하기 위해서 사용하는 것이 동기화 메소드다. 간단하게 말해서 특정 스레드가 사용 중인 객체를 다른 스레드가 변경할 수 없도록 작업이 끝날 때까지 객체에 잠금을 거는 것이다.

(2) synchronized : 스레드 동기화 키워드

동기화 메소드를 선언하는 방법은 synchronized 키워드를 인스턴스 메소드 혹은 정적 메소드에 붙이면 된다.

public synchronized void method() {
	// 단 하나의 스레드에서만 실행하는 코드
}

스레드가 동기화 메소드를 실행하게 되면 객체는 잠금이 일어나고, 메소드 실행이 끝나면 잠금이 풀리게 되는데, 만일 메소드 내부에서조차도 일부 영역만 동기화하고 싶으면 동기화 블록을 지정하면 된다.

public void method() {
	// 여러 스레드가 실행 가능한 코드
	
	synchronized(공유 객체명) {
   	// 단 하나의 스레드에서만 실행하는 코드
   }
   
   // 여러 스레드가 실행 가능한 코드
}

아래의 예제로 다시 이해해보자. 우선 공유 객체(예시 설명에서의 Example 클래스)의 역할을 맡게 될 Calculator 클래스를 작성한다.

// 공유 객체 역할인 Calculator
public class Calculator {
    private int memory;

    public int getMemory() {
        return memory;
    }

    // 동기화 메소드 setMemory1
    public synchronized void setMemory1(int memory) {
        this.memory = memory;
        try {
            Thread.sleep(2000);
        } catch(InterruptedException e) {}
        System.out.println(Thread.currentThread().getName() + " : " + this.memory);
    }

    public void setMemory2(int memory) {
        // 동기화 블록
        synchronized (this) { // 여기서 this는 공유객체인 Calculator 클래스의 인스턴스
            this.memory = memory;
            try {
                Thread.sleep(2000);
            } catch(InterruptedException e) {}
            System.out.println(Thread.currentThread().getName() + " : " + this.memory);
        }
    }
}

클래스 내부에는 동기화 메소드인 setMemory1(int memory) 메소드와, 동기화 블록을 내부에 갖춘 메소드인 setMemory2(int memory) 메소드가 존재한다. 사실 setMemory2() 메소드는 메소드 블록의 전체 영역이 동기화 블록으로 감싸져서 동기화 메소드와 차이는 없다.

다음으로, 해당 클래스를 사용할 스레드를 작성한다.

public class User1Thread extends Thread {
    private Calculator calculator;

    public User1Thread() {
        setName("첫 번째 스레드");
    }

    public void setCalculator(Calculator calculator) {
        this.calculator = calculator;
    }

    @Override
    public void run() {
        calculator.setMemory1(100);
    }
}

public class User2Thread extends Thread {
    private Calculator calculator;

    public User2Thread() {
        setName("두 번째 스레드");
    }

    public void setCalculator(Calculator calculator) {
        this.calculator = calculator;
    }

    @Override
    public void run() {
        calculator.setMemory2(50);
    }
}

각각의 스레드는 Calculator 클래스의 동기화 메소드 setMemory1()와 동기화 블록을 사용한 메소드 setMemory2()를 각각 오버라이딩한 run() 메소드에서 호출한다.

이렇게 작성함으로써 각각의 스레드가 호출하는 공유객체(Calculator)의 메소드들은 해당 클래스의 private 필드(memory)를 업데이트하고, 그 업데이트 값을 조회해서 출력시킨다.

public class SynchronizedExample {
    public static void main(String[] args) {
        Calculator calculator = new Calculator();

        User1Thread user1Thread = new User1Thread();
        user1Thread.setCalculator(calculator);
        user1Thread.start(); // 100 출력

        User2Thread user2Thread = new User2Thread();
        user2Thread.setCalculator(calculator);
        user2Thread.start(); // 50 출력
    }
}

User1ThreadCalculator의 동기화 메소드인 setMemory1()을 실행하는 순간 Calculator 참조 객체를 잠근다. 따라서 User2Thread는 객체가 잠금 해제될 때까지 CalculatorsetMemory2()에 작성된 동기화 블록을 실행하지 못 한다. 이 과정에서 memory 필드를 100으로 업데이트하고 출력 작업까지 이뤄진다.

그 다음에 잠금이 해제되면 그제야 User2ThreadsetMemory2()에 작성된 동기화 블록을 실행할 수 있게 된다. 그 과정에서 memory 필드를 50으로 업데이트하고 출력 작업을 수행한다.

(3) wait(), notify() : 스레드 제어 메소드

스레드 두 개를 번갈아가면서 실행할 필요가 있을 수 있다. 나는 여기서 yield() 메소드를 공부 열심히 했네하면서 자뻑하며 생각하면서 실행 대기 상태로 돌아가면서 차후 스레드에게 양보하는 것으로 구현할 수 있지 않을까... 생각했지만, CPU 스케줄링에 의한 양보 요청의 방식은 교대 스레드를 직접적으로 지정할 수 없으며 고차원의 제어가 불가능하다고 한다.

그래서 고차원의 교대 제어를 위해서는 wait() 메소드와 notify() 메소드를 잘 활용해야 할 필요가 있다. 정확히는 공유 객체 내부의 동기화 메소드(혹은 동기화 블록 내부)에서 notify() 메소드를 기입해서 다른 스레드를 실행 대기 상태로 만듦과 동시에, wait() 메소드를 기입해서 자신은 일시 정지 상태로 진입시킨다.

public class WorkObject {
    public synchronized void methodA() {
        Thread thread = Thread.currentThread();
        System.out.println(thread.getName() + " : methodA 작업 실행");
        notify(); // 다른 스레드를 실행 대기 상태로
        try {
            wait(); // 자신의 스레드는 일시 정지 상태로 넘어감(절대 대기 중이 아님, yield()랑 다름)
        } catch (InterruptedException e) {}
    }

    public synchronized void methodB() {
        Thread thread = Thread.currentThread();
        System.out.println(thread.getName() + " : methodB 작업 실행");
        notify(); // 다른 스레드를 실행 대기 상태로
        try {
            wait(); // 자신의 스레드는 일시 정지 상태로 넘어감(절대 대기 중이 아님, yield()랑 다름)
        } catch (InterruptedException e) {}
    }
}

작성은 이런 식으로 이뤄진다. 공유 객체인 WorkObject 클래스의 두 개의 동기화 메소드에 기입된 notify() 메소드를 통해서 다른 스레드를 실행 대기 상태로 진입시키고, wait() 메소드를 통해서 자신은 일시 정지 상태로 넘어가게 한다.

두 개의 스레드에 각각 오버라이딩시킨 run() 메소드 내부에 공유 객체의 동기화 메소드를 각각 할당시켜서 작업을 수행토록 한다. 해당 동기화 메소드를 지닌 스레드는 notify() 메소드를 호출할 때마다 다른 스레드를 실행 대기 상태로 돌리며, 그와 동시에 wait() 메소드를 호출할 때마다 자신의 스레드는 일시 정지 상태로 넘어간다.

Thread_A : methodA 작업 실행
Thread_B : methodB 작업 실행
Thread_A : methodA 작업 실행
Thread_B : methodB 작업 실행
Thread_A : methodA 작업 실행
Thread_B : methodB 작업 실행
...

5) 스레드 종료

스레드는 기본적으로 자신의 run() 메소드가 모두 실행되면 자동으로 종료되지만 종료 시점도 제어할 필요가 있다. 그렇다고 강제로 종료시키면(물론 현 시점의 자바 버전에서 강제 종료 메소드는 사용 불능) 리소스가 남은 불안정한 상태로 종료되므로 위험하다.

스레드를 안전하게 종료하는 전제 조건은 잔여 리소스를 정리하고 run() 메소드를 빠르게 종료하는 것이다. 그 방법은 조건 설정interrupt() 메소드 호출이 있다.

(1) 조건 설정

꽤 간단한데, 스레드 클래스에 boolean 타입의 필드를 세팅하고, 해당 필드의 참, 거짓 여부에 따른 작업 실행을 while문을 통해 제어한다.

@Override
public void run() {
    while(!stop) {
        System.out.println("실행 중...");
    }
    System.out.println("리소스 정리");
    System.out.println("실행 종료");
}

boolean 타입으로 지정된 stop 필드를 false로 지정하면 while문 안에서 작업이 계속 수행된다. 이후, 외부에서 stop 필드를 true로 지정하면 반복문은 종료 되고 다음 코드 라인으로 넘어간다.

이 넘어가는 코드 라인에 리소스를 정리하는 로직과 실행을 종료하는 로직을 작성하면 된다.

(2) interrupt() 메소드

interrupt() 메소드는 Thread 클래스의 인스턴스 메소드이며 스레드가 일시 정지 상태에 진입하면 InterruptedException 예외를 발생시킨다. 밑의 코드 예제를 통해 간단하게 파악해보자.

// 사전에 작성한 XThread 스레드 클래스를 외부에서 써먹기 과정...
XThread thread = new XThread();
thread.start(); // 스레드의 run() 메소드 작동

// ...

thread.interrupt(); 
// 스레드가 일시 정지 상태에 진입하면
// InterruptedException 예외 발생

// -------------------------------------------...

// XThread 클래스 내부에 오버라이딩 된 run() 메소드
public void run() {
	try {
    	while(true) {
        	// ...
            
            Thread.sleep(1); // 일시정지
            // 일시 정지 상태에 진입시킴으로써
            // interrupt() 메소드가 예외를 일으킴
            // 그러면 바로 캐치 블록으로 넘어감
        }
    } catch(InterruptedException e) {
    	// 스레드 사용 리소스 정리 코드
    }
}

interrupt() 메소드의 활용을 통해 while문에서 빠져나와 예외 처리 단계로 넘어가고 그곳에서 리소스를 전부 정리한 다음 스레드를 종료시키는 효과를 기대할 수 있다.

interrupt() 메소드는 스레드가 실행 대기 상태이거나 실행 상태일 때에는 호출해도 InterruptedException 예외가 발생하지 않는다.

만약 sleep() 메소드를 통해 일시 정지를 일으켜서 while문을 빠져나가는 것이 아닌, 다른 방법을 사용하고 싶으면 interrupt() 메소드의 호출 여부를 조건으로 두면 된다. intterupt() 메소드의 호출 여부를 확인하는 메소드는 두 가지가 있다.

boolean status = Thread.interrupted(); // 정적 메소드

boolean status = objThread.isInterrupted(); // 인스턴스 메소드

두 메소드 둘 다 interrupt() 메소드의 호출 여부를 조사한다. 이것을 사용하면 굳이 try-catch 블록을 사용하지 않고도, 만약 호출되었다면 while문을 break하고 리소스 정리 로직을 작성할 수 있다.

// 사전에 작성한 XThread 스레드 클래스를 외부에서 써먹기 과정...
XThread thread = new XThread();
thread.start(); // 스레드의 run() 메소드 작동

// ...

thread.interrupt(); 
// 스레드가 일시 정지 상태에 진입하면
// InterruptedException 예외 발생

// -------------------------------------------...

// XThread 클래스 내부에 오버라이딩 된 run() 메소드
public void run() {
    while(true) {
        // ...
            
        if(Thread.interrupted()) {
        	break;
        }
        // interrupt() 메소드가 호출됨을 확인하면
        // 반복문을 브레이크하고 빠져나감
    }

    // 스레드 사용 리소스 정리 코드
}

6) 데몬 스레드

데몬 스레드(daemon thread)는 주 스레드의 작업을 돕는 보조 역할을 수행하는 스레드다. 데몬 스레드의 큰 특징은 주 스레드가 종료하면 데몬 스레드도 따라서 자동으로 종료된다는 점이다.

대표적인 예시가 JVM이 종료하면 따라서 같이 종료되는 가비지 컬렉터.

스레드를 데몬으로 만들려면 주 스레드가 데몬이 될 스레드의 setDeamon(true)를 호출하면 된다.

public static void main(String[] args) {
	AutoSaveThread thread = new AutoSaveThread();
    
    thread.setDaemon(true);
    // 주 스레드(메인 스레드)가 데몬이 될 다른 스레드(thread)의
    // setDaemon(true)를 호출하면서 다른 스레드(thread)의 데몬 스레드화
    
    thread.start();
}

아래의 예제를 확인해보자. AutoSaveThread 스레드는 메인 스레드의 데몬 스레드로 설정된다.

public class AutoSaveThread extends Thread{
    public void save() {
        System.out.println("작업 내용 저장");
    }

    @Override
    public void run() {
        while(true) {
            try {
                Thread.sleep(100); // 주기 작업 및 인터럽트 처리를 위함
            } catch(InterruptedException e) {
                break;
            }
            save();
        }
    }
}
public class DaemonExample {
    public static void main(String[] args) {
        AutoSaveThread autoSaveThread = new AutoSaveThread();
        autoSaveThread.setDaemon(true); // AutoSaveThread를 메인 스레드(주 스레드)의 데몬 스레드화
        autoSaveThread.start(); // 데몬 스레드 호출하면서.....

        try {
            Thread.sleep(3000); // 메인 스레드 3초간 일시정지
        } catch (InterruptedException e) {}

        System.out.println("메인 스레드 종료"); // 이 시점에서 데몬 스레드도 같이 종료
    }
}

7) 스레드풀

작업의 병렬적 처리가 급증하면 당연히 스레드의 개수도 급증하게 된다. 다만 스레드가 폭증할 경우 그만큼 CPU에 걸리는 부담이 상당해지면서 메모리 사용량 역시 폭증하게 된다. 그렇게 되면 애플리케이션의 성능이 급격히 저하될 우려가 있다.

이렇게 병렬 작업 증가로 인한 스레드 폭증을 막으려면 스레드풀 기능을 사용하면 된다. 스레드풀은 작업 처리용 스레드를 제한된 개수만큼 정해두고, 작업용 큐(queue)에 들어오는 작업들을 스레드가 하나씩 맡아서 처리하는 방식이다. 작업 처리가 끝난 스레드는 다시 작업 큐에 새롭게 들어온 작업을 가져와 처리하면, 작업량이 증가해도 스레드 개수가 늘어나지 않으므로 애플리케이션 성능이 떨어지지 않는다.

물론 작업 속도가 느리다고 속 터지는 사용자의 마음은 별개로

(1) 스레드풀 생성

자바에서는 스레드풀 생성을 위해 ExecutorService 인터페이스Executors 클래스를 제공한다. Executors 클래스의 정적 메소드를 이용하면 스레드풀인 ExecutorService 구현 객체를 만들 수 있다.

스레드풀 생성 시, 기본 생성되는 스레드의 개수를 초기 수라고 하고, 스레드 증가 후 사용되지 않는 스레드 제거할 때 최소한 풀에서 유지하는 스레드의 개수를 코어 수라고 한다. 증가되는 스레드의 한도 수는 최대 수라고 한다.

ExecutorService executorService = Executors.newCachedThreadPool();
// 정적 메소드
// 초기 수 : 0
// 코어 수 : 0
// 최대 수 : Integer.MAX_VALUE
// 60초 동안 스레드 작업 안 할 시, 풀에서 제거

newCachedThreadPool() 메소드를 통해 생성한 스레드풀은 작업 개수가 많아지면 새 스레드를 생성시켜 작업을 처리하고, 60초 동안 스레드가 아무 작업을 하지 않으면 스레드를 풀에서 제거한다.

ExecutorService executorService = Executors.newFixedThreadPool(int nThreads);
// 정적 메소드
// 초기 수 : 0
// 코어 수 : 생성된 스레드 개수
// 최대 수 : nThreads
// 생성된 스레드 풀에서 별도 제거 안 함

newFixedThreadPool(int nThreads) 메소드를 통해 생성한 스레드풀은 작업 개수가 많아지면 최대 nThreads 수까지 스레드를 생성시켜 작업을 처리하고, 생성된 스레드를 제거하지 않는다.

위 두 메소드를 활용하지 않고 직접 ThreadPoolExecutorExecutorService 인터페이스를 구현하면서 스레드풀을 생성할 수 있다. 생성자에 매개값으로 여러 요소를 할당시킬 수 있다.

ExecutorService threadPool = new hreadPoolExecutor(
	코어 스레드 개수 /* int */,
    최대 스레드 개수 /* int */,
    놀고 있는 시간 /* long */,
    놀고 있는 시간 단위 /* ex) TimeUnit.SECONDS */,
    작업 큐 // new SynchronusQueue<Runnable>()
);

(2) 스레드풀 종료

스레드풀의 스레드는 기본적으로는 데몬 스레드가 아녀서 메인 스레드가 종료돼도 작업을 처리하기 위해 계속 실행 상태로 남아 있는다. 그래서 스레드풀의 모든 스레드를 종료하려면 ExecutorService 인터페이스의 두 메소드 중 하나를 선택해야 한다.

void shutdown()
// 현재 처리 중인 작업 외에도 작업 큐에 대기하는 모든 작업을 '처리'하고
// 그제야 스레드풀을 종료한다.

List<Runnable> shutdownNow()
// 현재 처리 중인 스레드를 interrupt해서 작업을 '중지'시키고
// 곧바로 스레드풀을 종료시킨다.
// 리턴 타입이 리스트인데, 리스트에 작업 큐에서 미처리된 작업(Runnable)의 목록을 담아 반환한다.

(3) 작업 생성 및 처리 요청

아까 위에서 봤지만, 그리고 이전에 Thread 클래스를 구현하기 위한 인터페이스가 뭔지에 대해서 스포(?)를 했지만, 스레드가 담당하는 하나의 작업Runnable 인터페이스 혹은 Callable 인터페이스의 구현 객체로 생성된다.

// Runnable 익명 구현 객체
new Runnable() {
	@Override
    public void run() {
    	// 스레드 처리 작업 내용
    }
}

// Callable 익명 구현 객체
new Callable<T>() {
	@Override
    public T call() throws Exception {
    	// 스레드 처리 작업 내용
        return T;
    }
}

단순히 작업을 정의 및 표현해줬다고 해서 끝이 아니다. 이제 이 작업에 대해서 작업 처리 요청을 해야 되는데, 이는 즉 ExcecutorService의 작업 큐에 Runnable 또는 Callable 객체를 넣는 행위를 말한다. 작업 처리 요청과 관련해서 ExcecutorService 인터페이스는 execute(Runnable command) 메소드와 submit(Callable<T> task) 메소드를 제공한다.

void execute(Runnable command)
// Runnable을 작업 큐에 저장
// 작업 처리 결과를 리턴하지 않음

Futrue<T> submit(Callable<T> task)
// Callable을 작업 큐에 저장
// 작업 처리 결과를 얻을 수 있도록 Future 리턴

각각의 예제를 확인하자

Runnable과 execute() 활용

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class RunnableExecuteExample {
    public static void main(String[] args) {
        String[][] mails = new String[1000][3];
        for(int i = 0; i < mails.length; i++) {
            mails[i][0] = "admin@my.com";
            mails[i][1] = "member"+i+"@my.com";
            mails[i][2] = "신상품 입고";
        }

        ExecutorService executorService = Executors.newFixedThreadPool(5);
        // 작업 개수가 많아지면 최대 5개까지 스레드를 생성시켜 작업을 처리하는 스레드풀

        for(int i = 0; i < mails.length; i++) {
            final int index = i;
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    Thread thread = Thread.currentThread();
                    String from = mails[index][0];
                    String to = mails[index][1];
                    String content = mails[index][2];

                    System.out.println(
                            "[" + thread.getName() +"] " + from + " ==> " + to + " : " + content
                    );
                }
            });
        }

        executorService.shutdown();
    }
}

Callable과 submit() 활용

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class CallableSubmitExample {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(5);

        for(int i=0; i<=100; i++) {
            final int index = i;
            Future<Integer> future = executorService.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    int sum = 0;
                    for(int i=1; i<=index; i++) {
                        sum += i;
                    }

                    Thread thread = Thread.currentThread();
                    System.out.println("[" + thread.getName() + "] 1~" + index + " 합 계산");
                    return sum;
                }
            });

            try {
                int result = future.get(); // Calllable의 call() 메소드가 리턴한 값(sum) 얻기
                System.out.println("\t리턴값: " + result);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        executorService.shutdown();
    }
}
profile
scientia est potentia / 벨로그 이사 예정...

0개의 댓글