일단 해보기 : Spring 카프카 시작해보기(Windows)

Jang990·2024년 4월 12일
0

일단해보기

목록 보기
2/7

글 작성 환경
OS: Windows
Java: 17
Spring: 3.2.4

나는 지금 Spring의 ApplicationEventPublisher를 통해서 이벤트 기반 프로그래밍을 경험해보고 있고 관련 기술인 메시지 큐에 대해 알아보려 한다.

이 글은 카프카에 대한 아무런 지식이 없는 상태에서 필요한 개념만 훑으면서 빠르게 시작하는게 목적이다.

도커를 전혀 모른다면 이전에 쓴 도커글을 참고하고 추가적으로 compose 정도만 알고 오자.

최소한 이것만 알고 시작하자

카프카를 시작하기 전에 최소한의 이해를 위해서는 다음 5가지는 알고 가야한다.

토픽, 프로듀서, 컨슈머, 파티션, 컨슈머 그룹

Spring 카프카를 시작하기 앞서 아주 간단하게 살펴보고 가자.

구조 확인

내용 출처
https://www.youtube.com/watch?v=k-3vBdX2M20
https://www.youtube.com/watch?v=0Ssx7jJJADI

MSA에서 서버간에 메시지 전달에는 카프카, RabbitMQ등의 미들웨어 서버가 있다.
이 미들웨어의 구조를 알아보자.
여기서는 토픽, 프로듀서, 컨슈머를 살펴본다.

기본적으로 중간에는 메시지를 보관할 수 있는 메시지 큐가 있다.
메시지 큐가 중간에서 토픽이라는 키로 보관하고 있는 것이다.
프로듀서는 특정 토픽을 키로 메시지를 발행하고(서버 or 앱 등등),
컨슈머는 발행된 메시지를 소비한다.

컨슈머는 누가 발행한지는 모르지만 발행된 메시지를 처리하므로써 느슨한 결합을 유지할 수 있다. (옵저버 패턴 참고)


위에서 살펴본 그림의 토픽에서 컨슈머로의 화살표는 실제와 다르다.
다음 그림과 같이 컨슈머가 토픽을 바라보고 있는 것이 정확한 그림이다.

Kafka 구조 그림

내용 출처 : https://www.youtube.com/watch?v=0Ssx7jJJADI

여기서는 카프카의 파티션, 컨슈머 그룹에 대해서 알아본다.

한 개의 토픽은 한 개 이상의 파티션으로 구성된다.
그림의 topic1p0, 01 파티션으로 구성됐다.
파티션은 메시지를 저장하는 물리적인 파일이라고 생각하면 된다.


컨슈머는 컨슈머 그룹에 속한다.
컨슈머가 카프카 브로커에 연결할 때 어느 그룹에 속하는지 지정을 해야한다.
하나의 파티션은 하나의 컨슈머 그룹내에서 하나의 컨슈머만 연결가능하다.

Kraft - Zookeeper

최근에 카프카에선 Zookpeeper대신 Kraft를 사용한다고 알게됐다.

하지만 나는 그냥 빠르게 카프카를 경험해보는 것이 목적이고, Zookeeper에 대해서 참고할 만한 글들이 많다.
일단 경험해보고 Kraft로 넘어가는건, 모르는 상태에서 시작하는 것 보다 쉬울 것이기 떄문에 Zookeeper로 시작한다.


Kafka 설정하기

참고 내용
https://www.youtube.com/watch?v=k-3vBdX2M20
https://www.baeldung.com/spring-kafka

도커 Compose 작성

다음 docker-compose.yml을 생성하자.

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    depends_on:
      - zookeeper
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

docker-compose up -d로 실행하자.

토픽 관리 (스킵 가능)

이 내용은 Spring에서 할 수 있으니 스킵해도 된다.
하지만 kafka 동작을 확인해보고 싶다면 직접 실행해보도록하자.

cmd에서 docker exec -it kafka /bin/bash로 들어가서 다음 명령어를 입력해준다.

-- 토픽 생성 명령어
kafka-topics.sh --create \
  --topic mytopic \
  --bootstrap-server localhost:9092 \
  --replication-factor 1 \
  --partitions 1

