스트림과 비동기

김종준·2023년 8월 2일
0

공부

목록 보기
9/12

스트림과 비동기

해당 내용은 WifiObserver라는 와이파이 공유기의 접속 목록을 크롤링 하여 해당 정보를 활용하는 프로젝트를 수행하며 공부한 내용입니다.

깃 허브 바로가기

배경

현재 수행중인 프로젝트의 핵심 기능 중 하나로 "회원 공유기의 와이파이 설정 페이지에서 와이파이에 접속하여 있는 기기 목록을 크롤링"하는 기능이 있다.

프로젝트 기획에서 부터 해당 기능의 경우 이를 동기로 처리한 다면 조회 대상이 증가함에 따라 처리 시간도 비례해서 증가할 것이 예상 되었기에 비동기적으로 처리할 계획이었다.

이번에 작성하는 "스트림과 비동기"는 해당 기능을 구현하며 공부하였던 내용을 바탕으로 한다.

비동기

우선 스프링에 있어 비동기의 경우 2가지 방법으로 사용할 수 있다.

  1. @Async
  2. CompletableFuture

@Async

우선 @Async 부터 알아보자.

@Async는 스프링에서 제공하는 Thread Pool을 활용하는 비동기 메서드 지원 어노테이션이다.

static ExecutorService executorService = Executors.newFixedThreadPool(5);

public void asyncMethod(final String message) throws Exception {
  executorService.submit(new Runnable() {
      @Override
      public void run() {
          // do something
      }            
  });
}

@Async는 위와 같이 작성해야하는 코드를 아래와 같이 간단히 사용할 수 있도록 도와준다.

@Async
public void asyncMethod(final String message) throws Exception {
    ....
}

이때 해당 메서드를 수행하며 사용할 Thread Pool 역시 설정 할 수 있는데 @Async 어노테이션의 value 값을 통해 설정해줄 수 있다.

@Async(value = "threadPoolName") // 설정 예시

이렇게 간단히 사용할 수 있는 @Async는 아래와 같은 주의 사항이 존재한다.

  1. private method에는 사용 불가
  2. self-invocation(자가 호출) 불가
  3. QueueCapacity 초가 요청에 대한 비동기 메서드 호출시 방어 코드 작성

1,2 번 주의 사항의 경우 프록시의 주의 사항과 유사한 것을 느낄 수 있다.

@Async를 사용하는 경우 이를 프록시를 활용하여 비동기적으로 수행하기 때문이다.

그렇기에 프록시 객체를 활용할 때의 주의 사항이 존재하는 것이다.

마지막으로 @Async에 대해 공부할 때 주로 만나는 예제는 void 타입을 반환하는 예제일 것이다.

그래서 @Async로 값을 반환하지 못한다고 생각할 수 있는데 그렇지 않다.

@Async는 CompletableFuture 타입도 반환할 수 있기에 이를 활용하여 원하는 값을 반환할 수 있다.

CompletableFuture

CompletableFuture은 아래와 같은 단점을 가지는 Future의 한계를 보안하기 위해 등장한 Future의 구현체 중 하나다.

  1. 외부에서 완료시킬 수 없고, get의 타임아웃 설정으로만 완료 가능
  2. get(블로킹 코드)를 통해서만 이후의 결과를 처리할 수 있음
  3. Future을 조합할 수 없음
  4. 여러 작업을 조합하거나 예외를 처리할 수 없음
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> { ... }

CompletableFuture는 외부에서 완료 시킬 수 없는 Future을 개선하였다는 의미로 CompletableFuture 이라는 이름을 갖게 되었다.

이러한 CompletableFuture은 아래와 같은 기능을 가지고 있다.

  1. 비동기 작업 실행
  2. 작업 콜백
  3. 작업 조합
  4. 예외 처리

비동기를 처리하기에 충분한 기능을 갖추었다 생각하였고 그렇기에 스프링에 종속되는 @Async을 활용하는 대신 자바에서 제공하는 CompletableFuture을 활용하여 비동기 기능을 처리하기로 결정하였다.

