Spring Boot FCM 전송 API 처리 속도 향상(Feat. Kafka)

최민길(Gale)·2023년 8월 31일
3

Spring Boot 적용기

목록 보기
44/46

안녕하세요 이번 시간에는 Kafka를 활용하여 FCM 전송 API의 처리 속도를 향상시키는 방법에 대해 알아보겠습니다.

Kafka는 메시지 큐의 일종입니다. 메시지 큐란 컴퓨터 시스템 또는 어플리케이션 간에 비동기적으로 데이터나 작업을 교환하기 위한 통신 패턴입니다. 발신자와 수신자가 독립적으로 작동하며 데이터를 직접 주고받지 않고 중간 매개체를 통해 데이터를 전달할 수 있습니다.

메시지 큐는 크게 프로듀서, 컨슈머, 브로커로 이루어져 있습니다. 프로듀서란 메시지를 생성하고 메시지 큐로 보내는 역할을 담당하며 컨슈머는 메시지 큐에서 메시지를 가져와 처리하는 역할을 담당합니다. 브로커는 메시지 큐 시스템을 관리하고 중재하는 중간 매개체로 프로듀서가 메시지를 브로커로 보내고 컨슈머가 브로커에서 메시지를 가져오게 됩니다. 이 때 Pub/Sub 방식으로 프로듀서가 브로커에 메시지를 전송하면 관련된 메시지를 구독하고 있는 컨슈머가 브로커에서 해당 메시지를 가져오는 방식으로 여러 요청이 동시에 발송될 경우 비동기적으로 빠르게 처리할 수 있습니다.

메시지 큐는 크게 RabbitMQ와 Kafka로 나뉩니다.

RabbitMQ란 AMQP, 즉 메시지 기반 통신을 위한 개방형 표준 프로토콜을 기반으로 브로커 내부의 Exchange로 인해 다양한 타입을 처리 가능하다는 장점이 있습니다. 또한 메시지가 처리된 이후 삭제되며, 수직적 확장이 필요하다는 특징이 있습니다.

Kafka란 대량의 데이터를 수집 및 처리하기 위해 사용되며 브로커들이 하나의 클러스터로 구성되어 파티션을 추가하여 수평적으로 확장할 수 있습니다. 또한 메시지가 처리되더라도 기간 동안 보관되어 데이터 보존이 된다는 특징이 있어 유실된 데이터의 복구가 편하다는 장점이 있으나 Kafka의 컨슈머에 오류가 발생 시 다음 컨슈머가 정상적으로 작동할 때 이전에 오류가 발생했을 때의 데이터 전송이 한번에 이루어지기 때문에 조심해서 사용해야 합니다.

Kafka에서는 Zookeeper를 사용하여 여러 브로커들이 존재하는 분산 환경에서 중앙화된 방식으로 관리합니다. Zookeeper는 각 브로커들의 데이터 변경 사항을 감지하여 이벤트로 제공하며 여러 브로커를 한번에 컨트롤할 수 있도록 합니다.

저는 확장성과 데이터 보존의 특징을 위해 Kafka를 선택했습니다.

그럼 Docker를 이용하여 Kafka를 구축해보겠습니다. 먼저 아래와 같이 docker-compose.yml 파일을 생성합니다. 위에서 살펴본 것처럼 Kafka의 브로커들을 관리하기 위한 Zookeeper가 같이 필요하기 때문에 두 도커 이미지를 생성한 후 연결해줍니다. 이 때 Zookeeper와 Kafka는 2181 포트로 통신하며, 외부에서 Kafka에 접근할 때는 9092 포트를 사용합니다.

version: '3.8'
services:
  zookeeper:
    image: wurstmeister/zookeeper:latest
    container_name: zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka:latest
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

이후 아래 명령어를 터미널에 입력하여 컨테이너를 생성합니다.

docker-compose -f {위에서 생성한 파일 이름}.yml up

생성한 Kafka 컨테이너에 접속한 후 토픽을 생성합니다. 토픽이란 메시지를 구분하고 분류하는 논리적인 개념으로 프로듀서와 컨슈머는 토픽을 기준으로 메시지 큐에 데이터를 전달합니다. 아래는 토픽 관련 명령어입니다.

토픽 생성 : kafka-topics.sh --create --topic {생성할 토픽 이름} --bootstrap-server localhost:9092

--replication-factor 1 : 토픽 내의 데이터를 몇 개의 복제본으로 유지할 것인지 설정 (이 경우 1개)
--partitions 3 : 토픽을 몇 개의 파티션으로 나눌 것인지 (이 경우 3개)

토픽 리스트 확인: kafka-topics.sh --list --bootstrap-server localhost:9092

토픽 상세 조회: kafka-topics.sh --describe --topic {토픽 이름} --bootstrap-server localhost:9092

토픽 삭제: kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic {토픽 이름}

