[Akka] Persistence Query

smlee·2023년 10월 26일
0

Akka

목록 보기
43/50
post-thumbnail

Persistent store는 데이터를 읽기 위하여 사용한다. 이때, Persistence Query를 사용한다면 다양한 기능들을 사용할 수 있다.
1. SELECT persistence IDs
2. select events by persistence ID
3. select events across persistence IDs by tags

즉, 어떠한 Persistent actor가 살아있는지 확인할 수 있으며, 오래된 상태들을 재생성하거나 쫓을 수 있다. 그리고, 전체 store에서 이벤트에 대한 데이터 프로세싱을 할 수 있다.

가장 먼저, 실행할 object에 다음과 같이 작성한다.

val system = ActorSystem("persistenceQueryDemo", ConfigFactory.load().getConfig("persistenceQueryExcercise"))

val readJournal = PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraJournal.Identifier)

readJournal이 Persistence Query를 담당해줄 값으로, CassandraReadJournal이다. 이때 CassandraReadJournal은 scaladsl 패키지 내에 있는 것으로 임포트 시켜야 한다. 즉, import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal이어야 한다. (인텔리제이에서 자동 import를 시키면 javadsl로 임포트 되는 경우가 종종 있다.)

1. 모든 persistence Id 불러오기

persistenceIds()라는 메서드를 사용하면 모든 Persistence Id를 불러올 수 있다.

  val persistenceIds = readJournal.persistenceIds()

  implicit val materializer = ActorMaterializer()(system)
  persistenceIds.runForeach{
    persistenceId =>
      println(s"Found persistence ID : $persistenceId")
  }

persistentIds에 persistenceIds() 메서드를 실행시킨 결과값을 담는다.

그렇다면 이 value의 데이터 타입은 Source[String, NotUsed]가 된다. Source는 Akka streams의 일부이며, string objects의 infinite collection이다.

이제 이 persistenceIds를 각각 실행하기 위해서는 runForeach 메서드를 돌려야 한다. 이때, ActorMaterializer가 필요하므로 implicit로 선언해준다. 그리고, runForeach 내부에 출력하는 코드를 작성하면 실행 결과는 다음과 같다.

위와 같이 Cassandra 내부에 존재하는 모든 persistentActorpersistenceId가 불러와진다. (위의 persistenceId들은 이전 포스트들에서 공부하며 정리했던 액터들이다.)

이때, persistenceIds() 메서드는 실시간으로 갱신된다. 이를 알아보기 위해 간단한 액터를 추가한 코드를 실행시켜본다.

  val readJournal = PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)

  val persistenceIds = readJournal.persistenceIds() // persistence query API is only available on these regional objects

  implicit val materializer = ActorMaterializer()(system)
  persistenceIds.runForeach{
    persistenceId =>
      println(s"Found persistence ID : $persistenceId")
  }

  val simpleActor = system.actorOf(Props[SimplePersistentActor], "simpleActor")

  import system.dispatcher
  system.scheduler.scheduleOnce(5 seconds) {
    val message = "hello, persistent actor"

    simpleActor ! message
  }

위와 같이 "simpleActor"를 만들어 실행시킨다면 결과는 어떻게 될까?

simpleActor에서 답변이 오고, persistenceId가 갱신된 것이 보인다. 즉, 실시간 갱신이 가능한 것을 알 수 있다.

2. persistenceId로 이벤트 가져오기

우리는 여러 개의 PersistentActor가 존재하는 환경에서 특정 PersistenceActor의 이벤트만 사용하고 싶다. 이럴 때는 eventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr) 메서드를 사용해준다.

  val events = readJournal.eventsByPersistenceId("persistence-query-id-1", 0, Long.MaxValue)

  events.runForeach{ event =>
    println(s"readEvent: $event")
  }

즉, 위와 같이 사용한다면 "persistence-query-id-1"을 persistenceId로 가지고 있는 PersistentActor의 이벤트들을 불러올 수 있다.

1번에서 실행된 하나의 이벤트가 잘 출력되는 것을 볼 수 있다.

이러한 eventsByPersistenceId 역시 실시간으로 실행된다. 만약 해당 persistentActor에 메시지 1개를 더 보냈다면 다음과 같이 출력이 된다.

0개의 댓글