[Akka] Classic Event Bus 2

smlee·2023년 9월 13일
0

Akka

목록 보기
22/50
post-thumbnail

Subchannel Classification

Subclassification이라는 trait를 확장하여 classifier가 계층을 이루었을 경우 리프 노드들에서 뿐만 아니라 구독이 가능하기를 원한다면, 이 classification는 딱 맞을 수 있다. 이것은 장르별로 라디오 채널을 튜닝(아마도 여러 개)하는 것과 비교될 수 있다. 이 분류는 분류기가 단지 이벤트의 JVM 클래스이고 구독자들이 특정 클래스의 모든 하위 클래스를 구독하는 것에 관심이 있을 수 있는 경우를 위해 개발되었지만, 그것은 어떤 분류기 계층에서도 사용될 수 있다.

import akka.util.Subclassification

class StartsWithSubclassification extends Subclassification[String] {
  override def isEqual(x: String, y: String): Boolean =
    x == y

  override def isSubclass(x: String, y: String): Boolean =
    x.startsWith(y)
}

import akka.event.SubchannelClassification

/**
 * Publishes the payload of the MsgEnvelope when the topic of the
 * MsgEnvelope starts with the String specified when subscribing.
 */
class SubchannelBusImpl extends EventBus with SubchannelClassification {
  type Event = MsgEnvelope
  type Classifier = String
  type Subscriber = ActorRef

  // Subclassification is an object providing `isEqual` and `isSubclass`
  // to be consumed by the other methods of this classifier
  override protected val subclassification: Subclassification[Classifier] =
    new StartsWithSubclassification

  // is used for extracting the classifier from the incoming events
  override protected def classify(event: Event): Classifier = event.topic

  // will be invoked for each event for all subscribers which registered
  // themselves for the event’s classifier
  override protected def publish(event: Event, subscriber: Subscriber): Unit = {
    subscriber ! event.payload
  }
}

위의 코드를 테스트하려면 밑의 코드를 사용해야 한다.

val subchannelBus = new SubchannelBusImpl
subchannelBus.subscribe(testActor, "abc")
subchannelBus.publish(MsgEnvelope("xyzabc", "x"))
subchannelBus.publish(MsgEnvelope("bcdef", "b"))
subchannelBus.publish(MsgEnvelope("abc", "c"))
expectMsg("c")
subchannelBus.publish(MsgEnvelope("abcdef", "d"))
expectMsg("d")

이 분류기는 이벤트에 대한 가입자를 찾을 수 없는 경우에도 효율적이지만, 내부 분류기 캐시를 동기화하기 위해 기존의 잠금을 사용하기 때문에 가입이 매우 높은 빈도로 변경되는 경우를 사용하기에 적합하지 않다(첫 번째 메시지를 보내 분류기를 "열어"도 이전의 모든 가입을 다시 확인해야 함).

Scanning Classification

이전 분류기는 엄격하게 계층화된 다중 분류기 구독을 위해 제작되었으며, 이 분류기는 계층을 형성하지 않고 이벤트 공간의 여러 부분을 포괄하는 중복된 분류기가 있는 경우 유용하다. 지리적 도달 가능성(구학파 전파 전송용)으로 라디오 방송국을 튜닝하는 것과 비교할 수 있다.

아래의 예시 코드를 보자.

import akka.event.ScanningClassification

/**
 * Publishes String messages with length less than or equal to the length
 * specified when subscribing.
 */
class ScanningBusImpl extends EventBus with ScanningClassification {
  type Event = String
  type Classifier = Int
  type Subscriber = ActorRef

  // is needed for determining matching classifiers and storing them in an
  // ordered collection
  override protected def compareClassifiers(a: Classifier, b: Classifier): Int =
    if (a < b) -1 else if (a == b) 0 else 1

  // is needed for storing subscribers in an ordered collection
  override protected def compareSubscribers(a: Subscriber, b: Subscriber): Int =
    a.compareTo(b)

  // determines whether a given classifier shall match a given event; it is invoked
  // for each subscription for all received events, hence the name of the classifier
  override protected def matches(classifier: Classifier, event: Event): Boolean =
    event.length <= classifier

  // will be invoked for each event for all subscribers which registered themselves
  // for a classifier matching this event
  override protected def publish(event: Event, subscriber: Subscriber): Unit = {
    subscriber ! event
  }
}
val scanningBus = new ScanningBusImpl
scanningBus.subscribe(testActor, 3)
scanningBus.publish("xyzabc")
scanningBus.publish("ab")
expectMsg("ab")
scanningBus.publish("abc")
expectMsg("abc")

이 분류기는 실제로 일치하는 구독 수에 관계없이 구독 수에 비례하는 시간을 항상 사용한다다.

0개의 댓글