마이크로서비스 - 분산 트랜잭션 처리 패턴

Jeongmin Yeo (Ethan)·2021년 4월 12일
10

Microservice

목록 보기
2/3
post-thumbnail

분산 트랜잭션 처리 패턴

마이크로서비스에서 기능을 분리하고 저장소를 격리함에 따라 이전에는 존재하지 않았던 문제가 생긴다.

즉 여러 개의 분산된 서비스에 걸쳐서 비즈니스 처리를 수행하는 경우 비즈니스 정합성 및 데이터 일관성을 어떻게 보장할 것인가에 대한 문제가 생기는데 이를 손쉽게 처리할 수 있는 방법 중 하나는 여러 개의 분산된 서비스를 하나의 일관된 트랜잭션으로 묶는 것이다.

이를 위한 전통적인 방법으로 2단계 커밋 기법이 있다.

  • 분산 데이터베이스 환경에서 원자성(atomicity)를 보장하기 위해 분산 트랜잭션에 포함돼 있는 모든 노드가 commit 되거나 rollback하는 매커니즘이다.
  • 하지만 이는 각 서비스에 lock in이 걸려서 발생하는 성능 문제 탓에 효율적인 방법은 아니다.
  • 마이크로 서비스에서는 장애 전파가 일어나지 않도록 설계를 해야하는데 이를 위해서는 각각이 독립적이어야 한다. 2단계 커밋 기법은 네트워크 장애인 경우에 특정 서비스의 트랜잭션이 처리되지 않을 경우 트랜잭션에 묶인 서비스가 즉시 영향을 받기도 한다.
  • 그리고 NoSQL은 이를 지원하지 않기도 한다. 함께 사용하는 DBMS가 동일 제품군이여야 한다는 조건이 있으므로 특정 벤더 제품을 사용해야만 한다. 이는 마이크로서비스 원칙인 느슨한 결합과는 다르다.

마이크로서비스의 독립적인 분산 트랜잭션 처리를 지원하는 패턴이 바로 사가(Saga) 패턴이다.

사가 패턴은 각 서비스의 로컬 트랜잭션을 순차적으로 처리하는 패턴이다. 사가 패턴은 여러개의 분산된 서비스를 하나의 트랜잭션으로 묶지 않고 각 로컬 트랜잭션과 보상 트랜잭션을 설정해 비즈니스 및 데이터 정합성을 맞춘다.

즉 로컬 트랜잭션은 자신의 데이터를 업데이트 한 다음 사가 내에 다음 로컬 트랜잭션을 업데이트 하는 트리거 메시지를 게시해서 정합성을 맞춘다.

다른 트랜잭션이 실패해서 롤백이 필요한 경우에 보상 트랜잭션을 따라서 앞서 처리한 트랜잭션들을 되돌리게 한다.

즉 일관성 유지가 필요한 트랜잭션들을 하나로 묶어서 처리하는게 아니라 하나하나씩 로컬 트랜잭션으로 처리하고 이벤트를 날리고 다음 로컬 트랜잭션이 처리하고 이런 식으로 진행된다. 실패하면 이때까지 처리했던 트랜잭션들을 모두 되돌리고.

결과적 일관성(eventual consistency)

모든 비즈니스에서 데이터는 일관성이 있어야 한다. 하지만 이전까지는 이 같은 데이터 일관성은 실시간으로 반드시 맞아야 한다는 생각이다.

그렇지만 모든 비즈니스 규칙들이 실시간으로 데이터 일관성이 맞아야 하지는 않다. 예를 들면 쇼핑몰에서 주문을 하고 결제 처리가 완료 되면 결제 내용과 함께 주문 내역이 고객의 이메일로 전송돼야 한다고 생각해보자.

이 경우들을 모두 그 즉시 순차적으로 처리해야 하지는 않다. 주문자가 폭등한 경우를 생각해보면 된다. 이 경우에 주문자가 많아져서 결제에도 그 트래픽이 전파되서 결제 서비스에 장애가 발생할 수 있다. 주문 서비스를 Scale Out 한다고 문제가 해결되지 않는다.

하지만 주문만 미리 받아놓고 (주문 서비스만 미리 Scale Out 해놓는다면) 외부 결제 서비스는 자신이 처리할 수 있는 만큼만 계속해서 처리한다라고 가정을 해봐도 문제되는 점은 없다. 데이터의 일관성이 실시간으로 맞지 않더라도 어느 일정 시점에서는 일관성이 맞을 것이다. 이를 결과적 일관성이라고 한다.

