실시간 스트리밍 데이터 처리에서 한 번만 정확하게(exactly-once) 처리하는 것은 중복 데이터 처리와 데이터 손실을 방지하고 분석 결과가 왜곡되지 않도록 하는 데 중요하다.
정의 자체는 이렇다.
for each received record, its processed results will be reflected once, even under failures.
processing.guarantee=exactly_once
설정만 하면 Kafka Streams 는 자동으로 정확히 한 번 처리하는게 가능해진다. default value 는 at_least_once
이다.
Kafka Streams 의 message 처리 방식은 다음과 같다.
1) Kafka Topic 으로 부터 message A 를 읽는다.
2) processing function 이 트리거 되서 기존 상태 S
를 S'
으로 업데이트 시킨다.
3) output message B1~Bn 까지 만들어서 output kafka topic 에다가 쓴다.
4) Kafka Broker 로 부터 send 요청에 대한 응답을 기다린다.
5) 처리된 message A 에 commit 한다. 메시지 A 는 완전히 처리된 것.
여기서 (4) 과 (5) 쪽에 문제가 생긴다면 exactly-once 로 처리하기 어려워진다. 다음 예시를 보자.
(4)
까지의 작업이 완료되고 source Topic 에 commit 을 하려고 할 때 App 이 갑자기 죽는다면 어떻게 될까?
commit 을 하지 못하고 죽었기 때문에 다시 해당 메시지를 처리한다. 그래서 processing 도 두 번 일어나서 상태도 두 번 업데이트 되고 output topic 에다가도 두 번 쓰게되는 문제가 생긴다.
결국에 exactly-once 를 보장하려면 다음 3가지 step 을 트랜잭션으로 보장되면 된다.
이 세가지 일을 다 성공시키거나 하나라도 실패하면 다 롤백 시키면 된다.
어떻게 그게 가능할까? 위 세가지 작업은 다 토픽으로 데이터를 전송하는 과정으로 변환시켜 볼 수 있다.
Update the application state from S to S
-> Kafka changelog topic
Write result messages B1, … Bn to output Kafka topic(s) TB.
-> Kafka sink topic
Commit offset of the processed record A on the input Kafka topic TA
-> Kafka offset topic
Kafka changelog 에 기록하는 작업은 모든 state 들을 capture 해서 changelog 에 보관하는 작업이다.
새롭게 상태가 업데이트 될 때마다 그것들을 기록해놓는 것. 이 세가지 topic partition 에다가 offset 을 쓰는 것은 Kafka 의 Transactional API
를 쓰면 가능하다.
Transactional API
를 쓰려면 프로듀서를 트랜잭션 프로듀서로 올려서 설정해야한다. (이런 이유로는좀비 어플리케이션이 생겨서 중복 데이터를 만드는 문제를 막기 위함과 트랜잭션 메시지 처리는 일반 메시지 처리와는 다르기 때문이다.)
Kafka Streams 에서 processing.guarantee=exactly_once
이 설정을 키면 내부적인 embedded producer client
가 transaction.id
를 이용해서 atomic 하게 모든 토픽 파티션에 쓰게 보장해준다.
idempotent producer
로 인해서 duplicate message 는 버려지게 된다.자바 코드로 보면 이렇다.
/** when commit() is called */
try {
// send the offsets to commit as part of the txn
producer.sendOffsets(“inputTopic”, offsets);
// commit the txn
producer.commitTxn();
} catch (KafkaException e) {
producer.abortTxn();
}
/** in the normal processing loop */
try {
recs = consumer.poll();
for (Record rec <- recs) {
// process ..
producer.send(“outputTopic”, ..);
producer.send(“changelogTopic”, ..);
}
} catch (KafkaException e) {
producer.abortTxn();
}
Kafka 의 트랜잭션 API 가 어떻게 동작하는지 보자.
A) the producer and transaction coordinator interaction
B) the coordinator and transaction log interaction
C) the producer writing data to target topic-partitions
D) the coordinator to topic-partition interaction
prepare_commit
으로 업데이트 한다. 일단 이게 되면 트랜잭션은 완료될 수 있다.two-phase-commit 을 하는 이유로는 트랜잭션 처리의 내구성을 보장하기 위함이다. 트랜잭션 코디네이터는 commit marker 를 남기던 중에 언제든지 죽을 수 있다. 이런 경우에 복구했을 때 트랜잭션 처리를 완료하기 위함이다.
카프카에서 트랜잭션을 위해선 이런 멱등성 프로듀서를 통해서 데이터를 정확하게 한번 토픽에다가 메시지를 보내놓는 것으로부터 시작된다. 토픽에 데이터가 들어있다면 이후부터는 Transaction API 를 통해서 트랜잭션 처리를 하면 되니까.
idempotent operation 은 producer 레벨에서 데이터를 한 번만 정확하게 보낼 수 있도록 하는 기능이다. 만약 producer 가 topic-partition 에 데이터를 보냈는데 broker 로 부터 응답을 늦게 받았다고 가정해보자.
그럼 producer 는 retry 를 재개할 것이고 topic paritition 에 데이터가 중복으로 쌓일 수 있다. 이를 막기 위한 기능으로 idempotent 기능이 추가되었다. producer 에 설정으로 `enable.idempotence = true` 를 설정하면 해당 기능이 자동으로 써진다.
어떻게 그럼 중복 Write 를 막을 수 있을까? 이는 TCP 의 처리방식괴 유사하다. Broker 로 보내는 각각의 batch message 에 sequence Number 를 붙혀서 중복인 메시지는 버리는 방식으로 해결한다.
TCP 와의 차이점은 해당 sequence Number 는 replication 되기 때문에 broker 가 죽어도 이 number 는 계속해서 유지할 수 있다.