kotlin reactive programing

짱구·2023년 4월 10일
0

kotlin

목록 보기
8/8

비동기 프로그래밍

동기 프로그래밍

동기(Synchronous)방식의 프로그램에서 작업의 실행 흐름은 순차적으로 동작합니다.
순차적으로 동작하는 프로그램은 코드를 파악하기 쉽고 결과를 예측하기 쉬우므로 디버깅이 쉽습니다.
특정 작업을 실행하는 동안에는 다른 작업을 할 수 없다는 단점이 존재합니다.

비동기 프로그래밍

비동기(Asynchronous) 방식의 프로그램에서 작업의 실행 흐름은 기본적으로 순차적이지 않습니다.
이러한 특징으로 인해 비동기 처리 방식은 현재 실행 중인 작업이 끝나는 것을 기다리지 않고 다른 작업을 할 수 있습니다.
서버, 클라이언트 등 모든 환경에서 유용하게 사용됩니다.
UI 애플리케이션의 경우 특정 이벤트가 발생할 경우에 반응하는 동작을 구현해야 하는데 이럴 때 필수적으로 비동기 프로그래밍을 사용하게됩니다.

// 버튼을 누를때마다 카운터가 증가하는 예제
const button = document.querySelector('button');
// 'click' 옆의 두번째 인자가 비동기 콜백
  button.addEventListener('click', event => {
  button.innerHTML = `클릭 수: ${event.detail}`;
});

대부분의 프로그래밍 언어들은 각 언어의 철학에 맞는 다양한 비동기 처리 방법들을 지원합니다.
대표적으로 Callback, Promise, Future, Async-await, Coroutine 등이 있고 각각의 방법들은 장점과 단점이 존재합니다.

비동기 프로그래밍 구현

Thread

가장 기본이 되는 비동기 처리 방식입니다.
스레드는 Runnable 인터페이스를 사용해 비동기 동작을 수행합니다.
스레드가 1개인 경우 싱글 스레드(Single Thread)라고 부르고 하나 이상 존재하는 경우 멀티 스레드(Multi Thread)라고 부릅니다.
멀티 스레드를 사용하면 애플리케이션에서 여러 개의 작업을 동시에 할 수 있습니다.

fun main() {
  for (i in 0..5) {
  val thread = Thread{
  	println("current-thread-name : ${Thread.currentThread().name}")
  }
  	thread.start()
  }
  println("current-thread-name : ${Thread.currentThread().name}")
}
// 호출할때마다 출력 결과가 달라짐

멀티 스레드를 사용하면 스케쥴링 알고리즘에 의해 스레드가 전환되면서 작업을 처리하는데 이를 컨텍스트 스위칭이라고합니다.
하나의 프로세스(Process)에는 최소한 하나 이상의 스레드가 존재하고 프로세스 내의 스레드들은 동일한 메모리를 공유합니다.
스레드는 프로세스를 생성하는 것 보다 가볍습니다.
하지만 스레드가 무한정 많아지면 메모리 사용량이 높아져서 OOME(OutOfMemoryError)가 발생할 수 있고 높은 동시 처리량을 요구하는 시스템에서는 스레드를 생성하면서 발생하는 대기 시간 때문에 응답 지연이 발생합니다.
이런 문제를 해결하기 위해선 스레드 풀(Thread Pool)을 사용해야한다. 스레드 풀을 사용하면
애플리케이션 내에서 사용할 총 스레드 수를 제한할 수 있고 기존에 생성된 스레드를 재사용하므로 빠른 응답이 가능합니다.
직접 만드는 것보다 검증된 라이브러리를 사용하는게 권장되며 java.util.concurrent 패키지의
ExecutorService를 사용하면 쉽고 안전하게 스레드 풀을 사용할 수 있습니다.

    fun main() {
        val pool: ExecutorService = Executors.newFixedThreadPool(5)
        try {
            for (i in 0..5) {
                pool.execute {
                    println("current-thread-name : ${Thread.currentThread().name}")
                }
            }
        } finally {
            pool.shutdown()
        }
        println("current-thread-name : ${Thread.currentThread().name}")
    }

출력 결과를 보면 스레드 이름이 pool-1-thread-{스레드번호} 형태이므로 스레드 풀에서 관리하는 스레드라는 것을 알 수 있습니다.
출력 결과를 보면 동일한 스레드 이름으로 여러번 수행된 것을 볼 수 있다. 스레드 풀에 있는 스레드를 재사용했기 때문입니다.

Future

