[Akka] ask pattern

smlee·2023년 10월 13일
0

Akka

목록 보기
34/50
post-thumbnail

다른 액터에게 request를 보내고, 해당 액터로부터 response를 받아야 하는 경우가 많다. 하지면 이 경우에 tell을 사용할 경우 액터의 캡슐화를 깨뜨릴 수 있다. 따라서 비동기적으로 일들을 처리하는 방법에는 ask를 사용하는 방법이 있다.

이를 학습하기 위해 예제로 인증을 위한 액터를 엄청 간단하게 짜보려고 한다.


예제에는 크게 3가지 액터가 사용될 것이다. usernamepassword를 key-value 형태로 저장하는 UserAuthActor, 유저가 username과 password를 입력하면 올바른 정보인지 여부를 가려내는 AuthManager, 여기에 AuthManager와 같은 역할을 하지만 pipe pattern을 사용하는 PipedAuthManager를 구현할 것이다.

// UserAuthActor.scala

object UserAuthActor {
	case class Read(username:String)
	case class Write(username:String, password: String)
}

class UserAuthActor extends Actor with ActorLogging{
	import UserAuthActor._
    
    override def receive: Receive = online(Map())
    
    def online(userInfos:Map[String, String]): Receive = {
    	case Read(username) =>
          log.info(s"${username}에 대한 password를 읽습니다...")
          sender() ! userInfos.get(username)
        case Write(username, password) =>
        	log.info(s"${username}과 그의 비밀번호 ${password}를 등록합니다...")
        	context.become(online(userInfo + (username -> password)))
    }
}

위는 유저의 아이디와 비밀번호를 저장하고 읽는 액터이다. 물론 실제로는 복잡한 보안 과정을 거치고, 로그에 등록하려는 username과 password를 보여주진 않을 것이지만 예시를 위한 것이므로 간단하게 나타내었다.

// AuthManager.scala

import akka.pattern.ask
import scala.concurrent.duration._

object AuthManager {
	case class RegisterUser(username:String, password:String)
    case class Authenticate(username:String, password:String)
	
	case class AuthFailure(message:String)
    case object AuthSuccess
    
    val AUTH_FAILURE_USERNAME_NOT_FOUND = "없는 유저명입니다."
    val AUTH_FAILURE_PASSWORD_INCORRECT = "올바르지 않은 비밀번호입니다."
    val AUTH_FAILURE_SYSTEM_ERROR = "시스템 에러"
}

class AuthManager extends Actor with ActorLogging {
	import AuthManager._
    import UserAuthActor._
    
    protected userDB = context.actorOf(Props[UserAuthActor])
    
    implicit val timeout:Timeout = Timeout(1 second)
    implicit val executionContext: ExecutionContext = context.dispatcher
    
    override def receive:Receive = {
    	case RegisterUser(username, password) =>
        	log.info(s"${username}님을 등록합니다.")
            userDB ! Write(username, password)
            
        case Authenticate(username, password) =>
        	val originalSender = sender()
        	
        	log.info(s"${username}님의 비밀번호를 확인합니다.")
            val future = userDB ? Read(username)
            
            future.onComplete {
            	case Success(None) => originalSender ! AuthFailure(AUTH_FAILURE_USERNAME_NOT_FOUND)
                case Success(Some(dbPassword)) => originalSender ! AuthSuccess
                case Failure(_) => originalSender ! AuthFailure(AUTH_FAILURE_SYSTEM_ERROR)
            }
    }
}

위와 같이 Authenticate(username, password) 메시지를 받았을 때 ?(ask)를 사용한 것이 보일 것이다. ask를 사용하기 위해서는 akka.pattern.ask를 임포트시켜야 한다.

ask의 return 타입은 Future[Any]이다. 따라서 요청을 처리하는 것이 비동기적으로 이루어진다. 이때 주의해야 할 점은 sender()를 미리 value로 빼놓았다는 점이다. 이는 sender()가 가장 마지막으로 메시지를 보낸 발신인을 리턴하는데, 비동기적으로 실행하는 동안 sender()가 바뀔 수 있으므로 value로 따로 빼놓는 것이다.

또한, timeoutexecutionContext를 암시적으로 선언했다. 이는 Future의 timeout 시간과 execution context를 위한 것이다.

그리고 onComplete, 즉 Future 작업이 완료되었을 때의 동작을 선언하면 된다. 이때 주의해야 할 점은 콜백 안에서 mutable state에 접근하거나 method를 호출하는 것을 지양해야 한다는 점이다.

위의 코드의 Future 부분을 따로 빼면 다음과 같다.

// AuthManager.scala

import akka.pattern.ask
import scala.concurrent.duration._

object AuthManager {
	case class RegisterUser(username:String, password:String)
    case class Authenticate(username:String, password:String)
	
	case class AuthFailure(message:String)
    case object AuthSuccess
    
    val AUTH_FAILURE_USERNAME_NOT_FOUND = "없는 유저명입니다."
    val AUTH_FAILURE_PASSWORD_INCORRECT = "올바르지 않은 비밀번호입니다."
    val AUTH_FAILURE_SYSTEM_ERROR = "시스템 에러"
}

class AuthManager extends Actor with ActorLogging {
	import AuthManager._
    import UserAuthActor._
    
    protected userDB = context.actorOf(Props[UserAuthActor])
    
    implicit val timeout:Timeout = Timeout(1 second)
    implicit val executionContext: ExecutionContext = context.dispatcher
    
    override def receive:Receive = {
    	case RegisterUser(username, password) =>
        	log.info(s"${username}님을 등록합니다.")
            userDB ! Write(username, password)
            
        case Authenticate(username, password) => handleAuthentication(username, password)
            }
            
    def handleAuthentication(username:String, password:String) = {
    	val originalSender = sender()
        	
        	log.info(s"${username}님의 비밀번호를 확인합니다.")
            val future = userDB ? Read(username)
            
            future.onComplete {
            	case Success(None) => originalSender ! AuthFailure(AUTH_FAILURE_USERNAME_NOT_FOUND)
                case Success(Some(dbPassword)) => originalSender ! AuthSuccess
                case Failure(_) => originalSender ! AuthFailure(AUTH_FAILURE_SYSTEM_ERROR)
    } 
    }
}

위의 handleAuthentication 부분에서 pipe pattern을 적용할 수도 있다.

class PipedAuthManager extends AuthManager {

  import AuthManager._

  override def handleAuthentication(username: String, password: String): Unit = {
    val future = userDB ? Read(username) 
    val passwordFuture = future.mapTo[Option[String]] 
    val responseFuture = passwordFuture.map{
      case None => AuthFailure(AUTH_FAILURE_USERNAME_NOT_FOUND)
      case Some(dbPassword) =>
        if(dbPassword == password) AuthSuccess
        else AuthFailure(AUTH_FAILURE_PASSWORD_INCORRECT)
      
    } 

    responseFuture.pipeTo(sender()) 
  }
}

위의 PipedAuthManagerhandleAuthentication만을 오버라이드하여 변경했다.

future까지는 같다. 이때, future의 데이터타입은 Future[Any]이다. 이때, passwordFuture에서 mapTo를 통해 Future[Option[String]]으로 타입을 명시해준다. 그리고 String을 받아왔으면 데이터에 대해 로직을 수행한다. 그 후 pipeTo 메서드를 통해 sender()를 사용하면 된다.

0개의 댓글