springboot - kafka 연동

배세훈·2022년 5월 27일
0

kafka

목록 보기
4/5

예제

  • Springboot에서 Kafka의 특정 Topic에 메시지를 생산(Produce)하고 해당 Topic을 Listen합니다. Kafka 서버에 해당 메시지가 전달되고, Springboot에서 이를 소비(Consume)할 준비가 되면 메시지를 pull 하는 간단한 예제입니다.

개발 환경 세팅

1) 프로젝트 구조

업로드중..

2) build.gradle

  • spring-kafka 의존성 추가
dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'org.springframework.kafka:spring-kafka'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testImplementation 'org.springframework.kafka:spring-kafka-test'
}

구현하기

1) application.yml

  • consumer와 producer에 대한 설정
spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:9092
      group-id: foo
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
  • spring.kafka.consumer
    - bootstrap-servers
    - Kafka 클러스터에 대한 초기 연결에 사용할 호스트:포트쌍의 쉼표로 구분된 목록입니다.
    - 글로벌 설정이 있어도 consumer.bootstrap-servers가 존재하면 consumer 전용으로 오버라이딩 합니다.

    • group-id
      • Consumer는 Consumer Group이 존재하기 때문에 유일하게 식별 가능한 Consumer Group을 작성합니다.
    • auto-offset-reset
      • Kafka 서버에 초기 offset이 없거나 서버에 현재 offset이 더 이상 없는 경우 수행할 작업을 작성합니다.
        • Consumer Group의 Consumer는 메시지를 소비할 때 Topic내에 Partition에서 다음에 소비할 offset이 어디인지 공유를 하고 있습니다. 그런데 오류 등으로 인해 이러한 offset 정보가 없어졌을 때 어떻게 offset을 reset 할 것인지를 명시한다고 보시면 됩니다.
          - latest: 가장 최근에 생산된 메시지로 offset reset
          • earliest: 가장 오래된 메시지로 offset reset
          • none: offset 정보가 없으면 Exception 발생
            - 직접 Kafka Server에 접근하여 offset을 reset할 수 있지만 Spring에서 제공해주는 방식은 위와 같습니다.
    • key-deserializer / value-deserializer
      • Kafka에서 데이터를 받아올 때 key / value를 역직렬화 합니다.
        • 여기서 key와 value는 뒤에서 살펴볼 KafkaTemplate의 key, value를 의미합니다.
        • 이 글에서는 메시지가 문자열 데이터이므로 StringDeserializer를 사용했습니다. JSON 데이터를 넘겨줄 것이라면 JsonDeserializer도 가능합니다.
  • spring.kafka.producer
    - bootstrap-servers
    -consuer.bootstrap-servers와 동일한 내용이며 producer 전용으로 오버라이딩 하려면 작성합니다.

    • key-serializer / value-serializer
      • Kafka에 데이터를 보낼 때 key / value를 직렬화 합니다.
        • consumer에서 살펴본 key-deserializer, value-deserializer와 동일한 내용입니다.
  • application.yml 은 공식 문서 를 참조 하시면 됩니다.

2) KafkaController.java

package com.demo.kafka.controller;

import com.demo.kafka.service.KafkaProducer;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/kafka")
@RequiredArgsConstructor
public class KafkaController {
    private final KafkaProducer producer;

	// post 방식으로 message 데이터를 받아서 Producer 서비스로 전달
    @PostMapping
    public String sendMessage(@RequestParam("message") String message){
        this.producer.sendMessage(message);

        return "success";
    }

}

3) KafkaProducer.java

package com.demo.kafka.service;

import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

// KafkaTemplate에 Topic명과 Message를 전달
// KafkaTemplate.send() 메서드가 실행되면 Kafka 서버로 메시지가 전송됩니다.
@Service
@RequiredArgsConstructor
public class KafkaProducer {
    private static final String TOPIC = "exam";
    private final KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message){
        System.out.println(String.format("Produce message: %s", message));
        this.kafkaTemplate.send(TOPIC, message);
    }

}

4) KafkaConsumer.java

package com.demo.kafka.service;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.io.IOException;

// exam이라는 Topic에서 consumer의 group id가 foo로 데이터를 받아옴
@Service
public class KafkaConsumer {

    @KafkaListener(topics = "exam", groupId = "foo")
    public void consume(String message) throws IOException {
        System.out.println(String.format("Consumed message: %s", message));
    }
}

테스트

1) Springboot & Kafka 연동 확인

  • Application이 실행되면 Kafka 서버와 커넥션이 이루어짐. Consumer의 @KafkaListener에서 설정한 exam 토픽을 자동으로 생성하는 것을 확인 할 수 있습니다.

업로드중..

해당 kafka의 topics 목록을 불러옴

2) 메시지 pub / sub

Kafka 컨테이너에서 examl 토픽에 메시지가 전송되었는지 확인

# docker exec -it {컨테이너명} bash
# kafka-console-consumer.sh - bootstrap-server localhost:9092 -- topic exam
profile
성장형 인간

0개의 댓글