퓨처(Future)는 비동기 작업에 대한 결과를 얻고 싶은 경우에 사용됩니다.
예를 들어 수행 시간이 오래 걸리는 작업이나 작업에 대한 결과를 기다리면서 다른 작업을 병행해서 수행하고 싶은 경우에 유용합니다.
스레드는 Runnable을 사용해 비동기 처리를 하지만 퓨처를 사용해 처리 결과를 얻기 위해서는 Callable을 사용합니다.

    fun sum(a: Int, b: Int) = a + b
        
        fun main() {
            val pool = Executors.newSingleThreadExecutor()
            val future = pool.submit(Callable {
            sum(100, 200)
        })
            println("계산 시작")
            val futureResult = future.get() // 비동기 작업의 결과를 기다린다.
            println(futureResult)
            println("계산 종료")
        }

퓨처를 사용하면 비동기 작업을 쉽게 구현할 수 있지만 몇 가지 단점을 가지고 있습니다.
먼저 get 함수는 비동기 작업의 처리가 완료될 때까지 다음 코드로 넘어가지 않고 무한정 대기하거나 지정해둔 타임아웃 시간까지 블로킹됩니다.
또한 퓨처를 사용하면 동시에 실행되는 한 개 이상의 비동기 작업에 대한 결과를 하나로 조합하여 처리하거나 수동으로 완료 처리(completion) 할 수 있는 방법을 지원하지 않습니다.

Completable Future

JDK8 부터 퓨처의 단점을 극복하기 위해 컴플리터블 퓨처Completable Future를 제공합니다.

팩토리 함수인 supplyAsync 를 사용해 비동기 작업을 수행하는 예제

    fun main() {
        val completableFuture = CompletableFuture.supplyAsync {
            Thread.sleep(2000)
            sum(100, 200)
        }
        println("계산 시작")
        completableFuture.thenApplyAsync(::println) // 논블로킹으로 동작
        while (!completableFuture.isDone) {
            Thread.sleep(500)
            println("계산 결과를 집계 중입니다.")
        }
        println("계산 종료")
    }

thenApplyAsync 함수를 사용해 논블로킹으로 동작하고 뒤에 Async가 붙은 함수들은 supplyAsync와 별도의 스레드 풀을 지정할 수 있습니다.
isDone 은 말그대로 컴플리터블 퓨처가 수행 중인 비동기 작업이 완료된 상태인지를 체크합니다.
취소상태를 나타내는 isCancelled 그리고 비동기 작업 도중에 에러가 발생한 상태를 나타내는 isCompletedExceptionally도 제공합니다.
CompletableFuture를 쓰더라도 get 함수를 그대로 사용한다면 블로킹 코드가 됩니다.

    fun main() {
        val completableFuture = CompletableFuture.supplyAsync {
            Thread.sleep(2000)
            sum(100, 200)
        }
        println("계산 시작")
//completableFuture.thenApplyAsync(::println) // 논블로킹으로 동작
        val result = completableFuture.get()
        println(result)
        while (!completableFuture.isDone) {
            Thread.sleep(500)
            println("계산 결과를 집계 중입니다.")
        }
        println("계산 종료")
    }
    // 계산 결과를 집계 중입니다.
    // 계산 결과를 집계 중입니다.
    // 계산 결과를 집계 중입니다.
    // 계산 결과를 집계 중입니다.
    // 300
    // 계산 종료

컴플리터블 퓨처는 기존의 비동기 처리 방법에 비해 효율적이며 편리합니다.
컴플리터블 퓨처가 만능 해결사는 아니지만 대다수의 비동기 처리 시나리오에서 유용하게 사용할 수 있습니다.
예를 들면 우리가 개발한 서버에서 외부의 여러 API 서버를 호출하여 응답을 받아서 결과를 결합하고 처리해야 하는 시나리오라면 컴플리터블 퓨처는 매우 유용할 수 있습니다.

옵저버 패턴

옵저버 패턴(Observer Pattern) 이란 GoF가 소개한 디자인 패턴 중 하나로 관찰 대상이 되는
객체가 변경되면 대상 객체를 관찰하고 있는 옵저버(Observer)에게 변경사항을 통지(notify)하는 디자인 패턴을 말합니다.
옵저버 패턴을 사용하면 객체 간의 상호작용을 쉽게 하고 효과적으로 데이터를 전달할 수 있습니다.

옵저버 패턴의 구조

