[DDD] 8장. 애그리거트 트랜잭션 관리

매빈·2023년 4월 3일
0

1. 애그리거트의 트랜잭션


8.1 애그리거트와 트랜잭션


➡️ 두 스레드는 개념적으로 동일하지만 물리적으로 서로 다른 애그리거트 객체를 사용함. 이 경우 일관성이 깨지는 문제가 발생할 수 있다.

  • 해결책
    • DBMS가 지원하는 트랜잭션과 함께 애그리거트를 위한 추가적인 트랜잭션 처리 기법이 필요함.
    • 애그리거트에 대해 사용할 수 있는 대표적인 트랜잭션 처리 방식
      • 선점 잠금(Pessimistic Lock)
      • 비선점 잠금(Optimistic Lock)

2. 애그리거트 잠금 기법


8.2 선점 잠금

  • 먼저 애그리거트를 구한 스레드가 애그리거트 사용이 끝날 때까지 다른 스레드가 해당 애그리거트를 수정하지 못하게 막는 방식

  • 선점 잠금의 동작 방식

    - 스레드1이 선점 잠금 방식으로 애그리거트를 구한 뒤 스레드2가 같은 애그리거트를 구함
    - 이때 스레드2는 스레드1이 애그리거트에 대한 잠금을 해제할 때까지 블로킹됨
    - 스레드1이 트랜잭션을 커밋하면 잠금이 해제된다

  • 선점 잠금은 보통 DBMS가 제공하는 행단위 잠금을 사용해서 구현

  • JPA EntityManger의 find() 메서드에 LockModeType.PESSIMISTIC_WRITE를 값으로 전달하면 해당 엔티티와 매핑된 테이블을 이용해서 선점 잠금 방식을 적용할 수 있음

    Order order = entityManager.find(Order.class, orderNo, LockModeType.PERSSIMISTIC_WRITE);
  • JPA 프로바이더와 DBMS에 따라 잠금 모드 구현이 다름

    • 하이버네이트: PESSIMISTIC_WRITE를 잠금 모드로 사용하면 for update 쿼리를 이용해서 선점 잠금을 구현

    • 스프링 데이터 JPA:@Lock 애너테이션을 사용해서 잠금 모드를 지정함

      public interface MemberRepository extends Repository<Member, MemberId> {
      
      @Lock(LockModeType.PESSIMISTIC_WRITE)
      @Query("select m from Member m where m.id = :id")
      Optional<Member> findByIdForUpdate(@Param("id") MemberId memberId);

8.2.1 선점 잠금과 교착 상태

  • 선점 잠금 기능을 사용할 때는 잠금 순서에 따른 교착 상태(deadlock)가 발생하지 않도록 주의해야 함.
  • 교착 상태를 피하기 위해선 잠금을 구할 때 최대 대기 시간을 지정해야 함
  • JPA에서 선점 잠금을 시도할 때 최대 대기 시간을 지정하려면 힌트를 사용
    • 지정한 시간 이내에 잠금을 구하지 못하면 익셉션 발생시키기
    • DBMS에 따라 힌트가 적용되지 않을 수 있으므로 주의
    Map<String, Object> hints = new HashMap<>();
    hints.put("javax.persistence.lock.timeout", 2000);
    Order order = entityManager.find(Order.class, orderNo, LockModeType.PERSSIMISTIC_WRITE);
  • 스프링 데이터 JPA는 @QueryHints 애너테이션을 사용해서 쿼리 힌트를 지정할 수 있음
public interface MemberRepository extends Repository<Member, MemberId> {
    @Lock(LockModeType.PESSIMISTIC_WRITE)
    @QueryHints({
            @QueryHint(name = "javax.persistence.lock.timeout", value = "3000")
    })
    @Query("select m from Member m where m.id = :id")
    Optional<Member> findByIdForUpdate(@Param("id") MemberId memberId);
  • DBMS에 따라 교착 상태에 빠진 커넥션을 처리하는 방식이 다름 ➡️ DBMS에 대해 JPA가 어떤 식으로 대기 시간을 처리하는지 반드시 확인해야 함
    • 쿼리별로 대기 시간 지정
    • 커넥션 단위로만 대기 시간 지정

8.3 비선점 잠금

  • 선점 잠금으로 모든 트랜잭션 충돌 문제가 해결되는 것은 아님.

  • 한 스레드에서 데이터를 조회한 후 변경하기 전에 다른 스레드에서 데이터를 변경하면 문제가 발생할 수 있음
    ➡️ 이때 비선점 잠금이 필요함

  • 비선점 잠금: 동시에 접근하는 것을 막는 대신, 변경한 데이터를 실제 DBMS에 반영하는 시점에 변경 가능 여부를 확인하는 방식

  • 특징

    • 비선점 잠금에선 버전을 이용해 데이터를 관리함

      • 구현을 위해선 애그리거트에 버전으로 사용할 숫자 타입 프로퍼티를 추가
      • 애그리거트를 수정할 때마다 버전으로 사용할 프로퍼티 값이 1씩 증가
      • 애그리거트와 매핑되는 테이블 버전 값이 현재 애그리거트의 버전과 동일할 때만 데이터를 수정
        UPDATE aggtable SET version = version + 1, colx = ?, coly = ?
        WHERE aggid = ? and version = 현재버전
    • JPA는 버전을 이용한 비선점 잠금 기능을 지원함

      • 버전으로 사용할 필드에 @Version 애너테이션을 붙이고 매핑되는 테이블에 버전을 저장할 칼럼을 추가하면 됨
      • 엔티티가 변경되는 시점에 다음과 같은 비선점 잠금 쿼리가 실행됨
      @Entity
      @Table(name = "purchase_order")
      @Access(AccessType.FIELD)
      public class Order {
          @EmbeddedId
          private OrderNo number;
      
          @Version
          private long version;
      
          ...
          
      UPDATE purchase_order SET ...생략, version = version + 1
      WHERE number = ? and version = 10
    • 응용 서비스는 버전에 대해 알 필요가 없음

      • 리포지터리에서 필요한 애그리거트를 구하고 알맞은 기능만 실행하면 됨
      • 애그리거트 데이터가 변경되면 JPA는 트랜잭션 종료 시점에 비선점 잠금을 위한 쿼리를 실행함
  • 비선점 잠금을 위한 쿼리를 실행할 때 쿼리 실행 결과로 수정된 행의 개수가 0이면 이미 누군가가 앞서 데이터를 수정한 것
    ➡️ 이는 트랜잭션이 충돌한 것이므로 트랜잭션 종료 시점에 익셉션(OptimisticLockingFailureException)이 발생함
    ➡️ 응용 서비스 또는 표현 영역에서 해당 예외를 적절하게 처리

@Service
public class ChangeShippingService {

    @Transactional
    public void changeShipping(ChangeShippingRequest changeReq) {
        Optional<Order> orderOpt = orderRepository.findById(new OrderNo(changeReq.getNumber()));
        Order order = orderOpt.orElseThrow(() -> new NoOrderException());
        order.changeShippingInfo(changeReq.getShippingInfo());
    }
    ...
}
  • 사용자가 전송한 버전과 애그리거트 버전이 동일한 경우에만 애그리거트 수정 기능을 수행하도록 함으로써 트랜잭션 충돌 문제를 해소할 수 있음
    ➡️ 이 경우 뷰단에도 버전 정보가 함께 제공되고, 반대로 응용 서비스에 전달할 요청 데이터는 사용자가 전송한 버전 값을 포함
  • 응용 서비스는 전달받은 버전 값을 이용해서 애그리거트 버전과 일치하는지 확인하고, 일치하는 경우에만 기능을 수행함
    • matchVersion 메서드는 현재 애그리거트의 버전과 인자로 전달받은 버전이 일치하면 true를, 아니라면 false를 리턴하도록 구현
    • VersionConflictException은 이미 누군가가 애그리거트를 수정했다는 것을 의미하고, OptimisticLockingFailureException은 이미 누군가가 거의 동시에 애그리거트를 수정했다는 것을 의미함

8.3.1 강제 버전 증가

  • 애그리거트에 애그리거트 루트 외에 다른 엔티티가 존재하는데 기능 실행 도중 루트가 아닌 다른 엔티티의 값만 변경된다면 JPA는 루트 엔티티의 버전 값을 증가시키지 않음
  • 애그리거트의 구성요소 중 일부만 변경되더라도 루트 애그리거트의 버전값이 증가해야 비선점 잠금이 올바르게 작동함
  • JPA ➡️ 이를 위해 엔티티를 구할 때 강제로 버전 값을 증가시키는 잠금 모드를 지원함
    • LockModeType.OPTIMISTIC_FORCE_INCREMENT를 사용하면 해당 엔티티의 상태가 변경되었는지에 상관없이 트랜잭션 종료 시점에 버전 값 증가 처리
    • 스프링 데이터 JPA를 사용하면 @Lock 애너테이션을 이용해서 지정하면 됨

8.4 오프라인 선점 잠금

  • 오프라인 선점 잠금(Offline Pessimistic Lock): 여러 트랜잭션에 걸쳐 동시 변경을 막음
  • 단일 트랜잭션에서 동시 변경을 막는 선점 잠금 방식과 차이가 있음
  • 첫 번째 트랜잭션을 시작할 때 오프라인 잠금을 선점하고, 마지막 트랜잭션에서 잠금을 해제
  • 오프라인 선점 잠금은 잠금 유효 시간을 가져야 함
    • 잠금을 선점한 사용자가 마지막 트랜잭션을 수행하기 전에 프로그램이 종료되면 다른 사용자가 영원히 잠금을 구할 수 없게 되기 때문

8.4.1 오프라인 선점 잠금을 위한 LockManager 인터페이스와 관련 클래스

  • 잠금 선점 시도, 잠금 확인, 잠금 해제, 잠금 유효시간 연장의 네 가지 기능이 필요함
  • LockManager 인터페이스
public interface LockManager {
	//  type과 id를 파라미터로 갖고, 각각 잠글 대상 타입과 식별자를 값으로 전달
    // 잠금을 식별할 때 사용할 LockId를 return
    LockId tryLock(String type, String id) throws LockException;

    void checkLock(LockId lockId) throws LockException;

    void releaseLock(LockId lockId) throws LockException;

    void extendLockExpiration(LockId lockId, long inc) throws LockException;
}
  • LockId 클래스
public class LockId {
    private String value;

    public LockId(String value) {
        this.value = value;
    }

    public String getValue() {
        return value;
    }
}
  • 오프라인 선점 잠금이 필요한 코드는 LockManager#tryLock()을 이용해 잠금을 시도
  • 잠금에 성공하면 tryLock()LockId를 리턴
  • LockId는 다음에 잠금을 해제할 때 사용하며, 어딘가에 보관해야 함
// 서비스: 서비스는 잠금 ID를 리턴한다
public DataAndLockId getDataWithLock(Long id) {
    // 1. 오프라인 선점 시도
    LockId lockId = lockManager.tryLock("data", id);
    // 2. 기능 실행
    Data data = someDao.select(id);
    return new DataAndLockId(data, lockId);
}
  • 서비스는 DAO를 통해 조회한 데이터와 LockId를 함께 반환
  • 컨트롤러는 서비스가 리턴한 LockId를 모델로 뷰에 전달
  • 잠금을 선점하는 데 실패하면 LockException이 발생
// 서비스: 잠금을 해제한다.
public void edit(EditRequest editReq, LockId lockId) {
    // 1. 잠금 선점 확인
    lockManager.checkLock(lockId);
    // 2. 기능 실행
    ...
    // 3. 잠금 해제
    lockManager.releaseLock(lockId);
}
  • LockId는 뷰와 컨트롤러를 거쳐 잠금을 해제하는 서비스 코드에 전달됨
  • 잠금을 선점한 이후에 실행하는 기능은 아래를 고려하여 반드시 주어진 LockId를 갖는 잠금이 유효한지 확인해야 함
    • 잠금 유효 시간이 지났으면 이미 다른 사용자가 잠금을 선점함
    • 잠금을 선점하지 않은 사용자가 기능을 실행했다면 기능 실행을 막아야 함

8.4.2 DB를 이용한 LockManager 구현

  • DB를 이용해 LockManager를 구현할 수도 있음

  • 잠금 정보를 저장할 테이블과 인덱스를 생성하는 쿼리

    • type과 id 칼럼을 주요키로 지정해서 동시에 두 사용자가 특정 타입 데이터에 대한 잠금을 구하는 것을 방지
    • 각 잠금마다 새로운 LockId를 사용하므로 lockid 필드를 유니크 인덱스로 설정
    • 잠금 유효 시간을 보관하기 위해 expiration_time 칼럼을 사용
    create table locks (
      `type` varchar(255),
      id varchar(255),
      lockid varchar(255),
      expiration_time datetime,
      primary key (`type`, id)
    ) character set utf8;
    
    create unique index locks_idx ON locks (lockid);
    • Order 타입의 1번 식별자를 갖는 애그리거트에 대한 잠금을 구하고 싶다면 다음의 insert 쿼리를 이용해서 locks 테이블에 데이터를 삽입함
    insert into locks values ('Order', '1', '생성한lockid', '2016-03-28 09:10:00');
  • locks 테이블의 데이터를 담을 LockData 클래스를 다음과 같이 작성

  • isExpend() 메서드는 유효 시간이 지났는지를 판단할 때 사용

    public class LockData {
        private String type;
        private String id;
        private String lockId;
        private long timestamp;
    
        public LockData(String type, String id, String lockId, long timestamp) {
            this.type = type;
            this.id = id;
            this.lockId = lockId;
            this.timestamp = timestamp;
        }
    
        public String getType() {
            return type;
        }
    
        public String getId() {
            return id;
        }
    
        public String getLockId() {
            return lockId;
        }
    
        public long getTimestamp() {
            return timestamp;
        }
    
        public boolean isExpired() {
            return timestamp < System.currentTimeMillis();
        }
    }
    • locks 테이블을 이용해서 LockManager를 구현하기

      @Component
      public class SpringLockManager implements LockManager {
        private int lockTimeout = 5 * 60 * 1000;
        private JdbcTemplate jdbcTemplate;
      
        private RowMapper<LockData> lockDataRowMapper = (rs, rowNum) ->
                new LockData(rs.getString(1), rs.getString(2),
                        rs.getString(3), rs.getTimestamp(4).getTime());
      
        public SpringLockManager(JdbcTemplate jdbcTemplate) {
            this.jdbcTemplate = jdbcTemplate;
        }
      
        /**
         * type과 id에 대한 잠금을 시도한다
         */
        @Transactional(propagation = Propagation.REQUIRES_NEW)
        @Override
        public LockId tryLock(String type, String id) throws LockException {
            checkAlreadyLocked(type, id);
            LockId lockId = new LockId(UUID.randomUUID().toString());
            locking(type, id, lockId);
            return lockId;
        }
      
        /**
         * 잠금이 존재하는지 검사한다
         */
        private void checkAlreadyLocked(String type, String id) {
            List<LockData> locks = jdbcTemplate.query(
                    "select * from locks where type = ? and id = ?",
                    lockDataRowMapper, type, id);
            Optional<LockData> lockData = handleExpiration(locks);
            if (lockData.isPresent()) throw new AlreadyLockedException();
        }
      
        /**
         * 잠금 유효 시간이 지나면 해당 데이터를 삭제하고, 값이 없는 Optional을 리턴한다
         * 유효 시간이 지나지 않았으면 해당 LockData를 가진 Optional을 리턴한다
         */
        private Optional<LockData> handleExpiration(List<LockData> locks) {
            if (locks.isEmpty()) return Optional.empty();
            LockData lockData = locks.get(0);
            if (lockData.isExpired()) {
                jdbcTemplate.update(
                        "delete from locks where type = ? and id = ?",
                        lockData.getType(), lockData.getId());
                return Optional.empty();
            } else {
                return Optional.of(lockData);
            }
        }
      
        /**
         * 잠금을 위해 locks 테이블에 데이터를 삽입한다
         * 데이터 삽입 결과가 없으면 익셉션을 발생시킨다
         * DuplicateKeyException이 발생하면 LockingFailException을 발생시킨다
         */
        private void locking(String type, String id, LockId lockId) {
            try {
                int updatedCount = jdbcTemplate.update(
                        "insert into locks values (?, ?, ?, ?)",
                        type, id, lockId.getValue(), new Timestamp(getExpirationTime()));
                if (updatedCount == 0) throw new LockingFailException();
            } catch (DuplicateKeyException e) {
                throw new LockingFailException(e);
            }
        }
      
        /**
         * 현재 시간 기준으로 lockTimeout 이후 시간을 유효 시간으로 생성한다
         */
        private long getExpirationTime() {
            return System.currentTimeMillis() + lockTimeout;
        }
      
        /**
         * 잠금이 유효한지 검사한다
         */
        @Transactional(propagation = Propagation.REQUIRES_NEW)
        @Override
        public void checkLock(LockId lockId) throws LockException {
            Optional<LockData> lockData = getLockData(lockId);
            if (!lockData.isPresent()) throw new NoLockException();
        }
      
        /**
         * lockId에 해당하는 LockData를 구한다
         */
        private Optional<LockData> getLockData(LockId lockId) {
            List<LockData> locks = jdbcTemplate.query(
                    "select * from locks where lockid = ?",
                    lockDataRowMapper, lockId.getValue());
            return handleExpiration(locks);
        }
      
        /**
         * lockId에 해당하는 잠금 유효 시간을 inc 만큼 늘린다
         */
        @Transactional(propagation = Propagation.REQUIRES_NEW)
        @Override
        public void extendLockExpiration(LockId lockId, long inc) throws LockException {
            Optional<LockData> lockDataOpt = getLockData(lockId);
            LockData lockData =
                    lockDataOpt.orElseThrow(() -> new NoLockException());
            jdbcTemplate.update(
                    "update locks set expiration_time = ? where type = ? AND id = ?",
                    new Timestamp(lockData.getTimestamp() + inc),
                    lockData.getType(), lockData.getId());
        }
      
        /**
         * lockId에 해당하는 잠금 데이터를 locks 테이블에서 삭제한다
         */
        @Transactional(propagation = Propagation.REQUIRES_NEW)
        @Override
        public void releaseLock(LockId lockId) throws LockException {
            jdbcTemplate.update("delete from locks where lockid = ?", lockId.getValue());
        }
      
        public void setLockTimeout(int lockTimeout) {
            this.lockTimeout = lockTimeout;
        }
      }
  • checkAlreadyLocked() 메서드를 이용해서 이미 잠금이 선점됐는지 확인하고, locking() 메서드로 잠금을 선점

0개의 댓글