비효울적인 Blocking 코드 => WebClient 사용으로 개선

공부는 혼자하는 거·2023년 2월 13일
1

Spring Tip

목록 보기
34/52

기존상황

기존에 파일을 업로드하면, 업로드 된 파일을 외부 API 에게 넘겨주고 마스터링된 결과를 받아 반환하는 API 를 만들었다. 문제는 이 과정에서 외부 API를 호출하는 부분이 여러 군데 있었고 합치면 11~12초 정도 걸린다는 점이었다. 새로 리팩터링하면서 이 부분을 비동기 로직으로 개선하고자 했다. 기존 HttpClient는 RestTemplate으로 작성되었다. 원래는 RestTemplate + @Async와 CompletableFuture를 활용해서 개선을 할려고 하였으나, 이 과정에서 제일 큰 시간을 소모하는 부분이 외부 Api를 호출하고 응답받는 부분이라는 걸 인지하고, 이 부분만 교체를 하면 어떨까 싶었다. 이런 고민 와중에서 Spring5에 새로 추가된 webclient를 사용하기로 결정하였다.

WebClient 도입

Webclient는 동기 및 비동기를 모두 지원한다. 사용하기 위해서, Spring WebFlux을 모듈에 추가하였다. 기존 서버는 Spring MVC로 개발이 되어있는 상태인데 WebFlux를 도입해서, 동작하던 코드가 다르게 바뀔 걸 고민하지 않아도 된다. WebMVC와 WebFlux 의존성이 모두 존재하는 경우 기본적으로, WebMVC 기반으로 스프링이 실행된다.

한가지 주의할 점은 Local에서 개발할 때, 나는 M1 맥북을 사용했는데, 이럴 경우 netty 의존성을 사용하기 위해서는 아래와 같이 해야 됨

    // mac silicon only
    // https://github.com/apache/commons-lang/blob/master/src/main/java/org/apache/commons/lang3/SystemUtils.java#L1173
    val isMacOS: Boolean = System.getProperty("os.name").startsWith("Mac OS X")
    val architecture = System.getProperty("os.arch").toLowerCase()
    if (isMacOS && architecture == "aarch64") {
        developmentOnly("io.netty:netty-resolver-dns-native-macos:4.1.68.Final:osx-aarch_64")
    }

    implementation("org.springframework.boot:spring-boot-starter-webflux")

대출 설정파일을 요렇게 구성했다.


@Configuration
class WebClientConfig {

    private val log = KotlinLogging.logger {  }

    @Value("\${mastering.url}")
    private lateinit var masteringUrl:String

    @Value("\${mastering.analyzerUrl}")
    private lateinit var analyzerUrl:String


  
    @Bean(name = ["masteringClient"])
    fun masteringWebClient(): WebClient {
        log.info { "mastring url!!! => $masteringUrl" }

        return WebClient.builder()
            .baseUrl(masteringUrl)
            .filter(logRequest())  // logging the request headers
            .filter(logResponse())  // logging the response headers
            .clientConnector(ReactorClientHttpConnector(createReactorHttpClient()))
            .codecs { configurer ->
                configurer
                    .defaultCodecs()
                    .maxInMemorySize(500 * 1024 * 1024)
            }
            .build()
    }



    @Bean
    fun analyzeWebClient(): WebClient {

        return WebClient.builder()
            .baseUrl(analyzerUrl)
            .filter(logRequest())  // logging the request headers
            .filter(logResponse())  // logging the response headers
            .clientConnector(ReactorClientHttpConnector(createReactorHttpClient()))
            .codecs { configurer ->
                configurer
                    .defaultCodecs()
                    .maxInMemorySize(500 * 1024 * 1024)
            }
            .build()
    }



    private fun logBody(response: ClientResponse): Mono<ClientResponse> {

        return if (response.statusCode() != null && (response.statusCode().is4xxClientError || response.statusCode().is5xxServerError)) {
            response.bodyToMono(String::class.java)
                .flatMap { body ->
                    log.error ("Error Body is {}", body)
                    //Mono.error(MyCustomClientException())
                    Mono.just(response)
                }
        } else {
            Mono.just<ClientResponse>(response)
        }
    }