옵저버 패턴은 관찰 대상인 서브젝트(Subject)와 Subject를 관찰하는 옵저버(Objeserver)로 이뤄져 있습니다.
하나의 서브젝트에는 1개 또는 여러 개의 옵저버를 등록할 수 있습니다.
서브젝트의 상태가 변경되면 자신을 관찰하는 옵저버들에게 변경사항을 통지합니다.
서브젝트로 변경사항을 통지 받은 옵저버는 부가적인 처리를 합니다.

옵저버 패턴은 서브젝트와 옵저버를 상속하는 구체화(Concrete) 클래스가 존재합니다.
구체화 클래스는 서브젝트와 옵저버에 대한 상세 구현을 작성합니다.

서브젝트의 함수

옵저버의 함수

class Coffee(val name: String)

// Subject
class Barista : Observable() {
private lateinit var coffeeName: String
    fun orderCoffee(name: String) {
        this.coffeeName = name
    }
        
    fun makeCoffee() {
        setChanged()
        notifyObservers(Coffee(this.coffeeName))
    }
}
        
// Observer
class Customer(val name: String) : Observer {
    override fun update(o: Observable?, arg: Any?) {
        val coffee = arg as Coffee
        println("${name}${coffee.name}을 받았습니다")
    }
}

fun main() {
    val barista = Barista()
    barista.orderCoffee("아이스 아메리카노")
    val customer = Customer("고객1")barista.addObserver(customer)
    barista.makeCoffee()
}
// 고객1이 아이스 아메리카노을 받았습니다

Customer 클래스는 Observer 인터페이스를 구현하여 Barista 클래스가 커피를 완성하면 통지를 받아서 update 함수에서 처리합니다.
Barista 클래스는 Observable 클래스를 상속하여 고객이 주문한 커피가 만들어지면 notifyObservers로 고객에게 만들어진 Coffee 객체를 전달한다.
이때 setChanged를 먼저 호출하여 변경 여부를 내부에 저장합니다.
Customer 클래스가 Barista 클래스를 관찰하기 위해 addObserver로 등록합니다.

옵저버(고객)를 추가로 등록한 경우

fun main() {
    val barista = Barista()
    barista.orderCoffee("아이스 아메리카노")
    val customer = Customer("고객1")
    val customer2 = Customer("고객2")
    val customer3 = Customer("고객3")
    barista.addObserver(customer)
    barista.addObserver(customer2)
    Chapter 08.리액티브 프로그래밍 기초 - 02.옵저버 패턴 5
    barista.addObserver(customer3)
    barista.makeCoffee()
}
// 고객3이 아이스 아메리카노을 받았습니다
// 고객2이 아이스 아메리카노을 받았습니다
// 고객1이 아이스 아메리카노을 받았습니다

옵저버 패턴의 장점

  • 옵저버 패턴을 사용하지 않았다면 고객은 일정 간격으로 커피가 완성됐는지 바리스타에게 확인하는 처리가 있어야합니다.
  • 고객은 일정 간격으로 커피가 완성됐는지 바리스타에게 확인하는 처리 간격이 너무 짧으면 변경된 상태를 빠르게 확인할 수 있지만 매번 불필요한 호출이 발생하므로 성능상 문제가 발생할 수 있습니다.
  • 간격이 너무 길면 변경된 상태를 즉시 확인할 수 없으므로 실시간성이 떨어질 수 있습니다.
  • 옵저버 패턴은 관찰자인 옵저버가 서브젝트의 변화를 신경 쓰지 않고 상태 변경의 주체인 서브젝트가 변경사항을 옵저버에게 알려줌으로써 앞서 언급한 문제를 해결할 수 있습니다.
  • 옵저버 패턴은 데이터를 제공하는 측에서 데이터를 소비하는 측에 통지하는 푸시(Push Based)하는 방식입니다.
  • 옵저버 패턴에서 서브젝트와 옵저버는 관심사에 따라 역할과 책임이 분리되어 있습니다.
  • 서브젝트는 옵저버가 어떤 작업을 하는지 옵저버의 상태가 어떤지에 대해 관심을 가질 필요가 없고 오직 변경 사항을 통지하는 역할만 수행하며 하나 혹은 다수의 옵저버는 각각 맡은 작업을 스스로 하기 때문에 옵저버가 하는 일이 서브젝트에 영향을 끼치지 않고 옵저버는 단순한 데이터의 소비자로서 존재하게 됩니다.

이터레이터 패턴

