[스프링인액션5] 리액티브 프로그래밍 - 1

hyeokjin·2022년 12월 8일
0

리액티브 프로그래밍

리액티브 프로그래밍은 본질적으로 함수적이면서 선언적이다. 즉, 순차적으로 수행되는 작업 단계를 나타낸 것이 아니라 데이터가 흘러가는 파이프라인이나 스트림을 포함한다. 그리고 이런 리액티브 스트림은 데이터 전체를 사용할 수 있을 때까지 기다리지않고 사용 가능한 데이터가 있을 때마다 처리되므로 사실상 입력되는 데이터는 무한할 수 있다.

리액터 시작하기

명령형 프로그래밍과 매우 다른 방식으로 접근해야 한다.
데이터가 전달될 파이프라인을 구성해야한다.

예를 들어, 사람의 이름을 가져와서 모두 대문자로 변경한 후 이것으로 인사말 메시지를 만들어 출력한다고 해보자.

String name = "Craig";
String capitalName = name.toUpperCase();
String greeting = "Hello," + capitalName + "!";
System.out.println(greeting)

이 경우 각 줄의 코드가 같은 스레드에서 한 단계씩 차례대로 실행된다. 그리고 각 단계가 완료될 때까지 다음 단계로 이동하지 못하게 실행중인 스레드를 막는다.

이와 다르게 리액티브 코드에서는 다음과 같이 할 수 있다.

Mono.just("Craig")
	.map(n -> n.toUpperCase())
	.map(cn -> "Hello, " + cn + "!")
	.subscribe(System.out::println)

여기 있는 just(), map(), subscribe() 오퍼레이션은 잠시 후 알아보자
일단 지금 코드는 데이터가 전달되는 파이프라인을 구성한 것이다. 그리고 파이프라인의 각 단계에서는 어떻게 하든 데이터가 변경된다.
또한, 각 오퍼레이션은 같은 스레드로 실행되거나 다른 스레드로 실행될 수 있다.

Mono는 리액터의 두 가지 핵심 타입 중 하나이며, 다른 하나로는 Flux가 있다.
두개 모두 리액티브 스트림의 Publisher 인터페이스를 구현한 것이다. Flux는 0,1 또는 다수의 데이터를 갖는 파이프라인을 나타내는 반면,
Mono는 하나의 데이터 항목만 갖는 데이터셋에 최적화된 리액티브 타입이다.

리액터 의존성 추가하기

리액터 의존성 추가하기

		<dependency>
			<groupId>io.projectreactor</groupId>
			<artifactId>reactor-core</artifactId>
		</dependency>

리액터 코드의 테스트를 작성하고자한다면 다음 의존성도 추가

		<dependency>
			<groupId>io.projectreactor</groupId>
			<artifactId>reactor-test</artifactId>
			<scope>test</scope>
		</dependency>

리액티브 오퍼레이션 적용

Flux와 Mono가 제공하는 오퍼레이션들은 두 타임을 함께 결합하여 데이터가 전달될 수 있는 파이프라인을 생성한다.
Flux와 Mono에는 500개 이상의 오퍼레이션이 있으며, 각 오퍼레이션은 다음과 같이 분류될 수 있다.

  • 생성오퍼레이션
  • 조합오퍼레이션
  • 변환오퍼레이션
  • 로직오퍼레이션

여기서는 가장 유용한 몇 가지 오퍼레이션을 살펴보기로 한다.

객체로부터 생성하기