이런 결과적 일관성은 고가용성을 극대화 한다. 실시간성을 강제로 해서 다른 서비스의 가용성을 떨어뜨리지 않는다.

이는 마이크로서비스의 사가 패턴과 이벤트 메시지 기반 비동기 통신을 통해서 만들 수 있다.

Saga Pattern

사가 패턴에는 크게 두 종류가 있다. choreography-based patternorchestration-based pattern 각각의 패턴에 대해 좀 더 자세하게 알아보자.

choreography-based pattern

Example choreography-based pattern

choreography-based pattern 은 Saga에 참여하는 사람들이 이벤트를 교환하고 협력하는 방식이다.

여기서 발생하는 이벤트 들은 각각의 데이터베이스를 업데이트 한다. 처음의 시작은 외부 요청으로부터 시작하고 (예를 들면 HTTP POST) 이후의 스텝은 이벤트에 대한 응답을 기반으로 한다.

각각의 Step들은 다음과 같다.

StepTriggering eventParticipantCommandEvents
1(External Request)Order ServicecreatePendingOrder()OrderCreated
2OrderCreatedCustomer ServicereserveCredit()Credit Reserved, Credit Limit Exceeded
3aCredit ReservedOrder ServiceapproveOrder()
3bCredit Limit ExceededOrder ServicerejectOrder()

Step 2같은 경우는 가능한 이벤트가 두 가지가 있다고 생각하면 된다. 고객의 신용카드가 결제가 된 경우와 한도초과가 난 경우

다이어그램은 다음과 같다.

순서를 살펴보자면

  1. Order Service 가 외부의 POST /orders 요청을 받고 Order 객체를 Pending 상태로 만든 것이다.
  2. 그 후 Order ServiceOrder Created 라는 이벤트를 발생시킨다.
  3. Customer Service 는 이 이벤트를 수신하고 나서 Credit Reserve 를 시도한다.
  4. 그 후 Customer Service 는 이 결과에 대해 이벤트를 낸다.
  5. Order Service 는 이 이벤트를 받고나서 Order 를 approve 할 지 reject 할 지 결정한다.

Implementing a choreography-based saga

여기서는 이제 Order ServiceCustomer Service 의 실제 구현에 대해 살펴보겠다.

The Order Service
@Transactional
public class OrderService{
  
  @Autowired
  private DomainEventPublisher domainEventPublisher;

  @Autowired
  private OrderRepository orderRepository;

  public Order createOrder(OrderDetails orderDetails) {
    ResultWithEvents<Order> orderWithEvents = Order.createOrder(orderDetails);
    Order order = orderWithEvents.result;
    orderRepository.save(order);
    domainEventPublisher.publish(Order.class, order.getId(), orderWithEvents.events);
    return order;
  } 
}

createOrder() 메소드에 의해서 Spring Data JPA를 이용해 Order 객체를 만들고 DomainEventPublisher 에 의해 OrderCreated 이벤트를 날린다.

이런 DomainEventPublisher 에 대한 설정 정보는 @Configuration 클래스에서 따로 빈으로 설정해서 사용했다.

The Order Configuration
@Configuration
@EnableJpaRepositories
@EnableAutoConfiguration
@Import({TramJdbcKafkaConfiguration.class,
        TramEventsPublisherConfiguration.class,
        TramEventSubscriberConfiguration.class})
public class OrderConfiguration {

  @Bean
  public OrderService orderService(DomainEventPublisher domainEventPublisher,   
                                   OrderRepository orderRepository) {
    return new OrderService(domainEventPublisher, orderRepository);
  }
The Order
@Entity
@Table(name="orders")
@Access(AccessType.FIELD)
public class Order {

  @Id
  @GeneratedValue(strategy = GenerationType.IDENTITY)
  private Long id;

  @Enumerated(EnumType.STRING)
  private OrderState state;

  @Embedded
  private OrderDetails orderDetails;

  @Version
  private Long version;

  public Order() {
  }

  public Order(OrderDetails orderDetails) {
    this.orderDetails = orderDetails;
    this.state = OrderState.PENDING;
  }

