[RxJava] 결합 연산자

Jay·2021년 3월 23일
0

RxJava

목록 보기
7/7
post-thumbnail

결합 연산자

  • 다수의 Observable을 하나로 합치는 방법을 제공
  • flatMap(), groupBy() 함수 등은 1개의 Observable을 확장해주는 반면 결합 연산자들은 여러 개의 Observable을 내가 원하는 Observable로 결합해준다.

zip()

  • 각각의 Observable을 모두 활용해 2개 혹은 그 이상의 Observable을 결합한다.
  • ex) A,B 두 개의 Observable을 결합한다면 2개의 Observable에서 모두 데이터를 발행해야 결합할 수 있다.
    그 전까지는 발행을 기다린다.

  • 위 다이어그램과 같이 2개의 Observable이 짝을 이룰 때까지 발행이 늦어지고 각각 짝을 이루었을 때 둘의 데이터를 결합해서 하나의 Observable로 방출한다.
  • 최대 9개의 Observable을 결합할 수 있다.
Observable<Integer> source = Observable.zip(
	Observable.just(100,200,300),
    Observable.just(10,20,30),
    Observable.just(1,2,3),
    (a,b,c) -> a+b+c);
    
source.subscribe(Log::i);

combineLatest()

  • 2개 이상의 Observable을 기반으로 Observable 각각의 값이 변경 되었을 때 갱신해주는 함수.
  • ex) 첫 번째 Observable과 두 번째 Observable을 결합하는 기능을 만든다고 할 때 첫 번째 Observable의 값 혹은 두 번째 Observable의 값이 변경되었을 때 그 값을 자동으로 갱신해준다.

첫 번째 Observable에서만 데이터를 발행하거나 두 번째 Observable의 데이터 흐름만 있으면 구독자에게 어떤 데이터도 발행되지 않는다.

하지만, 두 Observable 모두 값을 발행하면 그때는 결과 값이 나오고 그 이후부터 둘 중 어떤 것이 갱신되던지 최신 결과값만 보여준다.
이것이 zip()하고의 차이점
이다.

String[] data = {PUPPLE, ORANGE, SKY, YELLOW};
String[] data2 = {DIAMOND, STAR, PENTAGON};

Observable<String> source = Observable.combineLatest(
Observable.fromArray(data)
	.zipWith( //zipWith()로 깔끔하게 코드 정리
		Observable.interval(100L, TimeUnit.MILLISECONDS), 
        		(shape, notUsed) -> Shape.getColor(shape)),	  

Observable.fromArray(data2)
	.zipWith(
		Observable.interval(150L, 200L, TimeUnit.MILLISECONDS),	  
			(shape, notUsed) -> Shape.getSuffix(shape)),
				(v1, v2) -> v1 + v2);
		
source.subscribe(Log::i);
CommonUtils.sleep(1000);
CommonUtils.exampleComplete();

2개의 Observable 데이터 개수가 다르다.
첫 번째는 Observable에 색상을 4개 얻어오고, 두 번째 Observable은 모양 3개를 얻어온다.
첫 번째 Observable이 100ms간격으로 발행하고 두 번째 Observable이 150ms를 쉬고 200ms 간격으로 값을 발행한다.
zip()과 다르게 두 개의 값이 한 개씩 발행 된 이후 부터는 어느 1개의 값만 변경되어도 결과가 발행된다.


merge()

  • zip()함수나 combineLatest()함수와 비교하면 가장 단순한 결합 함수.
  • 입력 Observable의 순서와 모든 Observable이 데이터를 발행하는지 등에 관여하지 않고 어느 것이든 업스트림에서 먼저 입력되는 데이터를 그대로 발행한다.

    아주 직관적으로 마블 다이어그램에 나타나 있다.
  • merge()는 단순하게 2개 이상의 Observable이 데이터가 오는 대로 하나의 Observable을 통해 데이터를 방출해준다.
  • 그리고 이것이 concat()과의 차이점이다. (concat()은 Observable들의 순서를 보장해준다.)
  • concat()과 merge()를 비교하는 글

concat()

  • 2개 이상의 Observable과 이어 붙여주는 함수
  • 첫 번째 Observable에 onComplete 이벤트가 발생해야 두 번째 Observable을 구독한다.
  • 첫 번째 Observable에 onComplete 이벤트가 발생하지 않으면 두 번째 Observable은 영원히 대기하기에 잠재적인 메모리 누수 위험이 있다.

Reference

profile
developer

0개의 댓글