추가적으로 @Async를 사용하였을 때는 주의해야할 프록시 객체에 대한 문제도 신경쓸 필요도 없다.

성능 확인 - 비동기

간단한 코드를 통해 CompletableFuture을 활용하였을 때 동기적으로 처리할 때와 비교해 어느 정도의 성능 향상이 있는지 확인해보자.

테스트 구성

프로젝트에서 비동기를 사용하는 부분은 크롤링과 관련된 부분으로 서로 다른 많은 수의 공유기에 요청을 보낸다고 가정한다.

이러한 상황을 어떻게 구성할 수 있을까 고민한 끝에 구글에 요청을 보내는 것을 통해 테스트를 구성하였다.

구글에 요청을 보내는 것으로 테스트를 구성한 이유는 우선 서로 다른 많은 공유기에 요청을 보내는 상황을 실제로 구성할 수 없었다.

그렇다고 하나의 공유기에 다수의 요청을 비동기적으로 보내면 공유기에서 그 요청을 처리하지 못하였다.

하지만 구글은 우리고 요청을 보내는 주소만 동일할 뿐이지 이를 처리하는 서버는 많을 것이라 생각하였다.

그리고 단시간에 요청을 많이 보내는 경우 요청을 차단 할 수 있는데 구글은 요청 차단 기준이 널널하였다.

그리고 테스트 하기 위해 아래와 같은 클래스를 만들고 사용하였다.

  • BrowseQuery : Jsoup을 활용하여 특정 Url에 요청을 보내여 응답을 받는다.
  • BrowseFutureGenerator : BrowseQuery의 execute를 실행하는 CompletableFuture을 반환한다.
public class BrowseQuery {

	public Integer execute(Integer i) {
		try {
			Jsoup.connect("https://google.com").get();
			return i;
		} catch (SocketTimeoutException timeoutException) {
			throw new RuntimeException();
		} catch (IOException e) {
			throw new RuntimeException();
		}
	}
}
public class BrowseFutureGenerator {

	private final BrowseQuery browseQuery;
	private final Executor requestAsyncExecutor;

	public CompletableFuture<Integer> execute(Integer i) {
		return CompletableFuture.supplyAsync(
				() -> {
					browseQuery.execute(i);
					return i;
				},
				requestAsyncExecutor);
	}
}

그리고 Thread Pool 설정을 아래와 같다.

  • core pool size : 5
    • max pool size : 10
    • queue capacity : 50

코드 설명

List<CompletableFuture<Integer>> futures =
    requests.stream().map(browseFutureGenerator::execute).collect(Collectors.toList());

requests는 0 ~ 요청 횟수 - 1 의 수를 담아둔 리스트다.

수를 넣어둔 이유는 추후 로깅을 할 때 요청들을 어떻게 처리되는지 조금 더 편하게 추적하기 위해 넣어두었다.

이 수는 browseFutureGenerator.execute로 전달되고 해당 메서드가 반환하는 CompletableFuture가 수행된 이후 전달 받은 수를 리턴하도록 하였다.

위의 코드를 통해 우리는 CompletableFuture가 모여있는 futures를 획득할 수 있다.

List<Integer> collect =
    futures.stream().map(CompletableFuture::join).collect(Collectors.toList());

futures를 획득이 끝이 아니다.

CompletableFuture의 join을 통해 futures가 수행된 값을 수집하여야 한다.

위의 코드는 수행된 futures의 반환 값을 수집하는 코드다.

위와 같은 코드만 있다면 결과를 추적하기 힘들기에 아래와 같이 결과를 추적할 수 있도록 로그를 남기는 코드를 추가하여 테스트 코드를 구성하였다.

public class BrowseQuery {

