이전 Akka 포스트들에서 Source, Flow, Sink를 엮어서 graph를 만들었다. 하지만 그전에는 input과 output이 1:1인 그래프만 작성했었다. GraphDSL을 사용하여 1:N 관계가 포함된 복잡한 그래프 역시 작성해 볼 예정이다.
fan-in
은 input이 여러 개이며 output은 1개인 것을 뜻한다.N
개의 Input을 1개의 output으로 리턴한다.fan-out
은 input이 1개이며 output은 여러 개인 것을 뜻한다.복잡한 graph는 다음과 같이 코드가 작성된다.
import scala.concurrent.duration._
val numberSource = Source(1 to 1000)
val fastSource = numberSource.throttle(10, 1 second)
val slowSource = numberSource.throttle(2, 1 second)
val incrementer = Flow[Int].map(_ + 1)
val multiplier = Flow[Int].map(_ * 10)
val firstSink = Sink.foreach[Int](num => println(s"Sink 1: $num"))
val secondSink = Sink.foreach[Int](num => println(s"Sink 2: $num"))
val complexGraph = RunnableGraph.fromGraph(
GraphDSL.create(){ implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val merge = builder.add(Merge[Int](2))
val balance = builder.add(Balance[Int](2))
fastSource ~> incrementer ~> merge.in(0)
slowSource ~> multiplier ~> merge.in(1)
merge.out ~> balance
balance.out(0) ~> firstSink
balance.out(1) ~> secondSink
ClosedShape
}
)
complexGraph.run()
위의 코드를 실행하면 어떻게 될까? 위의 코드는 2개의 Source에서 2개의 flow를 연결한 후 flow의 결과값들을 각각 2개의 Sink에 연결한 것이다.
굳이 복잡한 과정을 거치지 않고도 위와 같이 2개의 Source가 잘 처리되는 것을 알 수 있다.
val input = Source(1 to 1000)
val incrementer = Flow[Int].map(_ + 1)
val multiplier = Flow[Int].map(_ * 10)
val output = Sink.foreach[(Int, Int)](println)
val graph = RunnableGraph.fromGraph(
GraphDSL.create() { implicit builder:GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[Int](2))
val zip = builder.add(Zip[Int, Int])
input ~> broadcast
broadcast.out(0) ~> incrementer ~> zip.in0
broadcast.out(1) ~> multiplier ~> zip.in1
zip.out ~> output
ClosedShape
}
)
graph.run()
위의 코드는 2개의 flow에서 계산된 결과를 엮어 하나의 튜플을 출력하는 코드이다. Source -> 1:2 -> broadcast -> 2:1 -> zip -> output
의 수순을 거친다.