Kafka for Spring Boot

Web 개발러 Velog!·2022년 5월 11일
0

본 포스팅은 Spring boot 환경에서 Kafka를 연동시키는 방법 및 절차를 기술한다.

1. 개요

1.1 Kafka란?

대용량의 실시간 로그 처리에 특화되어 있는 솔루션이며 데이터를 유실없이 안전하게 전달하는 것이 주목적인 메세지 시스템에서 안정적인 아키텍처와 빠른 퍼포먼스로 데이터를 처리할 수 있는 분산 스트리밍 플랫폼이다.

(zookeeper_2)

  • 프로듀서 : 카프카로 메시지를 보내는 역할
  • 카프카 : 프로듀서가 보낸 메시지를 저장하는 역할
  • 컨슈머 : 카프카에 저장되어 있는 메시지를 가져오는 역할
  • 주키퍼 : 카프카가 분산 코디네이터인 주키퍼와 연결하여 메타 데이터 및 클러스터 노드 관리

1.2 Kafka의 특징

  • 카프카는 파티션이란 개념을 도입하여 여러개의 파티션을 서버들에 분산시켜 나누어 처리할 수 있다. Producer가 메시지를 어떤 파티션으로 전송할지는 사용자가 구현한 분배 알고리즘에 따른다.
  • 각 파티션에 균등하게 저장하거나, 메시지의 키를 활용하여 특정 문자로 시작하는 메시지는 특정 파티션에 할당할 수 있다. => 파티션을 나누었을 때, 메시지의 순서는 보장해주지 않는다.
    한번 늘린 파티션은 절대로 줄일 수 없다.
  • 카프카는 클러스터로 동작하여 고가용성 서비스를 제공할 수 있고, 서버를 수평적으로 늘려 안정성 및 성능을 향상 시킬 수 있다.
  • 각 파티션을 복제하여 클러스터에 분산 시킬 수 있다.
  • 각 파티션은 Leader 와 Follower로 구분 존재하며, Leader에서만 읽기(read)와 쓰기(write)가 일어난다.

1.3 Zookeeper란?

분산 애플리케이션을 사용하게 되면, 분산 애플리케이션 관리를 위한 안정적인 코디네이션 애플리케이션이 추가로 필요하게 된다. 안정적인 코디네이션 서비스로 검증된 주키퍼(Zookeeper)를 많이 사용하게 된다.

카프카를 사용하기 위해서는 주키퍼(Zookeeper) 사용이 필수적이다.

1.4 Zookeeper의 특징

주키퍼 는 분산 애플리케이션을 위한 코디네이션 시스템이다. 분산 애플리케이션이 안정적인 서비스를 할 수 있도록 분산되어 있는 각 애플리케이션의 정보를 중앙에 집중하고 구성 관리, 그룹 관리 네이밍, 동기화 등의 서비스를 제공한다.

(zookeeper_3)

서버 여러 대를 앙상블(클러스터)로 구성하고, 분산 애플리케이션들이 각각 클라이언트가 되어 주키퍼 서버들과 커넥션을 맺은 후 상태 정보 등을 주고 받는다. (위의 그림에서 Server는 주키퍼, Client는 카프카가 된다.)

상태 정보들은 주키퍼의 지노드(znode)라고 불리는 곳에 Key-Value 형태로 저장하며, 지노드에 저장된 것을 이용하여 분산 애플리ㅔ이션들은 서로 데이터를 주고받게 된다. (znode를 일반 컴퓨터의 파일이나 폴더 개념으로 생각하면 쉬움)
지노드(znode)는 우리가 알고 있는 일반적인 디렉토리와 비슷한 형태로서 자식노드를 가지고 있는 계층형 구조로 구성되어 있다.

(znode)

각 지노드는 데이터 변경 등에 대한 유효성 검사 등을 위해 버전 번호를 관리(데이터가 변동될 때마다 지노드의 버전 번호가 증가)

주키퍼에 저장되는 데이터는 모두 메모리에 저장되어 처치량이 매우 크고 속도 또한 빠름

주키퍼는 좀 더 신뢰성 있는 서비스를 위해 앙상블(클러스터)이라는 호스트 세트를 구성할 수 있다. 앙상블로 구성되어 있는 주키퍼는 과반수 방식에 따라 살아 있는 노드 수가 과반 수 이상 유지된다면, 지속적인 서비스가 가능하다.

2. Spring Boot 연동 - API 방식

Spring에서 제공하는 Kafka 모듈을 이용해 API 형식으로 메시지를 받고 전송하는 기능을 제공하고 있다. 사용방법은 아래와 같다.

2.1 Spring boot 의존성 설정

