[Akka] Reactive Stream Principles

smlee·2023년 10월 30일
0

Akka

목록 보기
44/50
post-thumbnail

우리는 비동기적으로 실행되며 많은 양의 데이터를 처리할 수 있는 시스템을 가지고 싶다. 이를 위해서는 Akka Stream을 사용해야 한다. Akka stream은 reactive distributed system이다. 본격적으로 Akka stream에 대해 공부하기 전에 관련 규칙들을 정리할 예정이다.

reactive stream 사이트에도 잘 정리되어 있지만, 별개로 정리해 볼 예정이다.

reactive stream 용어

(1) Publisher

Publisher는 reactive stream에서 요소들을 제공하는 역할을 한다. (emits the elements) Akka Stream에서는 PublisherSource로 부른다.

(2) Subscriber

Subscriber는 Publisher가 제공하는 요소들을 받는 역할을 한다. (receives the elements that publisher emits) Subscriber는 Akka Stream에서는 SubscriberSink라고 부른다.

(3) Processor

Processor는 Publisher가 제공해주는 요소(데이터)들을 가공해주는 역할을 한다. (transform the element along the way) 이 작업 역시 비동기적으로 작동한다.

그림으로 나타내면 위와 같이 PublisherSubscriber 사이 Processor가 중간 역할을 하는 것이다. 이때, 여러 개의 Processor가 중첩될 수도 있다. 즉, Publisher -> processor -> processor -> ... -> processor -> Subscriber와 같은 플로우가 가능하다.

SPI

Reactive Streams는 SPI(Service Provider Interface)이다. SPI는 어떻게 작업을 할 지 정의할 수 있으며, 컴포넌트 사이의 프로토콜 역시 정의할 수 있다. 이때 유의해야할 점은 API가 아닌 점이다. Stream을 구현하기 위해 여러 API를 사용할 수 있다. 우리는 여기서 Akka Streams API를 사용하여 SPI를 구현할 것이다.

Akka Stream 기본 코드

ActorSystem, Materializer 설정

우리는 Akka Stream을 위해 뼈대가 있어야 한다. Akka Stream은 다음과 같은 환경에서 작성된다.

val system = ActorSystem("AkkaStream")
val materializer = Materializer(system)

우리는 그 동안의 챕터에서 ActorSystem 타입의 system은 많이 사용했었다. 하지만, Akka Streams에서는 Materializer 타입의 상수 역시 선언할 것이다. Materializer는 stream들을 Run시킬 수 있도록 한다. 이때, ActorSystemimplicit argument로 들어간다.

(캡쳐 이미지는 ActorMaterializer이지만, 현재는 Materializer를 사용해야 한다. Materializer가 요구하 implicit 인수는 ActorMaterializer와 같다.)
따라서, 위의 코드처럼 system을 직접 넣어주어도 되지만, ActorSystem을 Implicit로 선언해주면 된다. 따라서 밑과 같은 코드의 형태가 된다.

implicit val system = ActorSystem("AkkaStream")
val materializer = Materializer

이때, materializer는 stream들을 실행시킬 수 있는 역할인데, 이 역시 implicit로 들어간다. 밑은 stream을 실행시키기 위핸 run() 메서드이다.

Materializer를 implicit로 요구하므로 위쪽의 Materializer 타입의 상수를 implicit 키워드를 붙여준다. 따라서 다음과 같은 코드를 기본으로 작성해야 한다.

implicit val system = ActorSystem("AkkaStream")
implicit val materializer = Materializer

와 같이 코드를 작성해야 한다.

Source, Flow, Sink 작성

Source는 reactive streams의 Publisher에 해당하며, Sink는 Subscriber, Flow는 Processor에 해당 한다. 위의 ActorSystemMaterializer는 이 3개의 요소를 사용하기 위한 환경 작업이었고, 이제 실제로 코드를 작성하려고 한다.

이 때, 유의해야 할 점은 Source, Sink, Flow 모두 akka.stream.scaladsl 패키지에 있는 class인 것이다. import시킬 때 제대로 확인하고 Import하자.

(1) Source

val integerSources = Source(1 to 10)
val nameSource = Source.single[String]("leesomyoung")

위와 같이 integerSources를 보면 1부터 10까지의 정수를 보내는 source인 것을 알 수 있으며, 1개의 데이터만 가질 경우 Source.single[데이터타입]( 값 )을 사용하는 것을 알 수 있다. 이때 주의해야 할 점은, Source는 serializable하며, immutable한 객체들을 가질 수 있다는 점이다.

(2) Flow

다음은 Source에서 배출한 요소들의 데이터를 처리하는 Processor 역할의 Flow를 선언해주어야 한다.

val doubleIntegerFlow = Flow[Int].map(_ * 2)
val integerTakeThreeFlow = Flow[Int].take(3)
val nameDropFlow = Flow[String].map(_.substring(3))

위의 예시 코드들을 보면 알겠지만, Flow[데이터타입].HOF 형태로 작성을 한다.(HOF : High Order Function) 이때, Flow 역시 akka.stream.scaladsl 패키지 내 원소임을 유의하자.

예시 코드에서 doubleIntegerFlow는 모든 원소들을 2배 해주는 데이터 프로세스이며, integerTakeThreeFlow는 리스트의 여러 개 원소 중 앞의 3개를 가져오는 프로세스이다. 그리고, nameDropFlow는 가장 앞의 3글자를 제외한 모든 글자를 가져오는 프로세스이다.

(3) Sink

Sink는 Source에서 제공하는 데이터들을 Flow를 통해 프로세싱된 데이터를 받는 곳이다. Sink 역시 akka.stream.scaladsl 패키지의 원소이며, Sink.HOF[데이터타입]( 작업 )와 같은 형태로 정의된다.

val integerSink = Sink.foreach[Int](println)
val nameSink = Sink.foreach[String](println)

예시를 위한 코드이므로 간단하게 Sink에서 받아온 모든 데이터들을 출력하도록 하였다.

(4) Source, Flow, Sink 연결하기

Source, Flow, Sink를 연결하기 위해서는 viato 메서드를 사용해야 한다. 3개를 동시에 연결하기 위해서는 Source명.via(flow명).to(Sink명)과 같이 작성해야 한다.

이때, Flow는 여러 개가 chaining될 수 있는 것에 유의한다. 여러 개를 연결하기 위해서는 Source명.via(flow명).via(flow명).via(flow명).via(flow명).to(Sink명)과 같이 작성해주면 된다.

val nameWork = nameSource.via(nameDropFlow).to(nameSink)
val integerWork = integerSources.via(integerTakeThreeFlow).via(doubleIntegerFlow).to(integerSink)

예시 코드들은 위와 같이 작성하면 된다. integerWork는 앞의 3개의 원소만 가져오는 작업과 모든 원소를 2배 하는 작업을 연결한 코드이다.

(5) 연결된 stream 실행

실행하기 위해서는 run메서드를 실행해야 한다.

integerWork.run()
nameWork.run()

과 같이 실행하면 된다.

그러면 실행 결과는 위와 같이 된다. 우리는 여기서 비동기적으로 실행되므로 integerWork가 실행되는 도중 먼저 작업이 끝난 nameWork가 출력되는 것을 알 수 있다.

0개의 댓글