[Akka] Futures

smlee·2023년 8월 28일
0

Akka

목록 보기
12/50
post-thumbnail

이 글은 Akka in action Chapter.5 - Futures를 읽고 작성한 글입니다.


Scala에 대해 정리하면서 Future에 대한 정리한 게시물들을 올린 적 있었다. 이 포스트에서는 Future의 문법적인 사용법에서 더 나아가 Akka에서 어떻게 사용되는지 정리할 예정이다.

Actor & Future

Akka에서 Future는 액터 만큼이나 중요하게 다루어지는 요소이다. 이전 포스트들에서 액터에 대해 많이 정리했었는데, 그렇다면 Actor와 Future의 차이는 무엇일까?

Actor는 concurrent object를 위한 시스템이며, Future는 concurrent function을 위한 시스템이다. Future는 blocking이나 waiting하지 않고 함수의 결과를 combine할 수 있다.

우리는 굳이 Actor와 Future 중 하나를 선택하려고 노력하지 않아도 된다. 이 2가지는 같이 사용할 수 있기 때문이다. Akka는 서로 작업할 수 있는 액터와 퓨처 패턴을 제공한다.

Actor는 많은 메시지들을 처리하고 상태를 가져오는데 적합하다. 또한, 액터들이 받은 메시지를 기반으로 하여 다양한 행동에 응답하는 것이 용이하다. 문제가 발생하여도 모니터링과 supervision을 통해 오랜 기간 살아 남을 수 있다.

Future의 개념

Future는 Scala를 정리하면서 이 포스트이 포스트에 스칼라에서의 Future 문법 및 동시성에 대한 이야기를 간략하게 다루었다. 하지만, 이 챕터에서는 Akka에서의 Future 사용을 다룰 예정이므로 위 포스트들에 대한 내용이 어느정도 있다고 가정한다.

Future는 함수들을 사용할 때 이용하기 좋은 도구이며, 일을 수행하기 위한 객체가 굳이 필요로 하지 않는다. Future미래의 어떤 시점에서 접근가능한 함수 실행 결과를 담는 공간이다. 이러한 Future는 비동기 결과를 위해 사용된다.
(동기/비동기 및 blocking/non-blocking에 관한 내용들은 이 포스트에서 정리하였다.)

Future는 미래의 어떤 시점에서 접근 가능한 것이므로 결국에는 접근 가능하게 된다.

위는 Future를 사용하여 함수를 비동기적으로 실행했을 때를 그린 것이다. Function은 main 스레드 이외의 스레드에서 진행되므로 비동기적으로 진행된다. 그리고 그 결과는 Future에 저장이 된다.

이때, 주의해야 할 점은 Future는 read-only라는 점이다. Future의 결과값은 바깥에서 변경할 수 없다. 또한, 비동기 함수의 성공 여부에 따라 연산의 결과값을 담거나 Failure를 담는다.

read-only 특성을 가지고 있으므로 항상 같은 결과값을 제공한다는 것이 보장되고 이는 멀티 스레드 환경에서 안전하다는 것을 뜻한다. 또한, 여러 결과값하고도 combine할 수 있어 파이프라이닝에 용이하다. 즉, 한 함수의 결과값을 다른 함수로 넘기기 용이하다.

예시


만약 다음과 같은 과정을 가지는 앱이 있다고 하자. 즉, 티켓 번호를 입력하면 티켓 번호와 연관된 트래픽 정보들을 불러오는 이벤트들 정보를 가져오는 앱이 있다고 가정하자.

위의 흐름대로 각각 동기/비동기적으로 코드를 작성해 볼 것이다.

val request = EventRequest(ticketNr)
val response:EventResponse = callEventService(request)
val event:Event = response.event

이 코드는 동기적으로 작성된 코드이다. 반면, 비동기적으로 작성된 코드를 한 번 살펴보자.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

val request = EventRequest(ticketNr)
val futureResponse:Future[Event] = Future{
	val response = callEventService(request)
    response.event
}

동기적 코드가 조금 더 직관적이고 간결하지만, 실제로는 비동기적 코드로 작성하는 것이 애플리케이션 측면에서는 좋다. 왜 그런것일까?


