Rx

정삼빈·2021년 8월 19일
0

Android

목록 보기
4/4

RxJava

Reactive Extensions for the JVM
넷플릭스가 만든 Reactive Programming 개념의 자바 구현체

Reactive Programing

Reactive Programming is programming with asynchronous data streams.
데이터 흐름과 변화의 전달에 관한 프로그래밍 패러다임
데이터 스트림을 처리하여 코드의 변경 내용을 전파하는 비동기 프로그래밍 패러다임
환경이 변하면 이벤트를 받아 동작

pull 방식 → push 방식
pull 방식 : 변수를 사용하는 쪽으로 상태를 보내주는 방식
push 방식 : 변수를 사용하는 쪽에서 변수에게 상태를 요청하는 방식
데이터 흐름을 정의 후 데이터가 변경되었을 때 연관된 함수나 수식이 업데이트 되는 방식
⇒ 일종의 observer 패턴

넷플릭스가 RxJava를 만든 이유

  • Embrace Concurrency
    ⇒ 다수의 비동기 실행 흐름 생성 후 결과를 취합하여 최종리턴하는 방식
  • Java Futures are Expensive to Compose
    → 자바 Future(미래 시점 수행 결과값을 구하기 위해 사용)을 조합하기 어려움
    ⇒ 비동기 흐름을 조합할 수 있는 리액티브 연산자 제공
  • Callbacks Have Their Own Problems
    → 콜백이 콜백을 불러 코드의 가독성을 떨어트리고 디버깅을 어렵게 만드는 상황 해결
    ⇒ RxJava에선 callback 사용 X 스케줄러 활용? observable? 몰겟당

Rx (ReactiveX, Reactive Extenstions)

observable 시퀀스를 사용하여 비동기와 이벤트 기반 프로그램을 구성하기 위한 라이브러리

Observer 패턴 확장

Gang of four의 Observer 패턴에 누락된 두가지 의미 추가

  • 생산자가 더 이상 사용가능한 데이터가 없음을 소비자에게 알리는 기능 추가
    → Observable이 관찰자의 onCompleted 메서드 호출
  • 생산자가 소비자에게 오류가 발생했음을 알리는 기능 추가
    → Observable이 관찰자의 onError 메서드 호출

Observable

Rx에서는 Reactive Programming의 비동기 데이터 스트림을 Observable이라는 용어로 표현한다. 이 Observable은 변화를 지속적으로 관찰하고 이에 따른 동작을 수행하는 데 그치지 않고 필터링, select, 변환, 결합 등 여러 함수(Operators)를 이용할 수 있다.

Observer는 Observable을 구독한다.
Observable에 이벤트가 발생하면 Observer는 이벤트를 감지하여 연산 실행 후 결과 리턴한다.
Observable은 아이템을 발행(emit)하거나 옵저버의 함수를 호출함으로써 옵저버에게 알림을 보낸다.

Observable의 종류 5가지

Observable

rx의 기본 단위
발생 이벤트 onNext, onError, onComplete
지속적으로 발생하는 이벤트 뿐 아니라 비동기적인 네트워크 요청, 리스트의 스트림 처리 등 다양한 상황에서 활용 가능하다. 하지만 다양한 스트림을 활용하기에 부족하다는 의견

val observer = object : Observer<Int>{
            lateinit var disposable:Disposable
            override fun onSubscribe(d: Disposable) {
                println("Subscribe")
            }

            override fun onNext(t: Int) {
                println("$t Next")
            }

            override fun onError(e: Throwable) {
                println("Error : $e")
            }

            override fun onComplete() {
                println("Complete")
            }
        }

Observable.range(0, 5).subscribe(observer)

//결과값
0 Next
1 Next
2 Next
3 Next
4 Next
Complete

Flowable

데이터를 생산하는 속도를 처리하는 속도가 따라잡지 못하는 경우 데이터 누락 또는 데이터 부족이 발생
그래서 기존 Observable에는 Backpressure buffer를 두어 넘치는 데이터를 복롼하고 버퍼가 가득 찼을 경우 새로운 데이터를 발행?(publish)하지 않았다.
하지만 이 버퍼가 초보자들에게는 의동하지 않은 동작을 야기할 수 있다 생각되어 Observable에서 이를 없애고 Flowable 추가
배압(Back Pressure) - 방행하는 속도와 구독자가 처리하는 속도의 차이로 발생

Flowable의 BackpressureStrategy 5가지
BUFFER 넘치는 데이터 별도의 버퍼에 저장
DROP 넘치는 데이터 무시
LATEST 넘치는 데이터 버퍼에 저장, 버퍼가 찰 경우 최의 데이터만 유지
ERROR 버퍼 크기 초과시 MissingBackPressureException 에러
NONE 특정 처리 수행 X

여전히 다루기 까다로움

val subscriber = object : DisposableSubscriber<Int>() {
            override fun onNext(t: Int?) {
                println("$t Next")
            }

            override fun onError(t: Throwable?) {
                println("Error : $t")
            }

            override fun onComplete() {
                println("Complete")
            }
        }

