Kafka 설치, Spring 추가

S_H_H·2024년 7월 9일
0

Kafka 미니 플젝

목록 보기
3/7

Kafka 설치

Apache Kakfa에서 버전에 맞게 다운로드
2.~ 버전과 3.~ 버전의 차이만 확인해보시고 다운로드 받으면 됩니다.

전 2.5.0 버전과 3.7.1버전을 다운받았습니다.
-> 에러 찾을려고 2개 다운 받은 것이니 한개만 다운 받으셔도 됩니다.

window 11 사용 중입니다.

전 바로 C:\kafka_2.12-2.5.0 아래로 경로 설치 했습니다.

Kafka 설정

수정 한 파일입니다.

  • zookeeper.properties
  • server.properties

zookeeper.properties
기본 설정값 사용해도 됩니다만은 모든 경로를 절대 경로로 지정했는데,
만약 window powershell에서 서버 시작을 C: , C:\kafka, C:\kafka\bin 등 실행위치에 따른 상대경로로 파일이 계속 생성되어 수정했습니다.

dataDir=/kafka_2.12-2.5.0/zookeeper

server.properties
경로 수정은 위와 같은 이유로 수정
서버 포트 변경을 한 이유는 H2 DB를 사용하다 보니 포트 충돌로인해 9092 -> 9093으로 수정했습니다.

listeners=PLAINTEXT://localhost:9093
advertised.listeners=PLAINTEXT://localhost:9093

log.dirs=/kafka_2.12-2.5.0/data

C:\kafka_2.12-2.5.0\data 아닌 /kafka_2.12-2.5.0/data 설정했습니다.

Kafka 실행

zookeeper 실행

.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

kafka 실행

.\bin\windows\kafka-server-start.bat .\config\server.properties

Topic

Topic 생성

  • 많은 옵션이 있으니 찾아보기를 권장
.\bin\windows\kafka-topics.bat --bootstrap-server localhost:9093 --topic management_car_exception_topic --create

Topic 확인

.\bin\windows\kafka-topics.bat --bootstrap-server localhost:9093 --list

Producer

.\bin\windows\kafka-console-producer.bat --bootstrap-server localhost:9093 --topic management_car_exception_topic
  • key,value로 전송 시 아래 옵션 추가
--property "parse.key=true" --property "key.separator=:"

consumer

.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9093 --topic test_elasticsearch_sink --from-beginning

Kafka Producer 추가하기

gradle 설정 추가

implementation 'org.springframework.kafka:spring-kafka'

Producer 설정

KafkaConstants.KAFKA_BOOTSTRAP_SERVERS = localhost:9093

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.KAFKA_BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return props;
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<String, String>(producerFactory());
    }

}

Producer 전송
보내는 value는 type은 자유롭게 전달하면 됩니다.

KafkaConstants.MANAGE_CAR_PRODUCER = topic 전송 시 key
KafkaConstants.SEND_TELEGRAM_TOPIC = topic 명

@Component
@RequiredArgsConstructor
public class KafkaSendMessage {

    public final KafkaTemplate kafkaTemplate;

    public void kafkaSendAlarmMessage(TopicExceptionDto exceptionTopic){
        kafkaTemplate.send(KafkaConstants.SEND_TELEGRAM_TOPIC, KafkaConstants.MANAGE_CAR_PRODUCER , exceptionTopic);
    }
}

Kafka Consumer 추가하기

Consumer 설정

  • value에 대한 값은 보통 JsonDeserializer.class 를 많이 사용한다.
  • value의 값을 String으로 받는 이유는 다음글에 있습니다.
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public DefaultKafkaConsumerFactory consumerFactory(){
        return new DefaultKafkaConsumerFactory<>(consumerConfig(), new StringDeserializer(), new StringDeserializer());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }

    private Map<String, Object> consumerConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.KAFKA_BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 5000);
        return props;
    }
}

Consumer

  • acknowledgment.acknowledge();은 수동 커밋으로 Config > AckMode 설정입니다.
@Component
@RequiredArgsConstructor
public class KafkaConsumer implements AcknowledgingMessageListener<String, String> {

    public final TelegramSend telegramSend;

    @Override
    @KafkaListener(topics = KafkaConstants.SEND_TELEGRAM_TOPIC,
            groupId = "management_car_consumer_1",
            containerFactory = "kafkaListenerContainerFactory")
    public void onMessage(ConsumerRecord<String, String> consumerRecord,
                          Acknowledgment acknowledgment) {

        telegramSend.send(consumerRecord.value());
        acknowledgment.acknowledge();

    }
}

이렇게 보낸값을 Consumer에서 받아
이후에 처리해야할 일들을 작성하면 되겠습니다

profile
LEVEL UP

0개의 댓글