Flux나 Mono의 just() 메서드(static 메서드임)를 사용하여 리액티브 타입을 생성할 수 있다.

	@Test
	public void createAFlux_just() {
		Flux<String> fruitFlux = Flux
			.just("Apple", "Orange", "Grape", "Banana", "Strawberry");

구독자를 추가할 때는 Flux의 subscribe() 메서드를 호출한다.

	fruitFlux.subscribe(
		f -> System.out.println("Here's some fruit: " + f)
	);

여기서 리액터를 테스트하는 더 좋은 방법으로 StepVerifier가 있다.
Flux나 Mono가 지정되면 StepVerifier는 해당 리액티브 타입을 구독한다음에 스트림을 통해 전달되는 데이터에 대해 어서션을 적용한다.
그리고 해당 스트림이 기대한 대로 완전하게 동작하는지 검사한다.

다음과 같이 테스트를 작성할 수 있다.

    StepVerifier.create(fruitFlux)
        .expectNext("Apple")
        .expectNext("Orange")
        .expectNext("Grape")
        .expectNext("Banana")
        .expectNext("Strawberry")
        .verifyComplete();
	}

이 경우 StepVerifier가 fruitFlux를 구독한 후 각 데이터 항목이 기대한 과일 이름과 일치하는지 어서션을 적용한다.
그리고 마지막으로 fruitFlux가 완전한지 검사한다.

컬랙션으로부터 생성하기

배열로부터 Flux를 생성하려면 static 메서드인 fromArray()를 호출하며, 이때 소스 배열을 인자로 전달한다.

	@Test
	public void createAFlux_fromArray() {
	  String[] fruits = new String[] {
	      "Apple", "Orange", "Grape", "Banana", "Strawberry" };
	  
    Flux<String> fruitFlux = Flux.fromArray(fruits);
    
    StepVerifier.create(fruitFlux)
        .expectNext("Apple")
        .expectNext("Orange")
        .expectNext("Grape")
        .expectNext("Banana")
        .expectNext("Strawberry")
        .verifyComplete();
	}

List, Set, Iterable 의 다른 구현 컬렉션으로부터 Flux를 생성해야 한다면 해당 컬렉션을 인자로 전달하여 static 메서드인 fromIterable()을 호출하면 된다.

	@Test
	public void createAFlux_fromIterable() {
	  List<String> fruitList = new ArrayList<>();
	  fruitList.add("Apple");
	  fruitList.add("Orange");
	  fruitList.add("Grape");
    fruitList.add("Banana");
	  fruitList.add("Strawberry");
	  
	  Flux<String> fruitFlux = Flux.fromIterable(fruitList);
	  
    StepVerifier.create(fruitFlux)
        .expectNext("Apple")
        .expectNext("Orange")
        .expectNext("Grape")
        .expectNext("Banana")
        .expectNext("Strawberry")
        .verifyComplete();
	}

Flux를 생성하는 소스로 자바 Stream 객체를 사용해야 한다면 static 메서드인 fromStream()을 호출하면 된다.

	 @Test
	 public void createAFlux_fromStream() {
	   Stream<String> fruitStream = 
	        Stream.of("Apple", "Orange", "Grape", "Banana", "Strawberry");
	    
	   Flux<String> fruitFlux = Flux.fromStream(fruitStream);
	    
	   StepVerifier.create(fruitFlux)
	       .expectNext("Apple")
	       .expectNext("Orange")
	       .expectNext("Grape")
	       .expectNext("Banana")
	       .expectNext("Strawberry")
	       .verifyComplete();
	 }

Flux 데이터 생성하기

일정 범위의 값을 포함하는 카운터 Flux를 생성하는 방법이다
1부터 5까지 값을 포함하는 카운터 Flux가 생성된다
그리고 다섯 개 항목을 Flux가 발행하는지 검사한다

   @Test
   public void createAFlux_range() {
     Flux<Integer> intervalFlux = 
         Flux.range(1, 5);
     
     StepVerifier.create(intervalFlux)
         .expectNext(1)
         .expectNext(2)
         .expectNext(3)
         .expectNext(4)
         .expectNext(5)
         .verifyComplete();
   }

매초마다 값을 방출하는 Flux를 생성하려면 static 메서드인 interval()을 사용하면 된다.
take() 오퍼레이션을 사용해서 첫 번째 5개의 항목으로 결과를 제한할 수 있다.

	 @Test
	 public void createAFlux_interval() {
	   Flux<Long> intervalFlux = 
	       Flux.interval(Duration.ofSeconds(1))
	           .take(5);
	   
     StepVerifier.create(intervalFlux)
         .expectNext(0L)
         .expectNext(1L)
         .expectNext(2L)
         .expectNext(3L)
         .expectNext(4L)
         .verifyComplete();
	 }

