리액티브 프로그래밍, ReactiveX(Rx), 코루틴(Coroutine) 맛보기

임현주·2021년 11월 5일
0
post-thumbnail

시작

혼자 공부하면서 정리하고 싶은 부분을 작성한 글입니다 👀
함께 공부하는 사람에게는 도움이 되었으면 좋겠고,
혹시 제가 잘못 이해한 부분이 있다면 알려주시면 감사하겠습니다 💌


🔹 리액티브 프로그래밍

변화의 전파와 데이터 흐름과 관련된 선언적 프로그래밍 패러다임

  • 변화의 전파와 데이터 흐름 : 데이터가 변경될 때마다 이벤트를 발생시켜 데이터를 계속적으로 전달한다.
  • 선언적 프로그래밍 : 실행할 동작을 구체적으로 명시하는 명령형 프로그래밍과 달리 단순히 목표를 선언한다.

🎈 리액티브의 개념이 적용된 예시

Push 방식 : 데이터의 변화가 발생했을 때 변경이 발생한 곳에서 데이터를 보내주는 방식

  • RTC(Real Time Communication) : 데이터를 직접 푸시 해서 전달하는 방식 ex) 아파치 스톰
  • 소켓 프로그래밍
  • DB Trigger
  • Spring의 ApplicationEvent
  • Angular의 데이터 바인딩
  • 스마트폰의 Push 메시지

Pull 방식 : 변경된 데이터가 있는지 요청을 보내 질의하고 변경된 데이터를 가져오는 방식

  • 클라이언트 요청 & 서버 응답 방식의 애플리케이션
  • Java와 같은 절차형 프로그래밍 언어

🔹 ReactiveX(Rx)

관찰 가능한 스트림(Observable streams)을 사용하는 비동기 프로그래밍 API
Observable pattern + Iterator pattern + Functional programming
(위에 설명한 리액티브 프로그래밍을 할 수 있게 해주는 라이브러리이다.)

순수 함수들, 즉 외부의 데이터를 변경하지 않고 받아온 값들을 내부에서 처리해서 밖으로 반환하는 함수들의 연쇄 작용. 개발자의 착오로 인한 오류 방지, 스레드들의 동시 접근에 의한 오류 또는 교착 문제들로부터 자유로운 프로그래밍 가능.

  • Observable (데이터 소스)
    지속적으로 변경이 가능한 데이터의 집합
    앞으로 변경되는 데이터를 관찰할 수 있다는 의미

  • Operators (리액티브 연산자)
    옵저버블을 처리하는 함수
    옵저버블로부터 전달받은 데이터를 가공하여 최종적인 결과 데이터를 만들어냄

  • Scheduler (스케줄러)
    스레드 관리자

  • Subscrilber (구독자)
    옵저버블이 발행한 데이터를 구독하는 구독자
    구독하지 않으면 옵저버블이 발행한 데이터를 전달받을 수 없음

  • 함수형 프로그래밍
    RxJava에서 제공하는 연산자(Operator) 함수를 사용


RxJava에서는 Observable을 구독하는 Observer가 존재하고, Observable은 순차적으로 발행하는 데이터에 대해 반응한다. Observable은 다음의 3가지 이벤트를 사용하여 동작한다.
  • OnNext() : Observable에서 Observer까지 한 번에 하나씩 순차적으로 데이터를 발행한다.
  • onComplete() : 데이터 발행이 끝났음을 알리는 완료 이벤트를 Observer에 전달하여 OnNext()를 더 호출하지 않음을 나타낸다.
  • onError() : 오류가 발생했음을 Observer에 전달한다. null은 발행하지 못한다.

위 이벤트들은 Emitter라는 인터페이스에 의해 선언된다. 중간에 오퍼레이터 연산자를 통해 필터링도 가능하다.


