네트워크 I/O, DB 쿼링, 디스크 파일 읽는 건 Cold Observable
처리가 가능한 만큼 가져오는게 아닌 모두를 한 번에 가져와버리는 Cold Observable.
업스트림에서 발행하는 데이터의 속도와 다운스트림에서 처리하는 속도의 차이가 작다면 Observable을 활용해도 됩니다.
즉, 데이터 발행과 처리속도가 차이 나더라도, sample(), throttle(), debounce() 같은 흐름 제어 함수를 활용하여 해결하는 것이 좋습니다.
이러한 함수로 해결하기 어려운때 Flowable 클래스로 전환하면 됩니다.
onBackPressureBuffer() : 배압 이슈가 발생했을때 별도의 버퍼에 저장합니다 .
onBackPressureDrop() : 배압 이슈가 발생했을 때 해당 데이터를 무시합니다.
- 배압 이슈 이후의 데이터는 다 무시한다.
onBackPressureLatest() : 처리할 수 없어서 쌓이는 데이터를 무시하면서 최신 데이터만 유지합니다.
/**
* - DROP_LATEST 전략 : 생산자쪽에서 데이터 통지 시점에 버퍼가 가득 차있으면 버퍼내에 있는 데이터 중에서
* 가장 최근에 버퍼안에 들어온 데이터를 삭제하고 버퍼 밖에서 대기하는 데이터를 그 자리에 채운다.
*/
public class BackpressureBufferExample01 {
public static void main(String[] args) {
System.out.println("# start : " + TimeUtil.getCurrentTimeFormatted());
Flowable.interval(300L, TimeUnit.MILLISECONDS)
.doOnNext(data -> Logger.log("#inverval doOnNext()", data))
.onBackpressureBuffer(
2,
() -> Logger.log("overflow!"),
BackpressureOverflowStrategy.DROP_LATEST)
.doOnNext(data -> Logger.log("#onBackpressureBuffer doOnNext()", data))
.observeOn(Schedulers.computation(), false, 1)
.subscribe(
data -> {
TimeUtil.sleep(1000L);
Logger.log(LogType.ON_NEXT, data);
},
error -> Logger.log(LogType.ON_ERROR, error)
);
TimeUtil.sleep(2800L);
}
}
/**
* - DROP_OLDEST 전략 : 생산자쪽에서 데이터 통지 시점에 버퍼가 가득 차있으면 버퍼내에 있는 데이터 중에서 가장 먼저(OLDEST) 버퍼
* 안에 들어온 데이터를 삭제하고 버퍼 밖에서 대기하는 데이터를 채운다.
*/
public class BackpressureBufferExample02 {
public static void main(String[] args){
System.out.println("# start : " + TimeUtil.getCurrentTimeFormatted());
Flowable.interval(300L, TimeUnit.MILLISECONDS)
.doOnNext(data -> Logger.log("#inverval doOnNext()", data))
.onBackpressureBuffer(
2,
() -> Logger.log("overflow!"),
BackpressureOverflowStrategy.DROP_OLDEST)
.doOnNext(data -> Logger.log("#onBackpressureBuffer doOnNext()", data))
.observeOn(Schedulers.computation(), false, 1)
.subscribe(
data -> {
TimeUtil.sleep(1000L);
Logger.log(LogType.ON_NEXT, data);
},
error -> Logger.log(LogType.ON_ERROR, error)
);
TimeUtil.sleep(2500L);
}
}
public class BackpressureDropExample {
public static void main(String[] args){
Flowable.interval(300L, TimeUnit.MILLISECONDS)
.doOnNext(data -> Logger.log("#inverval doOnNext()", data))
.onBackpressureDrop(dropData -> Logger.log(LogType.PRINT, dropData + " Drop!"))
.observeOn(Schedulers.computation(), false, 1)
.subscribe(
data -> {
TimeUtil.sleep(1000L);
Logger.log(LogType.ON_NEXT, data);
},
error -> Logger.log(LogType.ON_ERROR, error)
);
TimeUtil.sleep(5500L);
}
}