msa를 경험해보기 위해 토이 프로젝트를 시작했다.
구성은 사용자 서버, 관리자 서버, 알림 서버, 알림 작업서버 4개로 구성했다.
아래는 컨테이너로 띄워진 서버 구성도이다.
현재 관리자 서버에서 Kafka의 메시지 전송 실패에 대한 대시보드 작업을 진행중에 테스트 코드를 짜야겠다는 마음이 들었다. notification-server 에서 message를 생산해서 worker-server에 보내는 일련의 작업이 불필요한 행위라는 생각이 들어서 테스트 코드를 작성하면서 진행해야겠다는 마음이 들었다.
TDD 책을 읽었을때 테스트코드를 작성하면 코드에 설득력이 생기고 힘이 생긴다는 말을 본거 같은데 나는 주니어라서 그 말이 아직 이해가 안되긴 하지만 현재 상황에서 테스트 코드를 짜야만 내 코드가 잘 작동되는지 알 수 있기에 짜기 시작했다.
Consumer 에서 메세지를 받는 경우 실패한 상황이 발생하면 어떻게 될까? 에서 시작된 나의 테스트 계획은 이렇다.
우선, notification-server에서 메시지를 생산해서 worker-server로 보내는 흐름을 테스트하기 위해 Kafka Producer를 생성했다.
@KafkaListener(topics = "notification-topic", groupId = "notification-group", containerFactory = "kafkaListenerContainerFactory")
public void consumeNewNotifications(String message) {
try {
log.info("새로운 알림 처리");
throw new RuntimeException("강제 오류 발생! Kafka 메시지 처리 실패");
// processNotification(message);
} catch (Exception e) {
// 에러 발생시 notification-dlq으로 메시지 전송
kafkaTemplate.send("notification-dlq", LocalDateTime.now().toString(), message);
}
}
@ExtendWith(SpringExtension.class)
@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = {"notification-topic", "notification-dlq"})
public class KafkaDLQIntegratiohnTest {
private static final String TOPIC = "notification-topic";
private static final String DLQ_TOPIC = "notification-dlq";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String GROUP_ID = "notification-group";
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Test
@DisplayName("DLQ 통합 테스트")
void testDLQMechanism() throws InterruptedException {
// Producer가 notification-topic으로 메시지를 전송
kafkaTemplate.send(TOPIC, "Test message for DLQ");
System.out.println("Producer: 메시지 전송 완료 - " + TOPIC);
TimeUnit.SECONDS.sleep(5);
// DLQ Consumer 설정 후 메시지 확인
Properties dlqProps = new Properties();
dlqProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
dlqProps.put(ConsumerConfig.GROUP_ID_CONFIG, "dlq-group");
dlqProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
dlqProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
dlqProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> dlqConsumer = new KafkaConsumer<>(dlqProps);
dlqConsumer.subscribe(Collections.singletonList(DLQ_TOPIC));
ConsumerRecords<String, String> dlqRecords = dlqConsumer.poll(Duration.ofSeconds(5));
// DLQ에 메시지가 존재하는지 검증
assertThat(dlqRecords.count()).isGreaterThan(0);
dlqRecords.forEach(dlqRecord -> {
System.out.println("DLQ Consumer: 메시지 수신 - " + dlqRecord.value());
});
dlqConsumer.close();
}
}

