event stream은 각 액터 시스템의 메인 이벤트 버스이다. 이는 log messages를 옮기거나 Dead Letters를 옮길 때 사용된다. 그리고 다른 목적의 유저 코드를 위해 사용되기도 한다. 이런 event stream은 연관되어 있는 채널들의 세트를 등록할 수 있는 Subchannel Classification
을 사용한다.
밑의 예시 코드는 어떻게 구독이 작동하는지를 보여주는 예시이다. 밑의 간단한 액터를 보자.
import akka.actor.{ Actor, DeadLetter, Props }
class DeadLetterListener extends Actor {
def receive = {
case d: DeadLetter => println(d)
}
}
val listener = system.actorOf(Props[DeadLetterListener]())
system.eventStream.subscribe(listener, classOf[DeadLetter])
위의 액터는 Actor
를 확장하며 receive를 확장한다. 만약, 들어오는 메시지가 DeadLetter
라면 해당 메시지를 출력하는 것이다.
위의 코드는 eventStream을 사용하여 subchanner classification을 사용한 것이다. 이를 통해 이벤트 그룹들을 구독하는 것이 가능하다.
abstract class AllKindsOfMusic { def artist: String }
case class Jazz(artist: String) extends AllKindsOfMusic
case class Electronic(artist: String) extends AllKindsOfMusic
class Listener extends Actor {
def receive = {
case m: Jazz => println(s"${self.path.name} is listening to: ${m.artist}")
case m: Electronic => println(s"${self.path.name} is listening to: ${m.artist}")
}
}
val jazzListener = system.actorOf(Props[Listener]())
val musicListener = system.actorOf(Props[Listener]())
system.eventStream.subscribe(jazzListener, classOf[Jazz])
system.eventStream.subscribe(musicListener, classOf[AllKindsOfMusic])
// only musicListener gets this message, since it listens to *all* kinds of music:
system.eventStream.publish(Electronic("Parov Stelar"))
// jazzListener and musicListener will be notified about Jazz:
system.eventStream.publish(Jazz("Sonny Rollins"))
Actor Classification과 유사하게 EventStream은 가입자가 종료될 때 자동으로 제거된다.
이때 주의해야하는 점은 이벤트 스트림은 로컬 기능이므로, (Stream에 Remote Actor를 명시적으로 가입하지 않는 한) 클러스터 환경의 다른 노드에 이벤트를 배포하지 않는다는 것이다. 수신인을 명시적으로 알지 못한 상태에서(즉, 수신인의 ActorRef 획득) Akka 클러스터에서 이벤트를 브로드캐스트해야 하는 경우 클러스터에서 분산 게시 구독을 확인할 수 있다.
시작 시 액터 시스템은 로깅을 위해 액터를 생성하고 이벤트 스트림에 가입시킨다. 이들은 application.conf
에서 예를 들어 구성된 핸들러이다.
akka {
loggers = ["akka.event.Logging$DefaultLogger"]
}
여기에 완전한 클래스 이름으로 나열된 핸들러는 구성된 로그 레벨 이상의 우선 순위를 가진 모든 로그 이벤트 클래스에 가입되며 런타임에 로그 레벨을 변경할 때 가입이 동기화된다.
system.eventStream.setLogLevel(Logging.DebugLevel)
이는 기록되지 않을 수준의 로그 이벤트가 일반적으로 전혀 디스패치되지 않음을 의미한다(각 이벤트 클래스에 대한 수동 구독이 수행되지 않은 경우)
Stopping Actors에서 설명한 것처럼, 연기자가 종료할 때 대기 중인 메시지는 Dead Letter함으로 다시 전달되고, 죽은 편지함은 기본적으로 죽은 편지함으로 포장된 메시지를 게시한다. 이 포장지에는 리디렉션된 편지함의 원본 송신자, 수신자 및 메시지가 보관된다.
일부 내부 메시지(죽은 편지 억제 특성이 표시된)는 일반 메시지처럼 Dead Letter로 끝나지 않을 것이다. 이 메시지들은 안전하게 설계되었으며 때때로 종료된 Actor에게 도착할 것으로 예상되며, 걱정할 것이 없기 때문에 기본 Dead Letter 기록 메커니즘으로부터 억제된다.
그러나 이런 종류의 낮은 수준의 억제된 Dead Letter를 디버깅해야 할 필요가 있다고 생각하는 경우에도 명시적으로 구독할 수 있다.
import akka.actor.SuppressedDeadLetter
system.eventStream.subscribe(listener, classOf[SuppressedDeadLetter])