카프카 2 - stomp 기반 채팅 브로커 만들기

조윤호·2023년 2월 3일
0

kafka

목록 보기
2/3
post-thumbnail

프로젝트 구조

자바 프로젝트에 카프카 적용하기

Java 에서 kafka를 사용하기 위한 라이브러리로 두 가지가 있다

  • org.apache.kafka:kafka-clients:3.3.2
  • org.springframework.kafka:spring-kafka:3.0.2

1편에서는 apache kafka-clients 를 썼지만, spring-kafka를 import하면 apache kafka-clients까지 모두 적용되므로 build.gradle에 spring-kafka를 적어주면 된다.
spring-kafka가 보다 간편한 기능을 제공하는 것 같다.

implementation 'org.springframework.kafka:spring-kafka'

apache kafka-clients, spring-kafka 모두 적용되어 있다.

application.properties

카프카 서버들을 나열하여 써준다. (, 로 구분하여 적으면 된다!)

코드 작성하기


WebSocketMessageBrokerConfig

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

@Configuration
@EnableWebSocketMessageBroker // stomp에서 사용하는 annotaion
public class WebSocketMessageBrokerConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry
                .addEndpoint("/ws/chat")
                .setAllowedOriginPatterns("*")
                .withSockJS();
    }

    /**
     * /topic : 1명 msg 발행 - n명 구독 <br>
     * /queue : 발행한 1명에게 다시 정보를 전송 <br>
     * /put : 발행 시의 uri prefix
     */
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        // topic 방식과 queue 방식이 있다.
        registry.enableSimpleBroker("/queue", "/topic");
        registry.setApplicationDestinationPrefixes("/pub");
    }
}

ChatController

import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.simp.SimpMessageSendingOperations;
import org.springframework.stereotype.Controller;

import java.time.LocalDateTime;

@Controller
public class ChatController {
    private final SimpMessageSendingOperations simpMessageSendingOperations;
    private final KafkaChatService kafkaService;

    @Autowired
    public ChatController(SimpMessageSendingOperations simpMessageSendingOperations, KafkaChatService kafkaService) {
        this.simpMessageSendingOperations = simpMessageSendingOperations;
        this.kafkaService = kafkaService;
    }

    /**
     * /pub/message <br>
     * 메시지 전송시에는 controller에서 처리
     */
    @MessageMapping("/message")
    public void enter(MessageModel messageModel) {
        if (messageModel.getType().equals("ENTER")) {
            messageModel.setMessage(messageModel.getSender()+"님이 입장하였습니다.");
        }
        messageModel.setTimestamp(LocalDateTime.now());
        kafkaService.send(messageModel);
    }
}

KafkaConfig

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@EnableKafka
@Configuration
public class KafkaConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServerUrl;

    //Sender config
    @Bean
    public ProducerFactory<String, MessageModel> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs(), new StringSerializer(), new JsonSerializer<MessageModel>());
    }

    @Bean
    public KafkaTemplate<String, MessageModel> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> map = new ConcurrentHashMap<>();
        map.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerUrl); //kafka server ip & port
        map.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        map.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName()); //Object json parser

        return map;
    }

    //Receiver config
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, MessageModel> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, MessageModel> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
    @Bean
    public ConsumerFactory<String, MessageModel> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(MessageModel.class));
    }
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> map = new ConcurrentHashMap<>();
        map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerUrl);
        map.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        map.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        map.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        return map;
    }
}

KafkaChatService

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.messaging.simp.SimpMessageSendingOperations;
import org.springframework.stereotype.Service;

@Service
public class KafkaChatService {
    private KafkaTemplate<String, MessageModel> kafkaTemplate;
    private SimpMessageSendingOperations simpMessageSendingOperations;
    private final ChatService chatService;
    private final String TOPIC_NAME = "chat";
    private final String CONSUMER_GROUP_ID = "localhost-1";
    private final String DB_GROUP_ID = "DB-1";

    @Autowired
    public KafkaChatService(KafkaTemplate<String, MessageModel> kafkaTemplate, SimpMessageSendingOperations simpMessageSendingOperations, ChatService chatService) {
        this.kafkaTemplate = kafkaTemplate;
        this.simpMessageSendingOperations = simpMessageSendingOperations;
        this.chatService = chatService;
    }

    public void send(MessageModel content) {
        kafkaTemplate.send(TOPIC_NAME, content);
    }

    @KafkaListener(groupId = CONSUMER_GROUP_ID,topics = TOPIC_NAME)
    public void receive(MessageModel messageModel) {
        simpMessageSendingOperations
                .convertAndSend("/topic/room/"+ messageModel.getRoomCode(), messageModel);

    }

    @KafkaListener(groupId = DB_GROUP_ID ,topics = TOPIC_NAME)
    public void receiveDB(MessageModel messageModel) {
        chatService.addChat(messageModel);
    }
}

📚참고문헌

https://velog.io/@ha0kim/스프링-인-액션-8.비동기-메시지-전송하기

https://log-laboratory.tistory.com/216

profile
한걸음씩 성실히

0개의 댓글