[RxJava] Observable vs Flowable

Jay·2021년 3월 11일
0

RxJava

목록 보기
5/7
post-thumbnail

배압

  • 데이터를 발행하는 속도와 구독하는 자가 처리하는 속도의 차가 클 때 발생한다.

Observable

  • Reactive Streams 인터페이스를 구현하지 않는다.
  • Observer에서 데이터를 처리한다.
  • 데이터 갯수 제어하는 배압 기능 없다.
  • 배압 기능 없기에 데이터 갯수 제한 할 수 없다.
  • Disposable로 구독을 해지 한다.

사용 시점

  • 최대 1,000개 이하의 데이터 흐름
    • OOME가 날 확률이 적은 경우 사용
  • 마우스 이벤트나 터치 이벤트 다루는 GUI 프로그래밍

Flowable

  • Reactive Streams 인터페이스를 구현한다.
  • Subscribe가 데이터를 처리한다.
  • 데이터 개수를 제한하는 배압 기능이 있다.
  • Subscription으로 전달받은 데이터 개수 제한 가능
  • Subscription으로 구독을 해지한다.
  • Observable의 성능 향상을 위해 존재
  • Observable과 동일하게 동작.

사용 시점

  • 특정 방식으로 생성된 10,000개 이상의 데이터를 처리하는 경우
    • 이때 메소드 체인에서 데이터 소스에 데이터 개수 제한을 요청해야 합니다.
  • 디스크에서 파일을 읽어들이는 경우
    • 본질적으로 blocking I/O 방식 활용
  • DB 쿼리 결과 가져오는 경우
  • 네트워크 I/O를 실행하는 경우
    • 네트워크나 프로토콜을 통해 서버에서 가져오길 원하는 만큼의 데이터양을 요청할 수 있을 때입니다.
  • 다수의 블로킹 방식을 사용하거나 가져오는 방식(pull-based)의 데이터 소스가 미래에는 논 블로킹(non-blocking) 방식의 리액티브 API나 드라이버를 제공할 수도 있는 경우

네트워크 I/O, DB 쿼링, 디스크 파일 읽는 건 Cold Observable
처리가 가능한 만큼 가져오는게 아닌 모두를 한 번에 가져와버리는 Cold Observable.

업스트림에서 발행하는 데이터의 속도와 다운스트림에서 처리하는 속도의 차이가 작다면 Observable을 활용해도 됩니다.

즉, 데이터 발행과 처리속도가 차이 나더라도, sample(), throttle(), debounce() 같은 흐름 제어 함수를 활용하여 해결하는 것이 좋습니다.
이러한 함수로 해결하기 어려운때 Flowable 클래스로 전환하면 됩니다.


backpressure

  • publish, subscribe 하는 상황에서 미친듯이 생산해내는데 비해 처리를 하는 속도가 느리다면 뒤의 데이터는 떠밀리게 된다.
  • backpressure buffer가 생기게 된다.

Flowable의 배압 대응

  • onBackPressureBuffer() : 배압 이슈가 발생했을때 별도의 버퍼에 저장합니다 .

    • Flowable 클래스는 기본적으로 128개의 버퍼가 있습니다.
    • 버퍼에 만들고 쌓아두다가 처리한다.
  • onBackPressureDrop() : 배압 이슈가 발생했을 때 해당 데이터를 무시합니다.
    - 배압 이슈 이후의 데이터는 다 무시한다.

  • onBackPressureLatest() : 처리할 수 없어서 쌓이는 데이터를 무시하면서 최신 데이터만 유지합니다.

버퍼 전략

DROP_LATEST

  • 버퍼가 가득 찬 시점에 버퍼 내에서 가장 최근에 버퍼로 들어온 데이터를 DROP 한다.
  • DROP된 빈 자리에 버퍼 밖에서 대기하던 데이터를 채운다.
/**
 * - DROP_LATEST 전략  : 생산자쪽에서 데이터 통지 시점에 버퍼가 가득 차있으면 버퍼내에 있는 데이터 중에서
 * 가장 최근에 버퍼안에 들어온 데이터를 삭제하고 버퍼 밖에서 대기하는 데이터를 그 자리에 채운다.
 */
public class BackpressureBufferExample01 {
    public static void main(String[] args) {
        System.out.println("# start : " + TimeUtil.getCurrentTimeFormatted());
        Flowable.interval(300L, TimeUnit.MILLISECONDS)
                .doOnNext(data -> Logger.log("#inverval doOnNext()", data))
                .onBackpressureBuffer(
                        2,
                        () -> Logger.log("overflow!"),
                        BackpressureOverflowStrategy.DROP_LATEST)
                .doOnNext(data -> Logger.log("#onBackpressureBuffer doOnNext()", data))
                .observeOn(Schedulers.computation(), false, 1)
                .subscribe(
                        data -> {
                            TimeUtil.sleep(1000L);
                            Logger.log(LogType.ON_NEXT, data);
                        },
                        error -> Logger.log(LogType.ON_ERROR, error)
                );

        TimeUtil.sleep(2800L);
    }
}

DROP_OLDEST

  • 버퍼가 가득 찬 시점에 버퍼내에서 가장 오래전에(먼저) 들어온 데이터를 DROP한다.
  • DROP 된 빈 자리에는 버퍼 밖에서 대기하던 데이터를 채운다.
/**
 * - DROP_OLDEST 전략 : 생산자쪽에서 데이터 통지 시점에 버퍼가 가득 차있으면 버퍼내에 있는 데이터 중에서 가장 먼저(OLDEST) 버퍼
 * 안에 들어온 데이터를 삭제하고 버퍼 밖에서 대기하는 데이터를 채운다.
 */
public class BackpressureBufferExample02 {
    public static void main(String[] args){
        System.out.println("# start : " + TimeUtil.getCurrentTimeFormatted());
        Flowable.interval(300L, TimeUnit.MILLISECONDS)
                .doOnNext(data -> Logger.log("#inverval doOnNext()", data))
                .onBackpressureBuffer(
                        2,
                        () -> Logger.log("overflow!"),
                        BackpressureOverflowStrategy.DROP_OLDEST)
                .doOnNext(data -> Logger.log("#onBackpressureBuffer doOnNext()", data))
                .observeOn(Schedulers.computation(), false, 1)
                .subscribe(
                        data -> {
                            TimeUtil.sleep(1000L);
                            Logger.log(LogType.ON_NEXT, data);
                        },
                        error -> Logger.log(LogType.ON_ERROR, error)
                );

        TimeUtil.sleep(2500L);
    }
}

DROP 전략

  • 버퍼에 데이터가 모두 채워진 상태가 되면 이후에 생성되는 데이터를 버리고(DROP), 버퍼가 비워지는 시점에 DROP되지 않은 데이터부터 다시 버퍼에 담는다.
public class BackpressureDropExample {
    public static void main(String[] args){
        Flowable.interval(300L, TimeUnit.MILLISECONDS)
                .doOnNext(data -> Logger.log("#inverval doOnNext()", data))
                .onBackpressureDrop(dropData -> Logger.log(LogType.PRINT, dropData + " Drop!"))
                .observeOn(Schedulers.computation(), false, 1)
                .subscribe(
                        data -> {
                            TimeUtil.sleep(1000L);
                            Logger.log(LogType.ON_NEXT, data);
                        },
                        error -> Logger.log(LogType.ON_ERROR, error)
                );

        TimeUtil.sleep(5500L);
    }
}
profile
developer

0개의 댓글