spring kafka test (local kafka 필요 없음)

gyeongseon·2023년 8월 13일
0
post-thumbnail

(사진 출처 : https://javarevisited.blogspot.com/2021/12/5-free-courses-to-learn-apache-kafka.html#axzz8AEttvI8W)

개요

처음으로 producer, consumer 를 개발하게 되었습니다.
문제 -> 테스트때문에 개발시간을 까먹고 있었습니다.

위 문제를 해결하기 위한 과정을 적어봅니다.

환경

스펙

  • spring boot : 3.1.2
  • spring kafka : 3.0.9
  • kafka client : 3.4.1 (bitnami helm chart 3.4.1 or 3.5.1 예상)
  • open jdk : 17.x
  • IDE : STS
    (... 이외 궁금한 스펙이 있으시다면 댓글달아주세요)

인프라

  • 망(network) 격리 되어 있음.
  • 클러스터 k8s 사용 -> 커맨드를 통해 클러스터에 접근은 가능했습니다.

어떤 방법들을 사용했었나 ?!

local kafka 설치

kafka download - 로컬에서 카프카를 구성하여 최대한 비슷한 설정으로 개발하도록 하려고 했습니다.

정확한 원인은 모르지만 kafka가 정상적으로 동작하지 못했습니다. 아래의 이유로 local kafka 방법은 포기했습니다.

  • 설정을 변경하지 않았고 기본 설정을 사용했습니다. 정상적으로 기동하지 못했습니다.
    이건 ... 뭔가 pc 환경과 관련되어 있을 것 같다는 추측이 있었습니다.
  • 리소스에 제약이 많았습니다. java 로 개발하면 ... 리소스를 정말 많이 차지합니다. 추가로 제공받은 환경에서는 메모리, cpu 를 적게 할당받았습니다.

port forwarding

두번째 방법은 k8s port forwarding 이였습니다.

이런 문제들이 있었습니다.

headless service 의 port forward

localhost:9092로 포트포워딩 실행했습니다. 우선 로컬에 다운로드받은 kafka command 를 통해서 테스틑 해봤습니다.

🖌️참고 kafka commands

하지만 호스트명을 찾지 못한다는 에러를 확인했습니다. 그런데 이상한 점
service name을 [temporarary-bitnami-kafka-headless-svc] 라고 가정하겠습니다.
그렇다면 DNS는 temporarary-bitnami-kafka-headless-svc.my-namespace.svc.cluster-domain.example 가 되어야 하지만

  • temporarary-bitnami-kafka-headless-svc01.my-namespace.svc.cluster-domain.example
  • temporarary-bitnami-kafka-headless-svc02.my-namespace.svc.cluster-domain.example
  • temporarary-bitnami-kafka-headless-svc03.my-namespace.svc.cluster-domain.example

처럼 되어 있었습니다. headless service 의 경우는 deployment replicas 만큼 DNS를 따로 생성하여 호스팅하는 것으로 보입니다.

위 문제 해결 방법은 현재 찾지 못하는 호스트이름을 port forwading 으로 향하게 하면됩니다.
host 변경 방법은 왼쪽 이외에도 다양한 곳에서 쉽게 찾을 수 있습니다. 결론적으로 호스트 파일에

127.0.0.1	temporarary-bitnami-kafka-headless-svc01.my-namespace.svc.cluster-domain.example
127.0.0.1	temporarary-bitnami-kafka-headless-svc02.my-namespace.svc.cluster-domain.example
127.0.0.1	temporarary-bitnami-kafka-headless-svc03.my-namespace.svc.cluster-domain.example

와 같은 설정 추가해주시면됩니다.

😵😵😵😵 그러나 ..

  • kafka command 는 잘 연결되었습니다.
  • producer code 정상연결되었습니다.
  • 😵 ... 하지만 consumer code 연결이 불가능했습니다.

에러 명이 정확히 기억나진 않습니다만, 'join group 불가능' , 'coordination ?! 조건이 적절하지 않다' 대충 이런 에러들이였던 것 같습니다.

역시나 .. 삽질 끝에 시간이 아까워서 ... 다른 길을 가기로 했습니다.

직접 배포

클러스터에 코드들을 상상코딩하여 직접 올리는 것입니다. 이런 이유로 포기했습니다.

  • 배포시간
  • 카프카 영향 -> 부작용이 분명 있을 것 같았습니다.
  • 테스트 노드 혹은 테스트 네임스페이스를 통한 테스트 환경이 없습니다. 정확하게는 접근권한이 없습니다.

결국 ... spring kafka test 로 왔습니다.

(😭😭😭😭😭😭😭 저는 kafka test가 .... local kafka가 필요하다는 블로그의 글 하나만 보고 속았습니다...... 글을 하나만 더 볼걸....)

다시 돌아와서 spring for apache kafka 는 embedded로 하여 테스트가 가능합니다.

1. injection sprig kafka test dependency

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <version>2.6.3.RELEASE</version>
    <scope>test</scope>
</dependency>

boot 를 통해 개발하신다면 version 은 잘 맞추세요. ✔️ spring for apache kafka 에서 boot, spring kafka, kafka client 적절한 버전 조합을 확인할 수 있습니다.

2. configuration producer, consumer

@Configuration
@Slf4j
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, DemoViewDTO> factory() {
    // 참고 : 저는 VO 를 따로 만들어서 테스트 했습니다. 
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        return new DefaultKafkaProducerFactory<>(props);
    }

    @Bean
    public KafkaTemplate<String, DemoViewDTO> kafkaTemplate(){
        return new KafkaTemplate<>(factory());
    }
}
@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, DemoViewDTO> consumerFactory() {
		// 저는 VO로 작업했습니다. String으로 하시면 더 빠르게 테스트 하실 수 있습니다.
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo-1");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

        JsonDeserializer<DemoViewDTO> deserializer = new JsonDeserializer<>(DemoViewDTO.class, false);
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), deserializer);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, DemoViewDTO>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, DemoViewDTO> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

}

