Spring Webflux + Kotlin + Coroutine Actor를 이용한 Websocket Server 만들기

백근영·2021년 7월 15일
2

practice

목록 보기
18/20

Websocket이란?

Wiki

웹소켓(WebSocket)은 하나의 TCP 접속에 전이중 통신 채널을 제공하는 컴퓨터 통신 프로토콜이다. 웹소켓 프로토콜은 2011년 IETF에 의해 RFC 6455로 표준화되었으며 웹 IDL의 웹소켓 API는 W3C에 의해 표준화되고 있다.

웹소켓은 HTTP와 구별된다. 두 프로토콜 모두 OSI 모델의 제7계층에 위치해 있으며 제4계층의 TCP에 의존한다. 이들에 차이가 있으나 "RFC 6455"에 따르면 웹소켓은 HTTP 포트 80과 443 위에 동작하도록 설계되었으며 HTTP 프록시 및 중간 층을 지원하도록 설계되었으므로 HTTP 프로토콜과 호환이 된다. 호환을 달성하기 위해 웹소켓 핸드셰이크는 HTTP 업그레이드 헤더를 사용하여 HTTP 프로토콜에서 웹소켓 프로토콜로 변경한다.

웹소켓 프로토콜은 HTTP 풀링과 같은 반이중방식에 비해 더 낮은 부하를 사용하여 웹 브라우저(또는 다른 클라이언트 애플리케이션)과 웹 서버 간의 통신을 가능케 하며, 서버와의 실시간 데이터 전송을 용이케 한다. 이는 먼저 클라이언트에 의해 요청을 받는 방식이 아닌, 서버가 내용을 클라이언트에 보내는 표준화된 방식을 제공함으로써, 또 연결이 유지된 상태에서 메시지들을 오갈 수 있게 허용함으로써 가능하게 되었다. 이러한 방식으로 양방향 대화 방식은 클라이언트와 서버 간에 발생할 수 있다. 통신은 TCP 포트 80(TLS 암호화 연결의 경우 443)를 통해 수행되며 방화벽을 통해 웹이 아닌 인터넷 연결을 차단하는 일부 환경에 도움이 된다. 단순 양방향 브라우저-서버 통신은 코멧 등의 스톱갭(stopgap) 기술을 사용하는 비표준 방식으로 수행된다.

구글 크롬, 마이크로소프트 에지, 인터넷 익스플로러, 파이어폭스, 사파리, 오페라 등 대부분의 브라우저가 이 프로토콜을 지원한다.

어렵다... 요약하면?
1. TCP에 의존하고, HTTP 포트 80과 443 위에서 동작한다.
2. HTTP 업그레이드 헤더를 사용하여 HTTP 프로토콜에서 웹소켓 프로토콜로 전환할 수 있다.
3. 한번의 TCP 커넥션으로 클라이언트/서버간 실시간 쌍방향 통신을 지원한다.

언제 Websocket을 쓰면 좋을까?

  • 빠른 반응 시간이 필요할 때
  • 자원의 상태가 실시간으로 바뀌어야 할 때
  • 작은 크기의 payload를 빈번하게 주고받아야 할 때

사용예시)

  • 페이스북과 같은 SNS APP
  • LOL 같은 멀티플레이어 Game
  • 위치 기반 APP
  • 증권 거래 정보 사이트 및 APP
  • 화상 채팅 APP
  • 구글 Doc 같이 여러 명이 동시 접속해서 수정할 수 있는 Tool

Spring Webflux - websocket support

한마디로 웹소켓은 단일 TCP 커넥션 위에서 이루어지는 양방향 스트리밍 프로토콜이다.
Reactive Stream API 구현체인 Reactor와 본질적으로 잘어울리고, Spring Webflux에서도 역시 websocket 관련 지원을 잘 해주고 있다.

@Configuration
class WebConfig {

  @Bean
  fun handlerMapping(): HandlerMapping {
      val map = mapOf("/path" to MyWebSocketHandler())
      val order = -1 // before annotated controllers

      return SimpleUrlHandlerMapping(map, order)
  }

  @Bean
  fun handlerAdapter() =  WebSocketHandlerAdapter()
}


class MyWebSocketHandler : WebSocketHandler {

  override fun handle(session: WebSocketSession): Mono<Void> {
      // ...
  }
}

WebsocketSession

  • session.receive(): Flux<WebsocketMessage>: client로부터 message를 받음, client가 websocket connection을 끊으면 Flux에 complete signal이 날라온다.

  • session.send(message: Publisher<WebsocketMessage>): Mono<Void>: client로 message를 보냄.

WebSocketHandler 작성 예시

  1. inbound와 outbound stream을 하나의 stream으로 연결시켜 처리하는 예제
class ExampleHandler : WebSocketHandler {

