Kafka로 비동기 처리하기

자라나는 ㅇㅅㅇ개발자·2024년 10월 21일
0

TIL

목록 보기
181/183

비동기 처리의 필요성

  • 비동기 처리는 특정 작업이 완료될 때까지 기다리지 않고 다른 작업을 동시에 수행할 수 있도록 하는 방식으로, 시스템의 응답성과 처리 속도를 크게 개선할 수 있다.
  • 예를들어 예매 취소 API에서 데이터베이스에 공연장을 생성하는 작업이 발생하는 동안 다른 추가적인 작업(예: 알림 발송, 로그 저장 등)을 동시에 처리할 수 있다.
    (동기적으로 처리한다면 API의 응답 시간이 길어지거나 서버의 부하가 높아질 수 있다.)
  • 주로 반환되는 데이터의 중요성이 떨어지거나(요청의 결과 성공 유무 상태를 모르는 상태로 응답을 보내주므로) 요청자가 빠른 응답을 원하는 경우에 적용이 유의미하다.

그래서 왜 Kafka를 사용하는가?

  • Apache Kafka는 분산 스트리밍 플랫폼으로, 데이터를 발행(Publish)하고 구독(Subscribe)하는 메시지 기반 시스템을 통해 대규모의 데이터를 실시간으로 처리할 수 있다.
  • Producer가 메시지를 Kafka로 보내고, Consumer가 이 메시지를 처리하는 방식으로 동작하므로 데이터를 즉시 처리하지 않고 필요할 때 Consumer가 데이터를 가져가서 처리하는 비동기 처리가 가능하다.

구현해보기

당연하게도 의존성과 컨테이너를 띄우기 위한 docker compose 파일을 먼저 추가한다.

dependencies {

    // ...
    
    //kafka
    implementation 'org.springframework.kafka:spring-kafka'
    
}
services:
  kafka:
    image: wurstmeister/kafka:latest
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:29092,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
    networks:
      - service-network

Servicea 메서드에서 "booking-cancel-topic"이라는 토픽의 메시지를 Producer에게 넘겨준다.

    @Transactional
    public void cancelBooking(Long getUserId, String role, Long bookingId) {

        // TODO : 이거 좀 애매함 확인 필요 -> 결제 취소
        Booking booking = bookingRepository.findById(bookingId)
            .orElseThrow(() -> new GlobalException(ErrorCase.BOOKING_NOT_FOUND));

        if (!role.equals("ROLE_MANAGER")) {
            if (!getUserId.equals(booking.getUserId())) {
                throw new GlobalException(ErrorCase.NOT_AUTHORIZED);
            }
        }
        GetScheduleDetailRes scheduleRes = concertClient.getScheduleDetail(booking.getScheduleId());

        // 공연이 끝났는지 체크 필요
        if (!possibleCancel(scheduleRes.concertDate(), scheduleRes.concertTime())) {
            throw new GlobalException(ErrorCase.CANNOT_CANCEL_BOOKING);
        }

//        paymentService.cancelPayment(getUserId, role, booking.getPayment().getId());

        // 취소 요청 됨(중간 상태) 상태로(사용자가 본인이 요청 중이라는 것을 알 수 있도록) 변경
        booking.requestCancel();

        // Kafka에 예매 취소 메시지 전송
        String message = "Booking canceled for ID: " + bookingId;
        log.info("1. 예매 취소를 위한 Kafka 메시지 전송 준비 중입니다. Booking ID: {}", bookingId);

        kafkaProducer.sendMessage("booking-cancel-topic", message);

        log.info("4. 예매 취소를 위한 Kafka 메시지가 전송되었습니다. Booking ID: {}", bookingId);
    }
@Service
@RequiredArgsConstructor
@Slf4j
public class KafkaProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String message) {
        log.info("2. Kafka로 메시지를 전송 중입니다. Topic: {}, Message: {}", topic, message);
        kafkaTemplate.send(topic, message);
        log.info("3. Kafka로 메시지가 성공적으로 전송되었습니다. Topic: {}, Message: {}", topic, message);

    }

}

@KafkaListener(topics = "booking-cancel-topic", groupId = "booking-group") 어노테이션이 사용되어 해당 토픽으로 전송된 메시지를 비동기적으로 소비한다.

@Service
@Slf4j
public class BookingCancelConsumer {

    private final BookingRepository bookingRepository;
    private final PaymentService paymentService;
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

    public BookingCancelConsumer(BookingRepository bookingRepository, PaymentService paymentService) {
        this.bookingRepository = bookingRepository;
        this.paymentService = paymentService;
    }

    @KafkaListener(topics = "booking-cancel-topic", groupId = "booking-group")
    public void consume(String message) {
        log.info("5. Kafka에서 메시지를 수신했습니다.: {}", message);

        // 10초 후에 상태 변경 작업 수행
        log.info("6. 수신된 메시지를 기반으로 예약 취소 작업을 10초 후에 작동합니다. message: {}", message);
        scheduler.schedule(() -> processBookingCancel(message), 10   , TimeUnit.SECONDS);
    }

    private void processBookingCancel(String message) {
        // 메시지에서 bookingId 추출
        Long bookingId = extractBookingIdFromMessage(message);

        // Booking 객체를 찾아 상태 변경
        log.info("7. 예약 취소 작업을 처리 중입니다. Booking ID: {}", bookingId);
        Booking booking = bookingRepository.findById(bookingId)
            .orElseThrow(() -> new GlobalException(ErrorCase.BOOKING_NOT_FOUND));

        paymentService.cancelPaymentTest(booking.getPayment().getId()); // 예매, 결제 상태 변경

        log.info("8. 예매 상태가 CANCELED로 변경되었습니다. Booking ID: {}", bookingId);
    }

    private Long extractBookingIdFromMessage(String message) {
        // 메시지에서 bookingId를 추출하는 로직 구현
        // 예시: "Booking canceled for ID: 1" -> 1을 추출
        return Long.parseLong(message.split(": ")[1]);
    }

}

실제 요청을 날려보게되면
(DB 상에서 PENDING -> CANCEL_REQUESTED 상태로 바로 변경되고)


메시지를 전송하고

consumer에서 수신하여

예약된 10초 뒤에 작업을 수행한다.

(이 때 예매 상태는 CANCEL_REQUEST -> CANCELED로 바뀌게 된다.)

로그가 의도한대로 찍히는 것을 확인할 수 있다.

0개의 댓글

관련 채용 정보