CQRS 도입기 - 6 MSK Cluster & Spring Boot Application

정훈·2023년 5월 8일
0

CQRS

목록 보기
6/8

이전 CQRS 도입기 - 5 에서는 Apache Kafka 패키지를 다운받은 후 , 메세지를 주고 받는 실습을 해보았다.

이번 챕터에서는Spring Boot 에서 Apache Kafka 의존성을 설치 이후 topic을 생성하여 Application에서 MSK Cluster로 데이터를 보낸 후 consumer를 서비스에서 임의로 지정하여 데이터를 log를 통해 읽어볼 예정이다.

시작하기 앞서 Spring Boot Application 은 3.0.5v을 사용하였다.

Spring Boot Build gradle setting

plugins {
    id 'java'
    id 'org.springframework.boot' version '3.0.5'
    id 'io.spring.dependency-management' version '1.1.0'
}

group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '17'

repositories {
    mavenCentral()
}

dependencies {
    implementation 'org.springframework.kafka:spring-kafka'
    implementation 'org.springframework.boot:spring-boot-starter-data-jdbc'
    implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    developmentOnly 'org.springframework.boot:spring-boot-devtools'
    runtimeOnly 'org.mariadb.jdbc:mariadb-java-client'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'

    compileOnly 'org.projectlombok:lombok:1.18.20'
    annotationProcessor 'org.projectlombok:lombok:1.18.20'
    testCompileOnly 'org.projectlombok:lombok:1.18.20'
    testAnnotationProcessor 'org.projectlombok:lombok:1.18.20'
    implementation 'org.apache.kafka:kafka-clients'
}

tasks.named('test') {
    useJUnitPlatform()
}

이후 application.yaml 설정하기


kafka:
  bootstrapAddress: msk 부트스트랩주소 
  topic:
    kid:
      name: topic-kid
      replicationFactor: 2
      numPartitions: 2
  consumer:
    consumer:
      group-id: 그룹id 
      auto-offset-reset: earliest
  producer:
    retry: 3
    enable-idempotence: true
    max-in-flight-requests-per-connection: 3
  

msk 클러스터에서 생성된 bootstrap주소를 넣어주고 ,topic 및 컨슈머, producer는 우리가 임의로 지정해주자 !

Kafka Topic Config

@Configuration
public class KafkaTopicConfiguration {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Value("${kafka.topic.kid.name}")
    private String topicName;
    @Value("${kafka.topic.kid.numPartitions}")
    private String numPartitions;
    @Value("${kafka.topic.kid.replicationFactor}")
    private String replicationFactor;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(configs);
    }

    /**
     * broker 를 두개만 설정하였으므로 최소 Replication Factor로 2를 설정하고
     * Partition의 경우 Event 의 Consumer인 WAS를 2대까지만 실행되도록 해두었기 때문에 2로 설정함.
     * 이보다 Partition을 크게 설정한다고 해서 Consume 속도가 빨라지지 않기 때문이다.
     *
     * @return
     */
    @Bean
    public NewTopic newTopic() {
        return new NewTopic(topicName, Integer.parseInt(numPartitions), Short.parseShort(replicationFactor));
    }
}
  • 토픽을 Bean 으로 등록해주자 .

Kafka Producer Config

Producer Factory는 Kafka Topic에 데이터를 전송하는 역할

@Configuration
public class KafkaProducer {
    @Value("${kafka.bootstrapAddress}")
    private String bootstrapServers;

    /**
     * ack: all
     * In-Sync-Replica에 모두 event가 저장되었음이 확인 되어야 ack 신호를 보냄 가장 성능은 떨어지지만
     * event produce를 보장할 수 있음.
     */
    @Value("${spring.kafka.producer.acks}")
    private String acksConfig;

    @Value("${kafka.producer.retry}")
    private Integer retry;

    @Value("${kafka.producer.enable-idempotence}")
    private Boolean enableIdempotence;
    @Value("${kafka.producer.max-in-flight-requests-per-connection}")
    private Integer maxInFlightRequestsPerConnection;

