[Akka] FSM(Finite State Machine) 2

smlee·2023년 9월 7일
0

Akka

목록 보기
18/50
post-thumbnail

EventSourcedBehavior로 마이그레이션

Persistent FSM은 Persistence Typed를 사용하여 나타내어진다. 스냅샷 어댑터와 이벤트 어댑터를 사용하는 EventSourcedBehavior를 사용하여 Persistence FSM에 저장된 데이터를 읽을 수 있다.

FSM이 유저 데이터를와 스냅샷들을 직접적으로 저장하지 않으므로 어댑터들이 필요하다.

Classic Persistence
Akka Persistence는 at-least-once 메시지 전달 과정을 통해 point-to-point 통신을 지원한다.
이러한 Akka Persistence를 사용하기 위해서는 akka-persistence라는 dependency를 추가해야 한다. (자세한 내용은 맨 밑 Reference에 있는 Classic Persistence 링크를 참고하면 된다.)

마이그레이션 과정

  1. replayTo라는 ActorRef를 포함하도록 새로운 커맨드를 만들거나 커맨드를 변경한다.
  2. 영속되어진 이벤트들은 이전과 같은 상태로 남아있다.
  3. old PersistentFSM을 모방한 EventSourcedBehavior를 생성한다.
  4. Behaviors.withTimers를 사용하여 상태 타임아웃을 하드 코딩이나 저장된 상태에 대하여 설정한다.
  5. EventAdapter를 추가하여 PersistentFSM에 의해 추가된 상태 이전 이벤트를 private event로 변경한다.
  6. 만약 스냅샷들이 사용되었다면, SnapshotAdapter를 추가하여 PersistentFSM 스냅샷들을 EventSourcedBehaviorState로 변경한다.

예시

지난 포스트에서 정리했던 내용들

sealed trait Command
case class AddItem(item: Item) extends Command
case object Buy extends Command
case object Leave extends Command
case class GetCurrentCart(replyTo: ActorRef[ShoppingCart]) extends Command
private case object Timeout extends Command

FSM의 상태들은 EventSourcedBehavior의 상태 파라미터와 더불어 커맨드 핸들러와 이벤트를 사용하여 나타낸다. 다음은 State를 나타낸 코드이다.

sealed trait State
case class LookingAround(cart: ShoppingCart) extends State
case class Shopping(cart: ShoppingCart) extends State
case class Inactive(cart: ShoppingCart) extends State
case class Paid(cart: ShoppingCart) extends State

커맨드 핸들러는 PersitentFSM의 상태들 각각에 대해 분리된 영역을 가지고 있다.

def commandHandler(timers: TimerScheduler[Command])(state: State, command: Command): Effect[DomainEvent, State] =
  state match {
    case LookingAround(cart) =>
      command match {
        case AddItem(item) =>
          Effect.persist(ItemAdded(item)).thenRun(_ => timers.startSingleTimer(StateTimeout, Timeout, 1.second))
        case GetCurrentCart(replyTo) =>
          replyTo ! cart
          Effect.none
        case _ =>
          Effect.none
      }
    case Shopping(cart) =>
      command match {
        case AddItem(item) =>
          Effect.persist(ItemAdded(item)).thenRun(_ => timers.startSingleTimer(StateTimeout, Timeout, 1.second))
        case Buy =>
          Effect.persist(OrderExecuted).thenRun(_ => timers.cancel(StateTimeout))
        case Leave =>
          Effect.persist(OrderDiscarded).thenStop()
        case GetCurrentCart(replyTo) =>
          replyTo ! cart
          Effect.none
        case Timeout =>
          Effect.persist(CustomerInactive)
      }
    case Inactive(_) =>
      command match {
        case AddItem(item) =>
          Effect.persist(ItemAdded(item)).thenRun(_ => timers.startSingleTimer(StateTimeout, Timeout, 1.second))
        case Timeout =>
          Effect.persist(OrderDiscarded)
        case _ =>
          Effect.none
      }
    case Paid(cart) =>
      command match {
        case Leave =>
          Effect.stop()
        case GetCurrentCart(replyTo) =>
          replyTo ! cart
          Effect.none
        case _ =>
          Effect.none
      }
  }

위과 같이 상태에 따라 분리하여 원하는 작업을 실행시킬 수 있다.

.receiveSignal {
  case (state, RecoveryCompleted) =>
    state match {
      case _: Shopping | _: Inactive =>
        timers.startSingleTimer(StateTimeout, Timeout, 1.second)
      case _ =>
    }
}

위는 recovery와 state에 따른 행동을 나눈것이다.

def eventHandler(state: State, event: DomainEvent): State = {
  state match {
    case la @ LookingAround(cart) =>
      event match {
        case ItemAdded(item) => Shopping(cart.addItem(item))
        case _               => la
      }
    case Shopping(cart) =>
      event match {
        case ItemAdded(item)  => Shopping(cart.addItem(item))
        case OrderExecuted    => Paid(cart)
        case OrderDiscarded   => state // will be stopped
        case CustomerInactive => Inactive(cart)
      }
    case i @ Inactive(cart) =>
      event match {
        case ItemAdded(item) => Shopping(cart.addItem(item))
        case OrderDiscarded  => i // will be stopped
        case _               => i
      }
    case Paid(_) => state // no events after paid
  }
}

위는 마이그레이션 과정 중에서 5번에 해당하는 것으로, 이벤트 어댑터를 선언한 것이다. 들어온 이벤트의 종류에 따라 나누어 해야할 일을 정한 코드이다.

class PersistentFsmEventAdapter extends EventAdapter[DomainEvent, Any] {
  override def toJournal(e: DomainEvent): Any = e
  override def manifest(event: DomainEvent): String = ""
  @nowarn("msg=deprecated")
  override def fromJournal(journalEvent: Any, manifest: String): EventSeq[DomainEvent] = {
    journalEvent match {
      case _: StateChangeEvent =>
        // In this example the state transitions can be inferred from the events
        // Alternatively the StateChangeEvent can be converted to a private event if either the StateChangeEvent.stateIdentifier
        // or StateChangeEvent.timeout is required
        // Many use cases have the same timeout so it can be hard coded, otherwise it cane be stored in the state
        EventSeq.empty
      case other =>
        // If using a new domain event model the conversion would happen here
        EventSeq.single(other.asInstanceOf[DomainEvent])
    }

  }
}

val persistentFSMSnapshotAdapter: SnapshotAdapter[State] = PersistentFSMMigration.snapshotAdapter[State] {
  case (stateIdentifier, data, _) =>
    val cart = data.asInstanceOf[ShoppingCart]
    stateIdentifier match {
      case "Looking Around" => LookingAround(cart)
      case "Shopping"       => Shopping(cart)
      case "Inactive"       => Inactive(cart)
      case "Paid"           => Paid(cart)
      case id               => throw new IllegalStateException(s"Unexpected state identifier $id")
    }
}

위의 과정은 마이그레이션 마지막에 해당하는 단계로, FSM 상태에 따라 해야할 메서드를 나타낸 것이다. 이때, 상태는 stateIdentifier에 저장되어 있으므로, 이것을 패턴매칭을 통해 해야할 일을 설정한 것이다.

Reference

0개의 댓글