WebClient

배세훈·2022년 10월 18일
0

SpringBoot

목록 보기
4/4

RestTemplate

  • RestTemplate은 Multi-Thread와 Blocking 방식을 사용합니다.

  • Thread pool은 요청자 어플리케이션 구동시에 미리 만들어 놓습니다.
  • Request는 먼저 Queue에 쌓이고 가용한 스레드가 있으면 그 스레드에 할당되어 처리됩니다.
  • 즉, 1 요청 당 1 스레드가 할당됩니다.
  • 각 스레드에서는 Blocking 방식으로 처리되어 응답이 올때까지 그 스레드는 다른 요청에 할당될 수 없습니다.

아래는 RestTemplate을 Connection Pool에 Spring Bean으로 등록하기 위한 예제입니다.
요청 당 20개의 RestTemplate client를 만들고, 최대 50개까지 증가할 수 있도록 했습니다.

@Configuration
public class RestTemplateConfig{
	PoolingHttpClientConnectionManager connManager = new PoolingHttpClientConnectionManager();
    connManager.setDefaultMaxPerRoute(20);
    connManager.setMaxTotal(50);
    
    HttpClient client = HttpClientBuilder.create().setConnectionManager(connManager).build();
    
    HttpComponentsClientHttpRequestFactory factory = new HttpComponentsClientHttpRequestFactory(client);
    factory.setConnectTimeout(3000);
    factory.setReadTimeout(3000);
    
    return new RestTemplate(factory);
}

@Bean
public RestTemplate coffeeRestTemplate(){
	return getRestTemplate(20, 50);
}
  • 요청을 처리할 스레드가 있으면 아무런 문제가 없지만, 스레드가 다 차는 경우 이후의 요청은 Queue에 대기하게 됩니다.
    대부분의 문제는 네트워킹이나 DB와의 통신에서 생기는데 이런 문제가 여러 스레드에서 발생하면 가용한 스레드 수가 현저하게 줄어들게 되고, 결국 전체 서비스는 매우 느려지게 됩니다.

Spring WebClient

  • Spring WebClient는 Single Thread와 Non-Blocking 방식을 사용합니다.
    Core 당 1개의 Thread를 이용합니다.

  • 각 요청은 Event Loop내에 Job으로 등록이 됩니다.
    Event Loop는 각 Job을 제공자에게 요청한 후, 결과를 기다리지 않고 다른 Job을 처리합니다.
    Event Loop는 제공자로부터 callback으로 응답이 오면, 그 결과를 요청자에게 제공합니다.
    WebClient는 이렇게 이벤트에 반응형으로 동작하도록 설계되었습니다.
    그래서 반응성, 탄력성, 가용성, 비동기성을 보장하는 Spring React 프레임워크를 사용합니다.
    또한, React Web 프레임워크인 Spring WebFlux에서 Http Client로 사용됩니다.

성능비교

  • 아래는 RestTemplate을 사용하는 Spring Boot1과 WebClient를 사용하는 Spring Boot2의 성능비교 결과입니다.
    1000명까지는 비슷하지만 동시사용자가 늘수록 RestTemplate은 급격하게 느려지는것을 볼 수 있습니다.

  • Spring 어플리케이션에서 HTTP 요청을 할 땐 주로 RestTemplate을 사용했었지만 Spring 5.0 버전부터는 RestTemplate은 유지 모드로 변경되고 향후 deprecated 될 예정입니다.
  • Spring 커뮤니티에서는 RestTemplate을 이미 Depreciated 시키고 WebClient를 사용할 것을 강력히 권고하고 있습니다.

Spring WebClient

메뉴얼: https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html#webflux-client

WebClient 특징

  • Non-blocking I/O
  • Reactive Streams back pressure
  • High concurrency with fewer hardware resources
  • Functional-style, fluent API that takes advantage of java 8 lambdas
  • Synchronous and asynchronous interactions
  • Streaming up to or streaming down from a server

Configuration

  • WebClient를 사용하기 위한 가장 간단한 방법은 static factory를 통해 WebClient를 생성해서 사용할 수 있습니다.
WebClient.create();
WebClient.create(String baseUrl);

하지만 default 값이나 filter 또는 ConnectionTimeOut 같은 값을 지정하여 생성하기 위해서는 Builder 클래스를 통해 생성하는 것이 좋습니다.

