confluent developer kafka internal architecture 강의 및 카프카 핵심 가이드 2E 서적을 통해 정리한 내용입니다.
멱등적 프로듀서
- m1, m2, m1, m2, m3 -> 중복 발생
- enable.idempotence=true
- producer ID 요청 후, 해당 pid와 레코드 별 seq 넘버를 추가해서 중복 메시지를 없애고 순서 보장
트랜잭션 필요 상황 가정
- '이체' 애플리케이션
- Alice -> Bob 에게 이체하는 상황
- 전송 토픽 이벤트를 가져와서 출력 토픽 2개에 레코드를 쌓는다. (Alice -10$ 인출, Bob +10$ 입금)
- 위의 트랜잭션이 완료되어야 오프셋 커밋이 찍힌다.
트랜잭션 없이 시스템 장애 발생
- Alice -> -10$ 인출 시점에 애플리케이션이 실패한다고 가정
- 애플리케이션 실패로 오프셋 커밋이 되지 않고, consumer가 전송 토픽을 다시 읽게되는 중복과정이 발생
- 결과적으로 Alice -> -10$ 이벤트가 2번 발생함.
트랜잭션 적용 후 시스템 장애
- 고유한 transactional.id 를 부여
- 이 애플리케이션 시작 시점에, 트랜잭션을 조정할 트랜잭션 코디네이터를 찾음.
- __transaction_state_topic 이라는 내부 토픽이 존재함
- 코디네이터가 결정되면, 코디네이터는 트랜잭션 애플리케이션에 대한 고유 produce ID, epoch를 생성 하여 애플리케이션에 전달
- 출력 토픽으로 이벤트를 쓰기전에 트랜잭션 애플리케이션은 코디네이터한테 대상 파티션을 알려줘야함.
- 코디네이터는 이 트랜잭션의 일부가 될 파티션을 내부 토픽에 지속적으로 저장
- 이 정보가 저장되면, 애플리케이션은 출력 토픽에 이벤트를 작성할 수 있음.
- 이 시점에 아까와 같이 이벤트가 실패했다고 가정
- 처음으로 트랜잭션 코디네이터를 통해 producer ID를 요청함.
- 코디네이터는 이전 인스턴스에서 보류중인 트랜잭션이 있음을 알고있음.
- 그리고 동일한 transactional.id를 가진 producer ID를 등록하려 했다는 것을 보고 새로운 인스턴스네! 라고 파악
- 트랜잭션 코디네이터가 먼저 할 일은 이전 인스턴스의 보류 중인 모든 트랜션을 중단해야함.
- 따라서, 내부 트랜잭션 로그에 중단 마커 (A) 를 추가
- 그리고, 트랜잭션이 데이터를 작성했던 각 파티션에도 중단 마커 (A) 를 추가
- 위 작업이 완료된 후, 코디네이터는 producer ID의 epoch를 증가 시킨다 pid e0 -> pid e1
- epoch를 증가시켜서 이전 인스턴스 epoch는 차단함
- 이제 consumer가 출력 토픽을 읽으려고 하면 중단 이벤트임을 인지하고 무시하게됨
트랜잭션 적용 후 시스템 성공
- 입력 토픽까지 읽고, 그 다음 출력 토픽에 쓰기전에 추가해야하는 파티션 세트를 코디네이터에게 알려줘야함.
- 위 경우에는 인출 파티션(P0), 입금 파티션(P1), 내부 오프셋 토픽 파티션(P7) 을 알려주게됨
- 코디네이터는 이 3개의 파티션을 트랜잭션 로그에 기록함.
- 이제 애플리케이션은 위의 3개의 파티션에 레코드를 쓸 수 있게됨.
- 레코드 쓰기 작업이 완료되면, 애플리케이션은 트랜잭션 코디네이터에게 커밋 요청을 보냄 (커밋해줘!)
- 트랜잭션 코디네이터가 내부 토픽에 커밋 마커 (C) 를 찍음
- 내부 커밋 마커가 찍힌 이후, 나머지 3개의 출력 토픽에 동일한 커밋 마커를 추가 함
- 이 시점에 커밋 정보는 read_committed 모드로 읽는 consumer 에게 노출 된다.
read_commited 모드 알아보기
- High watermark = 66
- 오프셋 64인 레코드는 아직 커밋되지 않은 상태라 보류중 (커밋 또는 중단 마커 없음)
- 브로커는 마지막 안정 오프셋 (last stable offset, LSO)를 알고있음
- LSO : 첫 번째 열려있는 보류 중인 트랜잭션의 오프셋 (64)
- LSO 이전의 오프셋은 모두 상태가 결정되었다고 할 수 있음 -> consumer에게 노출 가능함.
- 따라서, 응답을 줄 때 LSO 이전의 오프셋과 중단 마커를 무시하라는 메타데이터 정보를 추가적으로 전달
chapter 08. '정확히 한 번' 의미 구조
정확히 한 번
- 멱등적 프로듀서 : 프로듀서 재시도로 인해 발생하는 중복 방지
- 트랜젝션 : 스트림 처리 애플리케이션에서 '정확히 한 번' 처리를 보장
멱등적 프로듀서
- 멱등적 : 동일 작업을 여러 번 실행해도 한 번 실행한 것과 결과가 같은 서비스
1. UPDATE t SET x=x+1 where y=5
2. UPDATE t SET x=18 where y=5
멱등적 프로듀서의 작동 원리
- 고유한 프로듀서 ID와 시퀀스 넘버를 가짐
- 브로커가 이전에 받은 적이 있는 메시지를 받게되면, 적절한 에러를 발생
- 프로듀서에 로깅되고 지표에도 반영되지만, 예외가 발생한 것은 아니라 사용자에게 경고를 보내지는 않음
- RequestMetrics 유형의 ErrorsPerSec 지표값에 기록됨
작동 실패 시, 멱등적 프로듀서 처리
- 프로듀서 재시작
- 멱등적 프로듀서 기능이 켜져있다면, 프로듀서 초기화 과정에서 브로커로부터 프로듀서 ID를 생성받음
- 트랜젝션 기능이 꺼져있다면, 프로듀서 초기화할 때마다 완전히 새로운 ID가 생성됨
- 따라서 새 프로듀서가 기존 프로듀서가 이미 전송한 메시지를 다시 전송할 경우, 브로커는 중복이 발생한지 모름.
- 브로커 장애
- 리더는 새 메시지가 쓰여질 때마다 인-메모리 프로듀서 상태에 최근 5개의 시퀀스 넘버를 업데이트함.
- 리더 브로커 장애로 팔로워가 리더가 된 시점에 메모리에 최근 5개의 시퀀스 넘버를 가지고 있음.
- 따라서 아무 이슈나 지연 없이 새로운 메시지 유효성 검증 재개
- 예전 리더가 다시 돌아오면?
- 스냅샷 파일에서 최신 상태를 읽어오고, 현재 리더로부터 복제한 레코드를 사용해서 프로듀서 상태를 업데이트함.
멱등적 프로듀서의 한계
- 프로듀서 내부 로직으로 인한 재시도(프로듀서, 네트워크, 브로커 에러)가 발생할 경우 생기는 중복만 방지함.
- producer.send() 두 번 호출하면 중복이 발생함.
멱등적 프로듀서 사용법
enable.idempotence=true
위의 기능을 활성화 하면 다음처럼 동작함.
- 프로듀서 ID를 받아오기 위해 프로듀서 시동 시 API 호출
- 레코드 배치에 프로듀서 ID와, 첫 메시지의 시퀀스 포함
- 레코드 배치의 시퀀스 넘버를 검증해서 메시지 중복을 방지
- 장애가 발생하더라도 파티션에 쓰여지는 메시지들의 순서는 보장됨.
트랜잭션
트랜잭션이 해결하는 문제
- 상황 가정
- 원본 토픽으로부터 이벤트를 읽어서 처리 후, 결과를 다른 토픽에 쓴다.
- 애플리케이션 크래시로 인한 재처리
- 결과를 다른 토픽에 썼는데, 원본 토픽 입력 오프셋이 커밋되기 전에 애플리케이션이 크래시 발생!
- 컨슈머 리밸런스가 발생하고, 컨슈머가 읽고있던 파티션들은 다른 컨슈머로 재할당 됨.
- 할당받은 컨슈머가 마지막 커밋 오프셋부터 레코드를 읽기 시작함.
- 중복 처리 발생.
- 좀비 애플리케이션에 의해 발생하는 재처리
- 애플리케이션이 레코드 배치를 읽어온 직후 바로 연결이 끊어진 상황.
- 1번과 동일하게 새로운 컨슈머가 할당받아 처리 후
- 멈췄던 애플리케이션이 다시 살아남.
- 마지막으로 읽어왔던 레코드 배치를 처리하느라 중복 발생.
- 새로 카프카를 폴링하거나, 하트비트로 자기가 죽었다는걸 판정받기 전까지 실행 가능.
트랜잭션은 어떻게 '정확히 한 번'을 보장하는가?