이터레이터 패턴(Iterator Pattern)은 데이터의 집합에서 데이터를 순차적으로 꺼내기 위해 만들어진 디자인 패턴을 말합니다.
이터레이터 패턴을 사용하면 컬렉션이 변경되더라도 동일한 인터페이스를 사용해 데이터를 꺼내올 수 있기 때문에 변경사항 없이 사용할 수 있습니다.
데이터의 집합이 얼만큼의 크기를 가진지 알 수 없는 경우 이터레이터 패턴을 사용하면 순차적으로 데이터를 꺼내올 수 있습니다.

애그리게잇(Aggregate) 은 요소들의 집합체를 나타냅니다.
이터레이터는 집합체 내부에 구현된 iterator를 이용해 생성합니다.
이터레이터를 사용하는 클라이언트는 생성된 이터레이터의 hasNext 함수를 사용해 데이터가 존재하는지 검사하고 next 함수를 사용해 데이터를 꺼냅니다.

data class Car(val brand: String)

class CarIterable(val cars: List<Car> = listOf()) : Iterable<Car> {
    override fun iterator() = CarIterator(cars)
}

class CarIterator(val cars: List<Car> = listOf(), var index: Int = 0)
    : Iterator<Car> {
    override fun hasNext() = cars.size > index
    override fun next() = cars[index++]
}

fun main() {
    val carIterable = CarIterable(listOf(Car("람보르기니"), Car("페라리")))
    val iterator = carIterable.iterator()
    while (iterator.hasNext()) {
        println("브랜드 : ${iterator.next()}")
    }
}

CarIterable 클래스는 Iterable 인터페이스를 구현하여 CarsIterator 를 생성하는 iterator 함수를 오버라이드합니다.
CarIterator 클래스는 Iterator 인터페이스를 구현하여 데이터가 존재하는지 확인하는 hasNext를 실행합니다.
데이터가 존재하면 데이터를 가져오는 next 함수를 오버라이드합니다.
while문 내부에선 hasNext를 사용하여 데이터를 모두 가져올때까지 반복하고 데이터를 출력합니다.

옵저버 패턴과 차이점

데이터를 제공한다는 관점에서 이터레이터 패턴과 옵저버 패턴은 유사합니다.
이터레이터 패턴은 에그리게잇이 내부에 데이터를 저장하고 이터레이터를 사용해 데이터를 순차적으로 당겨오는 방식이기 때문에 풀 기반(Pull-Based)입니다.
이에 반해 옵저버 패턴은 데이터 제공자가 소비하는 측에 데이터를 통지하는 푸시 기반이라는 점이 차이점입니다.

리액티브 프로그래밍

리액티브 프로그래밍(Reactive Programming)은 데이터 또는 이벤트의 변경이 발생하면 이에 반응해 처리하는 프로그래밍 기법을 말합니다.
리액티브 프로그래밍은 비동기 프로그래밍을 처리하는 새로운 접근 방식입니다.
리액티브 프로그래밍은 2010년 에릭 마이어에 의해 마이크로소프트 .NET 에코 시스템으로 정의되었으며 데이터의 통지, 완료, 에러에 대한 처리를 옵저버 패턴에 영감을 받아 설계되었고 데이터의 손쉬운 비동기 처리를 위해 함수형 언어의 접근 방식을 사용했습니다.

리액티브 프로그래밍이 나오기 전 비동기 프로그래밍은 대부분 콜백 기반의 비동기 처리 방식을 사용

fetch("/api/users/me") { user->
    fetch("/api/users/${user.id}/followers") { followers ->
        fetch("/api/users/${user.id}/likes") { likes ->
            fetch("/api/users/${user.id}/contacts") { contacts ->
        // 콜백 헬
            }
        }
    }
}

간단한 콜백은 이해하기 쉬울 수 있지만 콜백이 많아져서 발생하는 콜백 헬(Callback Hell)로 인해 코드의 복잡도가 늘어납니다.

리액티브 프로그래밍을 적용한 사례

리액티브 프로그래밍을 사용하면 콜백 헬 문제를 함수형 프로그래밍 관점으로 해결할 수 있습니다.

fetchReactive("/api/users/me")
    .zip { user -> fetchReactive("/api/users/${user.id}/followers") }
    .zip { user -> fetchReactive("/api/users/${user.id}/likes") }
    .zip { user -> fetchReactive("/api/users/${user.id}/contacts") }
    .flatMap { followers, likes, contacts ->
    // 로직 구현
}

콜백 헬 없이 비동기 코드를 쉽게 작성할 수 있기 때문에 서버나 UI 애플리케이션 개발시 리액티브 프로그래밍이 유용하게 사용되고 있습니다.

리액티브 스트림