아래는 검색어를 입력하면 0.8초 후에 자동으로 검색되게 하는 코드이다.
// 구독하고 나서 나오는 반환형이 disposable,
// 모아서 한꺼번에 처리하고 메모리를 날림
// CompositeDisposable -> 한번에 담아서 관리
// 전역변수로 선언
private var myCompositeDisposable = CompositeDisposable()

override fun onCreateOptionsMenu(menu: Menu?): Boolean {
    val editTextChangeObservable = mySearchViewEditText.textChanges()
    // Disposable 인터페이스를 이용하여 옵저버블을 사용함으로서 발생할 수 있는 메모리 누수를 막을 수 있음
    // 옵저버블이 onComplete() 알림을 보냈을 때 자동으로 dispose()를 호출해 옵저버블과 구독자의 관계를 끊게 됨
    val searchEditTextSubscription: Disposable =
        // 옵저버블에 오퍼레이터(연산자)를 추가
        // debounce : 특정 시간이 지난 후에 마지막으로 들어온 이벤트만 받을 수 있는 오퍼레이터
        editTextChangeObservable.debounce(800, TimeUnit.MILLISECONDS)
            .subscribeOn(Schedulers.io()) // IO 스레드에서 돌리겠다
            .subscribeBy( // 구독 행위
                onNext = { // 데이터를 수신
                    if (it.isNotEmpty()) { // 필터로도 가능
                        searchPhotoApiCall(it.toString())
                    }
                },
                onComplete = { 
                       // 완료가 되어 데이터 흐름이 끊김
                },
                onError = { 
                       // 데이터 흐름 끊김
                }
            )
    // 살아있는 옵저버블 compositeDisposable에 추가
    myCompositeDisposable.add(searchEditTextSubscription)
}

override fun onDestroy() {
        // 지워주지 않으면 계속 메모리에 남음
        this.myCompositeDisposable.clear()
        super.onDestroy()
}

🔹 코루틴(Coroutine)

비동기적으로 실행되는 코드를 간소화하기 위해 안드로이드에서 사용할 수 있는 동시 실행 설계 패턴
Thread/AsyncTask/Rx Background 작업을 대신할 수 있는 Asynchronous/Non-Blocking Programming 제공하는 경량 스레드(Light-Weight Threads)

🎈 Co(협동) + routine(루틴)
메인 스레드가 블로킹될 수 있는 부분에 대해서 도움을 준다.
이전에 자신의 실행이 마지막으로 중단되었던 지점 다음의 장소에서 실행을 재개한다.
비동기 처리를 순차적인 코드로 만들 수 있게 해준다.

  • CoroutineScope
    코루틴의 범위, 코루틴 블록을 묶음으로 제어할 수 있는 단위

  • GlobalScope
    CoroutineScope의 한 종류로, 미리 정의된 방식으로 프로그램 전반에 걸쳐 백그라운드에서 동작함

  • CoroutineContext
    코루틴을 어떻게 처리할 것인지에 대한 여러 가지 정보의 집합
    주요 요소로는 Job과 dispatcher가 있음

  • Dispatcher
    Dispatchers.Default : CPU 사용량이 많은 작업에 사용. 주 스레드에서 작업하기 너무 긴 작업들에게 알맞음.
    Dispatchers.IO : 네트워크, 디스크 사용할 때. 파일 읽기쓰기/소켓 읽기쓰기 작업에 최적화.
    Dispatchers.Main : 안드로이드의 경우 UI 스레드를 사용.

🙋🏻‍♀️ 코루틴 사용하기
1. 사용할 Dispatcher를 결정하고
2. Dispatcher를 이용해서 CoroutineScope 만들고
3. CoroutineScope의 launch 또는 async에 수행할 코드 블록을 넘기면 된다.

launch와 async는 CoroutineScope의 확장 함수이며, 넘겨받은 코드 블록으로 코루틴을 만들고 실행해 주는 코루틴 빌더이다.

launch ➡ Job 객체 반환

  • join() : 코루틴 블록이 완료될 때까지 다음 코드 수행을 대기
  • cancel() : 코루틴 블록의 작업 취소