- 컨슈머에 격리수준이 올바르게 설정되어 있지 않은 경우, 기대하는 '정확히 한 번' 보장이 이루어지지 않음.
- 컨슈머에 isolation.level 설정하기
- read_committed
- read_uncommitted (default)
트랜잭션으로 해결할 수 없는 문제들
- 스트림 처리에 있어서의 부수 효과
- 이메일 발송, REST API 호출, 파일 쓰기 등의 외부 작업의 중복은 카프카 트랜잭션이 처리할 수 없음.
- 카프카 토픽에서 읽어서 DB에 쓰는 경우
- 하나의 트랜잭션에서 외부 디비에 결과를 쓰고 카프카에는 오프셋을 커밋할 수 있도록 하는 매커니즘은 없음.
- DB에서 읽고, 카프카에 쓰고, 다시 다른 DB에 쓰기
- 한 클러스터에서 다른 클러스터로 복제
- 발행/구독 패턴
- 오프셋 커밋 로직에 따라 컨슈머들이 메시지를 한 번 이상 처리하게 되는 경우
- 따라서, read_commited 설정이 반드시 되어야 함.
트랜잭션 사용법
-
카프카 스트림즈
- processing.guarantee
- exactly_once 이나 exactly_once_beta
-
카프카
producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalld);
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");'
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
트랜잭션 ID와 펜싱
- 트랜잭션 ID는 동일 애플리케이션 인스턴스가 재시작 했을 때는 일관적으로 유지되어야 하고,
서로 다른 애플리케이션 인스턴스에 대해서는 서로 달라야 함.
- 카프카 2.5에서 트랙잭션 ID와 컨슈머 그룹 메타데이터를 함께 사용하는 펜싱을 도입


