WebClient에 ProgressListner를 달 수는 없을까..?

공부는 혼자하는 거·2023년 5월 9일
0

Spring Tip

목록 보기
38/52

상황

상황은 요렇다. 필자는 서버(SpringBoot) httpClient로 WebClient를 사용하고 있는데, 필자의 서버에는 파일을 받아서 마스터링해서 응답해주는 기능이 있는데, 이 과정에서 외부 API를 호출하는 코드가 여러군데 존재했다. 그 중 특정 외부 API를 호출하는 함수를 시행하는 데 시간이 평균적(100MB 기준) 으로 4 ~ 6 초가 소모된다. 파일을 넘겨주고 마스터링한 파일을 응답받는 API로 대략적인 구조는 Client => My Server => 외부 Server => My Server => Client 로 이어지는 순서였다.

프론트단은 Vue.js를 사용했다. Axios를 이용해서 나의 서버를 호출하는데, 사용자에게 직접적으로 노출되는 페이지에서의 상호작용이었고, 동기적으로 처리하기에는 시간이 너무 오래 걸리기 때문에, 대부분의 서버로직은 비동기로 이루어졌다. 비동기 로직이므로, 이 부분에서 내 서버는 프론트 사이드에게 마스터링 진행률을 notify 해줘야 되는 임무가 주어졌다.

문제

서버에서 전송해주는 것은 SSE Protocol를 사용했다. 웹 페이지에서 버튼을 누르면 이벤트소스 Connection을 열고, 종료 flag를 주면 닫는 식으로 구현을 하였다. 문제는 외부 API는 내가 컨트롤할 수 있는 요소가 아니라는 거다. 따라서 나도 마스터링이 언제 끝날지 정확한 시점과 진행률을 알 수가 없는 부분이었다. 처음에는 단순하게 API를 호출하는 함수 단위로 진행률을 프론트 사이드에게 전송해주려고 했으나, 함수마다 편차가 너무 커고, 비동기 코드가 너무 혼재되어 있어, 이 과정을 매끄럽게 이어주지 못했다. 이후에는, WebClient에도 Filter를 붙이거나 HttpMessageWritter 등을 Custom해 ProgressListner를 달아줄 수 있지 않을까 고민해봤지만, 내 공부가 미천하여, 해결을 보지 못했다.

나름의 해결방안..?

결국 문제는 다수의 비동기 코드가 하나의 로직 안에 혼재되어있고, 그 과정에서 내가 컨트롤할 수 없는 부분의 영역들이 합쳐지면서 진행률을 파악하기가 힘든 부분이었다. 여러 방안을 고민해보다가.. 토탈 로직이 최대 1분을 넘지 않는다는 선에서 찝찝한 해결책을 생각해봤다. 처음 클라이언트 요청을 받자마자, 1초 주기로 SSE를 전송하는 비동기 함수를 시행해, 종료 시점에, 종료 FLAG를 주어서 강제로 100을 만들어 버리는 식이었다. 이 과정에서 request당 종료 FLAG를 어떤 식으로 전달해 줄 수 있을까 고민하다가 (동접을 고려해 전역변수를 쓸 수 없으므로), Redis를 고려해봤으나, 너무 헤비하다고 판단. 각각의 client Emitter들은 내부의 ConcurrentHashMap을 통해 메모리로 괸리하기로 정했다.

만족스러운 해결책은 아니다.. 만약 어떤 상황에서 토탈로직이 100초를 넘어가게 되면, 100프로가 넘었음에도 종료 flag를 클라이언트에게 주지 않을 것이다. 다른 곳에서는 어떤 식으로 진행을 하는지 모르겠다. 사수가 없으니 질문도 못하고.. 진짜 매번 맨땅에 헤딩하는 느낌으로 임시로 문제를 풀어나갔다.

Code

그래서 대략 다음과 같은 코드


class SseEmitterEvent<T>(
    @JsonIgnore
    val id: String,
    @JsonIgnore
    val eventName: SseEventName,
    val data: T,
    var flag: EventFlag = EventFlag.OPEN
) {

    private val log = KotlinLogging.logger {  }
    enum class SseEventName(
        //val dataType: Class<out CommonEvent>
        val dataType: Class<*>
    ) {
         PROGRESS(ObservableProgress::class.java), ORDER(OrderResNotifyDto::class.java)
        ;


        fun getEmitterId(principalDetail: PrincipalDetail?): String {

            val emitterId = if (this == ORDER){
                principalDetail?.userId ?: throw AuthenticationNotFoundException()
            }else{
                principalDetail?.userId ?: ServletUtil.getNonUserEmitterId()
            }
            return emitterId.toString()
        }

    }

    enum class EventFlag{

        CLOSE, OPEN
    }

}



@JsonInclude(JsonInclude.Include.NON_EMPTY)
class ObservableProgress(
    var target: Int = 100,
)  {

    private val value = AtomicInteger(0)

    private val log = KotlinLogging.logger {  }

    fun increment(v: Int = 10, isFinal:Boolean = false): ObservableProgress {

        if (this.getValue() < target) this.value.getAndAdd(v)
        else log.error("Pushing progress limit exceeded")

        if (isFinal){
            val remainAmount = target - this.getValue()
            if (remainAmount > 0) this.value.getAndAdd(remainAmount)
        }

        log.info("Pushing progress: {}", this.toString())

        return this
    }

    fun getValue(): Int {
        return value.get()
    }
    
  }

    @Async
    @EventListener
    fun keepIncreaseProgressUntilFinished(progressStarter: ProgressEvent) {

        val (emitterId, isFinished) = progressStarter

        val key = emitterId.toString()

        val sseEvent = SseEmitterEvent(
            id = key,
            eventName = SseEmitterEvent.SseEventName.PROGRESS,
            data = ObservableProgress()
        )

        HashmapDB.save(key, isFinished)

        println("isFinished: ${isFinished}")
        if (isFinished) {
            sseEvent.data.increment(1, isFinished)
        }

        while (!(HashmapDB.findValueByKey(key) as Boolean)) {
            try {
                val increment = sseEvent.data.increment(1, isFinished)
                if (increment.getValue() == increment.target ) break
                val sendToClientRes = sendToClient(sseEvent)

                if (!sendToClientRes) break
                //eventPublisher.publishEvent(emitterDto)
                Thread.sleep(3000)
                if (isFinished) HashmapDB.removeByKey(key)
            } catch (e: Exception) {
                log.error { e.stackTraceToString() }
            }
        }

    }



    @EventListener
    fun sendToClient(event: SseEmitterEvent<*>): Boolean {

        val emitter = emitterRepository.findById(event.id)

        if (emitter == null) {
            log.error { "could not get emitter maybe order" }
            return false
        } else {
            try {
                emitter.send(
                    SseEmitter.event()
                        .id(event.id)
                        .name(event.eventName.name)
                        .data(event, MediaType.APPLICATION_JSON) //?????
                )

                return true
            } catch (e: IOException) {
                emitterRepository.deleteById(event.id)
                //emitter.completeWithError()
                log.error("An error occurred while emitting progress.", e)
                throw RuntimeException("연결 오류!")
            }
        }

    }


data class ProgressEvent(
    val emitterId: Comparable<*>,
    val isFinished: Boolean = false,
)

//// 대걍

    val emitterId: Comparable<*> = userId ?: ServletUtil.getNonUserEmitterId()
    eventPublisher.publishEvent(ProgressEvent(emitterId = emitterId))
    // 로직 실행, 
    //끝날때쯤
	eventPublisher.publishEvent(ProgressEvent(emitterId = emitterId,  true))


profile
시간대비효율

0개의 댓글