[TIL]Combine Buffer,flatMap

rbw·2023년 11월 9일
0

TIL

목록 보기
95/99

Combine 연산자인 Buffer, FlatMap에 대해서 조큼 알아보겠슴니다.

Buffer

https://www.apeth.com/UnderstandingCombine/operators/operatorsTimers/operatorsbuffer.html
요기 설명 매우매우앰우ㅜㅐㅁ우우매우 잘 되어 이씀

먼저 밑에는 쥐피티님의 설명임니다

Combine 프레임워크의 buffer(size:prefetch:whenFull:) 함수는 Publisher에서 오는 이벤트를 버퍼링하여 관리하는 오퍼레이터입니다. 이 함수는 세 가지 중요한 매개변수를 가지고 있습니다

  • size: 버퍼 크기

버퍼에 저장할 이벤트의 최대 갯수를 나타냅니다. 예를 들어, size: 3으로 설정하면 최대 3개의 이벤트를 버퍼에 저장할 수 있습니다.

  • prefetch: 프리페치 전략

버퍼에 어떤 시점에서 이벤트를 저장할 것인지를 제어합니다.

  • .immediate: 가능한 한 빨리 버퍼에 저장합니다.

  • .none: 버퍼를 채우지 않습니다. Subscriber가 요청할 때까지 저장하지 않습니다.

  • .byRequest: Subscriber가 요청할 때 버퍼에 저장합니다.

  • whenFull: 버퍼가 가득 찼을 때의 동작

클로저 형태로 제공되며, 버퍼가 가득 찼을 때 어떻게 동작할지를 정의합니다. 이 클로저는 버퍼에 저장된 이벤트 배열을 받아서 다음 이벤트로 방출할지를 결정합니다. buffer 오퍼레이터는 주어진 size 및 prefetch 설정에 따라 이벤트를 버퍼에 저장하고, whenFull 클로저를 사용하여 버퍼가 가득 찼을 때 어떻게 처리할지를 정의합니다. 주로 일정 수의 이벤트를 모아서 한 번에 처리하거나 특정 조건이 충족될 때까지 이벤트를 버퍼링하는 데 사용됩니다.

예를 들어, size: 3 및 .byRequest 프리페치 전략을 사용하여 버퍼가 3개의 이벤트를 모았을 때 다음 이벤트로 방출하도록 설정할 수 있습니다. "whenFull" 클로저를 사용하여 버퍼에 저장된 이벤트 배열을 가지고 특정 조건을 판단하고 이벤트를 방출하도록 설정할 수 있습니다.


설명 잘 보았구요 잘 설명해주시네여, 위의 링크에 있는 예제 코드로 좀 살펴보믄

let pub = Timer.publish(every: 0.2, on: .main, in: .common).autoconnect()
    .scan(0) {i,_ in i+1}
    .buffer(size: 4, prefetch: .byRequest, whenFull: .dropNewest)
    .flatMap(maxPublishers:.max(1)) {
        Just($0).delay(for: 2, scheduler: DispatchQueue.main)
    }

출력은 1,2,3,4,5,12,23,34 ... 으로 나옴니다. 1이 먼저 방출되고 2,3,4,5가 버퍼에 담기고 1이 방출된 2초 후에 2가 방출되면서 2가 나간 자리에 값이 들어오는데 이미 시간이 2초(10이 더해지겠져 ?)가 지났기 때문에 들어올 값은 12가 됩니다. 이게 반복되니까 3이 나가고 10초뒤 23 이 들어오고 4가 나가고 34가 들어오고 ~ 가 반복댐니다.

.dropNewest.dropOldest로 변경해도 동작 자체는 크게 다르진 않슴니다. 들어오는 값을 버리지 않고 이미 버퍼에 있는걸 버리는 정도의 차이임니다.


요약해보자면 방출하는 flatMap을 .max(5) 이런식으로 몇개만 방출하고, 이벤트를 계속 받는건 버퍼에 담아둘건데 그 버퍼가 꽉차면 새로운 값을 버릴지 기존에 들어간 값을 버릴지 정하는 프로퍼티도 있고,

다시 방출할 수 있는 조건이 되면 버퍼에 있는 값들로 방출하고 버퍼를 비우고 새로 또 채우는 식임. 보통 딜레이를 사용하는 경우나, 시간이 정해져있지 않고 데이터들이 들어오는 경우에? 사용하면 괜찮을듯함.

버려지는 값들을 최소화하겠다는 의도도 있지만 버리려는 의도도 있는 기분임

