Thread Programming

leehyunjon·2022년 10월 7일
0

Spring

목록 보기
3/3

자바와 spring에서 스레드를 어떻게 생성해서 멀티프로그래밍, 비동기 작업을 처리하는지 정리해보려고 합니다.


Runnable과 Callable

쓰레드에 의해 실행되는 객체를 위한 인터페이스로 쓰레드는 Runnable과 Callable에 구현된 함수를 실행합니다.

Runnable

Runnable 인터페이스를 구현하는 개체가 스레드를 만드는 데 사용될 때 스레드를 시작하면 개별적으로 실행되는 스레드에서 개체의 실행 메서드가 호출됩니다.
메소드 실행의 일반적인 계약은 어떠한 조치도 취할 수 있다는 것입니다.

Runnable은 run()을 정의하고 있으며 쓰레드는 Runnable구현체를 할당했을때, 구현된 run()을 호출하여 작업을 수행합니다.

Runnable은 어떠한값을 반환하지 않는 void 타입입니다.

또한 별도로 Runnable객체를 생성해서 쓰레드에 할당해도되고 함수형인터페이스이기 때문에 람다식으로 표현도 가능합니다.

Callable

결과를 반환하고 예외를 발생시킬 수 있는 작업입니다. 구현자는 call이라는 인수 없이 단일 메서드를 정의합니다.
호출 가능 인터페이스는 다른 스레드에 의해 인스턴스가 실행될 수 있는 클래스에 대해 설계되었다는 점에서 Runnable과 유사합니다. 그러나 Runnable은 결과를 반환하지 않으며 선택된 예외를 발생시킬 수 없습니다.
실행자 클래스에는 다른 공통 양식에서 호출 가능한 클래스로 변환하는 유틸리티 메서드가 포함되어 있습니다.

Callable은 call()을 정의하고 있으며 쓰레드는 Callable구현체를 할당했을때, 구현된 call()을 호출하여 작업을 수행합니다.

Runnable과 다르게 특정 타입의 객체를 반환하고 예외가 발생할 수 있습니다.

마찬가지로 Callable객체를 생성해서 쓰레드에 할당해도되고 람다식으로 표현 가능합니다.


우리가 쓰레드를 이용한 프로그래밍을 할때, 쓰레드를 무한정 생성하고 사용하게 된다면 자원 부족은 물론 쓰레드 생성과 GC로 인한 오버헤드로 성능이 하락할것입니다. 그렇기 때문에 쓰레드를 관리해주고 효율적으로 사용하기 위해 ThreadPool을 생성하여 선언된 크기의 쓰레드만 사용하고 관리하며, 새로운 쓰레드 생성을 제어해주어야 합니다.

스레드 풀 관리 및 사용

Executor

이 패키지에서 제공되는 Executor 구현은 보다 광범위한 인터페이스인 ExecutorService를 구현합니다. ThreadPoolExecutor 클래스는 확장 가능한 스레드 풀 구현을 제공합니다. 실행자 클래스는 이러한 실행자를 위한 편리한 팩토리 메서드를 제공합니다.
메모리 일관성 효과는 다음과 같습니다. Runnable 개체를 Executor에게 제출하기 전에 다른 스레드에서 실행이 시작되기 전에 스레드의 작업입니다.

지정된 명령을 나중에 실행합니다. 명령어는 실행자 구현의 재량에 따라 새 스레드, 풀링된 스레드 또는 호출 스레드에서 실행될 수 있습니다.

제공된 작업(Runnable 구현체)을 실행하는 객체가 구현해야할 인터페이스. 재량에 따라 새 스레드에 사용하거나 쓰레드 풀에서 생성된 스레드에서 작업을 수행할 수 있습니다.

ExecutorService

쉽게 비동기로 작업 할수 있도록 도와주는 Java에서 제공하는 인터페이스.

Executor를 상속한 인터페이스로서 Future를 반환하는 submit(), shutdown() 등의 메소드를 정의합니다.

일반적으로 ExerciseService는 작업할당을 위한 스레드 풀과 API를 제공합니다.

Thread를 직접 생성하지 않아도 되고 Thread Pool을 생성하여 Task를 실행하고 Thread를 관리할 수 있습니다.