리액티브 타입 결합하기

두 개의 Flux 스트림이 있는데 이것을 하나의 결과 Flux로 생성해야 한다고 해보자.
하나의 Flux를 다른 것과 결합하려면 mergeWith() 오퍼레이션을 사용하면 된다.

예를 들어, TV나 영화의 캐릭터 이름을 값으로 갖는 Flux가 하나 있고, 이 캐릭터들이 즐겨먹는 식품 이름을 값으로 갖는 또 다른 Flux가 있다고 해보자.
delayElements() 오퍼레이션을 사용해서 조금 느리게 방출되도록 하고,
delaySubscription() 오퍼레이션을 적용하여 250밀리초가 지난 후 구독 및 데이터를 방출하도록 했다.

 @Test
  public void mergeFluxes() {

    Flux<String> characterFlux = Flux
        .just("Garfield", "Kojak", "Barbossa")
        .delayElements(Duration.ofMillis(500));
    Flux<String> foodFlux = Flux
        .just("Lasagna", "Lollipops", "Apples")
        .delaySubscription(Duration.ofMillis(250))
        .delayElements(Duration.ofMillis(500));
    
    Flux<String> mergedFlux = characterFlux.mergeWith(foodFlux);

    StepVerifier.create(mergedFlux)
        .expectNext("Garfield")
        .expectNext("Lasagna")
        .expectNext("Kojak")
        .expectNext("Lollipops")
        .expectNext("Barbossa")
        .expectNext("Apples")
        .verifyComplete();
  }

zip() 오퍼레이션은 정적인 생성 오퍼레이션이다. 따라서 여기서 생성되는 Flux는 캐릭터와 이 캐릭터가 좋아하는 식품을 완벽하게 조합한다.
zippedFlux로부터 방출되는 각 항목은 Tuple2(두개의 다른 객체를 전달하는 컨테이너)이며, 각 소스 Flux가 순서대로 방출하는 항목을 포함한다.

  @Test
  public void zipFluxes() {
    Flux<String> characterFlux = Flux
        .just("Garfield", "Kojak", "Barbossa");
    Flux<String> foodFlux = Flux
        .just("Lasagna", "Lollipops", "Apples");
    
    Flux<Tuple2<String, String>> zippedFlux = 
        Flux.zip(characterFlux, foodFlux);
    
    StepVerifier.create(zippedFlux)
          .expectNextMatches(p -> 
              p.getT1().equals("Garfield") && 
              p.getT2().equals("Lasagna"))
          .expectNextMatches(p -> 
              p.getT1().equals("Kojak") && 
              p.getT2().equals("Lollipops"))
          .expectNextMatches(p -> 
              p.getT1().equals("Barbossa") && 
              p.getT2().equals("Apples"))
          .verifyComplete();
  }
  

Tuple2 가 아닌 다른 타입을 사용하고 싶다면 우리가 원하는 객체를 생성하는 함수를 zip()에 제공하면 된다
다음은 String 객체의 Flux를 생성하는 방법을 보여준다.

  @Test
  public void zipFluxesToObject() {
    Flux<String> characterFlux = Flux
        .just("Garfield", "Kojak", "Barbossa");
    Flux<String> foodFlux = Flux
        .just("Lasagna", "Lollipops", "Apples");
    
    Flux<String> zippedFlux = 
        Flux.zip(characterFlux, foodFlux, (c, f) -> c + " eats " + f);
    
    StepVerifier.create(zippedFlux)
          .expectNext("Garfield eats Lasagna")
          .expectNext("Kojak eats Lollipops")
          .expectNext("Barbossa eats Apples")
          .verifyComplete();
  }

먼저 방출하는 리액티브 타입 선택하기

