실무에서 동시성 이슈를 MongoDB의 Document를 활용한 분산락을 사용해서 제어하는 개발건을 맡아 진행했다. 본디 Redis를 쓰는게 정배이긴 하지만, 상황이 여의치 않아서 MongoDB로 진행하였다.
그런데 내가 개발했던 건에서 잘못된try-catch-finally
사용으로 중복제어 에러를 반환하면서 바로 Lock을 해제해버리는 바람에 동시성 이슈가 제대로 해소되지 않는 문제가 발생했다.
소스코드는 아래 링크에 커밋했으니, 참고해주세요!
https://github.com/goathoon/playground/commit/55e6d682899ac5bb712633228df7a60620b51440
먼저, 왜 Redis를 안쓰냐? -> 회사 내부 이슈.. 저도 쓰고 싶습니다..
MongoDB로 분산락을 구현하는 것은 생각보다 고려해야할 점이 많습니다.
저희는 샤딩이 없는 replica 모드로 MongoDB를 운영하고 있는점에 참고해주세요.
같은 계정으로 다른 디바이스에서 동시에 같은 데이터에 변경을 하는 바람에 데이터 정합성이 깨지는 상황이 발생했었다.
그래서 이를 해결하기 위해 MongoDB에 Lock 상태를 관리하는 컬렉션을 만들고, 같은 계정에서 데이터를 중복으로 제어할 수 없게 로직을 작성했었다.
회사 코드를 가져올 수 없으니.. 예시 코드는 다음과 같다.
여기서 문제가 되는 코드가 무엇인지 살펴보자.
public String upload(String id) throws InterruptedException {
try {
lockService.tryLock(id);
// 비즈니스 로직 수행
delayUtil.randomDelay();
ProtectedResource resource = resourceRepository.findById(id)
.orElse(new ProtectedResource(id, 0));
resourceRepository.save(new ProtectedResource(id, resource.getCount() + 1));
} catch (DuplicateKeyException e) {
return "Fail (Lock)";
} finally {
lockService.unLock(id);
}
return "Success";
}
무엇이 문제였을까?
바로 finally
로직이다.
중복 제어 상태임을 나타내는 에러인 DuplicateKeyException
의 상황에서 finally로 락을 삭제하는 것은 불필요한 로직이다.
이는 먼저 들어온 트랜잭션1
의 Lock을 다른 트랜잭션2
에서 해제해버리는 상황이기 때문이다.
이 때문에 트랜잭션1
은 이미 없어진 Lock을 삭제하는 애꿎은 상황이 발생했다.
Lock 해제의 주체는 Lock을 획득한 요청이어야만 한다.
이러한 사실을 인지하지 못했고, 서비스가 운영되었었다.
물론... 실제 사용자의 입장에서는 대다수가 중복제어가 잘 되었겠지만.. QA에서 또다른 문제가 발생되었다.
그림으로 정리를 해봤다.
트랜잭션1
에서 걸었던 Lock을 트랜잭션2
에서 해제를 해버리는 바람에, 트랜잭션3
에서 트랜잭션1
의 비즈니스 로직이 진행중임에도 중복으로 제어가 가능한 상황이 되어버렸다.
해당 문제를 캐치하기 까지 오랜시간이 걸렸다.
기존의 중복제어 문제를 해결하는 로직의 테스트코드를 작성했었지만, 이러한 부분까지 고려하지 못했던 부분이 아쉬웠다.
(물론 알았다면.. 그 부분까지 고려한 테스트코드를 작성해보지 않았을까?)
그래서, 이번 블로그의 주제는
중복제어 상황의 요청에서 타 트랜잭션에서 걸어놓은 Lock 해제로 인해 생긴 문제를 테스트 코드로 잡아내고 이를 해결하기 위한 방법을 고안해내는 것이다.
@Service
@RequiredArgsConstructor
public class LockService {
private final LockRepository lockRepository;
public void tryLock(String id) throws DuplicateKeyException {
lockRepository.insert(LockDocument.of(id));
}
public void unLock(String id) {
lockRepository.deleteById(id);
}
}
@Service
@RequiredArgsConstructor
@Slf4j
public class UserService {
private final LockService lockService;
private final ProtectedResourceRepository resourceRepository;
private final DelayUtil delayUtil;
public String upload(String id) throws InterruptedException {
Boolean isSuccess = true;
try{
log.info("upload 수행");
lockService.tryLock(id);
// 비즈니스 로직 수행
delayUtil.randomDelay();
ProtectedResource resource = resourceRepository.findById(id)
.orElse(new ProtectedResource(id, 0));
resourceRepository.save(new ProtectedResource(id, resource.getCount() + 1));
} catch (DuplicateKeyException e) {
log.info("DUP ERROR in Thread = {}", Thread.currentThread().getName());
isSuccess = false;
return "Fail (Lock)";
} finally {
if(isSuccess){
lockService.unLock(id);
log.info("UNLOCK in Thread = {}", Thread.currentThread().getName());
}
}
return "Success";
}
public String delete(String id) throws InterruptedException {
Boolean isSuccess = true;
try{
log.info("delete 수행");
lockService.tryLock(id);
// 비즈니스 로직 수행
delayUtil.randomDelay();
resourceRepository.deleteById(id);
} catch (DuplicateKeyException e) {
log.info("DUP ERROR in Thread = {}", Thread.currentThread().getName());
isSuccess = false;
return "Fail (Lock)";
} finally {
if(isSuccess){
lockService.unLock(id);
log.info("UNLOCK in Thread = {}", Thread.currentThread().getName());
}
}
return "Success";
}
}
DelayUtil을 활용해서 random 시간 동안 (50~200ms) I/O와 같은 DB로직이 수행된다고 가정했다.
테스트코드를 작성하기 위한 전제 조건은 두가지이다.
비즈니스로직에 0.5초가 걸린다고 가정하자.
그리고 0.02초 간격으로 세개의 요청이 들어오는데, 삭제/삭제/업로드 요청이 인입된다.
그러면 최초 삭제시 Lock을 획득하고,
두번째 삭제시 중복제어 상황이므로 Lock이 해제되고,
세번째 업로드 요청에서 최초 삭제 로직이 진행중임에도, 두번째 요청에서 첫번째 요청에서 걸었던 Lock이 해제되어 업로드 요청이 수행될 수 있다.
@Test
@DisplayName("0.02초 간격으로 삭제/삭제/업로드 요청시, 첫번째 요청의 비즈니스 로직만 수행된다")
void 순서대로_삭제_삭제_업로드_요청시_최초_요청만_수행된다() throws InterruptedException {
int concurrency = 3;
ExecutorService executor = Executors.newFixedThreadPool(concurrency);
List<String> results = Collections.synchronizedList(new ArrayList<>());
for (int i = 0; i < concurrency; i++) {
final int index = i;
executor.submit(() -> {
try {
String result = null;
if (index == concurrency - 1) {
result = userService.upload("shared-id");
} else {
result = userService.delete("shared-id");
}
results.add(result);
} catch (DuplicateKeyException e) {
results.add("Duplication Exception");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
Thread.sleep(20);
}
// 스레드풀 전체 종료까지 main스레드 대기
executor.shutdown();
executor.awaitTermination(1, TimeUnit.SECONDS);
long successCount = results.stream().filter("Success"::equals).count();
Optional<ProtectedResource> resource = resourceRepository.findById("shared-id");
assertAll(
() -> assertThat(successCount)
.as("하나의 스레드에서만 비즈니스 로직이 성공해야 한다")
.isEqualTo(1),
() -> assertThat(resource)
.as("shared-id 리소스는 존재하지 않아야 한다")
.isNotPresent()
);
}
만약 제대로 중복제어가 되었을 것이라고 가정한다면, 세번째 인입된 업로드 요청은 시행되지 않았을 것이고, 결국 하나의 스레드에서만 비즈니스 로직이 성공 했어야 할 것이다.
두번째로 인입되었던 요청 (pool-3-thread-2)에서 Lock이 해제되어, 세번째로 인입되었던 upload요청에서 중복제어 상황임에도 성공적으로 수행되었음을 알 수 있다.
따라서, 첫번째 요청이 아니라, 첫번째와 세번째의 요청이 성공했음을 알 수 있고, 세번째 요청인 업로드도 성공적으로 진행되면서 저장되지 말아야할 Document가 저장됨을 알 수 있다.
비즈니스로직에 0.5초가 걸린다고 가정하고,
이번에는 세개의 요청이 0.02초 간격이 아니라, 정말 '동시' 상황을 가정해보자.
동시 상황을 가정하기 위해 Executor로 여러 스레드를 실행하기 전에, 메인 스레드 용 CountDownLatch로 스레드의 실행을 중지시키고, 모든 스레드를 submit시킨 후, 메인스레드에서 CouuntDownLatch를 0으로 만들어, 동시에 실행하게 했다.
@Test
@DisplayName("동시에 삭제/삭제/업로드 요청시, 최초 인입 요청의 비즈니스 로직만 수행된다")
void 동시에_삭제_삭제_업로드_요청시_최초_요청만_수행된다() throws InterruptedException {
int concurrency = 10;
ExecutorService executor = Executors.newFixedThreadPool(concurrency);
CountDownLatch allThreadsLatch = new CountDownLatch(1);
CountDownLatch eachThreadLatch = new CountDownLatch(concurrency);
List<String> results = Collections.synchronizedList(new ArrayList<>());
for (int i = 0; i < concurrency; i++) {
final int index = i;
executor.submit(() -> {
try {
allThreadsLatch.await();
String result = null;
if (index == concurrency - 1) {
result = userService.upload("shared-id");
} else {
result = userService.delete("shared-id");
}
results.add(result);
} catch (DuplicateKeyException e) {
results.add("Duplication Exception");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
eachThreadLatch.countDown();
}
});
}
allThreadsLatch.countDown();
eachThreadLatch.await();
// 스레드풀 전체 종료까지 main스레드 대기
executor.shutdown();
executor.awaitTermination(1, TimeUnit.SECONDS);
long successCount = results.stream().filter("Success"::equals).count();
assertThat(successCount)
.as("하나의 스레드에서만 비즈니스 로직이 성공해야 한다")
.isEqualTo(1);
}
10개밖에 안되서 그런가.. 중복제어가 잘 이뤄졌다.
1000개정도로 늘려보자.
원하는 상황이 등장하기 시작했고 테스트는 실패한다.
그렇다면 이제 테스트를 성공시키기 위해 기존 로직을 수정하자.
중복제어 상황에서 Lock을 해제하지 못하게 해야한다.
@Service
@RequiredArgsConstructor
@Slf4j
public class UserService {
private final LockService lockService;
private final ProtectedResourceRepository resourceRepository;
private final DelayUtil delayUtil;
public String upload(String id) throws InterruptedException {
Boolean isSuccess = true;
try{
log.info("upload 수행");
lockService.tryLock(id);
// 비즈니스 로직 수행
delayUtil.randomDelay();
ProtectedResource resource = resourceRepository.findById(id)
.orElse(new ProtectedResource(id, 0));
resourceRepository.save(new ProtectedResource(id, resource.getCount() + 1));
} catch (DuplicateKeyException e) {
log.info("DUP ERROR in Thread = {}", Thread.currentThread().getName());
isSuccess = false;
return "Fail (Lock)";
} finally {
if(isSuccess){
lockService.unLock(id);
log.info("UNLOCK in Thread = {}", Thread.currentThread().getName());
}
}
return "Success";
}
public String delete(String id) throws InterruptedException {
Boolean isSuccess = true;
try{
log.info("delete 수행");
lockService.tryLock(id);
// 비즈니스 로직 수행
delayUtil.randomDelay();
resourceRepository.deleteById(id);
} catch (DuplicateKeyException e) {
log.info("DUP ERROR in Thread = {}", Thread.currentThread().getName());
isSuccess = false;
return "Fail (Lock)";
} finally {
if(isSuccess){
lockService.unLock(id);
log.info("UNLOCK in Thread = {}", Thread.currentThread().getName());
}
}
return "Success";
}
}
성공 Flag를 통해서 성공일경우에만 Lock을 해제하자.
완벽히 성공함을 볼 수 있었다!