데이터 동기화를 위한 KAFKA 활용(2)

InSeok·2023년 3월 23일
0

이번에 적용해볼 것들

  • Order Service에 요청된 주문의 수량 정보를 Catalog Service에 반영
  • Order Service에서 Kafka Topic으로 메세지 전송 -> Producer
  • Catalog Service에서 Kafka Topic에 전송된 메세지 취득 -> Consumer

kafka 의존성 추가

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

Catalog Service

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

// Consumer 빈 설정 및 등록@Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> properties = new HashMap<>();
// kafka 서버의 host, port, 컨슈머는 데이터를 받아오기 때문에 역직렬화 설정
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(properties);
    }

// 카프카에 들어온 정보를 감지하기 위해 리스너 빈 등록@Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory
                = new ConcurrentKafkaListenerContainerFactory<>();
        kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
        return kafkaListenerContainerFactory;
    }
}

catalog service 수정

@Service
@RequiredArgsConstructor
@Slf4j
@Transactional
public class KafkaConsumer {

    private final CatalogRepository catalogRepository;
    private final String kafkaTopic = "example-catalog-topic";

// Listen할 토픽 설정@KafkaListener(topics = kafkaTopic)
    public void updateQuantity(String kafkaMessage) {
        log.info("kafka Message = " + kafkaMessage);

// 역직렬화
        Map<Object, Object> map = new HashMap<>();
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            map = objectMapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {});
        } catch(JsonProcessingException e) {
            e.printStackTrace();
        }

        CatalogEntity entity = catalogRepository.findByProductId((String) map.get("productId"));
// 상품이 존재할 경우 상품의 수량 수정if (entity != null) {
            entity.setStock(entity.getStock() - (Integer) map.get("quantity"));
        }
    }
}

Order Service

@Configuration
@EnableKafka
public class KafkaProducerConfig {

// kafka로 메세지를 보내야하기 때문에 직렬화 해주어야함@Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(properties);
    }

// 데이터 전달 인스턴스@Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

Order Controller 수정

@PostMapping("/{userId}/orders")
public ResponseEntity<ResponseOrder> createOrder(@PathVariable("userId") String userId,
                                                 @RequestBody RequestOrder request) throws JsonProcessingException {

    ModelMapper modelMapper = new ModelMapper();
    modelMapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);

    OrderDto orderDto = modelMapper.map(request, OrderDto.class);
    orderDto.setUserId(userId);
    OrderDto createdOrder = orderService.createOrder(orderDto);

// kafkatopic에 주문 데이터 전달
    kafkaProducer.send(kafkaTopic, orderDto);

    ResponseOrder responseOrder = modelMapper.map(orderDto, ResponseOrder.class);
    return ResponseEntity.status(HttpStatus.CREATED).body(modelMapper.map(createdOrder, ResponseOrder.class));
}

Order Service에서 주문해보기

먼저 상품의 재고 확인

# [GET] /catalog-service/catalogs
{
     {
        "productId": "CATALOG-001",
        "productName": "Berlin",
        "unitPrice": 1500,
        "stock": 99,
        "createdAt": "2021-05-24T16:19:54.62"
    },...
}

주문

# [POST] /order-service/{userId}/orders
Request
{
    "productId" : "CATALOG-001",
    "quantity" : 95,
    "unitPrice" : 2000
}
Response
{
    "productId": "CATALOG-001",
    "quantity": 95,
    "unitPrice": 2000,
    "totalPrice": 190000,
    "orderId": "b0b2d4be-e484-44c8-a42d-258bc79ec1bc"
}

다시 상품의 재고 확인

# [GET] /catalog-service/catalogs
{
    {
        "productId": "CATALOG-001",
        "productName": "Berlin",
        "unitPrice": 1500,
        "stock": 4,
        "createdAt": "2021-05-24T16:19:54.62"
    },...
}

그리고 내 정보에서 주문 확인

{
    "email": "kobumssh@naver.com",
    "name": "고범석",
    "userId": "b169e54c-c563-449a-9fe4-2b77fa4df2fd",
    "orders": [
        {
            "productId": "CATALOG-001",
            "quantity": 95,
            "unitPrice": 2000,
            "totalPrice": 190000,
            "createdAt": "2021-05-24T16:39:24",
            "orderId": "b0b2d4be-e484-44c8-a42d-258bc79ec1bc"
        }
    ]
}

Order Service Instance가 복수일 경우 동기화 문제를 해결해보기