    fun logRequest(): ExchangeFilterFunction {
        return ExchangeFilterFunction.ofRequestProcessor { clientRequest: ClientRequest ->

            log.info {
                """                                
                =========== Request ===========
                               
                Request: ${clientRequest.method()} : ${clientRequest.url()}
                ${clientRequest.headers().map { it.key + " : " + it.value }}
                                
                =========== Request ===========
               """.trimIndent()
            }
            Mono.just(clientRequest)
        }
    }

    fun logResponse(): ExchangeFilterFunction {
        return ExchangeFilterFunction.ofResponseProcessor { clientResponse: ClientResponse ->
            log.info {
                """                                
                =========== response ===========
               
                Response status: ${clientResponse.statusCode()}
                ${clientResponse.headers().asHttpHeaders().map { it.key + " : " + it.value }}
               
                =========== response ===========
               """.trimIndent()
            }

            logBody(clientResponse)
        }
    }


    fun createReactorHttpClient(): HttpClient {

        val provider: ConnectionProvider = ConnectionProvider.builder("custom-provider")
            .maxConnections(100) //유지할 Connection Pool의 수
            .maxIdleTime(Duration.ofSeconds(120)) //maxIdleTime : 사용하지 않는 상태(idle)의 Connection이 유지되는 시간.
            .maxLifeTime(Duration.ofSeconds(58)) //maxLifeTime : Connection Pool 에서의 최대 수명 시간
            .pendingAcquireTimeout(Duration.ofMillis(5000))  // Connection Pool에서 사용할 수 있는 Connection 이 없을때 (모두 사용중일때) Connection을 얻기 위해 대기하는 시간
            .pendingAcquireMaxCount(-1) // Connection을 얻기 위해 대기하는 최대 수 -1: 무제한
            .evictInBackground(Duration.ofSeconds(30))  //백그라운드에서 만료된 connection을 제거하는 주기
            .lifo() //  마지막에 사용된 커넥션을 재 사용, fifo – 처음 사용된(가장오래된) 커넥션을 재 사용
            .metrics(true) //connection pool 사용 정보를 actuator metric에 노출
            .build()

        return HttpClient.create(provider)
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Duration.ofSeconds(120).toMillis().toInt())
            .responseTimeout(Duration.ofSeconds(30))
            .compress(true)
            .doOnConnected { conn ->
                conn.addHandlerLast(ReadTimeoutHandler(30))
                conn.addHandlerLast(WriteTimeoutHandler(30))
            }
    }



}

@Service
class MasteringService(
    private val masteringClient: WebClient,
    private val s3UploaderService: S3UploaderService,
    private val masteringRepository: masteringRepository,
    private val fFmpegService: FFmpegService,
    private val eventPublisher: ApplicationEventPublisher,
    private val analyzeWebClient: WebClient,
    private val emitterRepository: EmitterRepository,
) {

	````

}

참고

https://mangkyu.tistory.com/257

Multipart file 비동기 이슈

기존 Blocking code

   private fun sendToDeepLearningServer(
        file: MultipartFile,
    ): ByteArray {

        var headers = HttpHeaders()

        headers.contentType = MediaType.MULTIPART_FORM_DATA
        headers.accept = mutableListOf(MediaType.ALL)

        val reqBody: MultiValueMap<String, Any> = LinkedMultiValueMap()
        reqBody.add("file", file.resource)

        val requestEntity = HttpEntity(reqBody, headers)
        val response = masteringTemplate.postForEntity("/upload", requestEntity, ByteArray::class.java)

        val responseBody = response.body ?: throw RuntimeException("upload error!")
        return responseBody
    }

새롭게 교체!

 private fun sendToDeepLearningServer(
        file: MultipartFile,
   ): Mono<ByteArray> {

        val reqBody: MultiValueMap<String, Any> = LinkedMultiValueMap()
   
        reqBody.add("file", file.resource)

        val resBody = masteringClient.post().uri("/mastering")
            .contentType(MediaType.MULTIPART_FORM_DATA)
            .accept(MediaType.ALL)
            .bodyValue(reqBody)
            .retrieve()
            .bodyToMono<ByteArray>()
            //.subscribeOn(Schedulers.boundedElastic())

        return resBody
    }

