Reactive x Observable 생성하기

HEETAE HEO·2022년 6월 30일
0
post-thumbnail

Observable 생성하기

RxJava에서는 연산자(operator)를 통해 기존 데이터를 참조, 변형하여 Observable을 생성할 수 있습니다.

create() 연산자

Observable.create()를 통해 Emitter를 이용하여 직접 어떤 데이터를 순차적으로 발행할 수 있고, 데이터 발행 완료 및 오류 이벤트를 직접 설정해줄 수 있습니다.

Observable<String> source = Observable.create(emitter -> {
    emitter.onNext("Hello");
    emitter.onNext("HeeTae");
    emitter.onComplete();
});
source.subscribe(System.out::println);
Hello
HeeTae

onNext()

  • 데이터가 하나 발행됐음을 알리는 이벤트입니다.
  • 연속된 데이터의 경우, 데이터가 하나씩 차례대로 onNext()로 발행됨

onComplete()

  • 데이터 발행이 모두 완료되었음을 알리는 이벤트
  • onComplete()가 호출된 이후네는 더 이상 onNext() 호출이 안됩니다.

그렇기에 위의 코드에서

Observable<String> source = Observable.create(emitter -> {
    emitter.onNext("Hello");
    emitter.onComplete();
    emitter.onNext("HeeTae");
});
source.subscribe(System.out::println);
Hello

결과는 다음과 같습니다.

onComplete 이후에는 아이템이 더 발행되더라도 구독자는 데이터를 받지 못합니다.

onError()

  • 오류가 발생했음을 알리는 이벤트
  • 마찬가지로 onError()가 호출된 이후에는 더 이상 onNext() 호출이 안됩니다.
Observable<String> source = Observable.create(emitter -> {
    emitter.onNext("Hello");
    emitter.onError(new Throwable());
    emitter.onNext("HeeTae");
});
source.subscribe(System.out::println,
	throwable -> System.out.println("onError"));
Hello
onError

실제로 create() 연산자는 개발자가 직접 Emiiter를 제어하기 때문에 주의해서 사용해야합니다. Observable을 더 이상 사용하지 않을 때에는 등록된 Callback을 모두 해제하지 않으면 메모리 릭이 발생하고 BackPressure를 직접 처리해야합니다.

just() 연산자

just()는 데이터를 있는 그대로 발행하는 Observable을 생성합니다.

  val stream: Observable<String> =
        Observable.just("Hello/", "Velog", "Kotlin")
    
    stream.subscribe(System.out::println)

RxJava는 null을 허용하지 않기 때문에 just()의 데이터에도 null을 담아선 안됩니다.

toObservable()

RxKotlin에서만 지원합니다. (RxJava 기준 fromArray, fromIterable 등)

기존의 Iterable한 것들을 Observable로 바꿔줍니다.

fromArray() 연산자

배열의 아이템을 Observable로 바꿀 때에는 fromArray() 연산자를 이용하여 아이템을 순차적으로 발행합니다.

String[] itemArray = new String[]{"Hello", "Goodbye", "NiceMeetYou"};
Observable source = Observable.fromArray(itemArray);
source.subscribe(System.out::println);
Hello
Goodbye
NiceMeetYou

fromIterable() 연산자

ArrayList, HashSet과 같은 Iterable 자료 구조 클래스는 fromIterable()연산자를 사용해 변환합니다.

ArrayList itemList = new ArrayList<String>();
itemList.add("Hello");
itemList.add("Goodbye");
itemList.add("NiceMeetYou");
Observable source = Observable.fromIterable(itemList);
source.subscribe(System.out::println);
Hello
Goodbye
NiceMeetYou

fromFuture() 메서드

Future 인터페이스는 자바5에 추가된 API로 연산 결과를 얻을 때 사용합니다.

fromFuture 메서드는 Future 인터페이스를 지원하는 모든 객체를 Observable Source로 변환하고 Future.get() 메서드를 통해 호출한 반환합니다. 그럼 이 Future 인터페이스는 비동기적인 작업의 결과를 구할 때 사용하는 것이입니다.

RxJava에서는 Executor를 직접 다루기보다는 스케줄러를 사용하는 것을 권장합니다.

Future<String> future = Executors.newSingleThreadExecutor()
    .submit(() -> {
        Thread.sleep(5000);
        return "This is the future";
    });
Observable source = Observable.fromFuture(future);
source.subscribe(System.out::println); //블로킹되어 기다림
This is the future

fromPublisher() 연산자

Publisher는 잠재적인 아이템 발행을 제공하는 생산자로 Subscriber로 부터 요청을 받아 아이템을 발행합니다. fromPublisher() 연산자는 Publisher를 Observable로 변환해줍니다.

Publisher<String> publisher = Subscriber -> {
    subscriber.onNext("Hello");
    subscriber.onNext("Goodbye");
    subscriber.onNext("NiceMeetYou");
    subscriber.onComplete();
};
Observable<String> source = Observabler.fromPublisher(publisher);
source.subscribe(System.out::println);
Hello
Goodbye
NiceMeetYou

fromCallable() 메서드

Callable 인터페이스는 Runnable 인터페이스처럼 메서드가 하나고 인자가 없다는 점에서 비슷하지만 실행결과를 반환한다는 점이 Runnable과 조금 다릅니다. 또한 Executor 인터페이스의 인자로 활용되기 때문에 다른 스레드에서 실행되는 것을 암시합니다.

fun main() {
    val callable: Callable<String> = Callable<String> { "HeeTae" }
    val source: Observable<String> = Observable.fromCallable(callable)
    source.subscribe(System.out::println)
}
HeeTae

references

http://reactivex.io/documentation/operators
https://velog.io/@haero_kim/RxJava-Observable-생성하기
https://blog.yena.io/studynote/2020/10/23/Android-RxJava(2).html

profile
Android 개발 잘하고 싶어요!!!

0개의 댓글