	public Integer execute(Integer i) {
		try {
			log.info("**>>> [{}] jsoup is on {}", i, Thread.currentThread().getName());
			long before = System.currentTimeMillis();
			Document document = Jsoup.connect("https://velog.io/").get();
			log.info("**>>> [{}] jsoup duration time is {} ms", i, System.currentTimeMillis() - before);
			return i;
		} catch (SocketTimeoutException timeoutException) {
			throw new RuntimeException();
		} catch (IOException e) {
			throw new RuntimeException();
		}
	}
}

public class BrowseFutureGenerator {

	private final BrowseQuery browseQuery;
	private final Executor requestAsyncExecutor;

	public CompletableFuture<Integer> execute(Integer i) {
		return CompletableFuture.supplyAsync(
				() -> {
					log.info("***** [{}] future query is on {}", i, Thread.currentThread().getName());
					long b = System.currentTimeMillis();
					Integer integer = browseQuery.execute(i);
					log.info(
							"***** [{}] future query duration time is {} ms", i, System.currentTimeMillis() - b);
					return i;
				},
				requestAsyncExecutor);
	}
}
List<CompletableFuture<Integer>> futures =
    requests.stream()
        .map(
            source -> {
              log.info(">>>>> [{}] query is on {}", source, Thread.currentThread().getName());
              long b = System.currentTimeMillis();
              CompletableFuture<Integer> execute = browseFutureGenerator.execute(source);
              log.info(
                  ">>>>> [{}] query duration time is {} ms",
                  source,
                  System.currentTimeMillis() - b);
              return execute;
            })
        .collect(Collectors.toList());

log.info("=============================================================");

List<Integer> collect =
    futures.stream()
        .map(
            onConnectUsersCompletableFuture -> {
              String uuid = UUID.randomUUID().toString();
              log.info(
                  ">>>>> temp [{}] query async join is on {}",
                  uuid,
                  Thread.currentThread().getName());
              long b = System.currentTimeMillis();
              Integer join = onConnectUsersCompletableFuture.join();
              log.info(
                  ">>>>> [{}][{}] query async join duration time is {} ms",
                  uuid,
                  join,
                  System.currentTimeMillis() - b);
              return join;
            })
        .collect(Collectors.toList());

log.info("=============================================================");

collect.forEach(
    v -> {
      log.info(">>>>> [{}] assert is done", v);
      Assertions.assertThat(v).isNotNull();
    });

결과

requests에 0~299의 수를 넣어 테스트를 진행하였다.

우선 결과 부터 보면 아래와 같다.

==================소요시간 : 11134 ms==================

300번의 요청을 수행하는 것에 11.13초가 소요되었고 이때 1개의 요청을 수행하는 것에 0.037초가 소요되었다는 것이다.

동일한 수의 요청을 동기로 보내는 경우는 117초가 소요되었고 이때 1개 요청을 수행하는 것에 0.039초가 소요되는 것을 알 수 있다.

CompletableFuture을 활용한 비동기로 처리한 경우가 90% 정도의 성능 향상을 확인할 수 있었다.

만약 스레드가 충분하고 300개의 요청을 한번에 보낼 수 있었다면 1번만 요청하였을 때와 유사한 측정값이 나왔을 것이다.

하지만 스레드가 300개를 한번에 처리할 수 있는 테스트 환경이 아니었고 또 결과를 받은 이후 처리 과정도 있었기에 추가적인 시간이 들었을 것이다.

그럼에도 동기적으로 처리한 경우보다 월등한 성능차이를 확인할 수 있었다.

튜닝 by Stream

그렇지만 CompletbleFuture을 활용한 비동기 처리에 더해 아직 성능을 더 향상 시킬 수 있는 요소가 남아있기에 조금 더 튜닝을 해보자.

위에서 비동기의 경우 크게 2가지 단계로 나누어진다.

  1. CompletableFuture 생성 및 설정한 스레드에 원하는 동작 수행 : browseFutureGenerator.execute
  2. 비동기적으로 수행된 결과 수집 : CompletableFuture.join

CompletableFuture의 결과를 수집하는 방법은 get 그리고 join 2가지 방법이 있는데 이에 대한 특징은 링크로 대체한다.