사용방법

  1. ExecutorService의 구현체인 ThreadPoolExecutor를 생성하여 사용할 수 있습니다.
	public class ExecutorServiceTest {
	public static void main(String[] args) throws InterruptedException {
		LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(7);
		ThreadPoolExecutor executorService = 
        	new ThreadPoolExecutor(2,10,3, TimeUnit.SECONDS, queue);

		for(int i=0;i<10;i++){
			executorService.execute(new Task());
		}

		executorService.awaitTermination(5, TimeUnit.SECONDS);
		executorService.shutdown();
	}

	public static class Task implements Runnable{

		@Override
		public void run() {
			System.out.println(Thread.currentThread().getName());
			try {
				Thread.sleep(5000);
			} catch (InterruptedException e) {
				throw new RuntimeException(e);
			}
		}
	}
}
  1. Executors에서 제공하는 ThreadPool을 사용합니다.
	Executors.newCachedThreadPool();
	Executors.newFixedThreadPool();
	Executors.newSingleThreadExecutor();
  • CachedThreadPool
    스레드를 제한없이 생성하고 60초 동안 작업이 없다면 Pool에서 제거
  • FiexedThreadPool
    고정된 쓰레드 개수를 가진 ThreadPool
  • SingleThreadExection
    단일 스레드로 작업 처리
static ExecutorService executorService = Executors.newFixedThreadPool(5);
	public void method1(){
		executorService.submit(new Runnable() {
			@Override
			public void run() {
				//쓰레드가 수행할 작업
			}
		});
	}

작업할당

  • execut()
    • 반환타입 void
    • 작업처리중 예외 발생시 해당 스레드 종료 후, 다음 작업을 위한 스레드 생성
  • submit()
    • Future객체에 특정타입을 담에서 반환
    • 작업처리중 예외 발생시 스레드를 삭제하지 않고 예외를 발생

작업종료

  • shutdown()
    • 새로운 Task가 실행되는 것을 막습니다.
  • shoutdownNow()
    • 현재 실행중인 Task를 종료 (바로 종료되지 않는 경우가 있음)
  • awaitTermination()
    • 새로운 Task가 실행되는 것을 막고 일정시간동안 실행중인 Task가 완료될때까지 기다립니다. 만일 일정시간동안 처리완료되지 않은 Task에 대해서는 강제 종료

Thread의 작업 종료

종료되지 않는 쓰레드에 대해 외부에서 강제로 종료해줘야하는 경우가 있다. 이럴 경우 stop이나 destroy메소드로 쓰레드를 강제종료 시킬수 있었습니다. 하지만, 리소스를 정리하지 못해 데드락이 발생하는 이유때문에 deprecated되었습니다.
이에 안정적으로 쓰레드를 종료할수 있는 방법으로 Thread.interrupt()가 있습니다.
interrupt()는 스레드를 즉시 종료시키는 방법이 아닌 쓰레드를 일시적으로 중지시키고 isInterrupted()를 통해 쓰레드가 중지되었을 경우 작업을 종료시키고 외부 자원을 반납하도록 구현하여 안전하게 쓰레드를 종료시킬 수 있습니다.

public static void main(String[] args) throws InterruptedException{
	Thread thread = new Thread(()->{
			try{
				while(!Thread.currentThread().isInterrupted()){	//쓰레드가 중지 되지 않는 동안 실행
					Thread.sleep(1000);
					System.out.println(Thread.currentThread().getName());
				}
			} catch (InterruptedException e) {
				throw new RuntimeException(e);
			}
			finally{
				//공유자원 처리
				//메모리 자원 정리
			}
		});

		thread.start();
		log.info(Thread.currentThread().getName());
		Thread.sleep(2000);
		thread.interrupt();
}

위 코드를 확인해보면 쓰레드가 종료되지 않는 동안 1초 마다 해당 쓰레드의 이름을 출력하는 작업을 수행하고있습니다.
그리고 실행후 2초뒤에 해당 쓰레드를 interrupt()를 통해 중지시키고, 쓰레드는 isInterrupted()를 통해 해당 쓰레드의 중지 사실을 확인하고 작업을 중지하게 되어 안전하게 쓰레드를 종료할 수 있게 됩니다.