만약 OrderService 인스턴스를 하나 더 띄우게 되면 프로젝트에는 각각의 인스턴스에 H2 내장 DB가 붙는다. 그리고 라운드 로빈 방식으로 각 서비스가 호출되기 때문에 주문을 여러번 하고 내 주문 조회를 하게 되면 조회를 할 때 마다 결과가 다르게 나온다.

이 문제를 해결하기 위해 Order Service에 요청된 주문 정보를 DB가 아니라 Kafka의 Topic으로 전송하고, Topic에 설정된 Kafka Sink Connect를 사용해 단일 DB(Maria DB)에 저장해보자

Table 생성

CREATE TABLE orders (
    id int auto_increment primary key,
    product_id varchar(20) not null,
    quantity int default 0,
    unit_price int default 0,
    total_price int default 0,
    user_id varchar(50) not null,
    order_id varchar(50) not null,
    created_at datetime default NOW()
);

Order Service의 datasource 설정 변경

spring:
  datasource:
    url: jdbc:mariadb://localhost:포트/스키마
    driver-class-name: org.mariadb.jdbc.Driver
    username: root
    password: 비밀번호입력

Kafka Sink Connect 추가

# connect의 기본 포트는 8083
# 커넥트 추가, POST로 전송
{
    "name":"my-order-sink-connect",
    "config": {
        "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url":"jdbc:mysql://localhost:3306/mydb",
        "connection.user":"root",
        "connection.password":"test1357",
        "auto.create":"true",  # DB를 자동으로 만들기 설정, 토픽과 같은 이름의 테이블 생성
        "auto.evolve":"true",
        "delete.enabled":"false",
        "tasks.max":"1",
        "topics":"orders"    # 정보를 받을 토픽
    }
}

Order Service Controller 수정

@PostMapping("/{userId}/orders")
    public ResponseEntity<ResponseOrder> createOrder(@PathVariable("userId") String userId,
                                                     @RequestBody RequestOrder request) throws JsonProcessingException {

        ModelMapper modelMapper = new ModelMapper();
        modelMapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);

        OrderDto orderDto = modelMapper.map(request, OrderDto.class);
        orderDto.setUserId(userId);

        orderDto.setOrderId(UUID.randomUUID().toString());
        orderDto.setTotalPrice(request.getUnitPrice() * request.getQuantity());

// catalog-service와 데이터 동기화
        kafkaProducer.send(catalogTopic, orderDto);
// order-service의 데이터 동기화(단일 DB에 저장)
        orderProducer.send(orderTopic, orderDto);

        ResponseOrder responseOrder = modelMapper.map(orderDto, ResponseOrder.class);
        return ResponseEntity.status(HttpStatus.CREATED).body(modelMapper.map(responseOrder, ResponseOrder.class));
    }

각각의 DTO들과 OrderProducer 설정

// Kafka에 넣을 DTO, 직렬화 해주기 잊지말긔public class KafkaOrderDto implements Serializable {

    private Schema schema;
    private Payload payload;
}

...

// 스키마 정보 클래스public class Schema {

    private String type;
    private List<Field> fields;
    private boolean optional;
    private String name;
}

...

// Schema 내부의 필드 정의 클래스public class Field {

    private String type;
    private boolean optional;
    private String field;
}

...

// 실제 데이터public class Payload {

    private String order_id;
    private String user_id;
    private String product_id;
    private int quantity;
    private int unit_price;
    private int total_price;
}
@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaProducer {

// 카프카에 데이터를 보낼 템플릿private final KafkaTemplate<String, String> kafkaTemplate;

    public OrderDto send(String topic, OrderDto orderDto) throws JsonProcessingException {
        ObjectMapper objectMapper = new ObjectMapper();
// dto를 json으로 변환
        String jsonInString = objectMapper.writeValueAsString(orderDto);
        kafkaTemplate.send(topic, jsonInString);
        log.info("Kafka Producer sent data from the Order micro service : " + orderDto);

        return orderDto;
    }
}
@Service
@RequiredArgsConstructor
@Slf4j
@Transactional
public class OrderProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;
// 필드, 스키마는 이미 DB 테이블이 존재하기 때문에 그에 맞게 바로 생성private final List<Field> fields = Arrays.asList(
            Field.builder().type("string").optional(true).field("order_id").build(),
            Field.builder().type("string").optional(true).field("user_id").build(),
            Field.builder().type("string").optional(true).field("product_id").build(),
            Field.builder().type("int32").optional(true).field("quantity").build(),
            Field.builder().type("int32").optional(true).field("unit_price").build(),
            Field.builder().type("int32").optional(true).field("total_price").build());
    private final Schema schema = Schema.builder()
            .type("struct")
            .fields(fields)
            .optional(false)
            .name("orders").build();

    public OrderDto send(String topic, OrderDto orderDto) throws JsonProcessingException {

        Payload payload = Payload.builder()
                .order_id(orderDto.getOrderId())
                .user_id(orderDto.getUserId())
                .product_id(orderDto.getProductId())
                .quantity(orderDto.getQuantity())
                .unit_price(orderDto.getUnitPrice())
                .total_price(orderDto.getTotalPrice())
                .build();

        KafkaOrderDto kafkaOrderDto = KafkaOrderDto.builder()
                .schema(schema)
                .payload(payload)
                .build();

        ObjectMapper objectMapper = new ObjectMapper();
// json으로 변환
        String jsonInString = objectMapper.writeValueAsString(kafkaOrderDto);
        kafkaTemplate.send(topic, jsonInString);
        log.info("Order Producer sent data from the Order micro-service : " + kafkaOrderDto);

        return orderDto;
    }
}