  public static ResultWithEvents<Order> createOrder(OrderDetails orderDetails) {
    Order order = new Order(orderDetails);
    OrderCreatedEvent orderCreatedEvent = new OrderCreatedEvent(orderDetails);
    return new ResultWithEvents<>(order, singletonList(orderCreatedEvent));
  }

  public Long getId() {
    return id;
  }

  public void noteCreditReserved() {
    this.state = OrderState.APPROVED;
  }

  public void noteCreditReservationFailed() {
    this.state = OrderState.REJECTED;
  }

  public OrderState getState() {
    return state;
  }

  public OrderDetails getOrderDetails() {
    return orderDetails;
  }

  public void cancel() {
    switch (state) {
      case PENDING:
        throw new PendingOrderCantBeCancelledException();
      case APPROVED:
        this.state = OrderState.CANCELLED;
        return;
      default:
        throw new UnsupportedOperationException("Can't cancel in this state: " + state);
    }
  }
}
The Customer Service

Customer ServiceOrder Service 에서 발생하는 이벤트를 subscribe 하고 있다. 보면 Order CreatedOrder Cancelled 이벤트를 처리할 핸들러가 있다.

public class OrderEventConsumer {
  private Logger logger = LoggerFactory.getLogger(getClass());

  @Autowired
  private CustomerRepository customerRepository;

  @Autowired
  private DomainEventPublisher domainEventPublisher;

  public DomainEventHandlers domainEventHandlers() {
    return DomainEventHandlersBuilder
            .forAggregateType("io.eventuate.examples.tram.ordersandcustomers.orders.domain.Order")
            .onEvent(OrderCreatedEvent.class, this::handleOrderCreatedEventHandler)
            .onEvent(OrderCancelledEvent.class, this::handleOrderCancelledEvent)
            .build();
  }

  public void handleOrderCreatedEventHandler(
             DomainEventEnvelope<OrderCreatedEvent> domainEventEnvelope) {

    Long orderId = Long.parseLong(domainEventEnvelope.getAggregateId());

    OrderCreatedEvent orderCreatedEvent = domainEventEnvelope.getEvent();

    Long customerId = orderCreatedEvent.getOrderDetails().getCustomerId();

    Optional<Customer> possibleCustomer = customerRepository.findById(customerId);

    if (!possibleCustomer.isPresent()) {
      logger.info("Non-existent customer: {}", customerId);
      domainEventPublisher.publish(Customer.class,
              customerId,
              Collections.singletonList(new CustomerValidationFailedEvent(orderId)));
      return;
    }

    Customer customer = possibleCustomer.get();


    try {
      customer.reserveCredit(orderId, orderCreatedEvent.getOrderDetails().getOrderTotal());

      CustomerCreditReservedEvent customerCreditReservedEvent =
              new CustomerCreditReservedEvent(orderId);

      domainEventPublisher.publish(Customer.class,
              customer.getId(),
              Collections.singletonList(customerCreditReservedEvent));

    } catch (CustomerCreditLimitExceededException e) {

      CustomerCreditReservationFailedEvent customerCreditReservationFailedEvent =
              new CustomerCreditReservationFailedEvent(orderId);

      domainEventPublisher.publish(Customer.class,
              customer.getId(),
              Collections.singletonList(customerCreditReservationFailedEvent));
    }
  }

이 이벤트 핸들러가 하는 일을 보면 다음과 같다.

  1. Customer 를 데이터베이스에서 가지고 온다.
  2. Customer 가 존재하지 않는다면 CustomerValidationFailedEvent 를 발생시킨다.
  3. 만약 credit이 reserve 된다면 CustomerCreditReservedEvent 를 발생시킨다.
  4. 그렇지 않고 Credit이 exceed 된다면 CustomerCreditReservationFailedEvent 를 발생시킨다.

Example orchestration-based pattern

orchestration-based pattern 에서는 Saga에 participant들에게 뭘 해야 하는지 알려주는 orchestrator 가 있다.

Saga orchestrator 는 참가자들과 비동기 통신을 통해서 상호작용을 한다. 각각의 스탭에서 Saga orchestratorparticipant 에게 어떤 행동을 해야 하는지 알려준다. 그 후 participant 는 적절한 행동 후에 orchestrator 에게 응답을 보낸다. orchestrator 는 이 메시지를 읽고 다음 participant 에게 보낼 메시지를 만든다.

다이어그램은 다음과 같다.

순서를 살펴보자면