두 개의 Flux 객체가 있는데, 결합하는 대신 먼저 값을 방출하는 소스 Flux의 값을 발행하는 새로운 Flux를 생성하고 싶다고 해보자.
frist() 오퍼레이션은 먼저 값을 방출하는 Flux의 값을 선택해서 이 값을 발행한다.

아래 firstFlux는 느린 slowFlux를 무시하고 빠른 fastFlux 값만 발행한다.

  @Test
  public void firstFlux() {
    Flux<String> slowFlux = Flux.just("tortoise", "snail", "sloth")
          .delaySubscription(Duration.ofMillis(100));
    Flux<String> fastFlux = Flux.just("hare", "cheetah", "squirrel");
    
    Flux<String> firstFlux = Flux.first(slowFlux, fastFlux);
    
    StepVerifier.create(firstFlux)
        .expectNext("hare")
        .expectNext("cheetah")
        .expectNext("squirrel")
        .verifyComplete();
  }

리액티브 스트림의 변환과 필터링

skip() 오퍼레이션은 소스 Flux의 항목에서 지정된 수만큼 건너뛴 후 나머지 항목을 방출하는 새로운 Flux를 생성한다.

  @Test
  public void skipAFew() {
    Flux<String> countFlux = Flux.just(
        "one", "two", "skip a few", "ninety nine", "one hundred")
        .skip(3);
   
    StepVerifier.create(countFlux)
        .expectNext("ninety nine", "one hundred")
        .verifyComplete();
  }

다음은 skip()을 사용해서 4초동안 기다렸다가 값을 방출하는 결과 Flux를 생성한다.

  @Test
  public void skipAFewSeconds() {
    Flux<String> countFlux = Flux.just(
        "one", "two", "skip a few", "ninety nine", "one hundred")
        .delayElements(Duration.ofSeconds(1))
        .skip(Duration.ofSeconds(4));
   
    StepVerifier.create(countFlux)
        .expectNext("ninety nine", "one hundred")
        .verifyComplete();
  }

skip() 오퍼레이션의 반대기능이 필요할때는 take()를 고려할 수 있다. take()는 지정된 수의 항목만 방출한다.

  @Test
  public void take() {
    Flux<String> nationalParkFlux = Flux.just(
        "Yellowstone", "Yosemite", "Grand Canyon", "Zion", "Acadia")
        .take(3);
   
    StepVerifier.create(nationalParkFlux)
        .expectNext("Yellowstone", "Yosemite", "Grand Canyon")
        .verifyComplete();
  }

take()를 사용해서 처음 3.5초 동안만 항목을 방출한다.

  @Test
  public void takeForAwhile() {
    Flux<String> nationalParkFlux = Flux.just(
        "Yellowstone", "Yosemite", "Grand Canyon", "Zion", "Grand Teton")
        .delayElements(Duration.ofSeconds(1))
        .take(Duration.ofMillis(3500));
   
    StepVerifier.create(nationalParkFlux)
        .expectNext("Yellowstone", "Yosemite", "Grand Canyon")
        .verifyComplete();
  }

Flux값의 더 범용적인 필터링을 할 때는 filter() 오퍼레이션이 매우 유용하다.
filter() 오퍼레이션에서 우리가 원하는 조건을 기반으로 선택적인 발행을 할 수 있다.

filter()에는 람다로 조건식이 지정되었으며, 공백이 없는 문자열 값만 받도록 한것이다.

  @Test
  public void filter() {
    Flux<String> nationalParkFlux = Flux.just(
        "Yellowstone", "Yosemite", "Grand Canyon", "Zion", "Grand Teton")
        .filter(np -> !np.contains(" "));
   
    StepVerifier.create(nationalParkFlux)
        .expectNext("Yellowstone", "Yosemite", "Zion")
        .verifyComplete();
  }

distinct() 오퍼레이션을 사용하면 발행된 적이 없는 소스(중복되지않는소스) Flux의 항목만을 발행하는 결과 Flux를 생성한다.

  @Test
  public void distinct() {
    Flux<String> animalFlux = Flux.just(
        "dog", "cat", "bird", "dog", "bird", "anteater")
        .distinct();
   
    StepVerifier.create(animalFlux)
        .expectNext("dog", "cat", "bird", "anteater")
        .verifyComplete();
  }