Builder()를 통하면

  • 모든 호출에 대한 기본 Header / Cookie 값 설정
  • filter를 통한 Request / Response 처리
  • Http 메시지 Reader / Writer 조작
  • Http Client Library 설정
    등이 가능합니다.

Spring에서 여러 Bean 에서 사용하기 위해 @Configuration을 통해 WebClient를 선언합니다.

package com.webclient.config;

import io.netty.channel.ChannelOption;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.http.codec.LoggingCodecSupport;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.ExchangeStrategies;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;

import javax.net.ssl.SSLException;
import java.time.Duration;
import java.util.concurrent.TimeUnit;

@Configuration
@Slf4j
public class WebClientConfig {

    @Bean
    public WebClient webClient(){
        /**
         * Spring WebFlux 에서는 어플리케이션 메모리 문제를 피하기 위해 codec 처리를 위한 in-memory buffer 값이 256KB로 기본 설정 되어 있습니다.
         * 이 제약 때문에 256KB보다 큰 http 메시지를 처리하려고 하면 DataBufferLimitException 에러가 발생하게 됩니다.
         * 이 값을 늘려주기 위해서는 ExchangeStrategies.builder()를 통해 값을 늘려줘야 합니다.
         */
        ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
                .codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(1024*1024*50))
                .build();

        /**
         * Debug 레벨일 때 form Data와 Trace 레벨 일 때 header 정보는 민감한 정보를 포함하고 있기 때문에
         * 기본 WebClient 설정에서는 위 정보를 로그에서 확인할 수 없습니다.
         * 개발 진행 시 Request / Response 정보를 상세히 확인하기 위해서는 ExchangeStrateges 와 logging level 설정을 통해
         * 로그 확인이 가능하도록 해주는 것이 좋습니다.
         *
         * ExchangeStrategies 를 통해 setEnableLoggingRequestDetails(boolean enable)을 true로 설정해 주고
         * application.yml에 개발용 로깅 레벨은 debug로 설정해 줍니다.
         *
         * logging:
         *  level:
         *      org.springframework.web.reactive.function.client.ExchangeFunctions: debug
         */
        exchangeStrategies
                .messageWriters().stream()
                .filter(LoggingCodecSupport.class::isInstance)
                .forEach(writer -> ((LoggingCodecSupport)writer).setEnableLoggingRequestDetails(true));

        /**
         *
         * HttpClient TimeOut
         * HttpClient를 변경하거나 ConnectionTimeOut과 같은 설정값을 변경하려면
         * WebClient.builder().clientConnector()를 통해 Reactor Netty의 HttpClient를 직접 설정해줘야 합니다.
         * 해당 코드의 line 76 ~ 81
         *
         * Client Filters
         * Request 또는 Response 데이터에 대해 조작을 하거나 추가 작업을 하기 위해서는
         * WebClient.builder().filter() 메소드를 이용해야 합니다.
         * ExchangeFilterFunction.ofRequestProcessor() 와
         * ExchangeFilterFunction.ofResponseProcessor()를 통해 clientRequest와
         * clientResponse를 변경하거나 출력할 수 있습니다.
         * 해당 코드의 line 88 ~ 105
         *
         */
        return WebClient.builder()
                .clientConnector(
                        new ReactorClientHttpConnector(
                                HttpClient.create()
                                        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
                                        .responseTimeout(Duration.ofMillis(5000))
                                        .doOnConnected(conn -> conn.addHandlerLast(new ReadTimeoutHandler(5000, TimeUnit.MILLISECONDS))
                                                .addHandlerLast(new WriteTimeoutHandler(5000, TimeUnit.MILLISECONDS))
                                        )
                                        .secure(
                                                sslContextSpec -> {
                                                    try {
                                                        sslContextSpec.sslContext(
                                                                SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build()
                                                        );
                                                    } catch (SSLException e) {
                                                        throw new RuntimeException(e);
                                                    }
                                                }
                                        )
                        )
                )
                .exchangeStrategies(exchangeStrategies)
                .filter(ExchangeFilterFunction.ofRequestProcessor(
                        clientRequest -> {
                            log.debug("Request: {} {} ", clientRequest.method(), clientRequest.url());
                            clientRequest.headers().forEach(
                                    (name, values) -> values.forEach(
                                            value -> log.debug("{} : {}", name, value))
                            );
                            return Mono.just(clientRequest);
                        }
                ))
                .filter(ExchangeFilterFunction.ofResponseProcessor(
                        clientResponse -> {
                            clientResponse.headers().asHttpHeaders().forEach(
                                    (name, values) -> values.forEach(value -> log.debug("{} : {}", name, value))
                            );
                            return Mono.just(clientResponse);
                        }
                ))
                .build();
    }
}