Flowable.range(0, 5).subscribe(subscriber)

//결과값
0 Next
1 Next
2 Next
3 Next
4 Next
Complete
//flowable에 BackpressureStrategy 설정 
Flowable.create({ emitter: FlowableEmitter<Int?> ->
            for (i in 0..999) {
                emitter.onNext(i)
            }
            emitter.onComplete()
        }, BackpressureStrategy.BUFFER).subscribe(subscriber)

//  Observable을 Flowable로 변환해주고 BackpressureStrategy 설정
val observable = Observable.range(0, 999)
        observable.toFlowable(BackpressureStrategy.BUFFER)
                .subscribe(System.out::println)

궁금한 것
create말고 다른 걸로 flowable생성하면 설정 못하는지

Flowable vs Observable
Observable
Out Of Memory Error(OOME)가 일어날 가능성이 거의 없는 최대 1000개 이하의 요소 흐름
GUI 이벤트 (빈번하게 일어나지 않고 backpressure의 확률이 낮음)
→ 1000Hz 미만의 빈도수를 다루는 이벤트의 경우 sample(), debounce() 활용 고려
Flowable
10000개 이상의 데이터 처리, 메서드 체인에서 데이터 소스에 데이터 개수 제한 요청
디스크에서 파일을 읽어 들일 경우
JDBC(Java Database Connectivity)를 활용해 데이터베이스의 쿼리 결과를 가져오는 경우
네트워크 IO 실행
⇒ 디스크, 네트워크 IO, DB쿼리 결과 가져오기 == Cold Observable
데이터를 조금씩이 아닌 한번에 가져온다. 그렇지만 배압이슈가 작은 경우 Observable을 사용해도 된다.

blocking 방식????????

일반적으로 Observable은 Flowable보다 오버헤드가 낮다.

Observable은 0-N개의 데이터를 발생시킨다. 하지만 복잡하지 않은 대부분은 비동기 작업들은 1개의 데이터만 발생시킨다. 이를 좀 더 편리하게 다루기 위해 Single과 Completable 등장

Single

무한대의 값을 배출시키는 Observable과 다르게 2개의 데이터만을 발생시킬 수 있다.
작업이 성공했을 시 결과값을 배출시키는 onSuccess와 실패 또는 에러 발생 시 에러를 배출시키는 onError 두가지 메소드를 사용가능하다.
비동기 작업을 요청 후 결과값을 가져오는 작업에 많이 사용 됨
Ex) Http 요청 후 결과 값

Single.just("Hello").subscribe(System.out::println)

val source = Observable.just("Single")

Single.fromObservable(source).subscribe(System.out::println)

source.single("defaultItem").subscribe(System.out::println)

Observable.empty<String>().single("default").subscribe(System.out::println)

Observable.just("R","G","B").take(1).single("Default").subscribe(System.out::println)

single(), singleOrError(), first(), firstOrError(), last(), lastOrError

Completable

별도의 데이터를 발생시키지 않고 작업의 성공/실패 여부만 전파한다.
성공 시 onComplete 실패 시 onError
ignoreElements()

Maybe

데이터가 배출될 수도 있고 아닐 수도 있을 때 사용한다. Single과 Completable이 합쳐진 느낌
성공 시 onSuccess 성공하였지만 값이 없을 때 onComplete 실패 시 onError
보통 Observable의 특정 연산자를 통해 생성할 때가 많음
elementAt(), firstElement(), flatMapMaybe(), lastElement(), reduce(), singleElement() 함수 등

더이상 null을 보낼 수 없기 때문에 completable이나 maybe를 활용해야 한다.

RxJava RxAndroid RxKotlin

RxJava: Java(JVM)를 위한 ReactiveX ExtensionsReactive programming(리액티브 프로그래밍) 패러다임을 자바에서 구현한 프로그래밍 라이브러리

RxKotlin: Kotlin을 위한 ReactiveX ExtensionsRxJava 라이브러리를 기반으로 포팅하여 코틀린을 위한 리액티브 프로그래밍의 특정 부분을 함수형 프로그래밍으로써 구현한 라이브러리
RxJava에 코틀린 래퍼를 입힌 라이브러리이다.

RxAndroid: Android를 위한 ReactiveX ExtensionsRxJava에 최소한의 클래스를 추가하여 안드로이드 앱에서 리액티브 구성요소를 쉽고 간편하게 사용하게 만드는 라이브러리

  • RxAndroid: Android를 위한 ReactiveX ExtensionsRxJava에 최소한의 클래스를 추가하여 안드로이드 앱에서 리액티브 구성요소를 쉽고 간편하게 사용하게 만드는 라이브러리

추가 라이브러리 → https://github.com/Reactivex/Rxandroid/wiki

RxKotlin이나 RxAndroid를 쓸 때 RxJava가 필수사항인가? 권장사항인가?

RxKotlin과 RxAndroid는 RxJava2를 내부적으로 참조하고 있어 반드시 RxJava2의 의존성을 추가 할 필요는 없다. 하지만 최신버전의 RxJava사용을 위해 명시해주는것이 좋다.

profile
studying android ..

0개의 댓글