1) 프로젝트 구조
2) build.gradle
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'
}
1) application.yml
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: foo
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer
- bootstrap-servers
- Kafka 클러스터에 대한 초기 연결에 사용할 호스트:포트쌍의 쉼표로 구분된 목록입니다.
- 글로벌 설정이 있어도 consumer.bootstrap-servers가 존재하면 consumer 전용으로 오버라이딩 합니다.
spring.kafka.producer
- bootstrap-servers
-consuer.bootstrap-servers와 동일한 내용이며 producer 전용으로 오버라이딩 하려면 작성합니다.
2) KafkaController.java
package com.demo.kafka.controller;
import com.demo.kafka.service.KafkaProducer;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/kafka")
@RequiredArgsConstructor
public class KafkaController {
private final KafkaProducer producer;
// post 방식으로 message 데이터를 받아서 Producer 서비스로 전달
@PostMapping
public String sendMessage(@RequestParam("message") String message){
this.producer.sendMessage(message);
return "success";
}
}
3) KafkaProducer.java
package com.demo.kafka.service;
import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
// KafkaTemplate에 Topic명과 Message를 전달
// KafkaTemplate.send() 메서드가 실행되면 Kafka 서버로 메시지가 전송됩니다.
@Service
@RequiredArgsConstructor
public class KafkaProducer {
private static final String TOPIC = "exam";
private final KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String message){
System.out.println(String.format("Produce message: %s", message));
this.kafkaTemplate.send(TOPIC, message);
}
}
4) KafkaConsumer.java
package com.demo.kafka.service;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.io.IOException;
// exam이라는 Topic에서 consumer의 group id가 foo로 데이터를 받아옴
@Service
public class KafkaConsumer {
@KafkaListener(topics = "exam", groupId = "foo")
public void consume(String message) throws IOException {
System.out.println(String.format("Consumed message: %s", message));
}
}
1) Springboot & Kafka 연동 확인
해당 kafka의 topics 목록을 불러옴
2) 메시지 pub / sub
Kafka 컨테이너에서 examl 토픽에 메시지가 전송되었는지 확인
# docker exec -it {컨테이너명} bash
# kafka-console-consumer.sh - bootstrap-server localhost:9092 -- topic exam