테스트

현재 내 주문 정보와 상품 재고는 다음과 같이 나온다.

# 주문 정보
{
    "email": "kobumssh@naver.com",
    "name": "고범석",
    "userId": "b169e54c-c563-449a-9fe4-2b77fa4df2fd",
    "orders": [
        {
            "productId": "CATALOG-001",
            "quantity": 95,
            "unitPrice": 2000,
            "totalPrice": 190000,
            "createdAt": "2021-05-24T16:39:24",
            "orderId": "b0b2d4be-e484-44c8-a42d-258bc79ec1bc"
        }
    ]
}

# 상품 재고
[
    {
        "productId": "CATALOG-001",
        "productName": "Berlin",
        "unitPrice": 1500,
        "stock": 4,
        "createdAt": "2021-05-24T16:19:54.62"
    },
    {
        "productId": "CATALOG-002",
        "productName": "Tokyo",
        "unitPrice": 1000,
        "stock": 108,
        "createdAt": "2021-05-24T16:19:54.625"
    },
    {
        "productId": "CATALOG-003",
        "productName": "Stockholm",
        "unitPrice": 2000,
        "stock": 105,
        "createdAt": "2021-05-24T16:19:54.625"
    }
]

이제 주문을 두번 더 해보자.

{
    "productId" : "CATALOG-002",
    "quantity" : 18,
    "unitPrice" : 1000
}
{
    "productId" : "CATALOG-003",
    "quantity" : 15,
    "unitPrice" : 2000
}

그리고 내 주문 조회 및 상품 재고를 조회해보면?

# 주문 조회
{
    "email": "kobumssh@naver.com",
    "name": "고범석",
    "userId": "b169e54c-c563-449a-9fe4-2b77fa4df2fd",
    "orders": [
        {
            "productId": "CATALOG-001",
            "quantity": 95,
            "unitPrice": 2000,
            "totalPrice": 190000,
            "createdAt": "2021-05-24T16:39:24",
            "orderId": "b0b2d4be-e484-44c8-a42d-258bc79ec1bc"
        },
        {
            "productId": "CATALOG-002",
            "quantity": 18,
            "unitPrice": 1000,
            "totalPrice": 18000,
            "createdAt": "2021-05-24T17:30:50",
            "orderId": "c4b5a3f1-6914-4e67-bf12-0b7461da87c8"
        },
        {
            "productId": "CATALOG-003",
            "quantity": 15,
            "unitPrice": 2000,
            "totalPrice": 30000,
            "createdAt": "2021-05-24T17:31:46",
            "orderId": "9ba95fa6-a61b-43fe-a092-5a5834882b53"
        }
    ]
}

# 재고
[
    {
        "productId": "CATALOG-001",
        "productName": "Berlin",
        "unitPrice": 1500,
        "stock": 4,
        "createdAt": "2021-05-24T16:19:54.62"
    },
    {
        "productId": "CATALOG-002",
        "productName": "Tokyo",
        "unitPrice": 1000,
        "stock": 90,
        "createdAt": "2021-05-24T16:19:54.625"
    },
    {
        "productId": "CATALOG-003",
        "productName": "Stockholm",
        "unitPrice": 2000,
        "stock": 90,
        "createdAt": "2021-05-24T16:19:54.625"
    }
]

잘 반영된 것을 확인할 수 있다. Maria DB에서도 확인해보자! HeidiSQL로 접속해서 확인해보았다.

테이블명도 잘 있다!

본 내용은 이도원님의 Spring Cloud로 개발하는 마이크로서비스 애플리케이션을 수강하고 정리한 내용입니다.

profile
백엔드 개발자

0개의 댓글