[Spring-Kafka] 이론은 잠시 미루고, 실행이나 시켜봅시다.

식빵·2023년 5월 7일
0

Spring Lab

목록 보기
22/35
post-thumbnail

이 글에서 작성된 대부분의 세팅 방법은 https://youtu.be/QngHCFFsa00 을 기반으로 작성되었음을 알립니다. 다만 저의 경우에는 영상대로 따라하면 안되는 부분도 약간 있어서 영상과 조금씩 내용이 다를 수 있습니다. 그리고 영상에서는 Spring Boot 3.x.x 버전을 사용하지만, 저는 Spring boot 2.7.11 버전을 사용한다는 약간의 차이점이 있습니다.




이론 공부도 중요하지만, 일단 한번 실행을 해보고 나서 공부하면 저는 효과가 좋더군요.
그래서 일단 Kafka 가 정확히 뭔지는 몰라도 Spring boot 로 Kafka 와 통신을 하는
초간단 애플리케이션을 생성해보려 합니다. 지금부터 시작해보겠습니다.

development tools and environment:

  • OS : Window 10 Home Edition
  • IDE : Intellij Ultimate
  • framework : Spring boot v2.7.11
  • jdk vendor(version) : azul-17
  • CLI : PowerShell 7

CLI 가 Powershell 이다 보니 콘솔창 명령 입력 시 줄바꿈을 위해서 ` (백틱) 을 사용합니다. Bash 계열의 CLI 개발자 분들은 해당 문자를 \ 로 대체해서 작성하시기 바랍니다.



spring boot 프로젝트 생성

  • Lombok
  • Spring Configuartion Processor
  • Spring for Apache Kafka



pom.xml 추가 설정

프로젝트 생성 후에 pom.xml 에 가서 추가적인 설정을 하나만 더 해줍니다.
이렇게 안하면 PowerShell 에서 \mvnw spring-boot:run 명령어를 입력하면 에러가 발생합니다.

<properties>
  <java.version>17</java.version>
  <!-- 아래 설정 추가, 이렇게 안하면 가끔 인코딩 안맞아서 실행이 안되는 경우가 있음 -->
  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>



docker compose

1. docker-compose.yml 생성

1-1. 생성위치


1-2. 작성법

출처: https://developer.confluent.io/quickstart/kafka-docker/

---
version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.2
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:7.3.2
    container_name: broker
    ports:
    # To learn about configuring Kafka for access across networks see
    # https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
      - "9092:9092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

여기서 9092:9092 포트를 잘 봐두시기 바랍니다.
저희가 이후에 spring-boot 애플리케이션에서 저 포트로 kafka container instance 와 통신을 할 겁니다.



2. docker-compose 실행

Powershell(콘솔창)을 열고 docker-compose.yml 파일이 있는 경로에서 가서 아래 명령어를 입력하여 docker container 가 실행됩니다.

docker compose up

실행하면 대충 아래와 같은 화면이 나오면서 정상적으로 docker-compose.yml 에 작성했던 2개의 container 가 실행되는 것을 확인할 수 있습니다.


정말 제대로 실행이 됐는지 다시 한번 확인하기 위해서 새로운 Powershell(콘솔창) 을 실행하여 docker ps -a 를 통해서 확인합니다. 저는 아래처럼 나오네요.

CONTAINER ID   IMAGE                             COMMAND                   CREATED          STATUS                      PORTS                          NAMES
65bb18ab9cfd   confluentinc/cp-kafka:7.3.2       "/etc/confluent/dock…"   41 seconds ago   Up 40 seconds               0.0.0.0:9092->9092/tcp         broker
09d649a97939   confluentinc/cp-zookeeper:7.3.2   "/etc/confluent/dock…"   42 seconds ago   Up 40 seconds               2181/tcp, 2888/tcp, 3888/tcp   zookeeper
  • broker, zookeeper 라는 명칭의 컨테이너가 실행 중임을 확인할 수 있습니다.

3. kafka topic 생성

새로운 Powershell 콘솔창을 열고 아래 명령어를 쳐줍니다.

docker exec broker kafka-topics --bootstrap-server broker:9092 --create --topic quickstart

4. Kafka 메세지 받기위한 consumer 실행

이후, Docker 로 띄운 Kafak container 에 아래 명령어를 전달합니다.
자세히 보면 kafka consumer 콘솔에 명령어를 내리는 걸 확인할 수 있고,
이때 topic 을 quickstart 로 지정하는 것을 확인 할 수 있습니다.

docker exec --interactive --tty broker `
kafka-console-consumer --bootstrap-server broker:9092 `
                       --topic quickstart `
                       --from-beginning

