[Akka] snapshot

smlee·2023년 10월 20일
0

Akka

목록 보기
38/50
post-thumbnail

Akka Persistent Actor의 단점 중 하나는 오랫 동안 살아있는 Persistent Actor의 Recovery 시간이 길다는 점이다. 즉, 오래 살아남을수록 저장되는 이벤트도 계속 누적될 것이고, 이는 회복 시간도 누적된 이벤트 수에 비례하여 증가할 것이라는 점이다.

이러한 단점을 해결하는 방법으로 Snapshot이 있다. Snapshot을 사용하면 이전 snapshot부터 그 이후 이벤트들이 기록되므로 오래 살아남는 액터에 많은 이벤트가 쌓인다고 할지라도 단기간 내에 복원시킬 수 있도록 해준다.

1. application.conf 작성

snapshot을 사용하여 해당 스냅샷 데이터들을 어디에 저장할 것인지 application.conf에 작성해야 한다.

akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "target/rtjvm/snapshots"

필자는 간단하게 로컬에 저장되는 액터를 작성할 것이므로 로컬로 설정을 하였으며, 디렉터리 역시 지정해주었다.

2. saveSnapshot(이벤트)를 통해 스냅샷 저장

오래 살아남으면서 이벤트를 많이 가지고 있는 액터는 대표적으로 채팅이 있다. 따라서 채팅에 대한 PersistentActor를 작성하며 예제를 구현할 것이다.

채팅 액터이므로 메시지를 주고 받는 것이 중요하다. 따라서 command로는 ReceivedMessageSentMessage를 처리할 것이다. 이벤트로는 ReceivedMessageRecordSentMessageRecord 2가지로 처리할 예정이다.

또한, 해당 유저에 대한 정보가 필요하므로 액터에는 ownercontact라는 필드를 가지게 할 것이다.

object Chat {
	// props
    def props(owner: String, contact: String): Props = Props(classOf[Chat], owner, contact)
    
    // command
    case class ReceivedMessage(contents: String)
    case class SentMessage(contents: String)
    
    // Events
    case class ReceivedMessageRecord(id: Int, contents: String)
    case class SentMessageRecord(id: Int, contents: String)
}

class Chat(owner:String, contact: String) extends PersistentActor with ActorLogging {

	var latestReceiveMessageId = 0
    var latestSentMessageId = 0
    
    import Chat._
    
	override def persistentId: String = s"$owner-$contact-chat"
    
    override def receiveCommand: Receive = ???
    override def receiveRecover: Receive = ???
}

위의 코드는 가장 기본적인 뼈대가 될 것이다. 이제 메시지를 송/수신했을 때 저장하는 코드를 작성해보자. 이때, 우리는 10개 단위로 snapshot을 저장할 것이다.

val MAX_MESSAGE_QUEUE_SIZE = 10
var commandsWithoutCheckpoints = 0
val latestMessages = mutable.Queue[(String,String)]()

따라서 10개 단위로 끊어 저장하기 위에 위와 같은 큐와 변수, 상수를 선언한다. 그리고 receiveCommand 핸들러를 다음과 같이 선언한다.

override def receiveCommand: Receive = {
	case ReceivedMessage(contents) =>
    	val event = ReceivedMessageRecord(latestReceiveMessageId, contents)
        
        persist(event){ record =>
        	log.info(s"Received message: ${record.contents}")
        	
            if(latestMessages.size >= MAX_MESSAGE_QUEUE_SIZE) {
            	latestMessages.dequeue()
            }
            
            latestMessages.enqueue((contact, contents))
            
        	currentMessageId += 1
        	
            if(commandsWithoutCheckpoints >= MAX_MESSAGE_QUEUE_SIZE) {
            	log.info("Saving Checkpoints...")
                saveSnapshot(lastMessages)
                commandsWithoutCheckpoints = 0
            }
      }
}

위와 같이 될 것이다. commandsWithoutCheckpoints는 아직 스냅샷으로 저장되지 않은 이벤트의 개수이고, 이 개수가 10개가 된다면 큐에 있는 모든 메시지를 snapshot으로 저장하고 다시 저장되지 않은 수를 0개로 reset하는 것이다.

saveSnapshot에 대한 실 코드를 보면 snapshot store에 이벤트를 메시지화에서 넣는 것을 알 수 있다.

다시 예제로 돌아와, 밑의 내용의 편의를 위해 큐의 크기를 10으로 유지하는 작업을 queueProcessing(sender:String, contents:String)라는 함수로 리팩터링할 것이며, 스냅샷을 저장하는 작업은 checkAndSaveSnapshot()을 통해 리팩터링할 예정이다.

  private def queueProcessing(sender: String, contents: String): Unit = {
    if (lastMessages.size >= MAX_MESSAGE_QUEUE_SIZE) {
      lastMessages.dequeue()
    }

    lastMessages.enqueue((sender, contents))
  }

  private def maybeCheckpoint(): Unit = {
    commandsWithoutCheckpoint += 1

    if(commandsWithoutCheckpoint >= MAX_MESSAGE_QUEUE_SIZE) {
      log.info("Saving checkpoint...")
      saveSnapshot(lastMessages) 
      commandsWithoutCheckpoint = 0
    }
  }

위와 같이 선언하고 이하에서도 위의 메서드를 활용할 것이다.

3. SnapshotOffer(metadata, contents), SnapshotSuccess, SnapshotFailure

위의 3개의 메서드는 각각 스냅샷이 저장된 것이 복구되었을 때, 스냅샷이 성공되었을 때, 스냅샷이 실패되었을 때를 나타낸다. 이때 이 메시지들은 모두 special message들이며, receiveCommand에 들어가야 한다.

  override def receiveCommand: Receive = {
    case SnapshotOffer(metadata, contents)=> // special message
      log.info(s"Recovered Snapshot: $metadata")
      contents.asInstanceOf[mutable.Queue[(String, String)]].foreach(content => lastMessages.enqueue(content))

    case SaveSnapshotSuccess(metadata) => log.info(s"saving snapshot succeeded: $metadata")
    case SaveSnapshotFailure(metadata, reason) => log.warning(s"saving snapshot $metadata failed because of $reason")
  }

이제 위의 메시지들을 작성했다면

  (1 to 100000).foreach(num => {
    chat ! Chat.ReceivedMessage(s"Akka Rocks $num")
    chat ! Chat.SentMessage(s"Akka Rules $num")
  })

이 코드를 실행시켜볼 것이다.Sent와 Received를 합하여 총 20만개의 이벤트가 발생하므로 굉장히 부하가 클 것이다.

위와 같이 20만개가 잘 처리가 된 것을 확인한 후, 중지시켰다가 다시 시작해본다.

그렇다면 위와 같이 잘 복구된 것을 확인할 수 있다. 이는 매우 빠르게 실행되어 그다지 성능의 저하가 존재하지 않다. 그리고 스냅샷 확인을 위해 1번의 application.conf에서 지정한 디렉터리로 가보면

스냅샷들이 잘 저장되어 있는 것을 확인할 수 있다.

0개의 댓글