[Akka] Local Stores

smlee·2023년 10월 23일
0

Akka

목록 보기
40/50
post-thumbnail

snapshot을 정리한 포스트에서 우리는 leveldb를 사용하여 로컬 journal에 데이터들을 저장했었다. 이번 포스트에서는 이에 대해 조금 더 보충할 예정이다.

스냅샷을 위해 다음과 같이 간단한 PersistentActor를 선언하자. 이 액터는 "snapshot"이라는 메시지로 그동안 받은 메시지의 개수를 저장하며, "print"라는 메시지를 받으면 현재까지의 메시지 개수를 출력한다. 그 외의 모든 메시지들은 이벤트화하여 Persist시키는 액터이다.

class SimpleActor extends PersistentActor with ActorLogging {
	
    var nMessages = 0
    
    override def persistentId: String = "simple-persistent-actor"
    
    override def receiveCommand: Receive = {
    	case SaveSnapshotSuccess(metadata) =>
        	log.info(s"Saving snapshot was successful: $metadata")
        case SavesnapshotFailure(_, cause) =>
        	log.info(s"Saving snapshot failed because of $cause")
        case "snapshot" =>
        	saveSnapshot(nMessages)
        case "print" =>
      log.info(s"I have persisted $nMessages so far")
        case message =>
        	persist(message){ e =>
            	log.info(s"Persisting $e")
            	nMessages += 1
            }
    }
    
    override def receiveRecover: Receive = {
    	case SnapshotOffer(metadata, payload) =>
        	log.info(s"Recovered Snapshot: $payload")
            nMessages = payload
            
       case RecoveryCompleted => log.info("Recovery Done")
       
       case message =>
       	log.info(s"Recovered Message: $message")
        nMessages += 1
    }
}

위와 같이 간단하게 액터를 선언한다. 그리고 application.conf에 들어가 다음과 같이 configuration을 작성한다.

localStores {
    akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
    akka.persistence.journal.leveldb.dir = "target/localStores/journal"

    akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
    akka.persistence.snapshot-store.local.dir = "target/localStores/snapshots"

    akka.persistence.journal.leveldb.compaction-intervals {
        # start compactions per persistenceId
        simple-persistent-actor = 1000 # 1000개의 메시지 - compaction 시작
        "*" = 5000
    }
}

구글에서 제공하는 가벼운 Key-value 형태의 로컬 db로 leveldb를 사용하며 해당 데이터들이 어디에 저장될 지 정하는 config이다. 또한, 너무 많은 양의 snapshot data가 축적되면 compaction이 진행되도록 설정한다. 이는 akka.persistence.journal.leveldb.compaction-intervals를 통해 진행한다.

이제 이것을 ActorSystem에 해당 Config를 넣어야하므로 ConfigFactory.load().getConfig("localStores")를 넣는다. 따라서 다음과 같은 코드가 된다.

object LocalStores extends App {
  val system = ActorSystem("localStoreSystem", ConfigFactory.load().getConfig("localStores"))
  val persistentActor = system.actorOf(Props[SimplePersistentActor], "simplePersistentActor")

  (1 to 10).foreach(num => persistentActor ! s"I love akka $num")

  persistentActor ! "print"

  (11 to 20).foreach(num => persistentActor ! s"I love akka $num")

  persistentActor ! "snapshot"
}

이제 실제 코드를 실행해보면 다음과 같다.

최초로 실행한 결과는 snapshot이 저장되었다는 로그를 볼 수 있다. 이제 snapshot들이 잘 저장된것이 맞는지 재실행해보면 Recovered Snapshot: 20이라는 문구와 함께 제대로 로컬 스냅샷들을 가져온 것을 확인할 수 있다.

0개의 댓글