리액티브 데이터 매핑하기

가장 많이 사용하는 오퍼레이션 중 하나는 발행된 항목을 다른 형태나 타입으로 매핑하는 것이다.
map()과 flatMap() 오퍼레이션을 제공한다.

map() 오퍼레이션은 입력메시지의 변환을 수행하여 결과 스트림의 새로운 메시지로 발행한다.

  @Test
  public void map() {
    Flux<Player> playerFlux = Flux
      .just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
      .map(n -> {
        String[] split = n.split("\\s");
        return new Player(split[0], split[1]);
      });
    
    StepVerifier.create(playerFlux)
        .expectNext(new Player("Michael", "Jordan"))
        .expectNext(new Player("Scottie", "Pippen"))
        .expectNext(new Player("Steve", "Kerr"))
        .verifyComplete();
  }

map()은 Flux로부터 발행될 때 동기적으로 매핑이 수행되며, 비동기적으로 매핑을 수행하고 싶다면 flatMap() 오퍼레이션을 사용해야 한다.

flatMap()에서는 각 객체를 새로운 Mono나 Flux로 매핑하며, 해당 Mono나 Flux들의 결과는 하나의 새로운 Flux가 된다.
flatMap()을 subscribeOn()(병렬스레드로 수행)과 함께 사용하면 리액터 타입의 변환을 비동기적으로 수행할 수 있다.
subscribeOn()인자로 Schedulers는 동시성 모델을 지원하며 parallel()는 풀에서 가져온 작업 스레드에서 구독을 실행하며 CPU코어의 개수가 크기가된다. 이 밖에도 immediate(), single(), newSingle(), elastic() 이 있다.

  @Test
  public void flatMap() {
    Flux<Player> playerFlux = Flux
      .just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
      .flatMap(n -> Mono.just(n)
          .map(p -> {
              String[] split = p.split("\\s");
              return new Player(split[0], split[1]);
            })
          .subscribeOn(Schedulers.parallel())
        );
    
    List<Player> playerList = Arrays.asList(
        new Player("Michael", "Jordan"), 
        new Player("Scottie", "Pippen"), 
        new Player("Steve", "Kerr"));

    StepVerifier.create(playerFlux)
        .expectNextMatches(p -> playerList.contains(p))
        .expectNextMatches(p -> playerList.contains(p))
        .expectNextMatches(p -> playerList.contains(p))
        .verifyComplete();
  }
  
  @Data
  private static class Player {
    private final String firstName;
    private final String lastName;
  }

리액티브 스트림의 데이터 버퍼링하기

Flux를 통해 전달되는 데이터를 처리하는 동안 데이터 스트림을 작은 덩어리로 분할하면 도움이 될 수 있다.
buffer() 오퍼레이션을 사용할 수 있다.

다음과 같이 5개의 String 값을 방출하는 원래의 Flux는 두 개의 컬랙션을 방출하는 Flux로 변환된다.
(buffer(3)이므로 세 개의 과일을 포함하는 것과, 두 개의 과일을 포함하는것으로 나뉜다)

  @Test
  public void buffer() {
    Flux<String> fruitFlux = Flux.just(
        "apple", "orange", "banana", "kiwi", "strawberry");
    
    Flux<List<String>> bufferedFlux = fruitFlux.buffer(3);
    
    StepVerifier
        .create(bufferedFlux)
        .expectNext(Arrays.asList("apple", "orange", "banana"))
        .expectNext(Arrays.asList("kiwi", "strawberry"))
        .verifyComplete();
  }

buffer()를 flatMap()과 같이 사용하면 각 List 컬렉션을 병행으로 처리할 수 있다.

다음과 같이 flatMap()에는 각 List 버퍼를 가져와서 해당 List의 요소로부터 새로운 Flux를 생성하고 map() 오퍼레이션을 적용한다.
따라서 버퍼링된 각 List는 별도의 스레드에서 병행으로 계속 처리될 수 있다.