2가지 메서드를 실행하는 서비스가 있다고 가정하자. 1개의 메서드는 4초가 걸리고 나머지 메서드는 2초가 걸린다면 위의 이미지처럼 동기의 경우 총 6초의 소요시간이 필요(sum 연산)하지만, 비동기로 연산을 진행하면 4초(max 연산)가 걸리기 때문이다.


액터로 처리해야 한다면 2개의 액터로 분리하여 병렬적으로 실행해야 한다. 물론 이렇게 액터를 사용하는 것도 좋은 방법 중 하나이지만, 위의 경우 Future를 사용한 비동기 처리가 가장 유용한 처리 방법이다.

asynchronous call

위의 예시를 보며 천천히 Future를 사용한 Akka 코드를 작성할 것이다.

애플리케이션의 플로우가 위와 같고, 우리는 위쪽에서 예시를 들며 밑과 같은 코드를 작성했었다.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

val request = EventRequest(ticketNr)
val futureResponse:Future[Event] = future{
	val response = callEventService(request)
    response.event
}

하지만 위의 코드는 getTrafficInfo를 가져오는 로직이 빠져있다. 그렇다면 어떻게 비동기적으로 위의 정보들을 가져올 수 있을까?

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

val request = EventRequest(tickerNr)
val futureRoute = Future {
	callEventService(request).event
}.map(event => {
	val trafficRequest = TrafficRequest(destination = event.location,
    arrivalTime = event.Time)
    
    callTrafficService(trafficRequest).route
})

GetTrafficInfo 역시 비동기 처리로 하여 Future[Event]를 chaining하는 방식으로 설정한 것이다.
Future로 GetEvent를 처리하고 그 결과값들에서 바로 map을 실행시켜 라우트 결과를 가져온 것이다. 결국, 최종 리턴 값은 Future[Route]가 된다.

ExecutionContext는 ForkJoinPool을 사용한다. 하지만, dispatcher를 사용하여 이를 조정할 수 있다. Akka Dispatcher를 사용하는 내용은 링크 걸어놓은 포스트에서 확인할 수 있다.

Future에서 에러 다루기

Future는 연산에 성공하면 결과값을 가지거나 Failure를 가진다.


위의 예시는 퓨처의 실패한 값을 출력하려고 하는 것이다. 하지만, foreach를 사용해 출력을 하려고 했지만 실패한 것이 보인다. 왜냐하면 foreach 블럭은 Future가 실패했으므로 실행 자체가 되지 않는다. 즉, 성공적으로 연산이 실행되지 않는다면 해당 value를 다루는 메서드는 실행되지 않는다는 것이다.

위와 같은 실패가 발생했을 때를 대비하여 onCompelte를 사용하여 에러를 다룬다.

onComplete

만약 실패 여부에 상관 없이 실행시키고 싶은 블럭이 있다면 onComplete 메서드를 사용하면 된다.

위의 코드에 이어서 다음과 같은 코드를 실행하면 의도한대로 출력하는 것을 알 수 있다. 즉, 실패했든 성공했든 실행해야하는 코드가 있다면 onComplete로 실행시키면 된다. 이때, onCompleteUnit을 리턴하는 메서드이다. 따라서 function chaining을 할 수 없다.


위와 같은 메서드를 통해 예외가 발생하더라도 정보를 쌓을 수 있다. 예시로 돌아온다면 다음과 같아진다.

비동기 함수 호출을 통해 데이터가 쌓이지만, 예외가 발생하는 경우 recover를 통해 복구할 수 있다.

이와 같이 복구하는 예제를 살펴보자.

case class TicketInfo(ticketNr:String, event:Option[Event]=None, route:Option[Route] = None)

위와 같은 TicketInfo에서 데이터를 쌓는 과정에서 recover라는 과정을 코드로 나타낸 것이다.

val futureStep1 = getEvent(ticketNr)
val futureStep2 = futureStep1.flatMap(ticketInfo => {
	getTraffic(ticketInfo).recover {
    	case e:TrafficServiceException => ticketInfo
    }
}).recover{
	case NonFatal(e) => TicketInfo(ticketNr)
}

recover를 사용하여 TicketInfo를 추가한 것이다.

Reference

  • Akka in action - Chapter.5 Futures

0개의 댓글