kafka-topics.sh : 스크립트를 사용하여 토픽을 생성.
--create : 새로운 토픽을 생성하는 옵션
--topic mytopic : 생성할 토픽의 이름을 myTopic으로 지정
--bootstrap-server localhost:9092 : 카프카 브로커의 주소를 지정 9092로 지정
--replication-factor 1 : 이 옵션은 토픽의 데이터를 복제할 수 있는 브로커 수를 1개로 지정
--partitions: 이 옵션은 토픽이 나뉘어져 있는 파티션의 수를 1개로 지정

-- 생성된 토픽 확인 명령어
kafka-topics.sh --describe --topic mytopic --bootstrap-server kafka:9092

Spring Kafka 시작하기

참고 내용 : https://www.baeldung.com/spring-kafka

start.spring.io에 가서 kafka, spring-web정도만 추가하고 시작하자.

시작하면서 application.properties에 다음 설정만 추가해주자.

spring.kafka.bootstrap-servers=localhost:9092
message.topic.name=jang

토픽 설정

앞서 Kafka 설정하기>토픽 관리 (스킵가능)에서 cmd창으로 입력한 명령어를 스프링 환경에서 하는 것이다.

지금은 프로퍼티 파일에 따라서 jang이라는 토픽을 만들어냈다.

@Configuration
public class KafkaTopicConfig {
    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;
    @Value(value = "${message.topic.name}")
    private String topicName;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(configs);
    }

    @Bean
    public NewTopic topic1() {
        return new NewTopic(topicName, 1, (short) 1);
    }
}

프로듀서 설정

@Configuration
public class KafkaProducerConfig {
    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

프로듀서 서비스 구현

@RestController
public class KafkaProducerController {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value(value = "${message.topic.name}")
    private String topicName;

    /* 카프카는 빠른 스트림 처리 플랫폼 */
    /** get을 호출하면 결과를 얻으려 대기해서 생산자 속도 저하 */
    @GetMapping("/kafka/sync")
    public void sendSync(String message) throws ExecutionException, InterruptedException {
        SendResult<String, String> result = kafkaTemplate.send(topicName, message).get();
        System.out.println(result);
    }

    /** 비동기 처리가 적절 */
    @GetMapping("/kafka/async")
    public void sendAsync(String message) {
        kafkaTemplate.send(topicName, message);
    }

    /** 비동기 처리 후처리를 원한다면 whenComplete를 활용하기 */
    @GetMapping("/kafka/async/result/console")
    public void sendAsyncWithLog(String message) {
        CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName, message);
        future.whenComplete((result, ex) -> {
            if(ex == null) // 오류 없이 처리
                System.out.println("Sent message=[" + message + "] " +
                        "with offset=[" + result.getRecordMetadata().offset() + "]");
            else // 오류 발생
                System.out.println("Unable to send message=[" + message + "] due to : " + ex.getMessage());
        });
    }
}

컨슈머 설정

앞서 컨슈머가 카프카 브로커에 연결할 때 어느 그룹에 속하는지 지정을 해야한다고 했다.
설정에서 컨슈머 그룹을 foo로 지정했다.

// @KafkaListener 어노테이션을 감지하려면 설정 클래스에 @EnableKafka가 있어야 한다.
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    public ConsumerFactory<String, String> consumerFactory(String groupId) {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(String groupId) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory(groupId));
        return factory;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> fooKafkaListenerContainerFactory() {
        return kafkaListenerContainerFactory("foo");
    }
}

컨슈머 서비스 구현

컨슈머 서비스에서는 프로듀서에서 설정한 토픽과 groupId를 조합하여

@Service
public class KafkaConsumerService {
    @KafkaListener(topics = "jang", groupId = "foo")
    public void listenGroupFoo(String message) {
        System.out.println("Received Message in group foo: " + message);
    }
}

실행해보자.

이제 서버를 동작하고 브라우저로 Get 요청을 보내보자.

동기 처리
http://localhost:8080/kafka/sync?message=hi!%20Jang990%20kafka%20sync

비동기 처리
http://localhost:8080/kafka/async?message=hi!%20Jang990%20kafka%20async

비동기 후처리
http://localhost:8080/kafka/async/result/console?message=hi!%20Jang990%20kafka%20asyn%20console


이 글에서는 빠른 시작을 위해서 문자열을 전송하는 아주 간단한 예제로 실행해봤다.
다음 글에서는 객체를 보내는 방법에 대해서 다루며 더 자세한 설정들을 알아보겠다.

profile
공부한 내용을 적지 말고 이해한 내용을 설명하자

0개의 댓글