일단 테스트를 위해 단 한개의 api 호출 함수만 비동기로 교체해봤다.

        val monoByteArray = sendToDeepLearningServer(filesDto)
               
        monoByteArray.subscribe { result ->
            filesDto.masteringByte = result
        }

두둥 문제 발생!

역시 한 번에 될 리가 없지.. 문제 원인을 찾아보니, 비동기 로직에서 MultipartFile을 제대로 핸들링하지 못하는 이슈가 있다. 이유는 스프링부트에서 paramter로 multipartfile을 받을 때, 임시로 tomcat 디렉토리 내부에서 파일을 저장한 후에 해당 메소드가 종료되면 사라진다. 따라서 비동기로 넘겨준 코드에서는, 해당 파일을 찾지 못한다는 것. 어쩔 수 없이 파일을 디스크에 새로 쓰기로 하고 삭제하기로 결정했다.

    fun createFileFromMultipartFile(multipartFile: MultipartFile, path: String): File {

        val randomString = RandomStringUtils.random(10, "0123456789abcdef")

        val fileName =
            normalizeNfc("$path$randomString$customSperator${multipartFile.originalFilename}")

        val convertFile = File(fileName)

        if (convertFile.createNewFile()) {
            FileOutputStream(convertFile).use { fos -> fos.write(multipartFile.bytes) }
        }
        log.info { "create tmp file from multipartFile==>${convertFile.absolutePath}" }
        return convertFile
    }

파일을 생성하는 유틸리티 함수를 하나 만들어줬다. 이 과정에서 동일 파일명에 대해 중복충돌이 발생하지 않도록 랜덤한 값을 부여해줬다.

참고

https://thesse.tistory.com/260
https://co-de.tistory.com/32
https://stackoverflow.com/questions/72583738/java-spring-when-temp-multipart-file-gets-deleted
https://stackoverflow.com/questions/36565597/spring-async-file-upload-and-processing

서로 다른 타입을 반환하는 여러 API 호출 결과를 묶기

앞서 말했다시피, 마스터링한 결과를 반환하는 과정에서 여러가지 부가적인 정보를 얻기 위한 API 호출도 해야 한다. 대략적인 흐름을 이야기하자면, 파일 업로드 -> 원본 파일 분석 api 호출결과 DB로 저장 -> 파일 마스터링 API 호출결과 DB로 저장 -> 마스터링된 파일 분석 API 호출결과 DB로 저장 -> 프론트에게 최종본 전달의 흐름이다.