build.gradle 에서 Kafka에 대한 의존성을 설정한다. 설정 방법은 다음과 같다.

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-web'

    /* kafka */
    implementation 'org.springframework.kafka:spring-kafka'

    testImplementation('org.springframework.boot:spring-boot-starter-test') {
        exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
    }
}

2.2 Kafka 연동 설정

Spring boot에서 Kafka와 연동하기 위한 설정을 한다. Spring의 application.yml 파일에서 설정할 수 있다.

spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:9092
      group-id: foo
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
  • spring.kafka.consumer

    • bootstrap-servers
      • Kafka 클러스터에 대한 초기 연결에 사용할 호스트:포트쌍의 쉼표로 구분된 목록.
      • 글로벌 설정이 있어도, consumer.bootstrap-servers가 존재하면 consuemer 전용으로 오버라이딩.
    • group-id
      • Consumer는 Consumer Group이 존재하기 때문에, 유일하게 식별 가능한 Consumer Group을 작성.
    • auto-offset-reset
      • Kafka 서버에 초기 offset이 없거나, 서버에 현재 offset이 더 이상 없는 경우 수행할 작업을 작성.
      • Consumer Group의 Consumer는 메시지를 소비할 때 Topic내에 Partition에서 다음에 소비할 offset이 어디인지 공유를 한다. 그런데 오류 등으로 인해 이러한 offset 정보가 없어졌을 때 어떻게 offeset을 reset 할 것 인지를 명시.
        • latest : 가장 최근에 생산된 메시지로 offeset reset
        • earliest : 가장 오래된 메시지로 offeset reset
        • none : offset 정보가 없으면 Exception 발생
      • 직접 Kafka Server에 접근하여 offset을 reset할 수 있지만, Spring에서 제공해주는 방식은 위와 같다.
    • key-deserializer / value-deserializer
    • Kafka에서 데이터를 받아올 때, key / value를 역직렬화 한다.
    • 여기서 key와 value는 뒤에서 살펴볼 KafkaTemplate의 key, value를 의미한다.
    • 메시지가 문자열 데이터이면 StringDeserializer, JSON 데이터를 넘겨줄 것이라면 JsonDeserializer도 가능하다.
  • spring.kafka.producer

    • bootstrap-servers
      • consumer.bootstrap-servers와 동일한 내용이며, producer 전용으로 오버라이딩 하려면 작성해야 한다.
    • key-serializer / value-serializer
      Kafka에 데이터를 보낼 때, key / value를 직렬화 한다.
      consumer에서 살펴본 key-deserializer, value-deserializer와 동일한 내용이다.

2.3 Kafka 사용 예제

KafkaController.java
post 방식으로 message 데이터를 받아서, Producer 서비스로 전달.

import com.victolee.kafkaexam.Service.KafkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {
    private final KafkaProducer producer;

    @Autowired
    KafkaController(KafkaProducer producer) {
        this.producer = producer;
    }

    @PostMapping
    public String sendMessage(@RequestParam("message") String message) {
        this.producer.sendMessage(message);

        return "success";
    }
}

KafkaProducer.java
KafkaTemplate에 Topic명과 Message를 전달한다. KafkaTemplate.send() 메서드가 실행되면, Kafka 서버로 메시지가 전송된다.

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducer {
    private static final String TOPIC = "exam";
    private final KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    public KafkaProducer(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String message) {
        System.out.println(String.format("Produce message : %s", message));
        this.kafkaTemplate.send(TOPIC, message);
    }
}

KafkaConsumer.java
Kafka로부터 메시지를 받으려면 @KafkaListener 어노테이션을 달아주면 된다.

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.io.IOException;

@Service
public class KafkaConsumer {

    @KafkaListener(topics = "exam", groupId = "foo")
    public void consume(String message) throws IOException {
        System.out.println(String.format("Consumed message : %s", message));
    }
}

3. Spring Boot 연동 - log4j 방식

log4j에서는 Kafka와 연동하여 바로 전송할 수 있도록 하는 서비스를 제공한다. 사용 방법은 아래와 같다.

2.1 log4j.xml 설정

# Root logger option
log4j.rootLogger=DEBUG, stdout, kafka
log4j.logger.kafka=WARN
log4j.logger.org.apache.kafka=WARN

# Redirect log messages to console
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n

log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender
log4j.appender.kafka.brokerList=localhost:9092
log4j.appender.kafka.topic=kafkalogger
log4j.appender.kafka.layout=org.apache.log4j.PatternLayout
log4j.appender.kafka.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
log4j.appender.kafka.level=INFO
profile
while(true) { 손가락 관절염++ };

0개의 댓글