추가적으로 데드락을 방지하기 위해서 쓰레드가 종료되게 되면 공유자원에 대한 반납을 통해 데드락을 방지하도록 구현하는것이 좋습니다.


ThreadPoolTaskExecutor

Spring에서 ThreadPoolExecutor를 쉽게 만들어 사용할 수 있게 해주는 Wrapper로서, 내부적으로 ThreadPoolExecutor를 통해 비동기 작업 설정 및 작업 실행을 하며, 모니터링과 같은 추가적인 기능을 지원해줍니다.

TaskExecutor

Executor를 상속받은 인터페이스로서 void execute(Runnable task)를 오버라이드 합니다.

AsyncTaskExecutor

TaskExecutor를 상속받은 인터페이스로서 submit(Runnalbe task or Callback task)가 추가되었습니다.

AsyncListenableTaskExecutor

Future인터페이스를 확장한 ListenableFuture를 반환하는 submitListenalbe()이 추가되었습니다.

동작

ThreadPoolTaskExecutor으로 coreSize, queueCapacity, masSize등을 직접 설정하여 ThreadPool을 생성할 수 있습니다.
ThreadPoolTaskExecutor를 생성하여 Bean으로 등록한 후 @Async어노테이션에 선언한 Bean의 이름을 작성한다면 사용자가 생성한 ThreadPoolTaskExecutor에서 쓰레드를 가져와 작업을 비동기적으로 수행해주게 됩니다.

@EnableAsync
@Configuration
public class AsyncConfig {
	protected Logger logger = LoggerFactory.getLogger(getClass());
	protected Logger errorLogger = LoggerFactory.getLogger("error");

	@Bean
	public Executor threadPoolTaskExecutor() {
		ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
		taskExecutor.setCorePoolSize(3);
		taskExecutor.setMaxPoolSize(20);
		taskExecutor.setQueueCapacity(10);
		taskExecutor.setThreadNamePrefix("thread-pool-hyunjong-");
		taskExecutor.initialize();
		return taskExecutor;
	}
}

ThreadPoolTaskExecutor 종료

JVM에서는 자식 스레드가 작업을 수행중이라면 JVM을 종료할수 없다고합니다.
그렇기 때문에 TaskExecutorService를 직접 사용한다면 shutdown()을 통해 더이상 작업이 수행되지 않도록 종료시켜야합니다.

ThreadPoolTaskExecutor를 사용하여 비동기화 작업을 수행중이라면 Config클래스에서 설정을 해서 사용할텐데, 이때 keepAliveSeconds와 allowCoreThreadTimeOut을 설정해줌으로서 task를 받지 않은 스레드를 정리해주게 됩니다.

  • keepAliveSeconds : 설정된 시간만큼 태스크가 할당되지 않은 스레드를 유지한다. (default 60s)
  • allowCoreThreadTimeOut : keepAliveSeconds만큼 시간이 지나고 core thread가 task를 할당받지 않는다면, pool에서 정리가된다.

corePoolSize가 3으로 설정해두었고 keepAliveSeconds를 1초로 생성해두었고 비동기작업을 2초마다 수행하게 설정했습니다.
원래대로라면 1,2,3의 스레드만 수행되어야할텐데 새로운 스레드가 생성되어지는 것을 확인할 수 있습니다. 즉, 작업이 들어오지 않는 core thread도 제거해버리게 됩니다.

하지만 애플리케이션 자체를 종료해버리게 되면 문제없이 동작해서 의하했는데 스레드를 자동으로 삭제해주는 기능을 수행해주고 있었습니다.

Bean 초기화, 삭제 메소드를 정의하고 있는 DisposableBean 인터페이스를 구현하고 있는 ExecutorConfigurationSupport클래스를 상속하고 있어 애플리케이션 종료시 ExecutorConfigurationSupport.destroy()가 자동 호출되면서 종료되기 때문에 개발자가 직접 종료 처리를 해주지 않아도 됩니다.