  override fun handle(session: WebSocketSession): Mono<Void> {

      val output = session.receive()                   
              .doOnNext {
                  // ...
              }
              .concatMap {
                  // ...
              }
              .map { session.textMessage("Echo $it") } 

      return session.send(output)                      
  }
}
  1. inbound, outbound stream을 독립적으로 처리하고 완료됐을때 하나로 합치는 예제
class ExampleHandler : WebSocketHandler {

  override fun handle(session: WebSocketSession): Mono<Void> {

      val input = session.receive()                               
              .doOnNext {
                  // ...
              }
              .concatMap {
                  // ...
              }
              .then()

      val source: Flux<String> = ...
      val output = session.send(source.map(session::textMessage)) 

      return Mono.zip(input, output).then()                       
  }
}

Actor Model이란?

Wiki

행위자 모델 또는 액터 모델(actor model)은 컴퓨터 과학에서 행위자를 병행 연산의 범용적 기본 단위로 취급하는 병행 컴퓨팅의 수학적 모델이다. 행위자가 받는 메시지에 대응하여, 행위자는 자체적인 결정을 하고 더 많은 행위자를 만들며, 더 많은 메시지를 보내고, 다음에 받을 메시지에 대한 응답 행위를 결정할 수 있다. 행위자는 개인 상태를 수정할 수 있지만, 메시지를 통해서만 서로에게 영향을 줄 수 있다. (락의 필요성을 제거함)

  • Actor Model 패턴에서 Actor 개념과 객체지향 언어에서의 객체 개념은 상당히 유사하다. 단지 다른 점이 있다면 객체지향에서의 메소드 호출은 메소드가 모두 실행될 때까지 기다려야 하는 동기식이다. 반면 Actor Model 패턴에서의 다른 Actor에 대한 메시지 전송은 기본적으로 비동기식이다.
  • 이 비동기식 메시지 전송을 지원하기 위해서는 Actor들이 모두 Active 객체, 즉 쓰레드 기반으로 동작하는 객체여야 하고, 메시지의 전송/수신에 대한 동기화 관리가 이루어 져야 한다.
  • 동기화 부분은 Actor 내부에 있는 Mailbox라는 객체를 통해서 해결되기 때문에 Actor들을 구현하는데 있어서는 동기화에 대한 고려를 전혀 하지 않아도 된다.
  • akka 프레임워크가 대표적이며, Go 언어에도 시범적으로 구현된 바가 있다.

역시 어렵다 .. 간단하게 요약해보면

  • 기본적으로 병렬 컴퓨팅을 위한 모델
  • 서로 다른 두 actor는 message를 기반으로 비동기적 통신을 함
  • 각각의 actor가 가지는 메모리 공간은 독립적이며 actor는 message를 통해서만 다른 actor와 통신할 수 있다.
  • actor는 Mailbox를 가지고 있어 동시에 다수의 message가 들어와서 발생할 수 있는 동기화 문제를 해결하고 있다.
  • 따라서 개발자는 동기화 문제로부터 한층 자유로운 상태에서 병렬 프로그래밍을 할 수 있다.

적절한 비유)

실제 사람과의 커뮤니케이션을 상상하면 좀 더 이해하기 편합니다. 사람들은 초능력이 존재하지 않기에 타인과 머릿속 생각을 직접 공유하지 못하고, 대화(message)를 통해 생각을 주고받습니다.
액터 또한 똑같습니다. 메세지를 주고받아 다른 액터와 상호작용을 합니다. 액터가 차지하는 메모리 공간은 독립적이며, 다른 스레드나 액터가 접근할 수 없습니다. 다시 말하면, 메모리 공유 없이 메세지 전달만을 사용하기에 공유 메모리로 인한 교착 상태 등의 골치 아픈 상황들을 피할 수 있습니다.

Websocket과 Actor model

Websocket 통신은 stateful하고, 실시간으로 상태의 업데이트가 빈번하게 일어나기 때문에 공유 자원의 동기화 문제를 필수적으로 신경써야 한다. Actor model은 동기화 문제를 알아서 해결해주고 있기 때문에 websocket server를 개발할 때 actor model을 기반으로 아키텍쳐를 구성하는 것이 적절한 선택지 중 하나가 될 수 있다.

Coroutine Actor

sealed class CounterMsg
object IncCounter : CounterMsg()
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg()

@ObsoleteCoroutinesApi
fun CoroutineScope.counterActor() = actor<CounterMsg> {
    var counter = 0 // actor state
    for (msg in channel) {
        when (msg) {
            is IncCounter -> counter++
            is GetCounter -> msg.response.complete(counter)
        }
    }
}

