Java 에서 kafka를 사용하기 위한 라이브러리로 두 가지가 있다
1편에서는 apache kafka-clients 를 썼지만, spring-kafka를 import하면 apache kafka-clients까지 모두 적용되므로 build.gradle에 spring-kafka를 적어주면 된다.
spring-kafka가 보다 간편한 기능을 제공하는 것 같다.
implementation 'org.springframework.kafka:spring-kafka'
apache kafka-clients, spring-kafka 모두 적용되어 있다.
application.properties
카프카 서버들을 나열하여 써준다. (, 로 구분하여 적으면 된다!)
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
@Configuration
@EnableWebSocketMessageBroker // stomp에서 사용하는 annotaion
public class WebSocketMessageBrokerConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry
.addEndpoint("/ws/chat")
.setAllowedOriginPatterns("*")
.withSockJS();
}
/**
* /topic : 1명 msg 발행 - n명 구독 <br>
* /queue : 발행한 1명에게 다시 정보를 전송 <br>
* /put : 발행 시의 uri prefix
*/
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// topic 방식과 queue 방식이 있다.
registry.enableSimpleBroker("/queue", "/topic");
registry.setApplicationDestinationPrefixes("/pub");
}
}
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.simp.SimpMessageSendingOperations;
import org.springframework.stereotype.Controller;
import java.time.LocalDateTime;
@Controller
public class ChatController {
private final SimpMessageSendingOperations simpMessageSendingOperations;
private final KafkaChatService kafkaService;
@Autowired
public ChatController(SimpMessageSendingOperations simpMessageSendingOperations, KafkaChatService kafkaService) {
this.simpMessageSendingOperations = simpMessageSendingOperations;
this.kafkaService = kafkaService;
}
/**
* /pub/message <br>
* 메시지 전송시에는 controller에서 처리
*/
@MessageMapping("/message")
public void enter(MessageModel messageModel) {
if (messageModel.getType().equals("ENTER")) {
messageModel.setMessage(messageModel.getSender()+"님이 입장하였습니다.");
}
messageModel.setTimestamp(LocalDateTime.now());
kafkaService.send(messageModel);
}
}
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@EnableKafka
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServerUrl;
//Sender config
@Bean
public ProducerFactory<String, MessageModel> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs(), new StringSerializer(), new JsonSerializer<MessageModel>());
}
@Bean
public KafkaTemplate<String, MessageModel> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> map = new ConcurrentHashMap<>();
map.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerUrl); //kafka server ip & port
map.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
map.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName()); //Object json parser
return map;
}
//Receiver config
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MessageModel> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, MessageModel> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<String, MessageModel> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(MessageModel.class));
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> map = new ConcurrentHashMap<>();
map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerUrl);
map.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
map.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
map.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return map;
}
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.messaging.simp.SimpMessageSendingOperations;
import org.springframework.stereotype.Service;
@Service
public class KafkaChatService {
private KafkaTemplate<String, MessageModel> kafkaTemplate;
private SimpMessageSendingOperations simpMessageSendingOperations;
private final ChatService chatService;
private final String TOPIC_NAME = "chat";
private final String CONSUMER_GROUP_ID = "localhost-1";
private final String DB_GROUP_ID = "DB-1";
@Autowired
public KafkaChatService(KafkaTemplate<String, MessageModel> kafkaTemplate, SimpMessageSendingOperations simpMessageSendingOperations, ChatService chatService) {
this.kafkaTemplate = kafkaTemplate;
this.simpMessageSendingOperations = simpMessageSendingOperations;
this.chatService = chatService;
}
public void send(MessageModel content) {
kafkaTemplate.send(TOPIC_NAME, content);
}
@KafkaListener(groupId = CONSUMER_GROUP_ID,topics = TOPIC_NAME)
public void receive(MessageModel messageModel) {
simpMessageSendingOperations
.convertAndSend("/topic/room/"+ messageModel.getRoomCode(), messageModel);
}
@KafkaListener(groupId = DB_GROUP_ID ,topics = TOPIC_NAME)
public void receiveDB(MessageModel messageModel) {
chatService.addChat(messageModel);
}
}