당연하게도 의존성과 컨테이너를 띄우기 위한 docker compose 파일을 먼저 추가한다.
dependencies {
// ...
//kafka
implementation 'org.springframework.kafka:spring-kafka'
}
services:
kafka:
image: wurstmeister/kafka:latest
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:29092,OUTSIDE://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
networks:
- service-network
Servicea 메서드에서 "booking-cancel-topic"
이라는 토픽의 메시지를 Producer에게 넘겨준다.
@Transactional
public void cancelBooking(Long getUserId, String role, Long bookingId) {
// TODO : 이거 좀 애매함 확인 필요 -> 결제 취소
Booking booking = bookingRepository.findById(bookingId)
.orElseThrow(() -> new GlobalException(ErrorCase.BOOKING_NOT_FOUND));
if (!role.equals("ROLE_MANAGER")) {
if (!getUserId.equals(booking.getUserId())) {
throw new GlobalException(ErrorCase.NOT_AUTHORIZED);
}
}
GetScheduleDetailRes scheduleRes = concertClient.getScheduleDetail(booking.getScheduleId());
// 공연이 끝났는지 체크 필요
if (!possibleCancel(scheduleRes.concertDate(), scheduleRes.concertTime())) {
throw new GlobalException(ErrorCase.CANNOT_CANCEL_BOOKING);
}
// paymentService.cancelPayment(getUserId, role, booking.getPayment().getId());
// 취소 요청 됨(중간 상태) 상태로(사용자가 본인이 요청 중이라는 것을 알 수 있도록) 변경
booking.requestCancel();
// Kafka에 예매 취소 메시지 전송
String message = "Booking canceled for ID: " + bookingId;
log.info("1. 예매 취소를 위한 Kafka 메시지 전송 준비 중입니다. Booking ID: {}", bookingId);
kafkaProducer.sendMessage("booking-cancel-topic", message);
log.info("4. 예매 취소를 위한 Kafka 메시지가 전송되었습니다. Booking ID: {}", bookingId);
}
@Service
@RequiredArgsConstructor
@Slf4j
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String message) {
log.info("2. Kafka로 메시지를 전송 중입니다. Topic: {}, Message: {}", topic, message);
kafkaTemplate.send(topic, message);
log.info("3. Kafka로 메시지가 성공적으로 전송되었습니다. Topic: {}, Message: {}", topic, message);
}
}
@KafkaListener(topics = "booking-cancel-topic", groupId = "booking-group")
어노테이션이 사용되어 해당 토픽으로 전송된 메시지를 비동기적으로 소비한다.
@Service
@Slf4j
public class BookingCancelConsumer {
private final BookingRepository bookingRepository;
private final PaymentService paymentService;
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
public BookingCancelConsumer(BookingRepository bookingRepository, PaymentService paymentService) {
this.bookingRepository = bookingRepository;
this.paymentService = paymentService;
}
@KafkaListener(topics = "booking-cancel-topic", groupId = "booking-group")
public void consume(String message) {
log.info("5. Kafka에서 메시지를 수신했습니다.: {}", message);
// 10초 후에 상태 변경 작업 수행
log.info("6. 수신된 메시지를 기반으로 예약 취소 작업을 10초 후에 작동합니다. message: {}", message);
scheduler.schedule(() -> processBookingCancel(message), 10 , TimeUnit.SECONDS);
}
private void processBookingCancel(String message) {
// 메시지에서 bookingId 추출
Long bookingId = extractBookingIdFromMessage(message);
// Booking 객체를 찾아 상태 변경
log.info("7. 예약 취소 작업을 처리 중입니다. Booking ID: {}", bookingId);
Booking booking = bookingRepository.findById(bookingId)
.orElseThrow(() -> new GlobalException(ErrorCase.BOOKING_NOT_FOUND));
paymentService.cancelPaymentTest(booking.getPayment().getId()); // 예매, 결제 상태 변경
log.info("8. 예매 상태가 CANCELED로 변경되었습니다. Booking ID: {}", bookingId);
}
private Long extractBookingIdFromMessage(String message) {
// 메시지에서 bookingId를 추출하는 로직 구현
// 예시: "Booking canceled for ID: 1" -> 1을 추출
return Long.parseLong(message.split(": ")[1]);
}
}
실제 요청을 날려보게되면
(DB 상에서 PENDING
-> CANCEL_REQUESTED
상태로 바로 변경되고)
메시지를 전송하고
consumer에서 수신하여
예약된 10초 뒤에 작업을 수행한다.
(이 때 예매 상태는 CANCEL_REQUEST
-> CANCELED
로 바뀌게 된다.)
로그가 의도한대로 찍히는 것을 확인할 수 있다.