async ➡ Deferred 객체 반환
코루틴 블록의 결과 값을 반환받고 싶을 때 생성

  • await() : 코루틴 블록이 완료될 때까지 다음 코드 수행을 대기, 완료되면 결과 값 반환
    (단, 여러 개의 async 코루틴 블록에 같은 Deferred 객체를 사용할 경우, 최종 결과 값은 첫 번째 async 코루틴 블록의 결과 값이 전달됨)
  • cancel() : 코루틴 블록의 작업 취소

또 다른 일시 중단 함수 suspend
원래대로라면 main 함수가 1000초 안에 끝나버려서 doWorld() 안의 World!를 출력하지 못할 것이다. 하지만 fun 앞에 suspend를 붙여 선언해 주면 왼쪽에 흐름이 끊긴듯한 화살표처럼 잠시 일시 중단하고 suspend 함수의 일처리가 마무리되면 main 함수도 종료된다.


위에 다룬 Rx를 이용한 코드와 마찬가지로 검색어를 입력하고 0.8초 후에 자동으로 검색되게 하는 코드이다. 아직 알아야 할 것투성이라 부족한 개념들은 주석으로 메모했다 (@_@;)

// 에딧텍스트 텍스트변경을 flow로 받기
// Flow : 옵저버블와 비슷
@ExperimentalCoroutinesApi
fun EditText.textChangesToFlow(): Flow<CharSequence?> {
    // flow 콜백 받기 -> 콜백 자체가 suspend 제공
    return callbackFlow<CharSequence?> {
        val listener = object : TextWatcher {
            override fun beforeTextChanged(p0: CharSequence?, p1: Int, p2: Int, p3: Int) = Unit

            override fun afterTextChanged(p0: Editable?) = Unit

            override fun onTextChanged(text: CharSequence?, p1: Int, p2: Int, p3: Int) {
                Log.d(TAG, "onTextChanged() / textChangesToFlow() 에 달려있는 텍스트 와쳐 / text : $text")
                // 값 내보내기
                offer(text)
            }
        }

        // 위에서 설정한 리스너 달아주기
        addTextChangedListener(listener)

        // 콜백이 사라질때 실행됨
        awaitClose {
            Log.d(TAG, "textChangesToFlow() awaitClose 실행")
            removeTextChangedListener(listener)
        }
    }.onStart { // 콜백이 시작될 때 안의 내용을 발동 시켜라
        Log.d(TAG, "textChangesToFlow() / onStart 발동")
        // emit으로 이벤트 전달, Rx에서 onNext와 동일
        emit(text)
    }
}
// 전역으로 선언
// Job : 백그라운드에서 실행되는 작업
private var myCoroutineJob: Job = Job()
// get() : 연산자(operator) 함수로써 주어진 key에 해당하는 컨텍스트 요소를 반환
private val myCoroutineContext: CoroutineContext
   get() = Dispatchers.IO + myCoroutineJob
   
override fun onCreateOptionsMenu(menu: Menu?): Boolean {
    // Rx의 스케줄러와 비슷
    // IO 스레드에서 돌리겠다고 설정
    // launch 코루틴 빌더 : 내부적으로 코루틴을 만들어서 Job 객체를 반환
    // job.join() : 코루틴이 완료될 때까지 기다렸다가 해당 메소드를 종료
    GlobalScope.launch(context = myCoroutineContext) {
        // editText가 변경되었을 때 Flow 형태로 받음 = 옵저버블
        val editTextFlow = mySearchViewEditText.textChangesToFlow()
        editTextFlow
            .debounce(800)
            .filter { // true, false로 적용됨
                it?.length!! > 0
            }
            .onEach { // 흘러들어온 데이터(flow)를 받음
                searchPhotoApiCall(it.toString())
            }
            .launchIn(this)
    }
}

override fun onDestroy() {
        myCoroutineContext.cancel()
        super.onDestroy()
}
profile
🐰 피드백은 언제나 환영합니다

0개의 댓글