ExecutorConfigurationSupport.destroy()코드를 보게되면 ThreadPool에서 진행중이던 작업을 안전하게 삭제하기 위해 shutdown()을 통해 새롭게 수행되는 작업을 중지하고, 이미 실행중인 작업이 있다면 Future.close()를 통해 작업 중지 요청을 보내게 됩니다.
그리고 awaitTermination()을 통해 일정시간동안 작업 완료를 대기 후 강제 종료시켜주는 과정을 거치게 됩니다.


비동기 작업

자바의 비동기 기술

ExecutorService

쉽게 비동기 작업을 실행할 수 있도록 제공하는 인터페이스로서 작업 할당을 위한 스레드 풀과 API를 제공합니다.

Future

비동기 작업이란, 현재 작업을 수행중인 스레드가 아닌 별도의 스레드에서 작업을 독립적으로 수행하는 것을 말합니다.
같은 스레드에서 메소드를 호출할때 결과값을 리턴받지만, 비동기적으로 작업을 수행할때는 결과값을 받을 수 있는 무엇인가가 필요합니다. 이를 Future가 수행합니다.

비동기 작업에서 결과를 반환하고 싶을때는 Runnable이 아닌 Callable인터페이스로 구현한 작업을 스레드에 할당하면 결과를 반환받을 수 있습니다. 또한 예외가 발생했을 때 예외를 비동기 스레드에서 처리하지 않고 밖으로 던질 수 있습니다.

Future를 통해 결과를 가져올때는 get()을 통해 가져옵니다. 이때 get()을 이용해 결과를 가져올때 비동기 작업이 완료될때까지 해당 스레드가 blocking됩니다.
그렇기 때문에 비동기 작업이 완료되었는지 확인하는 isDone()을 제공합니다.

public static void main(String[] args) throws ExecutionException, InterruptedException {
		ExecutorService executorService = Executors.newFixedThreadPool(5);

		Future<String> r = executorService.submit(()->{
			Thread.sleep(2000);
			log.info("Async");
			return "Hello";
		});

		log.info(String.valueOf(r.isDone()));
		// System.out.println(r.get());
		Thread.sleep(2000);
		log.info("Exit");
		log.info(String.valueOf(r.isDone()));
		log.info(r.get());

		executorService.shutdown();
	}

---
15:02:43.102 [main] INFO com.example.unittest.test.Async.FutureTest - false
15:02:45.103 [pool-1-thread-1] INFO com.example.unittest.test.Async.FutureTest - Async
15:02:45.111 [main] INFO com.example.unittest.test.Async.FutureTest - Exit
15:02:45.112 [main] INFO com.example.unittest.test.Async.FutureTest - true
15:02:45.112 [main] INFO com.example.unittest.test.Async.FutureTest - Hello

FutureTask

FutureTask는 비동기 작업을 생성합니다.
앞서 Future는 작업 생성과 비동기 작업을 동시에 했다면 FutureTask는 작업 생성과 비동기 작업을 분리하여 진행할 수 있습니다.

public static void main(String[] args) throws ExecutionException, InterruptedException {
		ExecutorService executorService = Executors.newFixedThreadPool(5);

		FutureTask<String> futureTask = new FutureTask<String>(()->{
			Thread.sleep(2000);
			log.info("Async");
			return "Hello";
		}){
			@Override
			protected void done(){
				try{
					log.info(get());
				}catch (ExecutionException e) {
					throw new RuntimeException(e);
				} catch (InterruptedException e) {
					throw new RuntimeException(e);
				}
			}
		};

		executorService.execute(futureTask);
		log.info("start");
		executorService.shutdown();
	}
---
15:01:15.395 [main] INFO com.example.unittest.test.Async.FutureTest - start
15:01:17.396 [pool-1-thread-1] INFO com.example.unittest.test.Async.FutureTest - Async
15:01:17.399 [pool-1-thread-1] INFO com.example.unittest.test.Async.FutureTest - Hello

FutureTask의 done을 활용하여 Callback을 구현할 수 있습니다.

