재고시스템으로 알아보는 동시성 이슈 해결방법을 듣고 정리한 내용입니다.
@Test
public void 동시에_50명() throws InterruptedException {
// 처음 Quantity를 100으로 등록함.
Stock stock = new Stock(1L, 100L);
stockRepository.saveAndFlush(stock);
int threadCount = 100;
// 비동기로 실행
ExecutorService executorService = Executors.newFixedThreadPool(32);
// 요청이 끝날 때까지 기다리는 용도
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
executorService.submit(() -> {
try {
stockService.decrease(1L, 1L);
} finally {
latch.countDown();
}
});
}
latch.await();
Stock stock = stockRepository.findById(1L).orElseThrow();
// threadCount가 100이므로 100번 decrease하면 수량은 0이 되어야한다.
assertEquals(0, stock.getQuantity());
}
decrease
메서드는 단순히 아래처럼 구현되어있다.
public void decrease(Long quantity) {
if (this.quantity - quantity < 0) {
throw new RuntimeException("foo");
}
this.quantity -= quantity;
}
결과는? 실패한다.
왜냐하면 Race Condition이 일어났기 때문이다.
첫번째 스레드가 데이터를 가져가서 갱신하고, 그 다음 두번째 스레드가 데이터를 가져가서 갱신하는 것이 아닌,
첫번째 스레드가 데이터를 가져가서 갱신하는 동안 두번째 스레드가 공유 데이터(quantity
)에 접근해 갱신하기 때문에, 문제가 발생하는 것이다.
예를 들어, quatity가 예제 코드처럼 100이었다면 Thread#1이 가져가서 99로 바꾸는 와중 (아직까지 quantity는 100인 상태) Thread#2도 그에 접근해 decrease
를 수행한다면 순차적으로 진행되었을 땐 98이 되어야할 값이 둘 다 100 -> 99로 바꾸는 것을 수행하기 때문에 문제가 발생하는 것!
@Transactional(propagation = Propagation.REQUIRES_NEW)
public synchronized void decrease(Long id, Long quantity) {
Stock stock = stockRepository.findById(id).orElseThrow();
stock.decrease(quantity);
stockRepository.saveAndFlush(stock);
}
가장 먼저 decrease
메서드에 synchronized
키워드를 붙여보자. synchronized
는 하나의 스레드만 해당 메서드에 접근할 수 있도록 해 준다. 다만 문제는 해결되지 않는다.
Why? 스프링에서는 @Transactional
을 사용하게 되면 우리가 만든 클래스를 래핑해서 실행하게 된다. (Proxy
로 감싸서 AOP가 동작할 수 있도록 한다.) 즉, 해당 메서드의 시작과 끝에 TX가 실행되고 종료될 수 있도록 한다.
실제 데이터베이스에 업데이트하기 전에 proxy에서 decrease
에 접근할 수 있기 때문에 (갱신되기 이전 값에 접근 가능하여) 문제가 발생하는 것.
만약 @Transactional
애노테이션을 제거하고 테스트케이스를 실행한다면 정상적으로 동작할 것이다.
그럼 Synchronized
를 사용했을 때 문제는 없을까?
하나의 프로세스 안에서만 보장되는 키워드이기 때문에 서버가 한대일 땐 괜찮지만, 서버가 여러 대일 경우에는 데이터에 대한 접근을 여러 대에서 할 수 있게 된다.
Lock
을 걸어서 정합성을 맞추는 방법. exclusive lock을 걸게되며 다른 트랜잭션에서는 lock이 해제되기전에 데이터를 가져갈 수 없다. 단, 데드락이 발생할 수 있다.@Lock(value = LockModeType.PESSIMISTIC_WRITE)
@Query("select s from Stock s where s.id=:id")
Stock findByIdWithPessimisticLock(Long id);
@Lock(value = LockModeType.OPTIMISTIC)
@Query("select s from Stock s where s.id = :id")
Stock findByIdWithOptimisticLock(Long id);`
Stock 엔티티에는 version을 명시해주어야한다.
@Version
private Long version;
별도의 락을 잡지않아 Pessimistic Lock
보다 성능이 좋지만, 업데이트에 실패한 경우에 대한 처리를 개발자가 직접 해줘야함. 충돌이 빈번히 일어나지 않는 경우에 적합하다.
public interface LockRepository extends JpaRepository<Stock, Long> {
@Query(value = "select get_lock(:key, 3000)", nativeQuery = true)
void getLock(String key);
@Query(value = "select release_lock(:key)", nativeQuery = true)
void releaseLock(String key);
}
락을 사용하는 경우 실무에서는 데이터 소스를 분리하는 것이 낫다. 커넥션 풀이 부족해질 수 있기 때문.
위의 LockRepository
를 decrease
의 호출 전과 후에 getLock
, releaseLock
을 해줌으로써 사용할 수 있다.
public Boolean lock(Long key) {
return redisTemplate
.opsForValue()
.setIfAbsent(generateKey(key), "lock", Duration.ofMillis(3_000));
}
public Boolean unlock(Long key) {
return redisTemplate.delete(generateKey(key));
}
private String generateKey(Long key) {
return key.toString();
}
구현이 간단하지만 spin-lock 방식이므로 레디스에 부하를 줄 수 있어서, sleep
을 활용해 락 획득 - 재시도 간에 텀을 두어야한다.
재시도가 필요하지 않은 lock은 lettuce를 활용하는 것이 나을 수 있다.
public void decrease(Long key, Long quantity) {
RLock lock = redissonClient.getLock(key.toString());
try {
boolean available = lock.tryLock(10, 1, TimeUnit.SECONDS);
if (!available) {
System.out.println("lock 획득 실패");
return;
}
stockService.decrease(key, quantity);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
Redisson을 활용하려면 별도의 라이브러리를 활용해줘야한다.
재시도가 필요한 경우에는 redisson을 활용하는 것이 나을 수 있다.