맨 처음 정의 설명에서는 역압력(backpressure)을 사용하여 업스트림의 값 생산과 다운스트림의 값에 대한 수요사이의 타이밍을 조정하기 위한 연산자다. 이는 업스트림에서 손실될 수 있는 값을 전달할 수 있을 때까지 축적함으로써 다운스트림의 역압에 대응하는 개념이다. 버퍼 자체는 큐처럼 작동함. FIFO ~

FlatMap

https://www.apeth.com/UnderstandingCombine/operators/operatorsTransformersBlockers/operatorsflatmap.html
요기 좋네요

먼저 또 지핕선생님이 말하시기를 flatMap은 퍼블리셔에서 방출된 각각의 이벤트를 새로운 퍼블리셔로 매핑하고 그 결과로 생성된 여러 퍼블리셔를 하나의 퍼블리셔로 평탄화 하는데 사용된다고 함니다.

내부적으로 각 이벤트에 대해 새 퍼블리셔를 생성하고, 이를 단일한 퍼블리셔로 평탄화함니다. 이를 통해 중첩된 퍼블리셔를 처리하거나 각각의 이벤트에 대해 비동기적인 작업을 수행하고 결과를 하나의 Publisher로 합칠 수 있다고 합니다.


설명 잘 해주시네여 이제 위 링크의 예제 코드 등을 보면서 확인해보겠씀니다

map, compactMap과 유사해보이지만, flatMap이 생성하는 것은 퍼블리셔여야 합니다. 퍼블리셔가 전달되면 퍼블리싱이 시작됩니다. 퍼블리셔 자체가 아니라 퍼블리셔가 생성하는 값이 파이프라인을 따라 내려감니다 따라서 다운스트림에서 해당퍼블리셔가 생성한 값을 볼 수 있습니다

또 동일한 출력 및 실패 유형을 가져야 요 연산의 다운스트림에서 예상되는 유형이 일관성을 유지함니다.

얘로 인해 생성된 퍼블리셔를 중첩 퍼블리셔 또는 내부 퍼블리셔(nexted publisher or inner publisher)라고 함니다.

여기 예제에서는 버튼 퍼블리셔를 예시로 들었슴니다

self.myButton.publisher()
    .flatMap { _ in
        Timer.publish(every: 1, on: .main, in: .common)
            .autoconnect()
    }

버튼을 탭하면 타이머 퍼블리셔가 존재하게 되고, 퍼블리싱이 시작되면서 매초마다 날짜 값이 파이프라인을 통해 방출됩니다.

업스트림에서 .flatMap으로 도착할 때마다 해당 함수는 해당 값을 원하는대로 사용하고(여기서는 무시함) 퍼블리셔를 생성합니다.

요 퍼블리셔는 다운스트림 객체로 전송되는 값을 퍼블리싱하기 시작하는 방식으로 구독하민다.

이전에 생성된 퍼블리셔가 여전히 유지되어 값을 게시하는 동안 퍼블리셔를 생성한다면, 동시에 수행됩니다. 버튼을 두 번 탭하면 두 개의 타이머가 동시에 실행댐니다.

퍼블리셔가 완료가 된다면, -> 값이 이제 없다면 완료 메시지는 삼켜져서 다운스트림에 전달되지 않는다고 함니다. 내부 퍼블리셔가 종료되었다고 전체 파이프라인이 종료되는것은 원치 않기 때문에 합리적임니다


이 친구가 실제로 생성하는 것은 새로운 파이프라인의 시작임니다. 나머지(외부) 파이프라인(flatMap 연산자의 다운스트림에 있는 모든것)은 이 새로운(내부) 파이프라인에 구독됩니다.

eraseToAnyPublisher 사용

Scan<Publishers.Autoconnect<Timer.TimerPublisher>, Int> 요런 타입의 경우 복잡한 유형이기 때문에 명확함을 주기 위해 eraseToAnyPublisher를 사용해서 축소 시키는게 좋슴니다.

self.myButton.publisher()
    .flatMap { _ in // compile error
        let p = Timer.publish(every: 1, on: .main, in: .common)
            .autoconnect()
            .scan(0) {count,date in count + 1}
            .eraseToAnyPublisher()
        return p
    }

위 친구는 추론하기 쉬워진 AnyPublisher<Int, Never>를 생성하겠죠. 또 이 유형으로 반환이 된다면, 조건부로 다른 퍼블리셔도 생성이 가능합니다. 유형만 동일하면 다른 유형의 퍼블리셔가 될 수 있기 때문에 더 쉽게 생성이 가능하겠져

맵 함수는 모든 조건에서 무언가를 생성해야 합니다. 만약 아무것도 하지 않으려면 "아무것도 안 함"을 생성하면 되겠쬬 이럴 땐 Empty를 사용하믄 댐니다.

실패하는 퍼블리셔 경우

