[Akka] Send/Receive/Reply message & Receive timeout

smlee·2023년 8월 23일
0

Akka

목록 보기
5/50
post-thumbnail

message와 immutability

메시지들은 어떠한 종류의 객체도 가능하나, 반드시 불변(immutable)해야 한다. 스칼라는 강제로 immutability를 집행할 수 없으므로 컨벤션으로 immutability를 지킨다.
추천하는 접근 방법은 Scala case class들을 사용하여 immutable하게 만들어 패턴 매칭을 활용하는 것이다.

case class User(name: String)

case class Register(user: User)
val user = User("Mike")

val message = Register(user)

위의 예시 코드처럼 case class를 만들어 immutable하게 사용을 해야한다.

send messages

메시지를 보내는 메서드에는 !?가 있다.

!

이전에 공부 내용들을 정리하면서 해당 메서드를 사용하여 코드를 작성했었다.

// Main.scala

object Main extends App{
	val system = ActorSystem("ping-pong")
    
    val pinger = system.actorOf(Props[PingActor](), "pinger")
    val ponger = system.actorOf(classOf(Props[PongActor], pinger), "ponger")
    
    system.scheduler.scheduleOnce(500 millis) {
    	ponger ! Ping
    }
}

위의 Main.scala에서 스케줄링을 시작할 때 ponger에게 Ping이라는 메시지를 !를 통해 전달하였다.
!의 의미는 메시지를 보낸 후 잊는다(fire-and-forget)는 뜻이다. 즉, 메시지를 비동기적으로 보내고 다시 돌아온다는 의미이다. tell이라고도 알려져 있다.

Tell : fire-and-forget
!, 즉 tell은 메시지를 전송할 때 선호되는 방식이다. 메시지를 처리하느라 블록킹을 하지 않기 때문이다. 이러한 tell은 확장성과 동시성에 가장 좋은 특징들을 제공해준다.
ActorRef ! message를 하면, Actor로부터 발생하여 Actor reference를 메시지와 함께 보낸다. 따라서 sender():ActorRef가 동반된다. 따라서 메시지 수신인이 발신자에게 답변을 보낼 때 sender() ! replyMsg와 같은 형태로 응답할 수 있다.
만약, Actor가 아닌 것에서부터 메시지가 발생한다면 deadLetters라는 Actor reference를 디폴트로 제공한다.

?

? 역시 메시지를 보내는 메서드이다. 이 메서드 역시 비동기적으로 메시지를 보내고, 가능한 답변을 나타내는 Future를 리턴한다. 이러한 ? 메서드는 ask라고도 알려져 있다.

import akka.pattern.{ ask, pipe }
import system.dispatcher
final case class Result(x: Int, s: String, d: Double)
case object Request

implicit val timeout: Timeout = 5.seconds 

val f: Future[Result] =
  for {
    x <- ask(actorA, Request).mapTo[Int] 
    s <- actorB.ask(Request).mapTo[String] 
    d <- (actorC ? Request).mapTo[Double] 
  } yield Result(x, s, d)

f.pipeTo(actorD) 
pipe(f) to actorD

위에서 결과값이 Future 타입인 것을 확인할 수 있다.

Ask: Send-And-Receive-Future
ask 패턴은 액터 뿐만 아니라 futures를 가지고 있다. 따라서, ActorRef보다는 패턴을 사용하는 것이 권장된다. 위의 코드에서 pipeTo라는 패턴을 통해 사용한 것처럼 말이다. 위의 코드는 non-blocking asynchronous code이다. ask가 Future을 만들고, x, s, d라는 3가지 요소가 새로운 future를 만들어 내기 때문이다.

권장되는 방식

ask를 사용하는 방식은 타임 아웃이 되었을 때 tell에 비해 작업해야 하는 양이 많다. 따라서 꼭 필요한 경우를 제외하고서는 tell(!)을 사용하는 것이 권장된다.

Receive Messages

앞쪽에서 정리한 액터 코드를 다시 한 번 가져와서 확인해보면

// PingActor.scala

package actors

import akka.actor.{Actor, ActorRef, PoisonPill, Props}
import akka.event.Logging

case object Pong
case object Ping

class PingActor extends Actor{

  private val log = Logging(context.system, this)
  private var countDown:Int = 100

  override def receive: Receive = {
    case Pong =>
      log.info(s"${self.path} received pong, count down $countDown")

      if (countDown > 0) {
        countDown -= 1
        sender() ! Ping
      } else {
        sender() ! PoisonPill
        self ! PoisonPill
      }

  }
}
// PongActor.scala
package actors

import akka.actor.{Actor, ActorRef}
import akka.event.Logging

class PongActor(pinger: ActorRef) extends Actor{
    private val log = Logging(context.system, this)

    override def receive: Receive = {
      case Ping =>
        log.info(s"${self.path} received ping")
        pinger ! Pong
  }
}

위의 액터들의 공통점은 모두 Actor라는 trait를 확장했다는 점과 receive 메서드를 오버라이드 해서 재정의했다는 점이다. 실제로, Actor trait를 상속하면 receive 메서드를 반드시 구현해야한다.
이때, 이러한 receive 메서드는 받은 메시지를 어떻게 처리하는지 다루는 블럭이다.
Receive라는 PartialFunction을 리턴하므로 패턴매칭을 통해 메시지를 분류해서 어떻게 반응할지 작성할 수 있다.

Reply to Message

만약 메시지에 대한 답을 보내고 싶으면 sender 메서드를 통해 메시지의 발신인을 ActorRef 형태로 가져오고. ! 메서드를 통해 메시지를 보내면 된다.

sender() ! replyMsg

형태의 코드를 작성하면 답장을 보내는 코드이다.

Receive time out

Receive time out 메시지가 전송된 후 inactivity time out 되는 것을 뜻한다. 자세하게 말하면, akka.actor.ReceiveTimeout 메시지로 핸들하는 것이다. 타임아웃 설정시간 중 최소시간은 1millisecond이다.

이때 주의해야할 점은 receive time out이 발생하고 ReceiveTimeout을 enqueue되는 시점은 다른 메시지가 enqueue되는 시점이다. 그러므로 receive time out에 대한 reception을 보장하지 않는다.

import akka.actor.ReceiveTimeout
import scala.concurrent.duration._
class MyActor extends Actor {
  // To set an initial delay
  context.setReceiveTimeout(30 milliseconds)
  def receive = {
    case "Hello" =>
      // To set in a response to a message
      context.setReceiveTimeout(100 milliseconds)
    case ReceiveTimeout =>
      // To turn it off
      context.setReceiveTimeout(Duration.Undefined)
      throw new RuntimeException("Receive timed out")
  }
}

위와 같이 사용할 수 있다. 만약, receive timeout에 관한 알림을 받고 싶지 않다면, cancelReceiveTimeout을 사용하면 된다. 그리고, NotInfluenceReceiveTimeout으로 마킹된 메시지들은 타이머를 리셋시키지 않는다.

📚 Reference

0개의 댓글