[Akka] stash

smlee·2023년 10월 13일
0

Akka

목록 보기
33/50
post-thumbnail

우리는 간혹 먼저 들어온 메시지들의 처리를 미루어야 할 때가 있다. 간단한 예시 상황을 들면 다음과 같다. 만약 어떤 액터 내부의 데이터를 처리하는 DB가 연결되지 않은 상태에서 Read나 Write라는 메시지들이 들어온 경우, 당연히 DB가 연결될 때까지 기다렸다가 메시지들을 처리해야 한다.

이런 경우 stash를 이용해야 한다. stash를 사용하면 다음과 같은 일련의 과정을 통해 메시지 처리가 진행된다.

  1. 아직 처리하면 안 되는 메시지들을 stash queue에 넣는다.
  2. 처리하면 되는 때에 stash queue에 있던 모든 메시지들을 mailbox에 넣는다.
  3. 이후 mailbox에 정상적으로 들어온 메시지들을 처리한다.

1. Stash trait mix-in

가장 먼저 해야할 것은 stash 기능을 사용할 Actor에 Stash라는 액터를 믹스인 해야 한다. 이때, 주의해야 할 점은 Stash trait를 가장 마지막에 믹스인해야 한다는 점이다.

Stash trait는 preStart를 오버라이드하므로 반드시 마지막에 믹스인해야 한다. 따라서 다음과 같은 형태가 된다.

class ResourceActor extends Actor with ActorLogging with Stash

가장 마지막에 Stash가 들어가주면 stash를 사용할 수 있다.

2. stash() 메서드 사용

간단한 예제를 만들어 볼 것이다. state가 Open일 때 읽기, 쓰기 기능을하고 Close일 때 받은 메시지들은 Open 상태까지 기다리는 예제이다. 예시이므로 innerData라는 변수를 사용하여 알기 쉽게할 예정이다.

object ResourceActor {
	case object Open
    case object Close
    case object Read
    case class Write(data:String)
}

class ResourceActor extends Actor with ActorLogging with Stash {
	private var innerData:String = "initialData"
    
    override def receive:Receive = closedStateHandler
    
    import ResourceActor._
    
    def closedStateHandler: Receive = {
    	case Read =>
        	log.info("[Closed state] Close 상태라 데이터를 읽을 수 없습니다.")
            stash()
            
        case Write(data) =>
        	log.info("[Closed state] Close 상태라 데이터를 작성할 수 없습니다.")
            stash()
    }
    
    def openStateHandler: Receive = ???
}

위의 코드를 보면 처음의 메시지 핸들러는 ClosedStateHandler다. 따라서 해당 액터의 최초 상태는 Close 상태인 것을 알 수 있다. 즉, Close 상태에서 ReadWrite 메시지를 받을 경우 stash queue에 enqueue시켜야 한다. 따라서 해당 메시지를 stash() 메서드를 통해 enqueue를 시킨다.

3. unstashAll() 메서드 사용

이제 Open이라는 메시지를 받아 상태를 바꾸고 stash queue에 있는 메시지들을 모두 mailbox를 옮기려면 메시지 핸들러가 context.become을 통해 바뀔 때 옮겨야 한다. 이를 위한 메서드가 unstashAll()이다.

위의 코드를 다듬어서 stash 및 unstashAll이 되도록하는 코드는 밑과 같다.

class ResourceActor extends Actor with ActorLogging with Stash {

  private var innerData:String = "initialData"

  import ResourceActor._

  override def receive: Receive = closedState

  def closedState: Receive = {
    case Open =>
      log.info("[Closed State] receives Open message ... Opening")
      unstashAll() // stash 큐에 있던 메시지들을 일반 mailbox에 넣는다.
      context.become(openState)

    case message =>
      log.info(s"[Closed State] Stashing ${message.toString} because I can't handle it in the closed state")
      stash() // 나중에 처리할 메시지들을 stash 큐에 넣는다.
  }

  def openState: Receive = {
    case Close =>
      log.info("[Open State] receives Close message ... Closing")
      unstashAll()
      context.become(closedState)

    case Read =>
      log.info(s"[Open State] receives Read message ... data is : $innerData")

    case Write(data) =>
      log.info(s"[Open state] receives the Write message with the data ${data.toString}")
      innerData = data

    case message =>
      log.info(s"[Open state] receives the message ${message.toString}")
      stash()
  }
}

메시지 핸들러가 바뀌기 전에 unstashAll()을 통해 stash queue를 비워주고 mailbox에 넣는 작업을 해주면 된다.

유의 해야할 점

stash를 사용하면서 유의해야할 점은 다음과 같다.
1. stash 역시 메모리를 사용한다. 따라서, 성능에 문제가 가지 않도록 주의한다. (거의 발생하지 않는 문제지만 알아 두는 것이 좋다.)
2. mailbox에 따라 unstash할 때 overflow가 발생할 수 있다.
3. 맨 처음 언급했듯, Stash trait는 가장 마지막에 Mix in 한다.

0개의 댓글