이러면 지금부터 새로운 메세지가 quickstart 라는 토픽에 들어오면 로그가 콘솔 창에 찍히게 됩니다. 참고로 반대로 produce 를 하고 싶으면 아래처럼 명령어를 입력하면 됩니다.

docker exec --interactive --tty broker `
kafka-console-producer  --bootstrap-server broker:9092 `
						 --topic quickstart

저희는 spring boot app 을 통해서 메세지를 kafka 에 전송할 예정입니다.




spring boot app config

전체적인 spring boot app 코드는 github 에 올려놨습니다.
https://github.com/CodingToastBread/kafka-setting
이 글에서는 핵심적인 세팅에 대한 것만 기록하겠습니다.


1. application.properties

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=events



2. Spring boot EntryPoint Class

package toast.bread.springkafka;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import toast.bread.springkafka.config.KafkaConfigProps;
import toast.bread.springkafka.domain.CustomerVisitEvent;

import java.time.LocalDateTime;
import java.util.UUID;

@SpringBootApplication
@EnableConfigurationProperties
public class SpringKafkaApplication {
	
	@Autowired
	private ObjectMapper objectMapper;
	
	public static void main(String[] args) {
		SpringApplication.run(SpringKafkaApplication.class, args);
	}
	
	@Bean
	public ApplicationRunner runner(final KafkaTemplate<String, String> kafkaTemplate, final KafkaConfigProps kafkaConfigProps) throws JsonProcessingException {
		final CustomerVisitEvent event = CustomerVisitEvent.builder()
			.customerId(UUID.randomUUID().toString())
			.dateTime(LocalDateTime.now())
			.build();
		final String payload = objectMapper.writeValueAsString(event);
		return args -> kafkaTemplate.send(kafkaConfigProps.getTopic(), payload);
	}
	
	@KafkaListener(topics = "quickstart")
	public String listens(final String in) {
		System.out.println(in);
		return in;
	}
	
}
  • runner 메소드는 Spring ApplicationContext 가 완전히 초기화되면 자동으로 return 한 Lambda 식이 실행됩니다. 이때 KafkaTemplate 을 통해서 Json 문자열을 Kafka 에 전송하도록 합니다.

  • 반대로 @KafkaListener 는 Kafka 에 quickstart 라는 토픽에 Event 가 오면
    해당 메세지를 받는 메소드입니다. 참고로 이걸 하나라도 생성하면 spring application 실행 후 자동 종료되지 않고, 계속해서 Kafka 에서 오는 메세지를 Listening 하기 위해 대기를 합니다.




spring boot 실행

spring boot 프로젝트 root 에 가서 새로운 콘솔창(PowerShell) 창을 열고
아래처럼 명령어를 입력합니다.

.\mvnw spring-boot:run



참고: spring-boot:run 실행 에러난 경우

pom.xml 에서 plugin 태그를 하나 추가하고, mvn update 를 해준 합니다.
그리고 나서 maven clean 하고 다시 mvn spring-boot:run 명령어를 실행해보세요.


<build>
  <plugins>
    <!-- 이 아래 plugin 하나를 추가한다. -->
    <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-resources-plugin</artifactId>
      <version>3.1.0</version>
    </plugin>
    
    <!-- 얘는 기존에 있던것임. 건들지 말것 -->
    <plugin>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-maven-plugin</artifactId>
      <configuration>
        <excludes>
          <exclude>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
          </exclude>
        </excludes>
      </configuration>
    </plugin>
  </plugins>
</build>




참고

profile
백엔드를 계속 배우고 있는 개발자입니다 😊

0개의 댓글