[Akka] Classic Cluster Usage

smlee·2023년 9월 8일
0

Akka

목록 보기
19/50
post-thumbnail

클러스터에 있는 액터 시스템 간 메시지를 보내기 위해서는 직렬화를 꼭 시켜야 한다. Serialization with Jackson은 좋은 선택지 중 하나이다.


Cluster API Extension

Cluster extenstion을 사용하기 위해서는 밑과 같은 configuration을 설정해야 한다.

akka {
  actor {
    provider = "cluster"
  }
  remote.artery {
    canonical {
      hostname = "127.0.0.1"
      port = 2551
    }
  }

  cluster {
    seed-nodes = [
      "akka://ClusterSystem@127.0.0.1:2551",
      "akka://ClusterSystem@127.0.0.1:2552"]
    
    downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
  }
}

cluster extension을 사용하기 위해서는 반드시 hostport를 설정해주어야 하고, akka.actor.provider = "cluster"를 사용해야 한다. 위의 코드는 Cluster API Extension을 사용하기 위한 최소한의 설정이다.

액터들은 특정한 클러스터 이벤트들에 대한 구독자로 가입한다. 구독이 시작되었을 때, 액터들은 현재 클러스터의 상태에 대응하는 이벤트를 받는다. 그리고 클러스터가 변하는 이벤트가 발생 시 그 상태에 대응하는 이벤트를 받는다.

/*
 * Copyright (C) 2018-2023 Lightbend Inc. <https://www.lightbend.com>
 */

package scala.docs.cluster

import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
import akka.actor.ActorLogging
import akka.actor.Actor

class SimpleClusterListener extends Actor with ActorLogging {

  val cluster = Cluster(context.system)

  // subscribe to cluster changes, re-subscribe when restart
  override def preStart(): Unit = {
    //#subscribe
    cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember])
    //#subscribe
  }
  override def postStop(): Unit = cluster.unsubscribe(self)

  def receive = {
    case MemberUp(member) =>
      log.info("Member is Up: {}", member.address)
    case UnreachableMember(member) =>
      log.info("Member detected as unreachable: {}", member)
    case MemberRemoved(member, previousStatus) =>
      log.info("Member is Removed: {} after {}", member.address, previousStatus)
    case _: MemberEvent => // ignore
  }
}

위의 코드는 클러스터 extension에 대한 간단한 예제 코드이다. 내부에 Cluster를 선언하고 preStart에서 클러스터에 가입한다. 그 후, 액터가 살아있는 생명주기 동안 이벤트를 받아 처리하는 코드이다. 그리고 액터가 중지 된 후 (postStop) 클러스터의 구독을 해제한다.

액터가 살아있을 동안은 receive를 통해 이벤트를 처리한다.

Cluster Membership API

(1) joining

클러스터에 있는 노드들에 joining할 수 있다. joining 하는 것은 아래와 같이 간단하게 할 수 있다.

import akka.actor.Address
import akka.cluster.Cluster

val cluster = Cluster(system)
val list: List[Address] = ??? //your method to dynamically get seed nodes
cluster.joinSeedNodes(list)

노드들이 들어있는 리스트를 선언해서 노드들을 직접 삽입해주면 된다. joinSeedNodes를 통해 클러스터에 등록한다. 중복성이나 빌트인 재시도 매커니즘으로 인해 위의 joinSeedNodes가 지향된다.

(2) subscribe to cluster event

Cluster(system).substribe를 사용함으로써 클러스터 멤버들의 변경 알림을 구독할 수 있다.

cluster.subscribe(self, classOf[MemberEvent], classOf[UnreachableMember])

위와 같은 간단한 형태로 변경을 구독할 수 있다.

CurrentClusterState에 있는 전체 상태의 스냅샷은 업데이트들을 담은 첫 메시지를 보낸다.

주의해야할 점은 만약 비어있는 CurrentClusterState를 받았다면, MemberUp 이벤트를 담아야 한다.

Reference

0개의 댓글