토픽을 생성한 후 Spring Boot에서 Kafka와 통신할 준비를 해야합니다. 먼저 gradle에 아래의 의존성을 추가합니다.

implementation 'org.springframework.kafka:spring-kafka'

해당 의존성을 추가하게 되면 application.properties 내에 아래 내용들을 추가해줍니다.

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id={group id 이름}

Kafka는 Redis와 마찬가지로 서비스 외부에서 데이터를 주고받기 때문에 데이터의 직렬화 및 역직렬화 작업이 필요합니다. 만약 이 작업을 String, String으로 진행할 경우 Spring Boot 자체적으로 StringSerializer와 StringDeserializer를 할당해줘 자동으로 직렬화 및 역직렬화가 수행되지만 다른 타입을 넣을 경우 별도의 KafkaConfig를 생성하여 빈을 주입해주어야 합니다.

아래는 KafkaConfig 클래스입니다. 직렬화 및 역직렬화의 보안 이슈 및 성능 향상을 위해 프로토콜 버퍼를 사용하여 <String,byte[]> 타입으로 Kafka에 접속하려고 합니다. 따라서 키는 String, 벨류는 byte로 직렬화 및 역직렬화를 수행하도록 ProducerFactory와 ConsumerFactory에 해당 값을 추가합니다.

여기서 주의할 점은 ProducerFactory를 참조하는 KafkaTemplate와 consumerFactory를 참조하는 ConcurrentKafkaListenerContainerFactory를 같이 빈을 주입해주어야 합니다. KafkaTemplate는 실질적으로 Kafka와 연결하여 로직을 처리하는 역할을 담당하며 @KafkaListner 어노테이션을 사용하여 컨슈머 메서드에 ConcurrentKafkaListenerContainerFactory를 매핑할 수 있습니다.

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.consumer.group-id}")
    private String consumerGroup;

    @Bean
    public ProducerFactory<String, byte[]> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public ConsumerFactory<String, byte[]> consumerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, byte[]> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, byte[]> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

기존 FCM 발송 클래스는 아래와 같습니다. title은 푸시 알림의 제목, body는 푸시 알림의 내용입니다. typeNum은 푸시 알림이 어떤 목적인지를 나타내며 dataID는 해당 목적에 따른 아이디값을 반환합니다. FcmGroup은 각각 IOS 토큰과 AOS 토큰의 리스트로 이루어져 있습니다.

import lombok.*;

@Getter
@ToString
public final class FcmParams {
    private String title;
    private String body;
    private int typeNum;
    private long dataID;
    FcmGroup fcmGroup;

    public FcmParams(){}

    @Builder
    public FcmParams(
            String title,
            String body,
            int typeNum,
            long dataID,
            FcmGroup fcmGroup
    ){
        this.title = title;
        this.body = body;
        this.typeNum = typeNum;
        this.dataID = dataID;
        this.fcmGroup = fcmGroup;
    }

}

이를 프로토콜 버퍼로 구현해보겠습니다. 리스트의 경우 repeated를 이용하여 구현할 수 있습니다. 여기서 주의할 점은 Spring Boot 내의 프로토콜 버퍼 의존성 버전이 3.22.0 이전일 경우 emptyList() is not public in LazyStringArrayList 에러가 발생한다는 점입니다. 3.22.0 이전 버전에서 LazyStringArrayList 클래스가 protected로 되어 있어 외부에서 참조할 수 없어 리스트를 정의했을 때 다음과 같은 오류가 발생합니다.

syntax = "proto3";
option java_package = "{상위 패키지 이름}";
option java_outer_classname = "KafkaFcmProto";

message KafkaFcm {
  string title = 1;
  string body = 2;
  int32 typeNum = 3;
  int64 dataID = 4;
  repeated string aosFcmList = 5;
  repeated string iosFcmList = 6;
}

그럼 컨슈머를 구현해보겠습니다. 위에서 살펴본 것처럼 @KafkaListener 어노테이션 내에 어떤 토픽을 사용할 것인지를 입력하면 ConcurrentKafkaListenerContainerFactory 클래스와 매핑되어 컨슈머가 설정됩니다. Kafka에 저장된 바이트 배열을 역직렬화하여 FcmParams 객체로 변환하고 알림을 발송합니다.

    @KafkaListener(topics = "{토픽 이름}")
    public void fcmKafkaConsumer(byte[] byteCode){
        try {
            KafkaFcmProto.KafkaFcm kafkaFcm = KafkaFcmProto.KafkaFcm.parseFrom(byteCode);
            FcmGroup group = FcmGroup.builder()
                    .aosFcmList(kafkaFcm.getAosFcmListList())
                    .iosFcmList(kafkaFcm.getIosFcmListList())
                    .build();

            FcmParams params = FcmParams.builder()
                    .title(kafkaFcm.getTitle())
                    .body(kafkaFcm.getBody())
                    .typeNum(kafkaFcm.getTypeNum())
                    .dataID(kafkaFcm.getDataID())
                    .fcmGroup(group)
                    .build();

            sendFcmSingleUser(params).get();
        }
        catch (ExecutionException | InterruptedException | InvalidProtocolBufferException e){
            throw new WrongAccessException(WrongAccessException.of.KAFKA_CONNECTION_EXCEPTION);
        }
    }

