카프카 4

mohadang·2023년 9월 17일
0

Road to Backend

목록 보기
12/21
post-thumbnail

Simple Producer

package com.example.kafkatester;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import java.util.Properties;

@SpringBootApplication
public class KafkaTesterApplication {
    private static String TOPIC_NAME = "hello-kafka";
    private static String BOOTSTRAP_SERVERS = "127.0.0.1:9092";

    public static void main(String[] args) {
        Properties configs = new Properties();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

        for (int index = 0; index < 10; index++) {
            String data = "This is record " + index;
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, data);
            try {
                producer.send(record);
                System.out.println("Send to " + TOPIC_NAME + " | data : " + data);
                Thread.sleep(1000);
            } catch (Exception e) {
                System.out.println(e);
            }
        }
    }

}
This is record 0
This is record 1
This is record 2
This is record 3

Key, Value Producer

KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

for (int index = 0; index < 10; index++) {
    String data = "This is record " + index;
    ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, Integer.toString(index), data);
    try {
        producer.send(record);
        System.out.println("Send to " + TOPIC_NAME + " | data : " + data);
        Thread.sleep(1000);
    } catch (Exception e) {
        System.out.println(e);
    }
}

key, value가 제대로 들어 오는지 확인하기 위해서 콘솔 컨슈머를 다음 옵션과 함께 실행

$ kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic hello-kafka --property print.key=true --property key.separator="-"

0-This is record 0
1-This is record 1
2-This is record 2
3-This is record 3
4-This is record 4
5-This is record 5

key 값을 넣지 않는 방식으로 레코드 전달 할 경우

null-This is record 0
null-This is record 1
null-This is record 2

레코드 키

카프카가 전달하는 메시지는 레코드라고 한다.
레코드 키는 메시지를 구분하는 구분자 역할을 한다.

  • 동일 키는 동일 파티션에 적재한다.
    • 디폴트 파티셔너의 경우 키를 해쉬값으로 하여 동일한 파티션으로 저장 하도록 수행.
  • 순서를 보장하므로, 상태 머신으로 사용 가능하다.
  • 역할에 따라 컨슈머를 할당 및 적용할 수 있다.
  • 레코드 값(value)을 정의 하는 구분자.
    • 키에 레코드 값 해쉬값을 넣음으로서 중복처리 방지 가능.
    • 키값에 버전 정보를 넣어서 해당 버전에 맞는 비즈니스 로직을 실행 하도록 처리 가능.
  • 키를 사용할 경우가 있을까 ?

레코드 값

레코드 값은 실질적으로 전달하고 싶은 데이터이다.

String, ByteeArray, Int, CSV, TSV, JSON, Object등 사실상 제한 없음

JSON 사용시 key/value 형태로서 확장성이 뛰어남. 컬럼 정보(key) 포함. 또한 디버그시 유리.

CSV 사용시 콤마 기준으로 데이터 구분. 용량 이득.

포맷을 관리하는 다른 방법

  • 컨플루언트 스키마 레지스트리

파티션 지정 Producer

@SpringBootApplication
public class KafkaTesterApplication {
    ...
    private static int PARTITION_NUMBER = 1;
    public static void main(String[] args) {
        ...
        for (int index = 0; index < 10; index++) {
            String data = "This is record " + index;
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, PARTITION_NUMBER, Integer.toString(index), data);
            try {
                producer.send(record);
                System.out.println("Send to " + TOPIC_NAME + " | data : " + data);
                Thread.sleep(1000);
            } catch (Exception e) {
                System.out.println(e);
            }
        }
    }
}

파티션을 지정하기에 순서를 보장할 수 있다.

프로듀서 acks

중요 !

프로듀서에서 데이터 보낼때 얼마나 빠르게 보내고 유실을 허용할 것인지 설정. 레플리카와 연관.

  • acks = 0 : 가장 속도가 빠름, 유실 가능성이 높음.
  • acks = 1(default) : 속도 보통, 유실 가능성이 있음.
  • acks = all 또는 -1 : 속도가 가장 느림. 메시지 전달 손실 가능성 없음.

acks = 0

프로듀서가 브로커와 소켓연결을 맺어 보낸 즉시 성공으로 간주한다.(ack를 받지 않는 것으로 보인다, UDP).
브로커가 정상적으로 받아서 리더 파티션에 저장했는지 알 수 없다. 팔로워 파티션에도 저장됬는지 알 수 없음.

  • EX) 센서 데이터 같이 1초에 같은 데이터가 수십번 전달

전송 속도가 중요하고 일부 유실되어도 무관한 데이터에 사용

acks = 1

프로듀서가 보낸 메시지가 리더 파티션에 정상 저장되었는지 확인. 팔로워 파티션에 저장됬는지는 모름. 즉, 리더 파티션에 저장되고 해당 브로커가 죽으면 데이터 유실.

acks = 0에 비해 신뢰도가 높지만 아직 유실 가능성은 있음.

acks = all or -1

프로듀서가 보낸 메시지가 리더, 팔로워 파티션에 정상 저장되었는지 확인.
리더 파티션의 데이터가 팔로워 파티션까지 복제될때 까지 기다림. 복제가 완료되기 까지 기다림으로 인해 속도가 느림.

유실 가능성이 없지만, 속도가 느림.

프로듀서 옵션

profile
mohadang

0개의 댓글