총 3개의 외부 API를 호출하는 코드가 존재한다. 그런데 이중에서, 원본 파일 분석 api 호출과 저장과 파일 마스터링 API 호출은, 병렬적으로 실행해도 아무 문제가 없는 코드다. 따라서 이 부분을 Mono.zip으로 묶어줬다.


    fun processMasteringAPI(
		file: File,
	){
        val resultBucket = EnumMap<MasteringCategory, Any?>(MasteringCategory::class.java)
        val monoByteArray = sendToDeepLearningServer(file)
    	val originAnalyzeMono = analyzeFile(file)

    	Mono.zip(monoByteArray, originAnalyzeMono).map { tuple ->

            resultBucket.put(MasteringCategory.MASTERING, tuple[0])
            resultBucket.put(MasteringCategory.ORIGIN_ANALYZE, tuple[1])
            resultBucket
            //println(tuple)
        }.subscribe{ result ->
    		
            // do something
			val masteringByte = result.get(MasteringCategory.MASTERING) as ByteArray
			
				       
    ........
       

참고

https://xlffm3.github.io/spring%20&%20spring%20boot/webflux-async-nonblocking/

JPA 트랜잭션 비동기 호출 문제

이렇게 해서, 결과를 제대로 반환받는 것 까지 확인 후, DB에 저장하는 코드를 작성했다. 필자는 JPA를 사용했는데, 처음에 메서드가 호출되면 DB에 진행 중인 상태로 데이터를 저장하고, 이후 비동기 호출의 결과를 반환받아서 DB의 정보를 업데이트하는 로직을 구상했는데, 이 과정에서 영속성 컨텍스트의 더티체킹 기능을 이용할려고 했다.

    @Transactional
    fun processMasteringFile(multipartFile: MultipartFile){
    
		val file = FileUtil.createFileFromMultipartFile(multipartFile, FFmpegConfig.savePath)    
        
        val entity = masteringRepository.save(
        	//save progress status entity
        )       
        
		processMasteringAPI(file, entity)       
        return entity    
     }
     
       fun processMasteringAPI(
		file: File,
        entity: Mastering
      ){
          val resultBucket = EnumMap<MasteringCategory, Any?>(MasteringCategory::class.java)
          val monoByteArray = sendToDeepLearningServer(file)
          val originAnalyzeMono = analyzeFile(file)

          Mono.zip(monoByteArray, originAnalyzeMono).map { tuple ->

              resultBucket.put(MasteringCategory.MASTERING, tuple[0])
              resultBucket.put(MasteringCategory.ORIGIN_ANALYZE, tuple[1])
              resultBucket
              //println(tuple)
          }.subscribe{ result ->

              val masteringByte = result.get(MasteringCategory.MASTERING) as ByteArray
				
            //do something
            entity.update(...)
            

            ```

    ........
     
        

결론은 더티체킹이 안 먹혔다. customRepository를 만들어서 update query를 날려보니, 아래와 같이 Transcation이 필요하다는 로그가 나왔다.

영속성 컨텍스트를 사용할 경우, 같은 스레드 내부에서만 먹힌다는 사실.. 이 부분은 JPA를 사용하지 않고, CustomRepository를 만들어 JDBCTemplate을 그대로 사용하는 방식으로 해결을 하였다.


   Mono.zip(monoByteArray, originAnalyzeMono).map { tuple ->

              resultBucket.put(MasteringCategory.MASTERING, tuple[0])
              resultBucket.put(MasteringCategory.ORIGIN_ANALYZE, tuple[1])
              resultBucket
              //println(tuple)
          }.subscribe{ result ->

              val masteringByte = result.get(MasteringCategory.MASTERING) as ByteArray
				
            //do something
            entity.update(...)
            
            //fix
             masteringRepository.updateMasteringStatus(entity.id, ...)

            ```
            
            

@Repository
class MasteringRepositoryImpl(
    private val queryFactory: JPAQueryFactory,
    private val dataSource: DataSource,
) : MasteringRepository {

    private val jdbcTemplate = JdbcTemplate(dataSource)

    override fun updateMasteringStatus(
        masteringId: Long,
		.. 기타 파라미터
    ): Long {

        val count = this.jdbcTemplate.update(
            """
            update 
                mastering.user_mastering
            set
                mastering_status = 'SUCCESS',
                mastering_column = '$기타 파라미터'
            where id = $masteringId   
            """.trimIndent()
        )

        return count.toLong()
    }

참고

https://www.slipp.net/questions/10

https://stackoverflow.com/questions/42002363/dirty-check-isnt-working-with-attached-entity-from-another-thread

https://newwisdom.tistory.com/m/127

Server Sent Event Protocol + Event Listner

이렇게 로직을 비동기적으로 바꾸다보니, 프론트 사이드에게 마스터링 완료 알림을 알려줘야 될 필요성이 생겼다. 이 과정에서 원래는 WebSocket을 사용하는 방향으로 생각했으나, 서버에서 단방향으로 데이터를 쏴주는 형식만 필요했기 때문에, 좀 더 단순하고 효과적인 Server Sent Event Protocol(http protocol 위에서 동작하는 경량 프로토콜) 을 활용하는 방향으로 결과를 정했다.

먼저 프론트 사이드에게 결과진행률을 알려줄 수 있는 Dto을 하나 만들었다.

@JsonInclude(JsonInclude.Include.NON_EMPTY)
class ObservableProgress(
    var target: Int = 100,
)  {
    private val value = AtomicInteger(0)
    private val log = KotlinLogging.logger {  }
    fun increment(v: Int): ObservableProgress {
        log.info("Pushing progress: {}", this.toString())
        if (this.getValue() <= 100) this.value.getAndAdd(v)
        else throw RuntimeException("Pushing progress limit exceeded")
        return this
    }

    fun getValue(): Int {
        return value.get()
    }

    override fun toString(): String {
        return "ObservableProgress(target=$target, value=$value)"
    }
}


class SseEmitterEvent<T>(
    @JsonIgnore
    val id: String,
    @JsonIgnore
    val eventName: SseEventName,
    val data: T,
    var flag: EventFlag = EventFlag.OPEN
) {

    private val log = KotlinLogging.logger {  }
    enum class SseEventName(
        val dataType: Class<*>
    ) {
         PROGRESS(ObservableProgress::class.java),
        ;
    }

    enum class EventFlag{
        CLOSE, OPEN
    }
}

그런 후에 Emitter를 저장하는 매모리 Repository를 생성해주었다.


@Repository
class EmitterRepository {

    private val log = KotlinLogging.logger {  }
    private val emitters = ConcurrentHashMap<String, SseEmitter>()

    fun save(id: String, sseEmitter: SseEmitter): SseEmitter {

        log.info { "SSE stream 접근" }

        try {
            this.emitters.put(id, sseEmitter)
            sseEmitter.onCompletion {
                log.info { "emitter complete" }
                deleteById(id)
            }
            sseEmitter.onTimeout {
                log.info { "emitter timeout" }
                deleteById(id)
            }

            sseEmitter.onError { e: Throwable ->
                log.error { e.stackTraceToString() }
                this.emitters.remove(id)
            }
            return sseEmitter
        }catch (e: Exception){
            log.error { e.stackTraceToString() }
            throw RuntimeException()
        }

    }

    fun deleteById(id: String) {
        this.emitters.remove(id)
    }

    fun findById(id: String): SseEmitter {
        return this.emitters.get(id) ?: throw RuntimeException("Could not get emitter")
    }

}


  @GetMapping(value = ["/subscribe"], produces = ["text/event-stream"])
    fun subscribe(
        @AuthenticationPrincipal principalDetails: PrincipalDetail?,
        @RequestParam eventName: SseEmitterEvent.SseEventName
    ): SseEmitter {

        return sseService.subscribe(principalDetails, eventName)
    }

.....

ServerSentEvent는 단순히 MasteringService 뿐만 아니라, 여러가지 서비스에서 활용하기 위해 별도의 클래스로 분리하면서 의존관계를 느슨하게 하기 위해 Spring Event를 활용했다.


@Service
class SseService(
    private val emitterRepository: EmitterRepository,
    private val om: ObjectMapper
) {

    private val log = KotlinLogging.logger {  }
    
    @EventListener
    fun sendToClient(event: SseEmitterEvent<*>) {

        val emitter = emitterRepository.findById(event.id)

        try {
            emitter.send(
                SseEmitter.event()
                    .id(event.id)
                    .name(event.eventName.name)
                    .data(event, MediaType.APPLICATION_JSON) //?????
            )
        } catch (e: IOException) {
            emitterRepository.deleteById(event.id)
            //emitter.completeWithError()
            log.error("An error occurred while emitting progress.", e)
            throw RuntimeException("연결 오류!")
        }
    }


    fun subscribe(principalDetails: PrincipalDetail?, eventName: SseEmitterEvent.SseEventName): SseEmitter {
        // 리버스 프록시에서의 오동작을 방지
        ServletUtil.getCurrentResponse()?.addHeader("X-Accel-Buffering", "no")

        val userId =
            principalDetails?.userId ?: ServletUtil.getNonUserEmitterId()

        val emitterId = userId.toString()

        val emitterDto = SseEmitterEvent (
            id = emitterId, eventName = eventName,
            data = "EventStream Created. [id=$emitterId]"
        )

        val emitter = emitterRepository.save(emitterId, SseEmitter(60L * 1000 * 60))

        // 503 에러를 방지하기 위한 더미 이벤트 전송
        this.sendToClient(emitterDto)
        return emitter
    }

}

이제 업로딩할 수 있는

 @PostMapping("/upload")
    fun processMasteringFile(
        file: MultipartFile,
        @AuthenticationPrincipal principalDetails: PrincipalDetail?,
    ): SuccessResponse<*> {

        return SuccessResponse(
            ResultCode.OK,
            "upload",
            masteringService.processMasteringFile(file, principalDetails)
        )
    }

....

  	@Transactional
   fun processMasteringFile(multipartFile: MultipartFile, principalDetails: PrincipalDetail?): Any {

        //로그인 여부 판단
        val userId = principalDetails?.userId       
       	val emitterId = userId ?: ServletUtil.getNonUserEmitterId()
        val eventName = SseEmitterEvent.SseEventName.PROGRESS
        
         val emitterDto = SseEmitterEvent(
            id = emitterId.toString(),
            eventName = eventName,
            data = ObservableProgress()
        )
        
		emitterDto.data.increment(10)
        eventPublisher.publishEvent(emitterDto)
        
        
        val file = FileUtil.createFileFromMultipartFile(multipartFile, FFmpegConfig.savePath)    
        
        val entity = masteringRepository.save(
        	//save progress status entity
        )       
        
		processMasteringAPI(file, entity, emitterDto)       
        return entity    
        
     }
        
        
        
       .....
       
           Mono.zip(monoByteArray, originAnalyzeMono).map { tuple ->

              resultBucket.put(MasteringCategory.MASTERING, tuple[0])
              resultBucket.put(MasteringCategory.ORIGIN_ANALYZE, tuple[1])
              resultBucket
              //println(tuple)
          }.subscribe{ result ->

              val masteringByte = result.get(MasteringCategory.MASTERING) as ByteArray
				
            //do something
            entity.update(...)
       
            masteringRepository.updateMasteringStatus(entity.id, ...)

           val newEmitterDto = SseEmitterEvent(
             id = emitterDto.id, eventName = SseEmitterEvent.SseEventName.PROGRESS,
             data = entity.toDto(), flag = SseEmitterEvent.EventFlag.CLOSE
          )

          eventPublisher.publishEvent(newEmitterDto)
          
	   

흐름은 대략적으로 이렇다. 프론트 사이드에서 파일을 업로드하기 전에, 먼저 구독을 한다. 그리고 마스터링 업로드 api를 호출한다. 그러면 클라이언트 사이드에서 보낸 정보를(로그인 없을 시, request header, 했을 시 accesstoken 이용) 사용해서, SseEmitter를 가져온 후 push, 만약 event data의 flag가 ClOSED 라면 Connection close

프론트 사이드

<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8">
    <title>Server-Sent Events Progress Bar Example</title>
    <!-- load the bootstrap stylesheet -->
    <link href="http://netdna.bootstrapcdn.com/bootstrap/3.1.1/css/bootstrap.min.css" media="all" rel="stylesheet" type="text/css" />
    <script  src="http://code.jquery.com/jquery-latest.min.js"></script>
    <style type="text/css">
        .displayNone {display: none;}
    </style>

</head>
<body>
<div class="container" style="padding-top:10px">
    <button id="downloadBt">upload File</button>
    <div id="loading_img" class="displayNone"><img src="loader.gif"></div>
    <div id="logMsg"></div>
</div>

<script>
    var isClosed = true;
    var eventSource;
    var $log;
    $(document).ready(function(){
        if (!window.EventSource) {
            alert('Your browser does not support EventSource.');
            return;
        }

        $('#downloadBt').click(function(){
            eventSource = new EventSource('{{주소}}/subscribe?eventName=PROGRESS');

            eventSource.onopen = function(e) {
                log("Connection is open");
            };

            eventSource.onerror = function(e) {
                if (this.readyState == EventSource.CONNECTING) {
                    log("Connection is interrupted, connecting ...");
                } else {
                    log("Error, state: " + this.readyState);
                }
            };


            //Custom listener
            eventSource.addEventListener("PROGRESS", (e) => {
                console.log("e", e)
                console.log(e.data)

                let jsonData = JSON.parse(e.data);

                if (jsonData.flag == 'CLOSE'){
                    stop();
                    log('다운로드 완료');
                } else {
                    console.log("parse", jsonData)
                    log(jsonData.data.value + '%');

                }
            });

            setTimeout(function(){
                $('#loading_img').removeClass('displayNone');
                startDownload();
            }, 1000);
        });

        $log = $('#logMsg');
    });

    function startDownload(){
        $('#downloadBt').prop('disabled', true);
        $.ajax({
            method: 'POST',
            url: "{{주소}}/upload",
        }).done(function( data ) {
            $('#downloadBt').prop('disabled', false);
        });
    }

    function stop() {
        eventSource.close();

        isClosed = true;
        $('#loading_img').addClass('displayNone');
    }

    function log(msg) {
        $log.html(msg);
    }
</script>

</body>
</html>

파일을 업로드하는 부분과, emitterId를 집어넣는 부분은 없으니 감안해서 보면 된다.

참고

https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events
https://tecoble.techcourse.co.kr/post/2022-10-11-server-sent-events/
https://kimyhcj.tistory.com/433

대용량 데이터 RDB 저장

그 다음 문제는, 파일 분석 api로부터 반환해온 정보가 너무 크다는 것이다. 그래프를 그리기 위한, x, y축 베열을 전달받는데, json 데이터가 대략 많으면 50MB까지 치솟는다. 이러한 데이터를 관계형 DB에 저장한다는 것이, 고민되긴 하지만, 당장은 문제가 없다고 판단하고, 저장하기로 결정했다. 데이터는 AttributeConverter를 활용하여 문자열로 변환해서 저장하기로 했다.

data class ChartData(
    val x_data: List<Double>,
    val y_data: List<List<Double>>
)


...

@Converter
@Component
class ChartDataConverter(
    val mapper: ObjectMapper
) : AttributeConverter<ChartData?, String> {

    override fun convertToDatabaseColumn(attribute: ChartData?): String? {
        return this.mapper.writeValueAsString(attribute)
    }

    override fun convertToEntityAttribute(dbData: String?): ChartData? {

        return this.mapper.readValue(dbData, ChartData::class.java)
    }

}

...


@Entity
@Table(name = "analyzed_audio", schema = "mastering", catalog = "mastering")
class AnalyzedFile(
    loudness: Double,
    rms: ChartData,
    waveform: ChartData,
) : BaseEntity() {

 	@Column(name = "loudness")
    var loudness = loudness
        protected set


    @Lob
    @Column(name = "rms")
    @Convert(converter = ChartDataConverter::class)
    var rms  = rms
        protected set


    @Lob
    @Column(name = "waveform")
    @Convert(converter = ChartDataConverter::class)
    var waveform  = waveform
        protected set
	

	  fun convertChartDataToString(chartData: ChartData): String {
        val mapper = RuntimeBeanAccessor.getBean(ObjectMapper::class)
        return mapper.writeValueAsString(chartData)
    }

	  operator fun component1(): Double {
        return this.loudness
    }

    operator fun component2(): String {
        return convertChartDataToString(this.rms)
    }
    operator fun component3(): String {
        return convertChartDataToString(this.waveform)
    }


}

마찬가지로 JDBCTemplate를 활용해야만 했다. native Query를 활용하므로, 구조분해할당 함수를 만들 때, ObjectMapper를 활용해서 문자열로 변환시켜줘야 했다.

    override fun saveAnalyzedFile(analyzedFile:AnalyzedFile): Int {

        Assert.notNull(analyzedFile, "Entity must not be null.")

        val (loudness, rms, waveform) = analyzedAudio

        val count = this.jdbcTemplate.update(
            """
            insert into
                mastering.analyzed_file
                (created_at, updated_at, loudness, rms, waveform)
            values
                (now(), now(),  $loudness ,'$rms', '$waveform')
            """.trimIndent()
        )

        return count
    }

Packet for query is too large You can change this value on the server by setting the 'max_allowed_packet'

요런 에러가 뜬다면 mysql 허용 packet size 를 늘려줄 필요가 있다는 뜻이다. 나는 AWS RDB 서비스를 쓰므로, DB 파라미터 그룹을 바꿔줘야 했다. 기존에는 default 그룹을 사용하고 있었는데, 수정을 할 수 없어 새로 만든 후 수정하고, 재부팅하였다.

참고

https://hgko1207.github.io/2020/08/10/mysql-1/
https://abbo.tistory.com/241
https://m.blog.naver.com/PostView.naver?isHttpsRedirect=true&blogId=sory1008&logNo=220808658700
https://aws.amazon.com/ko/premiumsupport/knowledge-center/rds-parameter-group-update-issues/
https://chwan.tistory.com/entry/AWS-RDS-Cannot-modify-a-default-parameter-group

폰트 이메일 템플릿 적용

https://stackoverflow.com/questions/65136910/spring-boot-java-mail-with-template-html

profile
시간대비효율

0개의 댓글