여기서 EmbeddedKafka
를 사용했는데 이 어노테이션은 내가 현재 docker로 실행중인 kafka를 사용하는게 아니였다...
내장 카프카를 사용해도 지금 테스트랑은 상관은 없지만 실제 어떻게 작동되는지 궁금해서 내가 사용하는 카프카를 가지고 테스트를 진행하기로 했다.
DLQ Consumer도 @Autowired
를 통해서 가져와서 사용하면 되지 않을까? 라는 생각에 깔끔하게 DLQ Consumer 설정 부분을 지워준다.
@ExtendWith(SpringExtension.class)
@SpringBootTest
public class KafkaDLQIntegratiohnTest {
@Autowired
private DLQConsumer dlqConsumer;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Test
@DisplayName("DLQ 통합 테스트")
void testDLQMechanism() throws InterruptedException {
// Producer가 notification-topic으로 메시지를 전송
kafkaTemplate.send("notification-topic", "Test message for DLQ");
System.out.println("Producer: 메시지 전송 완료 - notification-topic");
// DLQ 메시지가 정상적으로 도착했는지 확인
boolean messageReceived = dlqConsumer.getDLQLatch().await(10, TimeUnit.SECONDS);
assertThat(messageReceived).isTrue(); // DLQ 메시지가 존재해야 테스트 성공
assertThat(dlqConsumer.getDLQPayload()).isEqualTo("Test message for DLQ");
System.out.println("DLQ 메시지 확인 완료: " + dlqConsumer.getDLQPayload());
}
}
이렇게 코드를 작성하니 처음 조건이 생각났다.. Kafka에 DLQ라는 기능은 존재하지 않기 때문에 직접 구현을 해줘야 한다는것을.
어떻게 DLQ 조건을 만들어야 하나 고민하다 DefaultErrorHandler라는 것을 알게되었다.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaTemplate<String, String> kafkaTemplate) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// DeadLetterPublishingRecoverer를 KafkaTemplate을 이용해서 생성
DeadLetterPublishingRecoverer deadLetterPublishingRecoverer = new DeadLetterPublishingRecoverer(
kafkaTemplate,
(record, exception) -> {
log.warn("DLQ로 이동: {}, 예외: {}", record.value(), exception.getMessage());
return new TopicPartition("notification-dlq", record.partition());
}
);
DefaultErrorHandler errorHandler = new DefaultErrorHandler(
deadLetterPublishingRecoverer,
new FixedBackOff(3000L, 3) // 3초 간격, 최대 3회 재시도 후 DLQ 이동
);
factory.setCommonErrorHandler(errorHandler);
return factory;
}
kafka의 ErrorHandler는 DefaultErrorHandler에서 진행이 된다고 한다. KafkaTemplate.send() 방식이 아닌 DeadLetterPublishingRecover를 사용해서 DLQ를 처리하는 이유는 kafkaListener에서 메시지를 받는 경우 또 다시 예외발생할 경우 조치를 하지 못하기 때문이다.
그래서 기존에 작성된 에러 발생시 catch로 error를 잡아서 kafkaTemplate을 통해서 send를 하는것보다는 DefaultErrorHandler를 사용하는 것이 더 안전하다. 그래서 기존에 catch 하는 부분을 삭제해줘도 상관없다.
근데 3회 재시도를 확인할 방법이 없는걸까? debugging 코드를 넣어줘서 눈으로 확인하면서 따라갔다.
errorHandler.setRetryListeners((record, ex, deliveryAttempt) -> {
log.warn("메시지 재시도 중: {} (Attempt {}/{})", record.value(), deliveryAttempt, 3);
});
2025-03-13 01:46:22 - [com.notification.worker.config.kafka.KafkaConsumerConfig] [WARN] 메시지 재시도 중: Test message for DLQ (Attempt 1/3)
2025-03-13 01:46:25 - [com.notification.worker.config.kafka.KafkaConsumerConfig] [WARN] 메시지 재시도 중: Test message for DLQ (Attempt 2/3)
2025-03-13 01:46:28 - [com.notification.worker.config.kafka.KafkaConsumerConfig] [WARN] 메시지 재시도 중: Test message for DLQ (Attempt 3/3)
2025-03-13 01:46:31 - [org.springframework.kafka.listener.KafkaMessageListenerContainer] [DEBUG] Committing: {notification-dlq-0=OffsetAndMetadata{offset=52, leaderEpoch=null, metadata=''}}
DLQ 메시지 확인 완료: Test message for DLQ
다음과 같이 정상 처리 되는걸 확인할 수 있었다.
그리고 수동으로 throw를 처리하던 것을 테스트 코드에서 @SpyBean
을 통해서 가상으로 throw를 던지게 해주고 코드는 정상으로 되돌려 줬다.
@ExtendWith(SpringExtension.class)
@SpringBootTest
public class KafkaDLQIntegratiohnTest {
@Autowired
private DLQConsumer dlqConsumer;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@SpyBean
private NotificationConsumer notificationConsumer;
@Test
@DisplayName("DLQ 통합 테스트 - KafkaConsumer 예외 유발")
void testDLQMechanism() throws InterruptedException {
doThrow(new RuntimeException("강제 오류 발생! Kafka 메시지 처리 실패"))
.when(notificationConsumer)
.consumeNewNotifications(anyString());
kafkaTemplate.send("notification-topic", "Test message for DLQ");
System.out.println("Producer: 메시지 전송 완료 - notification-topic");
// DLQ 메시지가 정상적으로 도착했는지 확인
boolean messageReceived = dlqConsumer.getDLQLatch().await(20, TimeUnit.SECONDS);
assertThat(messageReceived).isTrue();
assertThat(dlqConsumer.getDLQPayload()).isEqualTo("Test message for DLQ");
System.out.println(" DLQ 메시지 확인 완료: " + dlqConsumer.getDLQPayload());
}
}
참고 : https://docs.confluent.io/kafka/introduction.html#terminology