3. Component producer, consumer

@Component
@Slf4j
@RequiredArgsConstructor
public class KafkaProducer {

    private final KafkaTemplate<String, DemoViewDTO> kafkaTemplate;

    public void send(String topic, DemoViewDTO payload) {
        log.info("sending payloa={} to topic={}", payload, topic);
        kafkaTemplate.send(topic, payload);
    }
}
@Component
@Slf4j
@Data
public class KafkaConsumer {

    private CountDownLatch latch = new CountDownLatch(10); 
    private List<DemoViewDTO> payloads = new ArrayList<>();  // 여러 메시지를 저장하기 위한 리스트
    private DemoViewDTO payload;

    @KafkaListener(topics = "baeldung")
    public void receive(ConsumerRecord<String, DemoViewDTO> consumerRecord) {
        payload = consumerRecord.value();
        log.info("received payload = {}", payload.toString());
        payloads.add(payload);
        latch.countDown();
    }

    public List<DemoViewDTO> getPayloads() {
        return payloads;
    }

    public void resetLatch() {
        latch = new CountDownLatch(1);
    }
}

4. test

@SpringBootTest
@EmbeddedKafka(partitions = 1,
        brokerProperties = {"listeners=PLAINTEXT://localhost:9092"},
        ports = { 9092 }
)
class KafkaConsumerTest {

    @Autowired
    private KafkaConsumer consumer;

    @Autowired
    private KafkaProducer producer;

    @Test
    public void giveEmbeddedKafkaBroker_whenSendingWithSimpleProducer_thenMessageReceived()
            throws Exception {

        String topic = "baeldung";

        DemoViewDTO payload = DemoViewDTO.builder()
                .name("my_name")
                .age("27")
                .id("temporary_id_001")
                .department("demo_department")
                .build();
        DemoViewDTO payload2 = DemoViewDTO.builder()
                .name("your_name")
                .age("32")
                .id("temporary_id_001")
                .department("demo_department")
                .build();
        int testCnt = 0;
        for (int i = 0; i < 10; i++) {
            if (testCnt % 2 == 0) {
                producer.send(topic, payload);
            } else {
                producer.send(topic, payload2);
            }
            testCnt++;
        };

        // 모든 메시지를 수신할 때까지 기다립니다 , consumer latch 로 관리할 수 있습니다. 
        consumer.getLatch().await(10, TimeUnit.SECONDS);

        System.out.println("============================================================");
        System.out.println("============================================================");
        System.out.println(consumer.getPayloads().size());
        System.out.println(consumer.getPayloads());
    }

}

5. test result

============================================================
============================================================
5
[DemoViewDTO{id='temporary_id_001', name='my_name', age='27', department='demo_department'}, DemoViewDTO{id='temporary_id_001', name='my_name', age='27', department='demo_department'}, DemoViewDTO{id='temporary_id_001', name='my_name', age='27', department='demo_department'}, DemoViewDTO{id='temporary_id_001', name='my_name', age='27', department='demo_department'}, DemoViewDTO{id='temporary_id_001', name='my_name', age='27', department='demo_department'}]

(저는 단순히 프린트하여 보여드렸습니다. 확인하는 코드는 따로 만들지 않겠습니다.)

마루으리 🏆

spring kafka 통해서 간단한 테스트 가능하다는 것을 알려드렸습니다.
embedded kafka 모르셨던 분들한테 도움되길 바랍니다.

아래는 참고했던 내용들 표시하도록 하겠습니다.

profile
경선 :)

1개의 댓글

comment-user-thumbnail
2023년 8월 13일

즐겁게 읽었습니다. 유용한 정보 감사합니다.

답글 달기