    /**
     * enable.idempotence true를 위해서는 retry가 0이상,
     * max.in.flight.requests.per.connection 은 5이하여야한다.
     *
     * @return
     */
    @Bean
    public ProducerFactory<String, ResponseMan> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.ACKS_CONFIG, acksConfig);
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        configProps.put(ProducerConfig.RETRIES_CONFIG, retry);
        configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotence);
        configProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, maxInFlightRequestsPerConnection);

        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, ResponseMan> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}

Kafka Consumer Config

Consumer Factory는 Kafka Topic으로부터 데이터를 수신해와서 만드는 역할


@EnableKafka
@Configuration
public class KafkaConsumer {
    @Value("${kafka.bootstrapAddress}")
    private String bootstrapServers;
    @Value("${kafka.consumer.consumer.auto-offset-reset}")
    private String autoOffsetResetConfig;
    @Value("${kafka.consumer.consumer.group-id}")
    private String rdbGroupId;

    @Bean
    public ConsumerFactory<String, ResponseMan> eventRDBConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.GROUP_ID_CONFIG, rdbGroupId);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetConfig);
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
                new JsonDeserializer<>(ResponseMan.class));

    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, ResponseMan> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, ResponseMan> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        factory.setConsumerFactory(eventRDBConsumerFactory()); // ConsumerFactory 설정 추가
        return factory;
    }

}

Consumer Service, Producer Service 생성

@Service
@Slf4j
public class ConsumerService {

    @KafkaListener(topics = "topic-kid", groupId = "group-id",
            properties = {AUTO_OFFSET_RESET_CONFIG + ":earliest"},
            containerFactory = "kafkaListenerContainerFactory")
    public void consume(ResponseMan message) {
        log.warn("메시지: " + message.toString());
    }
}
@Service
@RequiredArgsConstructor
@Slf4j
public class ProducerService {
    @Value("topic-kid")
    private String topicName;
    private final KafkaTemplate<String, ResponseMan> kafkaTemplate;
    private final ManRepository manRepository;

    public ResponseMan sendMessage(RequestDtoKid dto) {

        Kid kid = Kid.builder().
            name(dto.getName())
            .age(dto.getAge())
            .job(dto.getJob())
            .githubUrl(dto.getGithubUrl())
            .hobby(dto.getHobby())
            .height(dto.getHeight())
            .tech(dto.getTech())
            .build();

        Kid saveMan = manRepository.save(kid);

        ResponseMan responseMan = ResponseMan.builder().
            name(saveMan.getName())
            .age(saveMan.getAge())
            .job(saveMan.getJob())
            .githubUrl(saveMan.getGithubUrl())
            .hobby(saveMan.getHobby())
            .height(saveMan.getHeight())
            .tech(saveMan.getTech())
            .build();

        //이벤트 발생
        kafkaTemplate.send(topicName, responseMan);
        return responseMan;
    }
}

여기까지 완성이 되었다면 Dockerfile을 생성하자 !

# Base image
FROM --platform=linux/amd64 ubuntu:20.04 

# Update package lists
RUN apt-get update

# Install Java
RUN apt-get install -y openjdk-17-jdk

# Copy the Spring Boot application jar file to the container
COPY ./build/libs/kafka-spring-boot-0.0.1-SNAPSHOT.jar /app.jar

ENTRYPOINT ["java", "-jar", "/app.jar"]

FROM --platform=linux/amd64 ubuntu:20.04 ubuntu:20.04 이부분은 M1 유저에 해당되는 명령어이다.

docker image를 생성후에 dockerhub에 푸쉬!

EC2에서 CUD Application

docker pull 이미지이름

docker run -p 8080:8080 이미지이름

API 테스트를 통해 Consumer에서 데이터를 읽어보자!

정상적으로 Consumer 로그를 통해 데이터를 읽어볼수있다.!

0개의 댓글