Stream이 실행 중일 동안 Source(Publisher), Flow(Processor), Sink(Subscriber)는 static하다. 즉, run 메서드가 실행되기 전까지는 아무런 행동을 하지 않는다. 다만 해당 컴포넌트들이 엮여 그래프만 구성되어져 있을 것이다. 그렇다면, Stream에서 유의미한 데이터를 추출하기 위해서는 어떻게 할까? Graph를 실행시켜서 Materialized Value
를 얻어낼 것이다.
val graph = source.via(flow).to(sink)
val result = graph.run()
위의 코드에서는 result
가 Materialized value를 담고 있게 되는 것이다. graph
자체로는 유의미한 리소스를 담고 있지 않는다.
위는 graph를 담은 변수인데, 데이터 타입이 RunnableGraph[NotUsed]
인 것을 확인할 수 있다. 즉 run 메서드가 실행되어야 액터의 인스턴스화, 스레드풀 할당, 소켓 연결 등의 다양한 작업이 실행되는 것이며, materializeing
된다고 할 수 있다.
이 때, Materializing a graph
라는 뜻은 곧 그래프 내의 모든 컴포넌트들이 materializing된다는 뜻이다. 따라서 그래프 내의 모든 컴포넌트들은 materialized value
를 생성한다. 그리고, 그래프 전체에서 하나의 materialized value만 발생시킨다.
이때 유의해야할 점은 컴포넌드는 같은 리소스에 대해 여러 번 Materialized할 수 있다. 따라서 재사용이 가능하다. 또한, materialized value는 어떤 것이든 될 수 있다는 것에 유의하자.
만약 String List를 입력 받아서 단어의 개수 합을 출력하는 코드는 어떻게 될까?
implicit val system = ActorSystem("MaterializingValueSystem")
implicit val materializer = Materializer
val sentenceSource = Source(List("Hello World", "Akka is Awesome", "Akka streams", "Materialized values exercising"))
val wordCountFlow = Flow[String].fold[Int](0)((sum, str) => sum + str.split(" ").length)
val wordCountSink = Sink.fold[Int](0)((sum, str) => sum + str.split(" ").length)
val g1 = wordCountFlow.runWith(sentenceSource, Sink.head)._2
val g2 = sentenceSource.runWith(wordCountSink)
val g3 = sentenceSource.via(wordCountFlow).runWith(Sink.head)
val g4 = sentenceSource.viaMat(wordCountFlow)(Keep.right).toMat(Sink.head)(Keep.right).run()
위의 g1 ~ g4는 모두 같은 materialized value를 리턴한다.
천천히 살펴보면 다음과 같다.
가장 먼저 sentenceSource
는 문자열 리스트를 가지는 Source를 가지는 값이다. 그리고, wordCountFlow
는 Source가 전달하는 리스트 내의 단어의 개수를 세는 프로세서이다. 이때, Flow[데이터타입].fold[리턴타입]
인 것을 유의하자. wordCountSink
는 wordCountFlow와 거의 동일한 로직을 가진다. 따라서 wordCountFlow
를 사용하는 경우 wordCountSink
를 사용하지 않고, 계산 결과를 Future로 리턴하는 Sink.head
를 사용한다. 마찬가지로, sink 내에 데이터 처리 로직이 있으므로 위의 wordCountSink
는 sentenceSource
에 바로 연결할 수 있다.
이 때 유의할 점은 g1 - g4는 Future형태이므로 onComplete
을 사용하여 데이터를 가공해야 한다는 점이다.
여기서 눈 여겨 볼 점은 runWith
이다. runWith
는 플로우.runWith(Source, Sink)
와 같은 형태로 사용할 수 있으며, Sink.foreach[Int](println).runWith(Source.single[Int](42))
와 같은 형태로 Sink를 앞에 놓을 수 있다.