(사진 출처 : https://javarevisited.blogspot.com/2021/12/5-free-courses-to-learn-apache-kafka.html#axzz8AEttvI8W)
처음으로 producer, consumer 를 개발하게 되었습니다.
문제 -> 테스트때문에 개발시간을 까먹고 있었습니다.
위 문제를 해결하기 위한 과정을 적어봅니다.
kafka download - 로컬에서 카프카를 구성하여 최대한 비슷한 설정으로 개발하도록 하려고 했습니다.
정확한 원인은 모르지만 kafka가 정상적으로 동작하지 못했습니다. 아래의 이유로 local kafka 방법은 포기했습니다.
두번째 방법은 k8s port forwarding 이였습니다.
이런 문제들이 있었습니다.
localhost:9092로 포트포워딩 실행했습니다. 우선 로컬에 다운로드받은 kafka command 를 통해서 테스틑 해봤습니다.
🖌️참고 kafka commands
하지만 호스트명을 찾지 못한다는 에러를 확인했습니다. 그런데 이상한 점
service name을 [temporarary-bitnami-kafka-headless-svc] 라고 가정하겠습니다.
그렇다면 DNS는 temporarary-bitnami-kafka-headless-svc.my-namespace.svc.cluster-domain.example 가 되어야 하지만
처럼 되어 있었습니다. headless service 의 경우는 deployment replicas 만큼 DNS를 따로 생성하여 호스팅하는 것으로 보입니다.
위 문제 해결 방법은 현재 찾지 못하는 호스트이름을 port forwading 으로 향하게 하면됩니다.
host 변경 방법은 왼쪽 이외에도 다양한 곳에서 쉽게 찾을 수 있습니다. 결론적으로 호스트 파일에
127.0.0.1 temporarary-bitnami-kafka-headless-svc01.my-namespace.svc.cluster-domain.example
127.0.0.1 temporarary-bitnami-kafka-headless-svc02.my-namespace.svc.cluster-domain.example
127.0.0.1 temporarary-bitnami-kafka-headless-svc03.my-namespace.svc.cluster-domain.example
와 같은 설정 추가해주시면됩니다.
😵😵😵😵 그러나 ..
에러 명이 정확히 기억나진 않습니다만, 'join group 불가능' , 'coordination ?! 조건이 적절하지 않다' 대충 이런 에러들이였던 것 같습니다.
역시나 .. 삽질 끝에 시간이 아까워서 ... 다른 길을 가기로 했습니다.
클러스터에 코드들을 상상코딩하여 직접 올리는 것입니다. 이런 이유로 포기했습니다.
(😭😭😭😭😭😭😭 저는 kafka test가 .... local kafka가 필요하다는 블로그의 글 하나만 보고 속았습니다...... 글을 하나만 더 볼걸....)
다시 돌아와서 spring for apache kafka 는 embedded로 하여 테스트가 가능합니다.
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>2.6.3.RELEASE</version>
<scope>test</scope>
</dependency>
boot 를 통해 개발하신다면 version 은 잘 맞추세요. ✔️ spring for apache kafka 에서 boot, spring kafka, kafka client 적절한 버전 조합을 확인할 수 있습니다.
@Configuration
@Slf4j
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, DemoViewDTO> factory() {
// 참고 : 저는 VO 를 따로 만들어서 테스트 했습니다.
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, DemoViewDTO> kafkaTemplate(){
return new KafkaTemplate<>(factory());
}
}
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, DemoViewDTO> consumerFactory() {
// 저는 VO로 작업했습니다. String으로 하시면 더 빠르게 테스트 하실 수 있습니다.
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo-1");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
JsonDeserializer<DemoViewDTO> deserializer = new JsonDeserializer<>(DemoViewDTO.class, false);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), deserializer);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, DemoViewDTO>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, DemoViewDTO> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
@Component
@Slf4j
@RequiredArgsConstructor
public class KafkaProducer {
private final KafkaTemplate<String, DemoViewDTO> kafkaTemplate;
public void send(String topic, DemoViewDTO payload) {
log.info("sending payloa={} to topic={}", payload, topic);
kafkaTemplate.send(topic, payload);
}
}
@Component
@Slf4j
@Data
public class KafkaConsumer {
private CountDownLatch latch = new CountDownLatch(10);
private List<DemoViewDTO> payloads = new ArrayList<>(); // 여러 메시지를 저장하기 위한 리스트
private DemoViewDTO payload;
@KafkaListener(topics = "baeldung")
public void receive(ConsumerRecord<String, DemoViewDTO> consumerRecord) {
payload = consumerRecord.value();
log.info("received payload = {}", payload.toString());
payloads.add(payload);
latch.countDown();
}
public List<DemoViewDTO> getPayloads() {
return payloads;
}
public void resetLatch() {
latch = new CountDownLatch(1);
}
}
@SpringBootTest
@EmbeddedKafka(partitions = 1,
brokerProperties = {"listeners=PLAINTEXT://localhost:9092"},
ports = { 9092 }
)
class KafkaConsumerTest {
@Autowired
private KafkaConsumer consumer;
@Autowired
private KafkaProducer producer;
@Test
public void giveEmbeddedKafkaBroker_whenSendingWithSimpleProducer_thenMessageReceived()
throws Exception {
String topic = "baeldung";
DemoViewDTO payload = DemoViewDTO.builder()
.name("my_name")
.age("27")
.id("temporary_id_001")
.department("demo_department")
.build();
DemoViewDTO payload2 = DemoViewDTO.builder()
.name("your_name")
.age("32")
.id("temporary_id_001")
.department("demo_department")
.build();
int testCnt = 0;
for (int i = 0; i < 10; i++) {
if (testCnt % 2 == 0) {
producer.send(topic, payload);
} else {
producer.send(topic, payload2);
}
testCnt++;
};
// 모든 메시지를 수신할 때까지 기다립니다 , consumer latch 로 관리할 수 있습니다.
consumer.getLatch().await(10, TimeUnit.SECONDS);
System.out.println("============================================================");
System.out.println("============================================================");
System.out.println(consumer.getPayloads().size());
System.out.println(consumer.getPayloads());
}
}
============================================================
============================================================
5
[DemoViewDTO{id='temporary_id_001', name='my_name', age='27', department='demo_department'}, DemoViewDTO{id='temporary_id_001', name='my_name', age='27', department='demo_department'}, DemoViewDTO{id='temporary_id_001', name='my_name', age='27', department='demo_department'}, DemoViewDTO{id='temporary_id_001', name='my_name', age='27', department='demo_department'}, DemoViewDTO{id='temporary_id_001', name='my_name', age='27', department='demo_department'}]
(저는 단순히 프린트하여 보여드렸습니다. 확인하는 코드는 따로 만들지 않겠습니다.)
spring kafka 통해서 간단한 테스트 가능하다는 것을 알려드렸습니다.
embedded kafka 모르셨던 분들한테 도움되길 바랍니다.
아래는 참고했던 내용들 표시하도록 하겠습니다.
즐겁게 읽었습니다. 유용한 정보 감사합니다.