카프카 5

mohadang·2023년 9월 17일
0

Road to Backend

목록 보기
13/21
post-thumbnail

컨슈머

데이터를 가져가는 polling 주체.
커밋을 통해 읽은 컨슈머 offset을 카프카에 기록.
데이터 저장 위치.

  • FileSystem(.csv, .log, .tsv)
  • Object Storage(S3, Minio)
  • Hadoop(Hdfs, Hive)
  • RDBMS(Oracle, Mysql)
  • NoSql(MongoDB, CouchDB)
  • 기타 다양한 저장소들(Elasticsearch, influxDB)

Simple Consumer

package com.example.kafkatester;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

@SpringBootApplication
public class KafkaTesterApplication {
    private static String TOPIC_NAME = "test";
    private static String GROUP_ID = "testgroup";
    private static String BOOTSTRAP_SERVERS = "{aws ec2 public ip}:9092";

    public static void main(String[] args) {
        Properties configs = new Properties();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);

        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.value());
            }
        }
    }
}

프로듀서에서 레코드 주입

$ kafka-console-producer.bat --bootstrap-server localhost:9092 --topic hello-kafka
>Hello
>What is
>123qawe
>

컨슈머에서 레코드를 가져옴

...
This is record 0
This is record 2
This is record 3
Hello
What is
123qawe
This is record 1
This is record 5
This is record 7

Auto commit Consumer

...
/*
ENABLE_AUTO_COMMIT_CONFIG: 기본 옵션 true. polling 할때 어디까지 읽었는지 브로커에 알려준다. 자동 커밋이 되기 전에 컨슈머가 강제 종료 되면 컴슈머는 다음 실행시 커밋 이전까지 오프셋부터 레코드를 읽어온다(중복 처리)
*/
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);// 자동 커밋 여부
configs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 60000);// 자동 커밋 주기

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
consumer.subscribe(Arrays.asList(TOPIC_NAME));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println(record.value());
    }
}

enable.auto.commit=true : 일정 간격(auto.commit.interval.ms), poll() 메서드 호출시 자동 commit

commit 관련 코드를 작성할 필요 없기에 편리함.

속도가 가장 빠름. 그러나 중복 또는 유실이 발생할 수 있음(중복/유실을 허용하지 않는 상황에서는 사용하면 안됨)
일부 데이터가 중복/유실되도 상관 없는 곳(센서, GPS 등)에서 사용

컨슈머 오토 커밋

중복 처리 되는 메시지들

유실되는 메시지들

리밸런싱 뿐만 아니라 컨슈머가 갑자기 종료 되어도 문제 발생

데이터 중복을 막는 방법

오토 커밋을 사용하되, 컨슈머가 죽지 않도록 기도 ?

  • 불가능 : 서버, 어플리케이션은 언젠가 죽을 수 있다(ex : 배포)

오토 커밋을 사용하지 않는다

  • Kafka consumer의 commitSync(), commitAsync() 사용

commitSync(), commitAsync()

enable.auto.commit=false

commitSync() : 동기 커밋

  • ConsumerRecord 처리 순서를 보장.
  • 가장 느림(커밋이 완료될 때 까지 block).
  • poll() 메서드로 반환된 ConsumerRecord의 마지막 offset을 커밋.
  • 배치 처리 가능. 예를 들어 100개 offset 처리 될때마다 commitSync 호출하여 커밋.
  • CommitFailedException e 는 반드시 처리해야함. 커밋 재시도나 폴링 재시도로 처리 필요.
  • Map<TopicPartition, OffsetAndMetadata>을 통해 오프셋 지정 커밋 가능.
  • 처리 완료한 record를 Map 형태로 수집하여 한번에 커밋 가능

commitAsync() :

  • 동기 커밋보다 빠름.
  • 중복이 발생할 수 있음.
    • 일시적인 통신 문제로 이전 offset보다 이후 offset이 먼저 커밋 될 때.
  • ConsumerRecord 처리 순서를 보장하지 못함.
    • 처리 순서가 중요한 서비스(주문, 재고관리 등)에서는 사용 제한.

Ex) commitSync

...
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);// sync commit 위해 필요

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
consumer.subscribe(Arrays.asList(TOPIC_NAME));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println(record.value());
        consumer.commitSync();// 커밋
        record.offset();
    }
}
profile
mohadang

0개의 댓글