우리는 주로 절차를 명시해서 순서대로 실행되는 Imperative Programming(명령형 프로그래밍)을 해왔다.
이와 다르게 Reactive Programming이란 데이터의 흐름을 먼저 정의하고 데이터가 변경되었을 때 연관된 작업이 실행된다
즉 프로그래머가 어떠한 기능을 직접 정해서 실행하는 것이 아니라 시스템에 이벤트가 발생했을 때 알아서 처리되는 것이다
지난 포스트에서 다루기도 했지만 Spring webflux에서 사용하는 reactive library가 Reactive Streams의 구현체인 Reactor이다
이 Reactor의 주요 객체가 Mono와 Flux, 둘 다 Publisher 인터페이스를 구현한 구현체
Mono 는 0~1개의 데이터를 처리한다
0~N개의 데이터를 처리한다
따라서 데이터가 하나 전달될 때마다 내부적으로 onNext 이벤트를 실행한다
Flux내의 모든 데이터 처리가 완료되면 onComplete 이벤트가 발생하며 데이터를 전달하는 과정에서 오류가 나면 onError 이벤트가 발생한다
Flux<String> flux1 = Flux.just("foo", "bar", "foobar");
Flux<String> flux2 = Flux.fromIterable(Arrays.asList("foo", "bar", "foobar"));
Flux<Integer> flux3 = Flux.range(5, 3); // 5, 6, 7
Mono<Object> mono1 = Mono.empty(); // empty여도 타입을 가진다
Mono<String> mono2 = Mono.just("foo");
위와 같이 다양하게 생성 가능하고 Flux와 Mono가 publisher 구현체이므로 내부에 데이터를 넣어주면 된다
void subscribeExamples() {
Flux<Integer> flux2 = Flux.range(1, 3);
flux2.subscribe(i -> System.out.println(i));
Flux<Integer> flux3 = Flux.range(1, 4)
.map(i -> {
if(i <= 3){
return i;
}
throw new RuntimeException("Got to 4");
});
flux3.subscribe(
i -> System.out.println(i),
error -> System.out.println("Error: " + error));
Flux<Integer> flux4 = Flux.range(1, 4);
flux4.subscribe(
i -> System.out.println(i),
error -> System.out.println("Error: " + error), // onError : 에러로 흐름 종료
() -> System.out.println("Done")); // onComplete : 성공적으로 흐름 종료 == Runabble이다 (Supplier)
Flux<Integer> flux5 = Flux.range(1, 15);
flux5.subscribe(
i -> System.out.println(i),
error -> System.out.println("Error: " + error), // onError : 에러로 흐름 종료
() -> System.out.println("Done"), // onComplete : 성공적으로 흐름 종료 == Runabble이다 (Supplier)
subscription -> subscription.request(10)); // subscription에게 10개의 데이터를 요청한다
}
결과
// 2번
1
2
3
// 3번
1
2
3
Error: java.lang.RuntimeException: Got to 4
// 4번
1
2
3
4
Done
// 5번
1
2
3
4
5
6
7
8
9
10
상품을 주문하는 코드를 작성한다고 할 때
유저 아이디와 상품 아이디를 받아서
유저 정보 조회, 유저의 주소 조회, 상품 조회, 상품을 판매하는 상점을 모두 조회한 뒤
주문데이터를 저장하는 코드를 비동기적으로 작성하려면 아래와 같이 작성해야 한다
Mono<Order> orderProduct(String userId, String productId) {
return Mono.create { emitter ->
userRepository.findUserById(userId)
.subscribe { buyer ->
addressRepository.findAddressByUser(buyer)
.subscribe { address ->
checkValidRegion(address)
productRepository.findAllProductsByIds(productId)
.collectList()
.subscribe(products ->
storeRepository.findStoresByProducts(products)
.collect().asList()
.subscribe().with { store ->
orderRepository.createOrder(buyer,address,products, store).whenComplete{
order -> emitter.success(order)}
....
}
}
위의 코드를 보면서 가독성이 매우 떨어진다고 느낀다면 지극히 정상이다
위의 코드처럼 하나의 IO 작업의 결과를 받아서 그 결과로 IO 또 다른 IO작업을 연쇄적으로 실행하는 코드를 비동기적으로 프로그래밍하려면
위와 같이 끝없이 subscribe의 굴레를 만들어줘야 한다,, ㅎㅎ
이러한 코드의 엄청난 가독성,,ㅎㅎ 을 시원하게 개선해주는 것이 coroutine의 역할이다!
일단 맛보기지만 위의 코드를 코루틴으로 작성하면 아래와 같이 깔끔하게 정리된다
suspend fun orderProduct(userId: String, productId: String): Order {
val buyer = userRepository.findUserById(userId).awaitSingle()
val address = addressRepository.findAddressByUser(buyer).awaitLast()
val products = productRepository.findAllProductsByIds(productIds).asFlow().awaitAll()
val stores = storeRepository.findStoresByProducts(products).asFlow().awaitAll()
val order = orderRepository.createOrder(buyer,address,products,stores).await()
return order
}
코루틴은 근본적으로 subscribe 범벅이었던 기존의 rector 코드를 마치 동기 코드처럼 보이게 해서 코드 가독성을 엄청나게 높여준다는 장점이 있다!
그런데 이렇게 보면 정말로 코드가 동기적으로 처리되는 것처럼 보인다,, awiat라는 말이 특히,, 그래서 IO작업이 끝날때까지 기다리겠다는 의미 아닌가?
코루틴이 어떻게 비동기로 동작하는지는 다음 포스트에서 더 자세하게 다뤄볼 예정이다 👏
https://bgpark.tistory.com/162
https://www.youtube.com/watch?v=eJF60hcz3EU&t=1410s