알림 발송 로직은 아래와 같습니다. FcmParams 내에 있는 IOS, AOS 알림 발송할 토큰들을 Fcm 발송 API로 발송합니다. 이 때 주의할 점은 mapper.writeValueAsString(body), Consts.UTF_8) 설정하여 한글이 깨지지 않도록 해주어야 한다는 점입니다. 또한 IOS 발송과 AOS 발송을 최대한 동시에 진행할 수 있도록 알림 발송 시 @Async로 비동기로 처리하고 전체 알림을 발송하는 메서드는 Future<Void> 타입을 리턴하여 모든 알림이 발송할 때까지 대기합니다.

    private Future<Void> sendFcmSingleUser(FcmParams params) {
        String fcmType = getFcmType(params.getTypeNum());
        List<String> aosFcmList = params.getFcmGroup().getAosFcmList();
        List<String> iosFcmList = params.getFcmGroup().getIosFcmList();

        if(!aosFcmList.isEmpty()){
            sendUrl(getFcmIosBody(
                    iosFcmList,
                    params.getTitle(),
                    params.getBody(),
                    fcmType,
                    params.getDataID()
            ));
        }

        if(!iosFcmList.isEmpty()){
            sendUrl(getFcmAosBody(
                    aosFcmList,
                    params.getTitle(),
                    params.getBody(),
                    fcmType,
                    params.getDataID()
            ));
        }

        return CompletableFuture.completedFuture(null);
    }

    @Async
    private void sendUrl(Object body){
        ObjectMapper mapper = new ObjectMapper();
        try (CloseableHttpClient client = HttpClientBuilder.create().build()) {
            HttpPost postRequest = new HttpPost(fcmServerDomain);
            postRequest.setHeader("Authorization", authorization);
            postRequest.setHeader("Content-Type", contentType);
            postRequest.setEntity(new StringEntity(mapper.writeValueAsString(body), Consts.UTF_8));
            client.execute(postRequest);
        } catch (Exception e) {
            throw new WrongAccessException(WrongAccessException.of.HTTP_CONNECTION_EXCEPTION);
        }
    }

그럼 이번에는 프로듀서를 구현해보겠습니다. 프로듀서는 따로 어노테이션 등으로 구현하지 않고 KafkaTemplate 클래스의 send 메서드를 통해 구현합니다. 이 때 주의할 점은 프로토콜 버퍼에 리스트 값을 매핑할 때 .set으로 하지 않고 .addAll로 매핑합니다.

        KafkaFcmProto.KafkaFcm params = KafkaFcmProto.KafkaFcm.newBuilder()
                .setTitle(title)
                .setBody(body)
                .setTypeNum(1)
                .setDataID(diaryID)
                .addAllAosFcmList(fcmGroup.getAosFcmList())
                .addAllIosFcmList(fcmGroup.getIosFcmList())
                .build();
        kafkaTemplate.send("{토픽 이름}", params.toByteArray());

그럼 실제로 얼마나 성능이 향상되었는지 확인해보겠습니다. 아래는 로직 내에서 알림 전송 메서드를 호출하여 직접 FCM 메시지를 전송한 결과입니다. 알림 전송 메서드는 @Async 처리가 되어 있습니다. 보시는 것처럼 4344ms가 걸렸습니다. 이 정도면 상당히 느린 API라고 볼 수 있습니다.

아래는 알림 전송에 필요한 데이터를 Kafka로 전달한 후 컨슈머에서 데이터를 받아 알림을 발송하는 방식으로 수정한 결과입니다. 무려 1000ms로 API 성능을 4배 가량 향상시켰습니다. 향상 원인은 API 로직과 알림 발송은 동시에 이루어질 필요가 없기 때문에 Kafka를 이용하여 알림 발송을 독립적으로 진행하여 API 핵심 로직이 수행된 이후 바로 리스판스를 리턴하기 때문에 빠르게 결과를 얻을 수 있으며, 이 때 별도로 알림이 발송되기 때문에 사용성을 크게 향상시킬 수 있습니다.

참조 자료
https://oingdaddy.tistory.com/308
https://9hyuk9.tistory.com/92

profile
저는 상황에 맞는 최적의 솔루션을 깊고 정확한 개념의 이해를 통한 다양한 방식으로 해결해오면서 지난 3년 동안 신규 서비스를 20만 회원 서비스로 성장시킨 Software Developer 최민길입니다.

0개의 댓글