public static void main(String[] args) throws ExecutionException, InterruptedException {
		ExecutorService executorService = Executors.newCachedThreadPool();

		CallbackFutureTask f = new CallbackFutureTask(() -> {
			Thread.sleep(2000);
			log.info("Async");
			return "Hello";
		}, result -> log.info((String)result));

		executorService.execute(f);
		executorService.shutdown();
	}

	public static class CallbackFutureTask extends FutureTask<String>{
		SuccessCallback sc;
		//수행할 작업, 작업이 완료되었을때 던저줄 callback
		public CallbackFutureTask(Callable<String> callable, SuccessCallback sc) {
			super(callable);
			this.sc = Objects.requireNonNull(sc);
		}

		@Override
		protected void done() {
			try {
				sc.onSuccess(get());
			} catch (InterruptedException e) {
				throw new RuntimeException(e);
			} catch (ExecutionException e) {
				throw new RuntimeException(e);
			}
		}

비동기 작업을 설정하고 수행하는 CallbackFutureTask를 FutureTask를 상속하여 생성하고, 작업인 Callable과 작업완료시 던져줄 OnSuccess를 설정해줍니다.

CallbackFutureTask는 작업완료시 수행할 동작을 위해 done()메소드를 오버라이드하여 콜백인 OnSuccess을 실행하여줍니다.

그렇게 되면 Callback을 통해 이후 작업을 수행하게 해줍니다.


스프링의 비동기 기술

@Async

스프링에서는 쉽게 비동기 작업을 처리해주기 위해 @Async어노테이션을 지원해주게됩니다.
@Async를 통해 비동기 작업을 해주기 위해서는 작업을 수행할 클래스나 설정 클래스 또는 메인 클래스에 @EnableAsync를 선언해주어야 비동기 작업이 수행되게 됩니다.

동작

@Async를 사용해주기 위해서는 비동기 작업을 처리하기 위해서는 ThreadPoolTaskExecutor가 필요한데 사용자가 별로 ThreadPoolTaskExecutor를 구현하지 않는다면 Spring에서는 TaskExecutionAutoConfiguration에서 TaskExecutionPriperties의 설정을 통해 자동으로 applicationTaskExecutor라는 이름으로 TaskExecutor를 생성해주게 됩니다.

TaskExecutionProperties의 기본설정은 아래와 같습니다.

(다른 블로그에서는 @Async에 별도의 TaskExecutor를 설정해주지 않으면 'SimpleAsyncTaskExecutor'를 사용한다고 하는데, 직접 테스트해보았을때 8개의 쓰레드를 사용해서 확인해보니 위와 같은 과정으로 ThreadPool을 적용해주고 있었습니다. 아직 이부분에 대해서는 확실한 판단이 없어 더 공부해보고 수정하도록 하겠습니다.)

아래는 TaskExecutorAutoConfiguration의 내용 일부입니다.

Base class for setting up a ExecutorService (typically a ThreadPoolExecutor or java.util.concurrent.ScheduledThreadPoolExecutor). Defines common configuration settings and common lifecycle handling.
ExecutorService(일반적으로 ThreadPoolExecutor 또는 java.util.current를 설정하는 기본 클래스입니다.ScheduledThreadThreadPoolExecutor). 공통 구성 설정 및 공통 수명 주기 처리를 정의합니다.

@Async에서 사용자 지정 ThreadPoolTaskExecutor의 이름을 선언하게 된다면 해당 이름의 ThreadPoolTaskExecutor의 ThreadPool에서 쓰레드를 받아와 비동기 작업을 해당 쓰레드에 할당해 처리해주게 됩니다.

주의사항

@Async를 이용해 비동기 작업을 처리해줄때 주의사항이있습니다.
@Async는 프록시 패턴으로 동작하기 때문에 비동기 작업들은 Bean으로 등록되어 Spring에 의해 Proxy로 감싸져야합니다.

  • 비동기 작업 메소드는 public으로 선언되어야합니다.

    출처 [https://goodgid.github.io/SpringBoot-Why-doesn't-it-work-with-Async/]

  • self-invocation (자기 자신 호출)은 프록시가 적용되지 않습니다.

    출처 [https://goodgid.github.io/SpringBoot-Why-doesn't-it-work-with-Async/]

  • 비동기 작업 클래스는 spring container에 의해 관리되어야합니다.

    • 호출 스레드에서 비동기 작업 클래스를 생성하여 접근하게 된다면 Spring에서 프록시를 적용해주지 못한상태로 타겟 메소드를 호출하기 때문에 프록시가 적용되지 않습니다.

Scheduler

자바에서는 애플리케이션 내에서 의도적으로 쓰레드를 생성해 작업을 일정 간격이나 시간에 실행시켜줍니다.

ScheduledExecutorService

자바에서 ThreadPool을 이용해 스케줄링을 할 수 있게 해주는 인터페이스.
ExecutorService에 스케줄링 기능이 추가되었다고 보면 됩니다.

사용

  1. ScheduledExecutorService의 구현체인 ScheduledThreadPoolExecutor를 생성해서 사용한다.
  2. Executors에서 제공하는 ScheduledThreadPool을 생성해서 사용한다.

주의사항

작업을 연속적으로 스케줄링할때, 스레드에 의해 수행되는 작업이 간격시간보다 오래 걸리는 경우 후속 작업 실행이 늦게 시작될 수 있지만, 동시에 실행되지는 않습니다.

@Scheduled

스프링에서 쉽게 스케줄링 작업을 처리해주기 위해 @Scheduled어노테이션을 지원해줍니다.
@Scheduled를 통해 스케줄링을 사용하기 위해서는 @EnableScheduling을 선언해주어야합니다.

ThreadPoolTaskScheduler

ThreadPoolTaskExection과 마찬가지로 자바에서 제공하는 ScheduledThreadPoolExecutor를 쉽게 사용하기 위해 제공하는 WrapperClass입니다.

동작

@Scheduled를 사용해주기 위해서는 ThreadPoolTaskScheduler가 필요한데 사용자가 별도로 사용자가 지정해주지 않으면 Spring에서 별도로 ThreadPoolTaskScheduler를 생성해주게 됩니다.
이때 하나의 스레드로만 스케줄링이 동작되는데 Spring에서 생성한 ThreadPoolTaskScheduler의 기본 coreSize가 1이기 때문입니다.

사용자가 정의한 ThreadPoolTaskScheduler를 설정하기 위해서는 스케줄 config클래스를 SchedulingConfigurer를 상속받아 configureTasks()를 오버라이딩 해주어야합니다.

@Configuration
public class ScheduleConfig implements SchedulingConfigurer {
	private final int POOL_SIZE = 6;
	@Override
	public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
		ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();

		threadPoolTaskScheduler.setPoolSize(POOL_SIZE);
		threadPoolTaskScheduler.setThreadNamePrefix("my-scheduled-task-pool-");
		threadPoolTaskScheduler.initialize();

		taskRegistrar.setTaskScheduler(threadPoolTaskScheduler);
	}
}

그후 @Scheduled가 선언된 스케줄러에서 지정된 ThreadPoolTaskScheduler에서 생성된 쓰레드를 가져와 작업을 처리해 주게 됩니다.

마찬가지로 쓰레드에서 수행되는 작업이 설정한 시간간격보다 오래걸리는 경우 후속 작업 실행이 늦게 시작되게 됩니다.

병렬적으로 스케줄 작업을 수행하고 싶다면 Spring에서 제공하는 @Async를 적용해서 스케줄을 비동기작업으로 수행할 수 있습니다.
(여기서도 ThreadPoolTaskExecutor를 명시해주지 않으면 TaskExecutorAutoConfiguration에 의해 8개를 가진 스레드 풀이 생성되어 사용되게 됩니다.)

(0.1초 마다 @Async를 적용한 스케줄을 실행시키고, 각 스레드를 2초 동안 멈추게한 결과)


Reference

https://www.youtube.com/watch?v=aSTuQiPB4Ns&list=PLv-xDnFD-nnmof-yoZQN8Fs2kVljIuFyC&index=8

https://jongmin92.github.io/2019/03/31/Java/java-async-1/

https://github.com/HomoEfficio/dev-tips/blob/master/Java-Spring%20Thread%20Programming%20%EA%B0%84%EB%8B%A8%20%EC%A0%95%EB%A6%AC.md

https://jeong-pro.tistory.com/188

https://codechacha.com/ko/java-callable-vs-runnable/

https://stackoverflow.com/questions/2419611/jvm-does-not-exit-when-timeoutexception-occurs

profile
내 꿈은 좋은 개발자

0개의 댓글