MaxInMemorySize

  • Spring WebFlux 에서는 어플리케이션 메모리 문제를 피하기 위해 codec 처리를 위한 in-memory buffer 값이 256KB로 기본설정 되어 있습니다. 이 제약 땐문에 256KB보다 큰 http 메시지를 처리하려고 하면 DataBufferLimitException 에러가 발생하게 됩니다. 이 값을 늘려주기 위해서는 ExchageStrategies.builder() 를 통해 값을 늘려줘야 합니다.
ExchangeStrategies exchangeStrategies = 
	ExchangeStrategies
    	.builder()
        .codecs(configurer -> configurer.defaultCodecs()
        								.maxInMemorySize(1024*1024*50))
        .build();

Logging

Debug 레벨 일 때 form DataTrace 레벨 일 때 header 정보는 민감한 정보를 포함하고 있기 때문에, 기본 WebClient 설정에서는 위 정보를 로그에서 확인할 수가 없습니다.
개발 진행 시 Request/Response 정보를 상세히 확인하기 위해서는 ExchangeStrateges와 logging level 설정을 통해 로그 확인이 가능하도록 해주는 것이 좋습니다.

exchangeStrategies
	.messageWriters().stream()
    .filter(LoggingCodecSupport.class::isInstance)
    .forEach(writer ->
    	((LoggingCodecSupport)writer).setEnableLoggingRequestDetails(true));

ExchangeStrategies 를 통해 setEnableLoggingRequestDetails(boolean enable)true로 설정해 주고 application.yaml에 개발용 로깅 레벨은 debug로 설정해 줍니다.

logging:
	level:
    	org.springframework.web.reactive.function.client.ExchangeFunctions: debug

Client Filters

Request 또는 Response 데이터에 대해 조작을 하거나 추가 작업을 하기 위해서는 WebClient.builder().filter() 메소드를 이용해야 합니다.
ExchangeFilterFunction.ofRequestProcessor()
ExchangeFilterFunction.ofResponseProcessor()를 통해 clientRequestclientResponse를 변경하거나 출력할 수 있습니다.

WebClient.builder()
	.filter(ExchangeFilterFunction.ofRequestProcessor(
    	clientRequest -> {
        	log.debug("Request: {} {}", clientRequest.method(), clientRequest.url());
            clientRequest.headers()
            	.forEach((name, values) -> values.forEach(value -> log.deug("{} : {}", name, value)));
               	return Mono.just(clientRequest);
            }
        ))
        .filter(ExchangeFilterFunction.ofResponseProcessor(
        	clientResponse -> {
            	clientResponse.headers()
                	asHttpHeaders()
                    .forEach((name, values) -> values.forEach(value -> log.debug("{} : {}", name, value)));
                return Mono.just(clientResponse);
            }
        ))

HttpClient TimeOut

HttpClient를 변경하거나 ConnectionTimeOut과 같은 설정값을 변경하려면 WebClient.builder().clientConnector()를 통해 Reactor Netty의 HttpClient를 직접 설정해 줘야 합니다.

WebClient
	.builder()
    	.clientConnector(
                        new ReactorClientHttpConnector(
                                HttpClient.create()
                                        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
                                        .responseTimeout(Duration.ofMillis(5000))
                                        .doOnConnected(conn -> conn.addHandlerLast(new ReadTimeoutHandler(5000, TimeUnit.MILLISECONDS))
                                                .addHandlerLast(new WriteTimeoutHandler(5000, TimeUnit.MILLISECONDS))
                                        )
                                        .secure(
                                                sslContextSpec -> {
                                                    try {
                                                        sslContextSpec.sslContext(
                                                                SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build()
                                                        );
                                                    } catch (SSLException e) {
                                                        throw new RuntimeException(e);
                                                    }
                                                }
                                        )
                        )
                )

Usage

WebClient는 기존 설정값을 상속해서 사용할 수 있는 mutate() 함수를 제공하고 있습니다.
mutate()를 통해 builder()를 다시 생성하여 추가적인 옵션을 설정하여 재사용이 가능하기 때문에 @Bean으로 등록한 WebClient는 각 Component에서 의존주입하여 mutate()를 통해 사용 하는 것이 좋습니다.