- 프로듀서 B가 다음 세대의 컨슈머 그룹에서 온 것을 확인 후 문제없이 작업을 처리함
트랜잭션의 작동 원리
- 트랜잭션 2단계 커밋 (two-phase commit)
- 현재 진행중인 트랜잭션이 존재함을 로그에 기록한댜. 연관된 파티션들 역시 함께 기록한다.
- 로그에 커밋 혹은 중단 시도를 기록한다. (일단 로그에 기록이 남으면 최종적으로는 커밋되거나 중단되어야 한다.)
- 모든 파티션에 트랜잭션 마커를 쓴다.
- 트랜책션이 종료되었음을 로그에 쓴다.
- 프로듀서 -> initTransaction()을 호출해서 자신이 트랜잭션 프로듀서임을 등록
- initTransaction() API는 코디네이터에 새 트랜잭션 ID를 등록하거나, 기존 트랜잭션 ID의 에포크 값을 증가시킴
- beginTransaction() 프로듀서에 현재 진행중인 트랜잭션이 있음을 알려줌.
- 프로듀서가 새로운 파티션으로 레코드를 전송할 때 브로커에 AddPartitionsToTxn 요청을 보냄으로써
현재 이 프로듀서에 진행중인 트랜잭션이 있고, 해당 레코드가 트랜잭션의 일부임을 알림
해당 정보는 트랜잭션 로그에 기록됨.
- 쓰기 작업이 완료되고 커밋할 준비가 되면, 트랜잭션에서 처리한 레코드들의 오프셋부터 커밋
- sendOffsetsTOTransaction() 을 호출하면 트랜잭션 코디네이터로 오프셋과 컨슈머 그룹 ID가 포함된 요청이 전송
- commitTransaction() 이나 abortTransaction() 을 호출 하면 트랜잭션 코디네이터에 EndTxn 요청이 전송
- 트랜잭션 코디네이터는 트랜잭션 로그에 커밋 혹은 중단 시도를 기록함
- transaction.timeout.ms에 설정된 시간 내에 커밋, 중단 둘 다 안되면 코디네이터가 자동으로 트랜잭션을 중단
트랜잭션 성능
-
트랜잭션 ID 등록 요청은 한 번만 있음.
-
파티션 등록은 각 트랜잭션마다 파티션별로 한 번씩만 이루어짐.
-
트랜잭션 커밋 요청이 전송되면 각 파티션에 커밋 마커가 추가됨.
-
이 모든 과정은 동기적으로 진행되어 트랜잭션이 완료되거나 실패할 때까지 데이터는 전송되지 않음.
-
따라서 많은 메시지를 트랜잭션에 포함시킬수록 오버헤드는 줄어들며 전체 처리량이 증가함.
-
컨슈머는 커밋 마커를 읽어오는 작업에 약간의 오버헤드가 있음.
-
read_committed 모드에서는 아직 커밋되지 않은 트랜잭션의 메시지가 반환되지 않아 종단 지연이 길어질 수 있음.
-
하지만, 컨슈머는 완료되지 않은 트랜잭션의 메시지를 버퍼링할 필요가 없어 추가적인 작업은 없음.
Ref.
- https://developer.confluent.io/courses/architecture/transactions/?utm_source=youtube&utm_medium=video&utm_campaign=tm.devx_ch.cd-apache-kafka-internals-101_content.apache-kafka
- 카프카 핵심가이드 2E [그웬 샤피라, 토드팔리노, 라지니 시바람, 크리트 페티 지음, 이동진 옮김]
- 그웬 샤피라, 토드팔리노, 라지니 시바람, 크리트 페티 지음, 이동진 옮김, 『카프카 핵심가이드 2E』, 제이펍(2023)