.flatMap을 사용할 때 발생하는 또 다른 문제는 실패할 수 있는 퍼블리셔를 생성하는 경우임니다. 실패가 발생된다면, 요 실패가 파이프라인으로 전달되어 생성된 모든 퍼블리셔, 업스트림에 있는 전체파이프라인이 취소되어 중지됩니다.

또 실패 유형은 위와 아래 전부 일치해야합니다. 변경 x. 만약 변경하려면 업스트림에서 flatMap호출 전에 업스트림 개체의 실패 유형을 변경해야만 합니다. 이미 내려온 친구를 변경할순 x

이 때 .replaceError(with:) 을 사용해서 출력 유형을 바꾸믄 댐니다. 반대로 에러 유형이 없지만 실패 할 수 있는 게시자를 생성하려고 할 때는 .setFailureType(to:)를 사용하시믄 댐다

비동기성 직렬화

.flatMap의 경우 이전 비동기 연산이 완료될 때까지 다음 연산을 시작할 수 없도록 하는 것이 핵심입니다. 위의 타이머 예시에서도, 사용자가 버튼을 탭하여 파이프라인을 실행을 시작할 때까지 값을 생성하지않고 실제로 존재하지도 않습니다.

아래 코드는 연락처에 접근이 가능한지 안한지에 대한 예시 코드임니다. 비동기적으로 이루어지며, 아래 처럼 .flatMap을 이어서 깔끔하게 수행이 가능합니다.

self.myButton.publisher()
    .setFailureType(to: Error.self)
    .flatMap { _ in
        checkAccess().publisher // 비동기 작업
    }.flatMap { gotAccess -> AnyPublisher<Bool,Error> in
        if gotAccess {
            return Just(true)
                .setFailureType(to: Error.self)
                .eraseToAnyPublisher()
        } else {
            return requestAccessFuture()
                .eraseToAnyPublisher()
        }
    }

종료없는 실패

이 친구의 또 다른 용도는 전체 파이프라인을 중지하지 않고 파이프라인의 일부를 실패할 수 있게 하는 용도입니다. 아이디어는 맵 함수에 의해 생성된 내부 파이프라인 내에서 실패하고 내부에서 실패를 catch하는 것입니다. 이렇게 한다면 외부는 알지 못하고 파이프라인은 그대로 진행합니다.

urls.map(URL.init(string:)).compactMap{$0}.publisher
    .flatMap { url in
        URLSession.shared.dataTaskPublisher(for: url)
            .replaceError(with: (data: Data(), response: URLResponse()))
    }

내부에 넣어준다면 전체 파이프라인이 중지되는일은 업슴다

역압력(Backpressure) 행사

이제 flatMap의 옵셔널 매개변수인 maxPublishers:에 대한 내용임니다. 이를 생략하면 기본값으론 .unlimited가 됨니다. 하지만 생략 안한다면, 한 번에 생성하려는 최대 발행 수를 제한할 수 있습니다. 따라서 flatMap을 통해 업스트림 퍼블리셔에게 압력을 가할 수 있습니다 -> 역압력 이 점 때문에 많이 중요하다고 하네요 요 연산자가.

역압력을 가하면서 파이프라인을 효과적으로 조절할 수 있습니다. 요 기능도 잘 활용하면 비동기성을 직렬화 할 수 있슴니다.

위에서 본 URL예시를 보겠슴니다.

let urls = [
    "https://www.apeth.com/pep/manny.jpg",
    "https://www.apethh.com/pep/moe.jpg",
    "https://www.apeth.com/pep/jack.jpg"
]
urls.map(URL.init(string:)).compactMap{$0}.publisher
    .flatMap { url in
        URLSession.shared.dataTaskPublisher(for: url)
        .replaceError(with: (data: Data(), response: URLResponse()))
    }

URL은 순차적으로 도착할 수 있지만 데이터 작업은 모두 동시에 수행되며, 모두 즉시 시작 되고 각 네트워크 요청이 완료되는데 걸리는 시간에 따라 결과가 임의의 순서로 도착할 수 있습니다.

이럴때 maxPublisher:를 사용하면 순차적으로 받는것도 가능하겠져 !

urls.map(URL.init(string:)).compactMap{$0}.publisher
    .flatMap(maxPublishers:.max(1)) { url in
    ...
    } 

이제 이전 네트워크 요청이 완료될 때까지 다음 넽웤 요청이 시작되지 않습니다. 하지만 이 방식으로 역압력을 가하는건 좋지만, 잘못사용한다면 값이 손실될수도 있습니다. 이를 방지하기 위해서는 위에서 살펴본 buffer를 사용하면 좋겠죵

profile
hi there 👋

0개의 댓글