fun main() {
	runBlocking {
    	val counter = counterActor()
        counter.send(IncCounter) // fire and forget (tell)
        
        val response = CompletableDeferred<Int>()
        counter.send(GetCounter(response))
        
        val result = response.await() // CompletableDeferred를 이용한 질의(ask)
    }
}

Spring Webflux + Kotlin + Coroutine Actor를 이용해 Chatting Server 만들기

architecture

  • userActor: 사용자에 대응되는 액터. handle 함수로부터 받은 IncomingMessage를 roomActor에게 보내고, roomActor로부터 받은 OutgoingMessage를 routeActor로 보내는 역할을 한다.
  • roomActor: 사용자들이 모여있는 채팅방에 대응되는 액터. user actor list를 state로써 관리하고, 임의의 한 userActor로부터 받은 message를 채팅방에 속한 모든 userActor에게 브로드캐스팅한다.
  • routeActor: websocketSession을 state로 들고있고, userActor로부터 OutgoingMessage를 받아 session.send를 이용해 클라이언트로 메세지를 보내는 액터

handle 함수 하나가 곧 하나의 TCP connection에 대응됨.

WebsocketHandler

@InternalCoroutinesApi
class WSHandler: WebSocketHandler {
    override fun handle(session: WebSocketSession): Mono<Void> =
        mono {
            handleSuspended(session)
        }
            .then()

    private suspend fun handleSuspended(session: WebSocketSession) {
        val params = parseQueryString(session.handshakeInfo.uri)
        val roomId = params["id"]!!.toInt()
        val username = params["name"] ?: "anonymous"

        val roomActor = Rooms.findOrCreate(roomId)
        val userActor = userActor(roomActor)

        val routeActor = routeActor(session)

        val connectedMsg = Connected(
            routeActor = routeActor,
            username = username
        )

        userActor.send(connectedMsg)

        session.receive()
            .log()
            .map { it.retain() }
            .asFlow() // coroutine 사용을 위해 Flux를 Flow로 변환
            .onCompletion { userActor.send(Completed) } // client가 TCP connection을 끊었을 경우 
            .collect { // Flux.flatMap과 유사
                val userIncomingMessage = UserIncomingMessage(username, it.payloadAsText)

                userActor.send(userIncomingMessage)
            }
    }
}

userActor

fun userActor(roomActor: SendChannel<RoomActorMsg>) = CoroutineScope(Dispatchers.Default).actor<UserActorMsg> {
    lateinit var routeActor: SendChannel<UserOutgoingMessage>
    lateinit var username: String
    val roomActor = roomActor

    for (msg in channel) {
        when (msg) {
            is Connected -> {
                roomActor.send(Join(msg.username, this.channel)) // roomActor는 userActor에 대한 참조를 들고있어야 하므로 메세지에 채널을 담아보냄
                routeActor = msg.routeActor
                username = msg.username
            }
            is Completed -> {
                roomActor.send(Terminated(username))
            }
            is UserIncomingMessage -> {
                roomActor.send(IncomingMessage(msg.username, msg.message))
            }
            is UserOutgoingMessage -> {
                routeActor.send(msg)
            }
        }
    }
}

roomActor

fun roomActor(roomId: Int) = CoroutineScope(Dispatchers.Default).actor<RoomActorMsg> {
    val log = LoggerFactory.getLogger("roomActorLogger")

    val users = ConcurrentHashMap<String, SendChannel<UserActorMsg>>()

    suspend fun broadCast(outgoingMessage: UserOutgoingMessage) = users.values.forEach {
        it.send(outgoingMessage)
    }

    for (msg in channel) {
        when (msg) {
            is Join -> {
                users[msg.username] = msg.channel
                broadCast(UserOutgoingMessage("admin", "${msg.username} joined."))
                log.info("${msg.username} joined room $roomId, current user list: ${users.keys}")
            }
            is IncomingMessage -> {
                broadCast(UserOutgoingMessage(msg.username, msg.message))
                log.info("${msg.username} sent message: ${msg.message}")
            }
            is Terminated -> {
                users.remove(msg.username)
                broadCast(UserOutgoingMessage("admin", "${msg.username} left."))
                log.info("${msg.username} left room $roomId, current user list: ${users.keys}")
            }
        }
    }
}

routeActor

fun routeActor(session: WebSocketSession) = CoroutineScope(Dispatchers.Default).actor<UserOutgoingMessage> {
    val jsonMapper = jsonMapper()

    for (msg in channel) {
        session.send(
            session.textMessage(jsonMapper.writeValueAsString(msg)).toMono()
        ).awaitSingleOrNull()
    }
}
profile
서울대학교 컴퓨터공학부 github.com/BaekGeunYoung

1개의 댓글

comment-user-thumbnail
2021년 9월 9일

혹시 roomActor 에서 ConcurrentHashMap 을 쓰신 이유를 알 수 있을까요?

답글 달기