Event Adapter는 Schema가 변경되었을 때 자동으로 변환시켜주는 어댑터이다.
Event Adapter는 다음과 같은 순서로 동작한다
actor -> WriteEventAdapter -> Serializer -> journal -> ReadEventAdapter -> actor
로 기존 스키마를 바뀐 스키마 형태로 바꾼다.
예시를 보며 Event Adapter의 용례를 알아볼 예정이다.
어쿠스틱 기타 재고를 관리하는 InventoryManager
Actor가 있다고 생각하자. Guitar
는 기타의 데이터를 가지고 있으며, AddGuitar
라는 command를 통해 메시지를 받아들이고, GuitarAdded
라는 레코드를 사용하여 복구한다.
object InventoryManager {
case class Guitar(id: String, model: String, make: String)
case class AddGuitar(guitar: Guitar, quantity: Int)
case class GuitarAdded(guitarId: String, guitarModel: String, guitarMake: String, quantity: Int)
}
class InventoryManager extends PersistentActor with ActorLogging {
import InventoryManager._
val inventory: mutable.Map[Guitar, Int] = new mutable.HashMap[Guitar, Int]()
override def persistenceId: String = "guitar-inventory-manager"
override def receiveCommand: Receive = {
case AddGuitar(guitar @ Guitar(id, model, make), quantity) =>
val record = GuitarAdded(id, model, make, quantity)
persist(record){ _ =>
addGuitar(guitar, quantity)
log.info(s"Added $quantity x $guitar to Inventory")
}
case "print" =>
log.info(s"Current inventory is: $inventory")
}
private def addGuitar(guitar: Guitar, quantity:Int) = {
val existingQuantity = getGuitarQuantity(guitar)
inventory.put(guitar, quantity + existingQuantity)
}
private def getGuitarQuantity(guitar: Guitar): Int = inventory.getOrElse(guitar, 0)
override def receiveRecover: Receive = {
case event @ GuitarAdded(id, model, make, quantity) =>
log.info(s"Recovered: $event")
val currentGuitar = Guitar(guitarId, guitarModel, guitarMake)
addGuitar(currentGuitar, quantity)
}
}
위와 같이 간단한 PersistentActor를 작성한다. 어쿠스틱 기타 재고가 들어왔다는 command가 들어오면 재고 수량을 올리고, 이를 이벤트로 저장하는 간단한 액터이다. 이 액터를 테스트하기 위해 다음과 같은 코드를 작성하고 실행해본다.
val system = ActorSystem("guitarInventorySystem")
val actor = system.actorOf(Props[InventoryManager], "simpleGuitarInventoryManager")
val guitars = (1 to 10).map(num => Guitar(s"guitar$num", s"scala $num", "StudyingAkka"))
guitars.foreach(guitar => actor ! AddGuitar(guitar, 5))
위와 같이 10개의 기타 종류를 선언하고 각 종류마다 5개의 재고를 추가하는 코드를 작성하면 결과는 다음과 같다.
어쿠스틱 기타가 종류별로 5개 씩 정상적으로 추가된 것이 확인된다. 이 데이터들이 제대로 복구되는지를 확인하기 위하여 foreach
문을 주석처리하고 재실행한다.
제대로 복구가 되는 것을 알 수 있다.
만약 해당 프로젝트가 성공적이어서 어쿠스틱 기타 뿐만 아니라 일렉트릭 기타 역시 취급하게 되었다고 가정해보자. 이때, 기존에 쌓여있던 데이터들은 유지한 채, 데이터 타입에 기타 종류만 포함시켜야 한다. 따라서 추가된 데이터 타입을 위해 Guitar
에 타입란을 추가하고, Event는 새로운 객체를 만들어 GuitarAddedV2
를 만든다.
변경된 부분만 보면 다음과 같다.
object InventoryManager {
val ACOUSTIC = "acoustic"
val ELECTRIC = "electric"
case class Guitar(id: String, model: String, make: String, guitarType: String = ACOUSTIC)
case class GuitarAddedV2(guitarId: String, guitarModel: String, guitarMake: String, quantity: Int, guitarType: String = ACOUSTIC)
}
class InventoryManager extends PersistentActor with ActorLogging{
override def receiveRecover: Receive = {
case event @ GuitarAddedV2(guitarId, guitarModel, guitarMake, quantity, guitarType) =>
log.info(s"Recovered: $event")
val currentGuitar = Guitar(guitarId, guitarModel, guitarMake, guitarType)
addGuitar(currentGuitar, quantity)
}
override def receiveCommand: Receive = {
case AddGuitar(guitar @ Guitar(id, model, make, guitarType), quantity) =>
val record = GuitarAddedV2(id, model, make, quantity, guitarType)
persist(record){ _ =>
addGuitar(guitar, quantity)
log.info(s"Added $quantity x $guitar to Inventory")
}
}
}
위처럼 Actor가 변경하였다. 그리고 main 역시 일부만 변경한다.
val guitars = (1 to 10).map(num =>
if(num%2 == 0)
Guitar(s"guitar$num", s"scala $num", "StudyingAkka", ELECTRIC)
else
Guitar(s"guitar$num", s"scala $num", "StudyingAkka", ACOUSTIC)
)
위와 같이 변경하였으니, 실행하면 dead letter
가 생긴다.
그 이유는 스키마가 변했으므로 해당 스키마를 처리할 수 없는 것이다. 이를 위해 EventAdapter
를 사용한다.
EventAdapter를 사용하기 위해서는 config에 어떤 어댑터를 사용할지 등록해준다.
eventAdapters{
akka.persistence.journal.plugin = "cassandra-journal"
akka.persistence.snapshot-store.plugin = "cassandra-snapshot-store"
cassandra-journal {
event-adapters {
guitar-inventory-enhancer = "part4_practices.GuitarReadEventAdapter"
}
}
event-adapter-bindings {
"part4_practices.GuitarReadEventAdapter" = guitar-inventory-enhancer
}
}
cassandra-journal
에 event-adapters
를 등록하며, event-adapter-bindings
에 어댑터명과 path를 바꾸어서 작성해준다.
class GuitarReadEventAdapter extends ReadEventAdapter{
import part4_practices.actors.InventoryManager._
override def fromJournal(event: Any, manifest: String): EventSeq = event match {
case GuitarAdded(guitarId, guitarModel, guitarMake, quantity) =>
EventSeq.single(GuitarAddedV2(guitarId, guitarModel, guitarMake, quantity, ACOUSTIC))
case other => EventSeq.single(other)
}
}
ReadEventAdapter
trait를 상속하여 fromJournal
을 통해 바꿀 형태로 변환한 다음 EventSeq.single(바꿀 타입)
을 리턴하도록 한다.
main의 ActorSystem에서 해당 config를 가져온다.
val system = ActorSystem("guitarInventorySystem", ConfigFactory.load().getConfig("eventAdapters"))