WebClient a = WebClient.builder()
					.baseUrl("https://some.com")
    				.build();
WebClient b= a.mutate()
				.defaultHeader("user-agent", "WebClient")
                .build();
WebClient c = b.mutate()
				.defaultHeader(HttpHeaders.AUTHORIZATION, token)
                .build();

위와 같이 설정 했을 경우 WebClient "c"는 "a"와 "b"에 설정된 baseUrl, user-agent 헤더를 모두 가지고 있습니다.

@Bean으로 등록된 WebClient는 다음과 같이 사용 가능합니다.

@Service
@RequiredArgsConstructor
@Slf4j
public class SomeService implements SomeInterface{
	private final WebClient webClient;
    
    public Mono<SomeData> getSomething(){
    	return webClient.mutate()
        			.build()
                    .get()
                    .uri("/resource")
                    .retrieve()
                    .bodyToMono(SomeData.class);
    }
}

retrieve() vs exchange()

HTTP 호출 결과를 가져오는 두 가지 방법으로 retrieve()exchange()가 존재합니다.
retrieve를 이용하면 바로 ResponseBody를 처리 할 수 있고, exchange를 이용하면 세세한 컨트롤이 가능합니다. 하지만 Spring 에서는 exchange를 이용하게 되면 Response 컨텐츠에 대한 모든 처리를 직접 하면서 발생할 수 있는 memory leak 가능성 때문에 가급적 retrieve를 사용하기를 권고하고 있습니다.

  • retrieve
Mono<Person> result = webClient.get()
							.uri("/persons/{id}", id)
                            .accept(MediaType.APPLICATION_JSON)
                            .retrieve()
                            .bodyToMono(Person.class);
  • exchange
Mono<person> result = webClient.get()
							.uri("/persons/{id}", id)
                            .accept(MediaType.APPLICATION_JSON)
                            .exchange()
                            .flatMap(response ->
                            	response.bodyToMono(Person.class));

4xx and 5xx 처리

HTTP 응답 코드가 4xx 또는 5xx로 내려올 경우 WebClient에서는
WebClientResponseException이 발생하게 됩니다. 이 때 각 상태코드에 따라 임의의 처리를 하거나 Exception을 랩핑하고 싶을 때는 onStatus() 함수를 사용하여 해결할 수 있습니다.

webClient.mutate()
		.baseUrl("https://some.com")
        .build()
        .get()
        .uri("/resource")
        .accept(MediaType.APPLICATION_JSON)
        .retrieve()
        .onStatus(status -> status.is4xxClientError() || status.is5xxServerError(), clientResponse -> clientResponse.bodyToMono(String.class)
        					.map(body -> new RuntimeException(body)))
        .bodyToMono(SomeData.class)

GET

public Mono<String> getWebClientTest(String name, String id) {

        log.info("getWebClientTest start!!");
        Mono<String> result = webClient.mutate()
                .baseUrl("http://localhost:8080")
                .build()
                .get()
                .uri(uri -> uri.path("/mockmvc")
                        .queryParam("name", name)
                        .queryParam("id", id)
                        .build()
                )
                .retrieve()
                .bodyToMono(String.class);

        /**
         * WebClient는 결과를 Mono: 0~1개, Flux: 0~N개 형태로 최종 전달하지 않는 이상
         * 코드 내에서 Subscribe()를 실행해야 실제 http 호출이 진행된다.
         *
         * 결과: 해당 메소드가 다 호출 된 후 해당 api의 결과값을 전달 - non-blocking이기 때문에 나오는 현상
         * 2022-12-09T18:46:37.132+09:00  INFO 1980 --- [ctor-http-nio-3] com.webclient.service.WebClientService   : getWebClientTest start!!
         * 2022-12-09T18:46:37.313+09:00  INFO 1980 --- [ctor-http-nio-3] com.webclient.service.WebClientService   : getWebClientTest end!!
         * 2022-12-09T18:46:37.387+09:00  INFO 1980 --- [ctor-http-nio-3] com.webclient.service.WebClientService   : 테스트의 MockMvc 테스트입니다. test
         */
        result.subscribe(e -> log.info(e));
        log.info("getWebClientTest end!!");

        return result;
    }

POST

  • form 데이터 전송
webClient.mutate()
		.baseUrl("https://some.com/api")
        .build()
        .post()
        .uri("/login") .contentType(MediaType.APPLICATION_FORM_URLENCODED)
        .accept(MediaType.APPLICATION_JSON)
        .body(BodyInserters.fromFormData("id", idValue)
        					.with("pwd", pwdValue)
        )
        .retrieve()
        .bodyToMono(SomeData.class);

