이전 CQRS 도입기 - 5
에서는 Apache Kafka 패키지를 다운받은 후 , 메세지를 주고 받는 실습을 해보았다.
이번 챕터에서는Spring Boot
에서 Apache Kafka 의존성을 설치 이후 topic을 생성하여 Application에서 MSK Cluster
로 데이터를 보낸 후 consumer
를 서비스에서 임의로 지정하여 데이터를 log를 통해 읽어볼 예정이다.
시작하기 앞서 Spring Boot Application 은 3.0.5v
을 사용하였다.
plugins {
id 'java'
id 'org.springframework.boot' version '3.0.5'
id 'io.spring.dependency-management' version '1.1.0'
}
group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '17'
repositories {
mavenCentral()
}
dependencies {
implementation 'org.springframework.kafka:spring-kafka'
implementation 'org.springframework.boot:spring-boot-starter-data-jdbc'
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
implementation 'org.springframework.boot:spring-boot-starter-web'
developmentOnly 'org.springframework.boot:spring-boot-devtools'
runtimeOnly 'org.mariadb.jdbc:mariadb-java-client'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
compileOnly 'org.projectlombok:lombok:1.18.20'
annotationProcessor 'org.projectlombok:lombok:1.18.20'
testCompileOnly 'org.projectlombok:lombok:1.18.20'
testAnnotationProcessor 'org.projectlombok:lombok:1.18.20'
implementation 'org.apache.kafka:kafka-clients'
}
tasks.named('test') {
useJUnitPlatform()
}
이후 application.yaml 설정하기
kafka:
bootstrapAddress: msk 부트스트랩주소
topic:
kid:
name: topic-kid
replicationFactor: 2
numPartitions: 2
consumer:
consumer:
group-id: 그룹id
auto-offset-reset: earliest
producer:
retry: 3
enable-idempotence: true
max-in-flight-requests-per-connection: 3
msk 클러스터에서 생성된 bootstrap주소를 넣어주고 ,topic 및 컨슈머, producer는 우리가 임의로 지정해주자 !
@Configuration
public class KafkaTopicConfiguration {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Value("${kafka.topic.kid.name}")
private String topicName;
@Value("${kafka.topic.kid.numPartitions}")
private String numPartitions;
@Value("${kafka.topic.kid.replicationFactor}")
private String replicationFactor;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
/**
* broker 를 두개만 설정하였으므로 최소 Replication Factor로 2를 설정하고
* Partition의 경우 Event 의 Consumer인 WAS를 2대까지만 실행되도록 해두었기 때문에 2로 설정함.
* 이보다 Partition을 크게 설정한다고 해서 Consume 속도가 빨라지지 않기 때문이다.
*
* @return
*/
@Bean
public NewTopic newTopic() {
return new NewTopic(topicName, Integer.parseInt(numPartitions), Short.parseShort(replicationFactor));
}
}
Bean
으로 등록해주자 . Producer Factory는 Kafka Topic에 데이터를 전송하는 역할
@Configuration
public class KafkaProducer {
@Value("${kafka.bootstrapAddress}")
private String bootstrapServers;
/**
* ack: all
* In-Sync-Replica에 모두 event가 저장되었음이 확인 되어야 ack 신호를 보냄 가장 성능은 떨어지지만
* event produce를 보장할 수 있음.
*/
@Value("${spring.kafka.producer.acks}")
private String acksConfig;
@Value("${kafka.producer.retry}")
private Integer retry;
@Value("${kafka.producer.enable-idempotence}")
private Boolean enableIdempotence;
@Value("${kafka.producer.max-in-flight-requests-per-connection}")
private Integer maxInFlightRequestsPerConnection;
/**
* enable.idempotence true를 위해서는 retry가 0이상,
* max.in.flight.requests.per.connection 은 5이하여야한다.
*
* @return
*/
@Bean
public ProducerFactory<String, ResponseMan> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.ACKS_CONFIG, acksConfig);
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configProps.put(ProducerConfig.RETRIES_CONFIG, retry);
configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotence);
configProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, maxInFlightRequestsPerConnection);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, ResponseMan> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Consumer Factory는 Kafka Topic으로부터 데이터를 수신해와서 만드는 역할
@EnableKafka
@Configuration
public class KafkaConsumer {
@Value("${kafka.bootstrapAddress}")
private String bootstrapServers;
@Value("${kafka.consumer.consumer.auto-offset-reset}")
private String autoOffsetResetConfig;
@Value("${kafka.consumer.consumer.group-id}")
private String rdbGroupId;
@Bean
public ConsumerFactory<String, ResponseMan> eventRDBConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.GROUP_ID_CONFIG, rdbGroupId);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetConfig);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
new JsonDeserializer<>(ResponseMan.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, ResponseMan> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, ResponseMan> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
factory.setConsumerFactory(eventRDBConsumerFactory()); // ConsumerFactory 설정 추가
return factory;
}
}
@Service
@Slf4j
public class ConsumerService {
@KafkaListener(topics = "topic-kid", groupId = "group-id",
properties = {AUTO_OFFSET_RESET_CONFIG + ":earliest"},
containerFactory = "kafkaListenerContainerFactory")
public void consume(ResponseMan message) {
log.warn("메시지: " + message.toString());
}
}
@Service
@RequiredArgsConstructor
@Slf4j
public class ProducerService {
@Value("topic-kid")
private String topicName;
private final KafkaTemplate<String, ResponseMan> kafkaTemplate;
private final ManRepository manRepository;
public ResponseMan sendMessage(RequestDtoKid dto) {
Kid kid = Kid.builder().
name(dto.getName())
.age(dto.getAge())
.job(dto.getJob())
.githubUrl(dto.getGithubUrl())
.hobby(dto.getHobby())
.height(dto.getHeight())
.tech(dto.getTech())
.build();
Kid saveMan = manRepository.save(kid);
ResponseMan responseMan = ResponseMan.builder().
name(saveMan.getName())
.age(saveMan.getAge())
.job(saveMan.getJob())
.githubUrl(saveMan.getGithubUrl())
.hobby(saveMan.getHobby())
.height(saveMan.getHeight())
.tech(saveMan.getTech())
.build();
//이벤트 발생
kafkaTemplate.send(topicName, responseMan);
return responseMan;
}
}
여기까지 완성이 되었다면 Dockerfile을 생성하자 !
# Base image
FROM --platform=linux/amd64 ubuntu:20.04
# Update package lists
RUN apt-get update
# Install Java
RUN apt-get install -y openjdk-17-jdk
# Copy the Spring Boot application jar file to the container
COPY ./build/libs/kafka-spring-boot-0.0.1-SNAPSHOT.jar /app.jar
ENTRYPOINT ["java", "-jar", "/app.jar"]
FROM --platform=linux/amd64 ubuntu:20.04 ubuntu:20.04
이부분은 M1 유저에 해당되는 명령어이다.
docker image를 생성후에 dockerhub에 푸쉬!
docker pull 이미지이름
docker run -p 8080:8080 이미지이름
정상적으로 Consumer 로그를 통해 데이터를 읽어볼수있다.!