// build.gradle
implementation 'org.springframework.kafka:spring-kafka'
// KafkaCunsumerConfig.java
// Kafka consumer 설정을 위한 Configuration 클래스
@Configuration
@EnableKafka // Kafka를 사용하기 위해 Spring Kafka를 활성화함
public class KafkaConsumerConfig {
// Kafka consumer를 생성하기 위한 ConsumerFactory를 생성하는 Bean 메서드
@Bean
public ConsumerFactory<String, String> consumerFactory() {
// Kafka consumer 설정을 위한 Properties 객체 생성
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); // Kafka broker의 주소와 포트 설정
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "127.0.0.1:9092"); // consumer group ID 설정
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // key deserializer 설정
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // value deserializer 설정
// Properties 객체를 사용하여 ConsumerFactory 생성
return new DefaultKafkaConsumerFactory<>(properties);
}
// Kafka listener container를 생성하기 위한 Bean 메서드
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
// ConcurrentKafkaListenerContainerFactory 생성
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory
= new ConcurrentKafkaListenerContainerFactory<>();
// ConsumerFactory를 사용하여 Kafka listener container를 생성함
kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
return kafkaListenerContainerFactory;
}
}
// KafkaConsumer.java
@Service
@Slf4j
public class KafkaConsumer {
private CatalogRepository repository;
// 생성자를 통해 CatalogRepository bean을 주입받음
@Autowired
public KafkaConsumer(CatalogRepository repository) {
this.repository = repository;
}
// KafkaListener annotation을 통해 example-catalog-topic의 메시지를 수신함
@KafkaListener(topics = "example-catalog-topic")
public void updateQty(String kafkaMessage) {
log.info("Kafka Message: ->", kafkaMessage);
// JSON 데이터를 Map으로 파싱함
Map<Object, Object> map = new HashMap<>();
ObjectMapper mapper = new ObjectMapper();
try {
map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {});
} catch (JsonProcessingException e) {
e.printStackTrace();
}
// productId를 기반으로 CatalogEntity를 찾음
CatalogEntity entity = repository.findByProductId((String) map.get("productId"));
// CatalogEntity가 존재하면 해당 제품의 재고를 업데이트함
if(entity != null) {
entity.setStock(entity.getStock() - (Integer) map.get("qty"));
repository.save(entity);
}
}
}
// build.gradle
implementation 'org.springframework.kafka:spring-kafka'
// KafkaProducerConfig.java
// Kafka producer 설정을 위한 Configuration 클래스
@Configuration
@EnableKafka // Kafka를 사용하기 위해 Spring Kafka를 활성화함
public class KafkaProducerConfig {
// Kafka producer를 생성하기 위한 ProducerFactory를 생성하는 Bean 메서드
@Bean
public ProducerFactory<String, String> producerFactory() {
// Kafka producer 설정을 위한 Properties 객체 생성
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); // Kafka broker의 주소와 포트 설정
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // key serializer 설정
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // value serializer 설정
// Properties 객체를 사용하여 ProducerFactory 생성
return new DefaultKafkaProducerFactory<>(properties);
}
// KafkaTemplate을 생성하기 위한 Bean 메서드
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
// ProducerFactory를 사용하여 KafkaTemplate 생성
return new KafkaTemplate<>(producerFactory());
}
}
// KafkaProducer.java
// Kafka 메시지를 생성하고 보내기 위한 Kafka Producer 서비스 클래스
@Service
@Slf4j
public class KafkaProducer {
private KafkaTemplate<String, String> kafkaTemplate;
// KafkaTemplate 객체를 생성자를 통해 주입 받음
@Autowired
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
// 주어진 주제(topic)로 OrderDto 객체를 Kafka 메시지로 전송하고, 전송한 OrderDto 객체를 반환하는 메서드
public OrderDto send(String topic, OrderDto orderDto) {
ObjectMapper mapper = new ObjectMapper();
String jsonInstring = "";
try {
jsonInstring = mapper.writeValueAsString(orderDto); // OrderDto 객체를 JSON 문자열로 변환
} catch (JsonProcessingException e) {
e.printStackTrace();
}
kafkaTemplate.send(topic, jsonInstring); // KafkaTemplate을 사용하여 Kafka 메시지를 보냄
log.info("Kafka Producer sent data from the Order microservice: " + orderDto);
return orderDto; // 전송한 OrderDto 객체를 반환
}
}
// OrderController.java
@RestController
@RequestMapping("/order-service")
public class OrderController {
private Environment env;
private OrderService orderService;
private KafkaProducer kafkaProducer;
public OrderController(Environment env,
OrderService orderService,
KafkaProducer kafkaProducer) {
this.env = env;
this.orderService = orderService;
this.kafkaProducer = kafkaProducer;
}
...
// http://127.0.0.1:0/order-service/{user_id}/orders/
// 사용자 ID와 주문 정보(orderDetails)를 받아 새 주문을 생성하는 REST API 엔드포인트
@PostMapping("/{userId}/orders")
public ResponseEntity<ResponseOrder> createOrder(@PathVariable("userId") String userId,
@RequestBody RequestOrder orderDetails) {
ModelMapper mapper = new ModelMapper();
mapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);
/* JPA */
// RequestOrder 객체를 OrderDto 객체로 변환하여 userId를 추가
OrderDto orderDto = mapper.map(orderDetails, OrderDto.class);
orderDto.setUserId(userId);
// OrderDto 객체를 사용하여 주문 생성
OrderDto createdOrder = orderService.createOrder(orderDto);
// 생성된 OrderDto 객체를 ResponseOrder 객체로 변환
ResponseOrder responseOrder = mapper.map(createdOrder, ResponseOrder.class);
/* send this order to the kafka */
// KafkaProducer 객체를 사용하여 주문 정보를 Kafka 메시지로 전송
kafkaProducer.send("example-catalog-topic", orderDto);
// 생성된 ResponseOrder 객체를 HttpStatus.CREATED 상태코드와 함께 반환
return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder);
}
...
}
ZookeeperServer, KafkaServer, EurekaServer, ConfigService, API Gateway 실행