form 데이터를 생성하기 위해서는 BodyInserters.fromFormData()를 이용할 수 있으며,
bodyValue(MultiValueMap<String, String>)을 통해서도 데이터를 전송 할 수 있습니다.

` JSON body 데이터 전송

public void postWebClientTest(String name, String id) {
        log.info("postWebClientTest start!!");

        Info info = new Info(name, id);

        Mono<String> result = webClient.mutate()
                .baseUrl("http://localhost:8080")
                .build()
                .post()
                .uri("/mockmvc")
//                .contentType(MediaType.APPLICATION_JSON)
                .bodyValue(info)
                .retrieve()
                .bodyToMono(String.class);

        result.subscribe(e -> log.info(e));

        log.info("postWebClientTest end!!");
    }

객체 자체를 RequestBody로 전달하기 위해서는 bodyValue(objet body)를 통해서 사용할 수 있습니다.
또한 MonoFlux 객체를 통해 RequestBody를 생성하기 위한
<T, P extends Publisher<T>> RequestHeadersSpec<?> body(P publisher, Class<T> elementClass); 함수도 존재합니다.

PUT

PUT 호출은 POST 호출과 유사하며 다만 put() 함수를 통해 시작되는 것만 다릅니다.

webClient.mutate()
		.baseUrl("https://some.com/api")
        .build()
        .put()
        .uri("/resource/{ID}", id)
        .contentType(MediaType.APPLICATION_JSON)
        .accept(MediaType.APPLICATION_JSON)
        .bodyValue(somData)
        .retrive()
        .bodyToMono(SomeData.class);

DELETE

DELETE 호출은 GET과 유사하며 delte() 함수를 통해 시작되고, delete() 함수의 특성상 response는 Void.class로 처리됩니다.

webClient.mutate()
		.baseUrl("https://some.com/api")
        .build()
        .delete()
        .uri("/resource/{ID}", id)
        .retrieve()
        .bodyToMono(Void.class);

Synchronous Use

WebClientReactive Stream 기반이므로 리턴값을 Mono 또는 Flux로 전달받게 됩니다. Spring WebFlux를 이미 사용하고 있다면 문제가 없지만 Spring MVC를 사용하는 상황에서 WebClient를 활용하고자 한다면 MonoFlux를 객체로 변환하거나 Java Stream으로 변환해야 할 필요가 있습니다.

이럴 경우를 대비해서 Mono.bloclkk()이나 Flux.blockFirst()와 같은 blocking 함수가 존재하지만 block()을 이용해서 객체로 변환하면 Reactive Pipeline을 사용하는 장점이 없어지고 모든 호출이 main 쓰레드에서 호출되기 때문에 Spring 측에서는 block()은 테스트 용도외에는 가급적 사용하지 말라고 권고하고 있습니다.

대신 완벽한 Reactive 호출은 아니지만 Lazy SubsCribe를 통한 Stream 또는 Iterable로 변환 시킬 수 있는 Flux.toStream(), Flux.toIterable() 함수를 제공하고 있습니다.

List<SomeData> results = webClient.mutate()
								.baseUrl("https://some.com/api")
                                .build()
                                .get()
                                .uri("/resource")
                                .accept(MediaType.APPLICATION_JSON)
                                .retrieve()
                                .bodyToFlux(SomeData.class)
                                .toStream()
                                .collect(Collectors.toList());

Flux.toStream()을 통해 데이터를 추가 처리하거나 List로 변환하여 사용할 수 있습니다.

Mono에 대해서는

SomeData data = webClient.mutate()
						.baseUrl("https://some.com/api")
                        .build()
                        .get()
                        .uri("/resource/{ID}", id)
                        .accept(MediaType.APPLICATION_JSON)
                        .retrieve()
                        .bodyToMono(SomeData.class)
                        .flux()
                        .toStream()
                        .findFirst()
                        .orElse(defaultValue);

Mono.flux()를 통해 Flux로 변환하고 findFirst()를 통해 Optional 처리하는 것이 좋습니다. (이 때는 onError 처리가 필요합니다.)

  • Flux: 0 ~ N개의 데이터 전달
  • Mono: 0 ~ 1개의 데이터 전달

참조 url

profile
성장형 인간

0개의 댓글