[Akka] Classic Event Bus 1

smlee·2023년 9월 13일
0

Akka

목록 보기
21/50
post-thumbnail

한 집단의 액터들에게 메시지를 보내는것은 EventBus를 사용하여 일반화한 후 보내도록 한다.

EventBus
AnyRef를 확장한 객체로, Event 타입을 내부적으로 가지고 있다.

EventBus는 위와 같은 멤버들을 가진다.

다음과 같은 예제 코드를 확인해보자. 이때 주의해야할 점은 EventBus는 이미 출간된 메시지들의 발신자를 보존하지 않는다는 점이다. 만약 원래의 발신자의 레퍼런스가 필요하다면, 메시지 안에 들어 있어야 한다.

밑은 예제 코드이다. 밑에 있는 메서드들은 모두 EventBus 내의 abstract value member들이다.

/**
 * Attempts to register the subscriber to the specified Classifier
 * @return true if successful and false if not (because it was already
 *   subscribed to that Classifier, or otherwise)
 */
def subscribe(subscriber: Subscriber, to: Classifier): Boolean

/**
 * Attempts to deregister the subscriber from the specified Classifier
 * @return true if successful and false if not (because it wasn't subscribed
 *   to that Classifier, or otherwise)
 */
def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean

/**
 * Attempts to deregister the subscriber from all Classifiers it may be subscribed to
 */
def unsubscribe(subscriber: Subscriber): Unit

/**
 * Publishes the specified Event to this bus
 */
def publish(event: Event): Unit

subscribe라는 메서드는 특정 Classifier에 구독자를 등록하는 메서드이다. 등록에 성공한다면 true를 반환하고 false를 반환한다. unsubscribe라는 메서드는 특정 Classifier에 구독자의 등록을 해제하는 메서드이며, 이 역시 등록 해제에 성공하면 true, 등록 해제에 실패하면 false를 반환한다. 반면 Unit 형태의 unsubscribe는 모든 classifier의 구독자들을 해제하는 메서드이며, publish라는 메서드는 Event를 출판한다.

위와 같은 매커니즘은 akka의 다른 장소에서 사용된다. 대표적으로 EventStream이 있다.

Classifiers

classifiers는 Akka 분산처리의 일부이다.

Classification이란

가장 간단한 classification은 각 이벤트에서 임의의 classifier를 추출하고 가능한 classifier에 구독자들 세트를 유지하는 것이다. LookupClassification이라는 trait를 사용한다.

import akka.event.EventBus
import akka.event.LookupClassification

final case class MsgEnvelope(topic: String, payload: Any)

class LookupBusImpl extends EventBus with LookupClassification {
  type Event = MsgEnvelope
  type Classifier = String
  type Subscriber = ActorRef

  override protected def classify(event: Event): Classifier = event.topic

  override protected def publish(event: Event, subscriber: Subscriber): Unit = {
    subscriber ! event.payload
  }
  override protected def compareSubscribers(a: Subscriber, b: Subscriber): Int =
    a.compareTo(b)
  override protected def mapSize(): Int = 128

}

LookupBusImpl라는 클래스는 EventBus와 LookupClassification을 확장한다. 따라서 들어오는 이벤트들로부터 classifier를 추출하고, 구독자들 세트를 유지시킬 수 있다.

val lookupBus = new LookupBusImpl
lookupBus.subscribe(testActor, "greetings")
lookupBus.publish(MsgEnvelope("time", System.currentTimeMillis()))
lookupBus.publish(MsgEnvelope("greetings", "hello"))
expectMsg("hello")

따라서 위와 같이 각각의 이벤트에 대해 사용할 수 있다.

Reference

0개의 댓글