  1. Order ServicePOST /orders 요청을 Order Saga Orchestrator 를 만들어서 처리한다.
  2. Saga orchestratorOrderPending 상태로 만든다.
  3. 그 후 Saga orchestratorReserve Credit 명령을 Customer Service 에게 보낸디.
  4. Customer Service 는 명령대로 행동한다.
  5. 그 후 Customer Service는 결과에 따른 응답 메시지를 보낸다.
  6. Saga orchestrator는 이 결과를 보고 Order 를 approve 할 지 reject 할 지 결정한다.

Implementing an orchestration-based saga

Design Overview

The Order Service

OrderSerivce 의 핵심 요소들을 살펴보자면 다음과 같다.

  • OrderController - request 요청을 처리하는 컨트롤러
  • OrderService - Saga Orchestrator 를 만들고 처리하는 역할
  • SagaInstanceFactory - Saga Orchestrator 를 인스턴스화 해주는 팩토리
  • CreateOrderSaga - 싱글톤 빈으로 Saga Orchestrator 의 플로우를 정의해주는 역할
  • CreateOrderSagaData - CreateOrderSaga 의 영구적인 데이터를 가지고 있는 역할
The OrderController class
@RestController
public class OrderController {

  private OrderService orderService;
  private OrderRepository orderRepository;

  @Autowired
  public OrderController(OrderService orderService, OrderRepository orderRepository) {
    this.orderService = orderService;
    this.orderRepository = orderRepository;
  }

  @RequestMapping(value = "/orders", method = RequestMethod.POST)
  public CreateOrderResponse createOrder(@RequestBody CreateOrderRequest createOrderRequest) {
    Order order = orderService.createOrder(new OrderDetails(createOrderRequest.getCustomerId(), createOrderRequest.getOrderTotal()));
    return new CreateOrderResponse(order.getId());
  }

  @RequestMapping(value="/orders/{orderId}", method= RequestMethod.GET)
  public ResponseEntity<GetOrderResponse> getOrder(@PathVariable Long orderId) {
    return orderRepository
            .findById(orderId)
            .map(o -> new ResponseEntity<>(new GetOrderResponse(o.getId(), o.getState(), o.getRejectionReason()), HttpStatus.OK))
            .orElse(new ResponseEntity<>(HttpStatus.NOT_FOUND));
  }

  @RequestMapping(value="/orders/customer/{customerId}", method= RequestMethod.GET)
  public ResponseEntity<List<GetOrderResponse>> getOrdersByCustomerId(@PathVariable Long customerId) {
    return new ResponseEntity<List<GetOrderResponse>>(orderRepository
            .findAllByOrderDetailsCustomerId(customerId)
            .stream()
            .map(o -> new GetOrderResponse(o.getId(), o.getState(), o.getRejectionReason()))
            .collect(Collectors.toList()), HttpStatus.OK);
  }
}

처음 HTTP POST /orders 요청을 받아서 처리해주는 컨트롤러 역할을 해준다.

The OrderService class
public class OrderService {

@Autowired
private SagaInstanceFactory sagaInstanceFactory;

@Autowired
private CreateOrderSaga createOrderSaga;

@Transactional
public Order createOrder(OrderDetails orderDetails) {
  CreateOrderSagaData data = new CreateOrderSagaData(orderDetails);
  sagaInstanceFactory.create(createOrderSaga, data);
  return orderRepository.findOne(data.getOrderId());
}

createOrder() 메소드는 createOrderSagaDataorderDetails 를 통해 초기화 하고 SagaInstanceFactory 를 통해 인스턴스화 를 한다.

The CreateOrderSaga class
public class CreateOrderSaga implements SimpleSaga<CreateOrderSagaData> {

  private OrderRepository orderRepository;

  public CreateOrderSaga(OrderRepository orderRepository) {
    this.orderRepository = orderRepository;
  }

  private SagaDefinition<CreateOrderSagaData> sagaDefinition =
          step()
            .invokeLocal(this::create)
            .withCompensation(this::reject)
          .step()
            .invokeParticipant(this::reserveCredit)
            .onReply(CustomerNotFound.class, this::handleCustomerNotFound)
            .onReply(CustomerCreditLimitExceeded.class, this::handleCustomerCreditLimitExceeded)
          .step()
            .invokeLocal(this::approve)
          .build();

  private void handleCustomerNotFound(CreateOrderSagaData data, CustomerNotFound reply) {
    data.setRejectionReason(RejectionReason.UNKNOWN_CUSTOMER);
  }