그리고 이를 기반으로 비동기 성능을 향상 할 수 있는 아래 3가지로 생각된다.

  1. CompletableFuture을 빠르게 생성하기
  2. 스레드에서 원하는 동작 빠르게 수행하기
  3. 결과 수집 빠르게 하기

우선 가장 기본적인 경우로 구성한 코드의 결과를 살펴보며 분석해보자.

스크린샷 2023-07-12 오후 3 55 03

첫 번째 구분선을 기준으로 정상적으로 299개의 CompletableFuture을 잘 생성하고 join을 시작하는 모습을 확인할 수 있다.

로그를 조금 더 내려보면 아래 사진과 같이 main 스레드에서는 완료된 CompletableFuture에 대해 join이 진되고 있는 와중에 req 스레드에서는 요청이 수행되고 있는 것을 확인할 수 있다.

이러한 로그가 보이는 이유는 스레드가 한정되어 있어 모든 요청을 한 번에 수행할 수 없다.

그렇기에 요청이 나누어 수행이 되는데 이미 완료된 요청은 join에 의해 결과가 수집되고 동시에 다음 요청이 진행되는 것이다.

즉, CompletableFuture가 스레드와 함께 비동기적으로 요청을 잘 처리하고 있음을 확인 할 수 있었다.

스크린샷 2023-07-12 오후 4 00 21

스크린샷 2023-07-12 오후 4 03 58

두 번째 구분선 이전까지 CompletableFuture들의 join이 잘 완료된 이후 순차적으로 검증이 일어나는 것을 확인 할 수 있다.

이제 비동기를 조금 더 빠르게 수행할 수 있도록 튜닝을 해보자.

앞서 소개한 방법 중 1,3번의 경우 각각 futures와 collect를 생성하는 stream을 parallelStream로 변경하여 구현할 수 있다.

그래서 아래와 같이 각 스트림을 변경하여 성능을 측정해보았다.

  1. futures : parallelStream, collect : parallelStream
  2. futures : parallelStream, collect : stream
  3. futures : stream, collect : parallelStream

1번 상황의 경우 튜닝전과 비교해 36.8% 성능 향상을 측정할 수 있었다.

2번 상황의 경우 튜닝전과 비교해 39.71% 성능 향상을 측정할 수 있었다.

3번 상황의 경우 튜닝전과 비교해 2.7% 성능 향상을 측정할 수 있었다.

우선 가장 눈에 띄는 3번 상황일 것이다.

3번 상황은 결과를 수집하는 collect의 stream을 parallelStream로 변경한 것이다.

하지만 앞선 futures의 경우 stream으로 순차적으로 수행되었다.

이는 CompletableFuture 내의 요청 역시 순차적으로 실행되었다는 것인데 parallelStream은 순차적으로 stream을 처리하지 않는다.

즉, 먼저 실행된 CompletableFuture이 아닌 나중에 실행된 CompletableFuture에 join을 수행할 가능성이 있다는 것이고 이러한 경우에 join은 해당 CompletableFuture가 수행될 때까지 대기하기에 위와 같은 결과가 나온 것이라 생각한다.

그리고 1번과 2번 상황의 성능 개선의 핵심은 CompletableFuture을 빠르게 생성해준 것이라 생각한다.

즉, 빠르게 생성한 만큼 빠르게 요청을 보낼 수 있고 그만큼 성능 향상이 있었다고 해석할 수 있을 것이다.

이를 통해 parallelStream이 stream의 성능 개선에 효과 있다는 것을 확인 할 수 있었다.

하지만 parallelStream을 사용할 때는 아래 사진과 같이 우리가 제어할 수 없는 스레드 풀을 사용한다.

스크린샷 2023-07-12 오후 4 41 57

그렇기에 parallelStream을 활용할 때는 확실한 성능 개선 효과를 확인할 수 있을 때만 사용하여야 한다.

0개의 댓글