[Akka] Multiple Persisting

smlee·2023년 10월 20일
0

Akka

목록 보기
37/50
post-thumbnail

우리는 이전 포스트에서 persistAll(이벤트)(콜백)이라는 고차함수를 통해 같은 타입의 여러 이벤트를 한 번에 저장하는 것은 어떻게 해야 할까?

답은 간단하다. 저장할 이벤트마다 persist 혹은 persistAll을 실행하면 된다. 다음과 같은 예시를 통해 알아보자.


우리는 DiligentAccountant라는 Persistent Actor를 선언할 것인데, 2가지 레코드를 저장시키는 역할을 한다. 첫번째로, 세금과 관련된 TaxRecord를 저장하는 일과 송장과 관련된 InvoiceRecord를 저장하는 역할을 시킬 것이다. 즉, 서로 다른 이벤트인 TaxRecordInvoiceRecord를 저장시키는 액터인 것이다.

그리고, DiligentAccountant 액터 자체는 taxId와 데이터를 제출할 taxAuthority라는 ActorRef를 인자로 받을 것이다. 이러한 점을 종합하여 전체적인 틀은 아래와 같을 것이다.

object DiligentAccountant{
	// props
    def props(taxId:String, taxAuthority:ActorRef): Props = Props(classOf[DiligentAccountant], taxId, taxAuthority)
    
    // Command
    case class Invoice(recipient:String, date: Date, amount: Int)
    
    // Record
    case class InvoiceRecord(invoiceId: Int, recipient: String, date:Date, amount: Int)
    case class TaxRecord(taxId:String, recordId:Int, date: Date, totalAmount:Int)
}

class DiligentAccountant(taxId:String, taxAuthority: ActorRef) extends PersistentActor with ActorLogging{
	
    import DiligentAccountant._
    
    override def receiveCommand: Receive = ???
    
    override def receiveRecover: Receive = {
    	case message => log.info(s"Recovered message: $message")
    }
    
	override def persistentId:String = s"diligent-accountant-$taxId"
}

이제 틀을 잡았으니 본격적으로 내부 구현을 하겠다. (여기서는 receiveRecover는 중요도가 떨어진다. 따라서 로그를 찍는 것으로 구현하였다.)

당연히 Invoice라는 command를 받으면, InvoiceRecordTaxRecord로 선언하여 저장하는 로직이 될 것이다.

var latestInvoiceId = 0
var latestTaxId = 0

override def receiveCommand: Receive = {
	case Invoice(recipient, date, amount) =>
    	val taxData = TaxRecord(taxId, latestTaxId, date, amount/3)
        val invoiceData = InvoiceRecord(latestInvoiceId, recipient, date, amount)
        
        persist(taxData){ record =>
        	taxAuthority ! record
            latestTaxId += 1
        }
        
        persist(invoiceData){ record =>
        	taxAuthority ! record
            latestInvoiceId += 1
        }
}

위와 같이 작성될 것이다. 즉, 여러 타입의 이벤트를 저장해야하는 상황이 오면 각각 persist/ persistAll을 선언해야 하는 것이다.

그렇다면 위 코드의 실행 결과는 어떻게 될까? 테스트의 편의성을 위해 taxAuthority라는 액터는 받은 메시지를 로그로 출력하는 액터라고 생각하자.

class TaxAuthority extends Actor with ActorLogging{
  override def receive: Receive = {
    case message => log.info(s"received: $message")
  }
}


위의 캡처처럼 출력이 될 것이다. 이때 눈에 띄는 점은 TaxRecordInvoiceRecord순서가 변하지 않았다는 점이다. 이는 저장되는 이벤트 역시 메시지로 취급되기 때문이다.

또한, 내부에 persist가 있다고 해도 큰 블럭 단위의 persist가 순차적으로 진행된 후 내부 persist가 순차적으로 진행된다.

  override def receiveCommand: Receive = {
    case Invoice(recipient, date, amount) =>
      val taxRecord = TaxRecord(taxId, latestTaxRecordId, date, amount/3)
      val invoiceRecord = InvoiceRecord(latestInvoiceRecordId , recipient, date, amount)

      persist(taxRecord) { record =>
        taxAuthority ! record
        latestTaxRecordId += 1

        persist("I hereby declare this tax record to be true and complete."){
          declaration =>
            taxAuthority ! declaration
        }
        
      }

      persist(invoiceRecord){ record =>
        taxAuthority ! record
        latestInvoiceRecordId += 1

        persist("I hereby declare this invoice record to be true and complete.") {
          declaration =>
            taxAuthority ! declaration
        }
        
      }
  }

즉 위와 같이 persist가 겹쳐있어도 실행결과는 다음과 같다.

즉 큰 블럭들의 Persist가 순차적으로 진행된 후 내부 블럭의 persist가 순차적으로 진행됨을 알 수 있다.

0개의 댓글