  private void handleCustomerCreditLimitExceeded(CreateOrderSagaData data, CustomerCreditLimitExceeded reply) {
    data.setRejectionReason(RejectionReason.INSUFFICIENT_CREDIT);
  }


  @Override
  public SagaDefinition<CreateOrderSagaData> getSagaDefinition() {
    return this.sagaDefinition;
  }

  private void create(CreateOrderSagaData data) {
    Order order = Order.createOrder(data.getOrderDetails());
    orderRepository.save(order);
    data.setOrderId(order.getId());
  }

  private CommandWithDestination reserveCredit(CreateOrderSagaData data) {
    long orderId = data.getOrderId();
    Long customerId = data.getOrderDetails().getCustomerId();
    Money orderTotal = data.getOrderDetails().getOrderTotal();
    return send(new ReserveCreditCommand(customerId, orderId, orderTotal))
            .to("customerService")
            .build();
  }

  private void approve(CreateOrderSagaData data) {
    orderRepository.findById(data.getOrderId()).get().approve();
  }

  public void reject(CreateOrderSagaData data) {
    orderRepository.findById(data.getOrderId()).get().reject(data.getRejectionReason());
  }

CreateOrderSaga 클래스 안에 SagaDefinition 메소드를 보면 Saga orchestrator 의 매 스텝마다 흐름이 적혀있다.

각각의 step을 보면 다음 스텝이 있기도 하고 보상 트랜잭션도 있다. 위에서 부터 아래로 스텝이 실행된다고 보면 된다.

트랜잭션이 실패할 경우 동작해야 하는 보상 트랜잭션은 밑에서부터 위로 실행된다고 알면 된다.

The CreateOrderSagaData class
public class CreateOrderSagaData  {

  private OrderDetails orderDetails;
  private Long orderId;
  private RejectionReason rejectionReason;

  public CreateOrderSagaData(OrderDetails orderDetails) {
    this.orderDetails = orderDetails;
  }

  public CreateOrderSagaData() {
  }

  public Long getOrderId() {
    return orderId;
  }

  public OrderDetails getOrderDetails() {
    return orderDetails;
  }

  public void setOrderId(Long orderId) {
    this.orderId = orderId;
  }

  public void setRejectionReason(RejectionReason rejectionReason) {
    this.rejectionReason = rejectionReason;
  }

  public RejectionReason getRejectionReason() {
    return rejectionReason;
  }
}

OrderServiceCreateOrderSagaData 를 매 요청마다 만드는데 이 인스턴스는 Saga Orchestrator 의 인스턴스라고 생각하면 된다. 여기에 있는 데이터인 OrderIdrejectionReason 은 Saga가 실행되면서 업데이트 된다.

위의 CreateOrderSaga 를 보면 첫 번째 스텝과 세 번째 스텝은 CreateOrderSaga 의 메소드 create 메소드와 reject 메소드 그리고 approve 메소드에서 업데이트 된다.

CreateOrderSaga 의 두 번째 스텝에서 호출되는 메소드를 보면 reserveCredit 메소드인데 이는 CustomerService 에서 실행될 메소드다.

The CustomerService Code
public class CustomerCommandHandler {

  private CustomerService customerService;

  public CustomerCommandHandler(CustomerService customerService) {
    this.customerService = customerService;
  }

  public CommandHandlers commandHandlerDefinitions() {
    return SagaCommandHandlersBuilder
            .fromChannel("customerService")
            .onMessage(ReserveCreditCommand.class, this::reserveCredit)
            .build();
  }

  public Message reserveCredit(CommandMessage<ReserveCreditCommand> cm) {
    ReserveCreditCommand cmd = cm.getCommand();
    try {
      customerService.reserveCredit(cmd.getCustomerId(), cmd.getOrderId(), cmd.getOrderTotal());
      return withSuccess(new CustomerCreditReserved());
    } catch (CustomerNotFoundException e) {
      return withFailure(new CustomerNotFound());
    } catch (CustomerCreditLimitExceededException e) {
      return withFailure(new CustomerCreditLimitExceeded());
    }
  }

CustomerServiceSaga Orchestatrion 에 참여하고 있는 participantReserve Credit 명령에 따라 행동을 하는 서비스다.

profile
좋은 습관을 가지고 싶은 평범한 개발자입니다.

0개의 댓글