아래는 RestTemplate을 Connection Pool에 Spring Bean으로 등록하기 위한 예제입니다.
요청 당 20개의 RestTemplate client를 만들고, 최대 50개까지 증가할 수 있도록 했습니다.
@Configuration
public class RestTemplateConfig{
PoolingHttpClientConnectionManager connManager = new PoolingHttpClientConnectionManager();
connManager.setDefaultMaxPerRoute(20);
connManager.setMaxTotal(50);
HttpClient client = HttpClientBuilder.create().setConnectionManager(connManager).build();
HttpComponentsClientHttpRequestFactory factory = new HttpComponentsClientHttpRequestFactory(client);
factory.setConnectTimeout(3000);
factory.setReadTimeout(3000);
return new RestTemplate(factory);
}
@Bean
public RestTemplate coffeeRestTemplate(){
return getRestTemplate(20, 50);
}
메뉴얼: https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html#webflux-client
WebClient.create();
WebClient.create(String baseUrl);
하지만 default 값이나 filter 또는 ConnectionTimeOut 같은 값을 지정하여 생성하기 위해서는 Builder 클래스를 통해 생성하는 것이 좋습니다.
Builder()를 통하면
Spring에서 여러 Bean 에서 사용하기 위해 @Configuration을 통해 WebClient를 선언합니다.
package com.webclient.config;
import io.netty.channel.ChannelOption;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.http.codec.LoggingCodecSupport;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.ExchangeStrategies;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import javax.net.ssl.SSLException;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
@Configuration
@Slf4j
public class WebClientConfig {
@Bean
public WebClient webClient(){
/**
* Spring WebFlux 에서는 어플리케이션 메모리 문제를 피하기 위해 codec 처리를 위한 in-memory buffer 값이 256KB로 기본 설정 되어 있습니다.
* 이 제약 때문에 256KB보다 큰 http 메시지를 처리하려고 하면 DataBufferLimitException 에러가 발생하게 됩니다.
* 이 값을 늘려주기 위해서는 ExchangeStrategies.builder()를 통해 값을 늘려줘야 합니다.
*/
ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
.codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(1024*1024*50))
.build();
/**
* Debug 레벨일 때 form Data와 Trace 레벨 일 때 header 정보는 민감한 정보를 포함하고 있기 때문에
* 기본 WebClient 설정에서는 위 정보를 로그에서 확인할 수 없습니다.
* 개발 진행 시 Request / Response 정보를 상세히 확인하기 위해서는 ExchangeStrateges 와 logging level 설정을 통해
* 로그 확인이 가능하도록 해주는 것이 좋습니다.
*
* ExchangeStrategies 를 통해 setEnableLoggingRequestDetails(boolean enable)을 true로 설정해 주고
* application.yml에 개발용 로깅 레벨은 debug로 설정해 줍니다.
*
* logging:
* level:
* org.springframework.web.reactive.function.client.ExchangeFunctions: debug
*/
exchangeStrategies
.messageWriters().stream()
.filter(LoggingCodecSupport.class::isInstance)
.forEach(writer -> ((LoggingCodecSupport)writer).setEnableLoggingRequestDetails(true));
/**
*
* HttpClient TimeOut
* HttpClient를 변경하거나 ConnectionTimeOut과 같은 설정값을 변경하려면
* WebClient.builder().clientConnector()를 통해 Reactor Netty의 HttpClient를 직접 설정해줘야 합니다.
* 해당 코드의 line 76 ~ 81
*
* Client Filters
* Request 또는 Response 데이터에 대해 조작을 하거나 추가 작업을 하기 위해서는
* WebClient.builder().filter() 메소드를 이용해야 합니다.
* ExchangeFilterFunction.ofRequestProcessor() 와
* ExchangeFilterFunction.ofResponseProcessor()를 통해 clientRequest와
* clientResponse를 변경하거나 출력할 수 있습니다.
* 해당 코드의 line 88 ~ 105
*
*/
return WebClient.builder()
.clientConnector(
new ReactorClientHttpConnector(
HttpClient.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.responseTimeout(Duration.ofMillis(5000))
.doOnConnected(conn -> conn.addHandlerLast(new ReadTimeoutHandler(5000, TimeUnit.MILLISECONDS))
.addHandlerLast(new WriteTimeoutHandler(5000, TimeUnit.MILLISECONDS))
)
.secure(
sslContextSpec -> {
try {
sslContextSpec.sslContext(
SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build()
);
} catch (SSLException e) {
throw new RuntimeException(e);
}
}
)
)
)
.exchangeStrategies(exchangeStrategies)
.filter(ExchangeFilterFunction.ofRequestProcessor(
clientRequest -> {
log.debug("Request: {} {} ", clientRequest.method(), clientRequest.url());
clientRequest.headers().forEach(
(name, values) -> values.forEach(
value -> log.debug("{} : {}", name, value))
);
return Mono.just(clientRequest);
}
))
.filter(ExchangeFilterFunction.ofResponseProcessor(
clientResponse -> {
clientResponse.headers().asHttpHeaders().forEach(
(name, values) -> values.forEach(value -> log.debug("{} : {}", name, value))
);
return Mono.just(clientResponse);
}
))
.build();
}
}
DataBufferLimitException
에러가 발생하게 됩니다. 이 값을 늘려주기 위해서는 ExchageStrategies.builder()
를 통해 값을 늘려줘야 합니다.ExchangeStrategies exchangeStrategies =
ExchangeStrategies
.builder()
.codecs(configurer -> configurer.defaultCodecs()
.maxInMemorySize(1024*1024*50))
.build();
Debug
레벨 일 때 form Data
와 Trace
레벨 일 때 header
정보는 민감한 정보를 포함하고 있기 때문에, 기본 WebClient
설정에서는 위 정보를 로그에서 확인할 수가 없습니다.
개발 진행 시 Request/Response 정보를 상세히 확인하기 위해서는 ExchangeStrateges
와 logging level 설정을 통해 로그 확인이 가능하도록 해주는 것이 좋습니다.
exchangeStrategies
.messageWriters().stream()
.filter(LoggingCodecSupport.class::isInstance)
.forEach(writer ->
((LoggingCodecSupport)writer).setEnableLoggingRequestDetails(true));
ExchangeStrategies
를 통해 setEnableLoggingRequestDetails(boolean enable)
을 true
로 설정해 주고 application.yaml
에 개발용 로깅 레벨은 debug
로 설정해 줍니다.
logging:
level:
org.springframework.web.reactive.function.client.ExchangeFunctions: debug
Request 또는 Response 데이터에 대해 조작을 하거나 추가 작업을 하기 위해서는 WebClient.builder().filter()
메소드를 이용해야 합니다.
ExchangeFilterFunction.ofRequestProcessor()
와
ExchangeFilterFunction.ofResponseProcessor()
를 통해 clientRequest
와 clientResponse
를 변경하거나 출력할 수 있습니다.
WebClient.builder()
.filter(ExchangeFilterFunction.ofRequestProcessor(
clientRequest -> {
log.debug("Request: {} {}", clientRequest.method(), clientRequest.url());
clientRequest.headers()
.forEach((name, values) -> values.forEach(value -> log.deug("{} : {}", name, value)));
return Mono.just(clientRequest);
}
))
.filter(ExchangeFilterFunction.ofResponseProcessor(
clientResponse -> {
clientResponse.headers()
asHttpHeaders()
.forEach((name, values) -> values.forEach(value -> log.debug("{} : {}", name, value)));
return Mono.just(clientResponse);
}
))
HttpClient
를 변경하거나 ConnectionTimeOut
과 같은 설정값을 변경하려면 WebClient.builder().clientConnector()
를 통해 Reactor Netty의 HttpClient
를 직접 설정해 줘야 합니다.
WebClient
.builder()
.clientConnector(
new ReactorClientHttpConnector(
HttpClient.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.responseTimeout(Duration.ofMillis(5000))
.doOnConnected(conn -> conn.addHandlerLast(new ReadTimeoutHandler(5000, TimeUnit.MILLISECONDS))
.addHandlerLast(new WriteTimeoutHandler(5000, TimeUnit.MILLISECONDS))
)
.secure(
sslContextSpec -> {
try {
sslContextSpec.sslContext(
SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build()
);
} catch (SSLException e) {
throw new RuntimeException(e);
}
}
)
)
)
WebClient
는 기존 설정값을 상속해서 사용할 수 있는 mutate()
함수를 제공하고 있습니다.
mutate()
를 통해 builder()
를 다시 생성하여 추가적인 옵션을 설정하여 재사용이 가능하기 때문에 @Bean
으로 등록한 WebClient
는 각 Component
에서 의존주입하여 mutate()
를 통해 사용 하는 것이 좋습니다.
WebClient a = WebClient.builder()
.baseUrl("https://some.com")
.build();
WebClient b= a.mutate()
.defaultHeader("user-agent", "WebClient")
.build();
WebClient c = b.mutate()
.defaultHeader(HttpHeaders.AUTHORIZATION, token)
.build();
위와 같이 설정 했을 경우 WebClient
"c"는 "a"와 "b"에 설정된 baseUrl
, user-agent
헤더를 모두 가지고 있습니다.
@Bean으로 등록된 WebClient는 다음과 같이 사용 가능합니다.
@Service
@RequiredArgsConstructor
@Slf4j
public class SomeService implements SomeInterface{
private final WebClient webClient;
public Mono<SomeData> getSomething(){
return webClient.mutate()
.build()
.get()
.uri("/resource")
.retrieve()
.bodyToMono(SomeData.class);
}
}
HTTP
호출 결과를 가져오는 두 가지 방법으로 retrieve()
와 exchange()
가 존재합니다.
retrieve
를 이용하면 바로 ResponseBody를 처리 할 수 있고, exchange
를 이용하면 세세한 컨트롤이 가능합니다. 하지만 Spring 에서는 exchange
를 이용하게 되면 Response 컨텐츠에 대한 모든 처리를 직접 하면서 발생할 수 있는 memory leak 가능성 때문에 가급적 retrieve
를 사용하기를 권고하고 있습니다.
Mono<Person> result = webClient.get()
.uri("/persons/{id}", id)
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(Person.class);
Mono<person> result = webClient.get()
.uri("/persons/{id}", id)
.accept(MediaType.APPLICATION_JSON)
.exchange()
.flatMap(response ->
response.bodyToMono(Person.class));
HTTP 응답 코드가 4xx 또는 5xx로 내려올 경우 WebClient
에서는
WebClientResponseException
이 발생하게 됩니다. 이 때 각 상태코드에 따라 임의의 처리를 하거나 Exception
을 랩핑하고 싶을 때는 onStatus()
함수를 사용하여 해결할 수 있습니다.
webClient.mutate()
.baseUrl("https://some.com")
.build()
.get()
.uri("/resource")
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.onStatus(status -> status.is4xxClientError() || status.is5xxServerError(), clientResponse -> clientResponse.bodyToMono(String.class)
.map(body -> new RuntimeException(body)))
.bodyToMono(SomeData.class)
public Mono<String> getWebClientTest(String name, String id) {
log.info("getWebClientTest start!!");
Mono<String> result = webClient.mutate()
.baseUrl("http://localhost:8080")
.build()
.get()
.uri(uri -> uri.path("/mockmvc")
.queryParam("name", name)
.queryParam("id", id)
.build()
)
.retrieve()
.bodyToMono(String.class);
/**
* WebClient는 결과를 Mono: 0~1개, Flux: 0~N개 형태로 최종 전달하지 않는 이상
* 코드 내에서 Subscribe()를 실행해야 실제 http 호출이 진행된다.
*
* 결과: 해당 메소드가 다 호출 된 후 해당 api의 결과값을 전달 - non-blocking이기 때문에 나오는 현상
* 2022-12-09T18:46:37.132+09:00 INFO 1980 --- [ctor-http-nio-3] com.webclient.service.WebClientService : getWebClientTest start!!
* 2022-12-09T18:46:37.313+09:00 INFO 1980 --- [ctor-http-nio-3] com.webclient.service.WebClientService : getWebClientTest end!!
* 2022-12-09T18:46:37.387+09:00 INFO 1980 --- [ctor-http-nio-3] com.webclient.service.WebClientService : 테스트의 MockMvc 테스트입니다. test
*/
result.subscribe(e -> log.info(e));
log.info("getWebClientTest end!!");
return result;
}
webClient.mutate()
.baseUrl("https://some.com/api")
.build()
.post()
.uri("/login") .contentType(MediaType.APPLICATION_FORM_URLENCODED)
.accept(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromFormData("id", idValue)
.with("pwd", pwdValue)
)
.retrieve()
.bodyToMono(SomeData.class);
form 데이터를 생성하기 위해서는 BodyInserters.fromFormData()
를 이용할 수 있으며,
bodyValue(MultiValueMap<String, String>)
을 통해서도 데이터를 전송 할 수 있습니다.
` JSON body 데이터 전송
public void postWebClientTest(String name, String id) {
log.info("postWebClientTest start!!");
Info info = new Info(name, id);
Mono<String> result = webClient.mutate()
.baseUrl("http://localhost:8080")
.build()
.post()
.uri("/mockmvc")
// .contentType(MediaType.APPLICATION_JSON)
.bodyValue(info)
.retrieve()
.bodyToMono(String.class);
result.subscribe(e -> log.info(e));
log.info("postWebClientTest end!!");
}
객체 자체를 RequestBody로 전달하기 위해서는 bodyValue(objet body)를 통해서 사용할 수 있습니다.
또한 Mono
나 Flux
객체를 통해 RequestBody를 생성하기 위한
<T, P extends Publisher<T>> RequestHeadersSpec<?> body(P publisher, Class<T> elementClass);
함수도 존재합니다.
PUT
호출은 POST
호출과 유사하며 다만 put()
함수를 통해 시작되는 것만 다릅니다.
webClient.mutate()
.baseUrl("https://some.com/api")
.build()
.put()
.uri("/resource/{ID}", id)
.contentType(MediaType.APPLICATION_JSON)
.accept(MediaType.APPLICATION_JSON)
.bodyValue(somData)
.retrive()
.bodyToMono(SomeData.class);
DELETE
호출은 GET
과 유사하며 delte()
함수를 통해 시작되고, delete()
함수의 특성상 response는 Void.class
로 처리됩니다.
webClient.mutate()
.baseUrl("https://some.com/api")
.build()
.delete()
.uri("/resource/{ID}", id)
.retrieve()
.bodyToMono(Void.class);
WebClient
는 Reactive Stream
기반이므로 리턴값을 Mono
또는 Flux
로 전달받게 됩니다. Spring WebFlux를 이미 사용하고 있다면 문제가 없지만 Spring MVC를 사용하는 상황에서 WebClient
를 활용하고자 한다면 Mono
나 Flux
를 객체로 변환하거나 Java Stream
으로 변환해야 할 필요가 있습니다.
이럴 경우를 대비해서 Mono.bloclkk()
이나 Flux.blockFirst()
와 같은 blocking 함수가 존재하지만 block()
을 이용해서 객체로 변환하면 Reactive Pipeline
을 사용하는 장점이 없어지고 모든 호출이 main 쓰레드에서 호출되기 때문에 Spring 측에서는 block()
은 테스트 용도외에는 가급적 사용하지 말라고 권고하고 있습니다.
대신 완벽한 Reactive 호출은 아니지만 Lazy SubsCribe
를 통한 Stream
또는 Iterable
로 변환 시킬 수 있는 Flux.toStream()
, Flux.toIterable()
함수를 제공하고 있습니다.
List<SomeData> results = webClient.mutate()
.baseUrl("https://some.com/api")
.build()
.get()
.uri("/resource")
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToFlux(SomeData.class)
.toStream()
.collect(Collectors.toList());
Flux.toStream()
을 통해 데이터를 추가 처리하거나 List로 변환하여 사용할 수 있습니다.
Mono
에 대해서는
SomeData data = webClient.mutate()
.baseUrl("https://some.com/api")
.build()
.get()
.uri("/resource/{ID}", id)
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(SomeData.class)
.flux()
.toStream()
.findFirst()
.orElse(defaultValue);
Mono.flux()
를 통해 Flux
로 변환하고 findFirst()
를 통해 Optional
처리하는 것이 좋습니다. (이 때는 onError 처리가 필요합니다.)
참조 url