log() 오퍼레이션은 모든 리액티브 스트림 이벤트를 로깅하므로 실제 어떻게되는지 파악할수 있다.

  @Test
  public void bufferAndFlatMap() throws Exception {
    Flux.just(
        "apple", "orange", "banana", "kiwi", "strawberry")
        .buffer(3)
        .flatMap(x -> 
          Flux.fromIterable(x)
            .map(y -> y.toUpperCase())
            .subscribeOn(Schedulers.parallel())   
            .log()
        ).subscribe();
  }
  

log에는 첫번째 버퍼의 과일(apple, orange, banana) parallel-1 스레드에서 처리되고
두번째 버퍼의 과일(kiwi, strawberry)들은 parallel-2 스레드가 처리할 것이다. 두 개의 버퍼가 서로 다른 스레드에서 병행 처리된다

Flux가 발행한 모든 항목을 포함하는 List를 방출하려면 collectList() 오퍼레이션을 사용하면 된다.

  @Test
  public void collectList() {
    Flux<String> fruitFlux = Flux.just(
        "apple", "orange", "banana", "kiwi", "strawberry");
    
    Mono<List<String>> fruitListMono = fruitFlux.collectList();
    
    StepVerifier
        .create(fruitListMono)
        .expectNext(Arrays.asList(
            "apple", "orange", "banana", "kiwi", "strawberry"))
        .verifyComplete();
  }

collectMap() 오퍼레이션은 Map을 포함하는 Mono를 생성한다. 이때 해당 Map에는 지정된 함수로 산출된 키를 갖는 항목이 저장된다.

  @Test
  public void collectMap() {
    Flux<String> animalFlux = Flux.just(
        "aardvark", "elephant", "koala", "eagle", "kangaroo");
    
    Mono<Map<Character, String>> animalMapMono = 
        animalFlux.collectMap(a -> a.charAt(0));
    
    StepVerifier
        .create(animalMapMono)
        .expectNextMatches(map -> {
          return
              map.size() == 3 &&
              map.get('a').equals("aardvark") &&
              map.get('e').equals("eagle") &&
              map.get('k').equals("kangaroo");
        })
        .verifyComplete();
  }

리액티브 타입에 로직 오퍼레이션 수행하기

Mono나 Flux가 발행한 항목이 어떤 조건과 일치하는지 알아야할 경우 all() 이나 any() 오퍼레이션이 그런 로직을 수행한다.

다음과 같이 문자 a가 이름에 모두 포함되어 있으므로 true가 방출된다. 그러나 문자 k는 모든 동물이름에 포함이 안되서 false이다.

  @Test
  public void all() {
    Flux<String> animalFlux = Flux.just(
        "aardvark", "elephant", "koala", "eagle", "kangaroo");
    
    Mono<Boolean> hasAMono = animalFlux.all(a -> a.contains("a"));
    StepVerifier.create(hasAMono)
      .expectNext(true)
      .verifyComplete();
    
    Mono<Boolean> hasKMono = animalFlux.all(a -> a.contains("k"));
    StepVerifier.create(hasKMono)
      .expectNext(false)
      .verifyComplete();
  }

첫번째 문자 t가 최소한 하나라도 포함되기때문에 true이지만 두번째 문자 z는 포함 동물이 없으르며 false 이다

  @Test
  public void any() {
    Flux<String> animalFlux = Flux.just(
        "aardvark", "elephant", "koala", "eagle", "kangaroo");
    
    Mono<Boolean> hasAMono = animalFlux.any(a -> a.contains("a"));
    
    StepVerifier.create(hasAMono)
      .expectNext(true)
      .verifyComplete();
    
    Mono<Boolean> hasZMono = animalFlux.any(a -> a.contains("z"));
    StepVerifier.create(hasZMono)
      .expectNext(false)
      .verifyComplete();
  }
profile
노옵스를향해

0개의 댓글