[Spring Cloud] DB 단일화

jsieon97·2023년 3월 20일
0

DB 단일화

OrderService가 여러개 실행되었을때 각각의 H2 DB에 따로 데이터가 저장되는 일이 발생할 수 있다. 이때 하나의 DB로 단일화 시킴으로써 해결할 수 있다.

JPA 데이터 베이스 교체

OrderService

  • Dependencies 추가
	implementation 'org.mariadb.jdbc:mariadb-java-client:2.7.8'
	implementation 'mysql:mysql-connector-java:8.0.29'
  • application.yml 수정
spring:
  application:
    name: order-service
  h2:
    console:
      enabled: true
      settings:
        web-allow-others: true
      path: /h2-console
  jpa:
    hibernate:
      ddl-auto: update
  datasource:
    driver-class-name: org.mariadb.jdbc.Driver
    url: jdbc:mysql://localhost/mydb
    username: [userName]
    password: [passWord]

테스트

MariaDB에 저장된 것을 확인할 수 있음

OrderService에 Kafka Topic 활용

  • Controller 수정
@RestController
@Slf4j
@RequestMapping("/order-service")
public class OrderController {
    private Environment env;
    private OrderService orderService;
    private KafkaProducer kafkaProducer;
    private OrderProducer orderProducer;

    public OrderController(Environment env,
                           OrderService orderService,
                           KafkaProducer kafkaProducer,
                           OrderProducer orderProducer) {
        this.env = env;
        this.orderService = orderService;
        this.kafkaProducer = kafkaProducer;
        this.orderProducer = orderProducer;
    }

    ...

    // http://127.0.0.1:0/order-service/{user_id}/orders/
    @PostMapping("/{userId}/orders")
    public ResponseEntity<ResponseOrder> createOrder(@PathVariable("userId") String userId,
                                                     @RequestBody RequestOrder orderDetails) {
        ModelMapper mapper = new ModelMapper();
        mapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);

        OrderDto orderDto = mapper.map(orderDetails, OrderDto.class);
        orderDto.setUserId(userId);
        /* JPA */
//        OrderDto createdOrder = orderService.createOrder(orderDto);
//        ResponseOrder responseOrder = mapper.map(createdOrder, ResponseOrder.class);

        /* Kafka */
        orderDto.setOrderId(UUID.randomUUID().toString());
        orderDto.setTotalPrice(orderDetails.getUnitPrice() * orderDetails.getQty());

        /* send this order to the kafka */
        kafkaProducer.send("example-catalog-topic", orderDto);
        orderProducer.send("orders", orderDto);

        ResponseOrder responseOrder = mapper.map(orderDto, ResponseOrder.class);

        return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder);
    }

    ...
    
}
  • Producer에서 발생하기 위한 메시지 등록
// Field.java

@Data
@AllArgsConstructor
public class Field {
    private String type;
    private boolean optional;
    private String field;
}
// Schema.java

@Data
@Builder
public class Schema {
    private String type;
    private List<Field> fields;
    private boolean optional;
    private String name;
}
// Payload.java

@Data
@Builder
public class Payload {
    private String order_id;
    private String user_id;
    private String product_id;
    private int qty;
    private int unit_price;
    private int total_price;
}
// KafkaOrderDto.java

@Data
@AllArgsConstructor
public class KafkaOrderDto implements Serializable {
    private Schema schema;
    private Payload payload;
}
  • OrderProducer 생성
@Service // 스프링에서 빈으로 등록될 클래스임을 선언
@Slf4j // lombok 어노테이션으로, Logger 객체를 자동 생성해준다.
public class OrderProducer {
    private KafkaTemplate<String, String> kafkaTemplate; // 카프카 프로듀서 객체

    // 카프카 메시지의 스키마를 정의하기 위한 필드 리스트
    List<Field> fields = Arrays.asList(
            new Field("string", true, "order_id"),
            new Field("string", true, "user_id"),
            new Field("string", true, "product_id"),
            new Field("int32", true, "qty"),
            new Field("int32", true, "unit_price"),
            new Field("int32", true, "total_price")
    );
    
    // 스키마 객체 생성
    Schema schema = Schema.builder()
            .type("struct")
            .fields(fields)
            .optional(false)
            .name("orders")
            .build();

    // KafkaTemplate 객체를 주입받는 생성자
    @Autowired
    public OrderProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    // 카프카 토픽에 메시지를 보내는 메소드
    public OrderDto send(String topic, OrderDto orderDto) {
        // Payload 객체 생성
        Payload payload = Payload.builder()
                .order_id(orderDto.getOrderId())
                .user_id(orderDto.getUserId())
                .product_id(orderDto.getProductId())
                .qty(orderDto.getQty())
                .unit_price(orderDto.getUnitPrice())
                .total_price(orderDto.getTotalPrice())
                .build();

        // 카프카 메시지로 전송될 KafkaOrderDto 객체 생성
        KafkaOrderDto kafkaOrderDto = new KafkaOrderDto(schema, payload);

        // Jackson ObjectMapper 객체 생성
        ObjectMapper mapper = new ObjectMapper();
        String jsonInstring = "";
        try {
            // KafkaOrderDto 객체를 JSON 문자열로 변환
            jsonInstring = mapper.writeValueAsString(kafkaOrderDto);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }

        // 카프카 템플릿을 사용하여 메시지를 보내고, 로그 출력
        kafkaTemplate.send(topic, jsonInstring);
        log.info("Order Producer sent data from the Order microservice: " + kafkaOrderDto);

        // OrderDto 객체 반환
        return orderDto;
    }
}
  • Kafka Sink Connector 추가
    POST http://127.0.0.1:8083/connectors
// JSON 형태

{
        "name" : "my-order-sink-connect",
        "config" : {    
            "connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
            "connection.url":"jdbc:mysql://localhost:3306/mydb",
            "connection.user":"DBusername",
            "connection.password":"DBpassword",
            "auto.create":"true",
            "auto.evolve":"true",
            "delete.enabled":"false",
            "tasks.max":"1",
            "topics":"orders"
    }
}

이제 OrderService를 여러개 띄우고 실행해도 하나의 DB에 저장된다.

profile
개발자로써 성장하는 방법

0개의 댓글