리액티브 스트림(Reactive Stream)은 리액티브 프로그램의 표준 API 사양을 말합니다.
비동기 데이터 스트림 과 논-블로킹 백프레셔(Back-Pressure)에 대한 사양을 제공합니다.
리액티브 스트림 이전의 비동기식 애플리케이션에서는 CPU의 멀티 코어를 제대로 활용하기 위해 복잡한 병렬 처리 코드가 필요했습니다.
기존의 방식은 처리할 데이터가 무한정 많아져서 시스템의 한계를 넘어서는 경우 애플리케이션은 병목 현상(bottleneck) 이 발생하거나 심각한 경우 애플리케이션이 정지되는 경우도 발생할 수 있습니다.
Netflix, Vmware, Red Hat, Twitter, Lightbend 등과 같은 유명 회사들이 표준화에 참여중입니다.

리액티브 스트림 사양

리액티브 스트림 사양(specification)은 핵심 인터페이스 와 프로토콜 로 구성됩니다.

인터페이스 이름역할
Publisher데이터를 생성하고 구독자에게 통지
Subscriber데이터를 구독하고 통지 받은 데이터를 처리
SubscriptionPublisher와 Subscriber 간의 데이터를 교환하도록 연결하며 전달받을 데이터의 개수를 설정하거나 구독을 해지할 수 있다
ProcessorPublisher와 Subscriber을 모두 상속받은 인터페이스

발행자(Publisher) 는 데이터를 생성하고 구독자(Subscriber) 에게 데이터를 통지하고 구독자는 자신이 처리할 수 있는 만큼의 데이터를 요청하고 처리합니다.
이때 발행자가 제공할 수 있는 데이터의 양은 무한(unbounded) 하고 순차적(sequential) 처리를 보장합니다.
서브스크립션(Subscription) 은 발행자와 구독자를 연결 하는 매개체이며 구독자가 데이터를 요청하거나 구독을 해지하는 등 데이터 조절에 관련된 역할을 담당합니다.
프로세서(Processor) 는 발행자와 구독자의 기능을 모두 포함하는 인터페이스이며 데이터를 가공하는 중간 단계에서 사용합니다.

리액티브 스트림의 데이터 처리 프로토콜

리액티브 스트림은 발행자 구독자간의 데이터 전달에 사용되는 규칙을 규약(Protocal) 로 정의하고 있습니다.
구독자는 4개의 추상 메서드를 프로토콜을 가지고 있습니다.

public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}
메서드 명설명
onSubscribe구독시 최초에 한번만 호출
onNext구독자가 요구하는 데이터의 수 만큼 호출 (최대 java.lang.Long.MAX_VALUE)
onError에러 또는 더이상 처리할 수 없는 경우
onComplete모든 처리가 정상적으로 완료된 경우

리액티브 스트림 데이터 처리 프로토콜의 흐름

  • 각 메서드의 호출을 시그널(Signal) 이라고 부르고 각 시그널은 호출되는 순서가 다릅니다.
  • onSubscribe 는 최초 구독에 대한 초기화를 담당하므로 구독 시 최초 한 번만 호출되기 때문에 onSubscribe 내부에서 초기화 로직을 구현할 수 있습니다.
  • onNext 는 발행자로부터 통지받을 데이터가 있는 경우 구독자가 요청하는 만큼 계속 호출됩니다.
  • 이때 발행자가 통지하는 데이터의 수는 구독자가 요구하는 수와 같거나 적어야 합니다.
  • 이런 사양이 있는 이유는 발행자가 너무 많은 데이터를 통지해서 구독자가 처리할 수 있는 양보다 많아지면 시스템에 문제가 발생할 수 있기 때문에 적절하게 처리량을 조절하기 위함입니다.
  • 발행자 측에서 처리 중 에러가 발생하면 onError 를 구독자에게 통지하고 onError 시그널이 발생하면 더 이상 데이터를 통지하지 않습니다.
  • 구독자는 onError 시그널을 받으면 이에 대한 에러 처리를 할 수 있습니다.
  • onComplete 는 모든 데이터를 통지한 시점에 마지막에 호출되어 데이터 통지가 성공적으로 완료되었음을 통지합니다.
  • onError와 onComplete는 반드시 둘중 하나만 호출되야하며 이후에는 어떠한 시그널도 발생해선 안되는데 그 이유는 만약 onError가 발생하고 onComplete가 발생한다면 에러가 발생한 것인지 정상적으로 완료되었는지 판단할 수 없기 때문입니다.
profile
코드를 거의 아트의 경지로 끌어올려서 내가 코드고 코드가 나인 물아일체의 경지

0개의 댓글