Redis 를 활용한 Concurrency 문제 해결기

박우영·2024년 2월 4일
0

트러블 슈팅

목록 보기
19/19

동시성 문제를 해결하기 위한 여러 방법중 Redis 를 활용한 DistributedLock 으로 해결 한 이유와 방법을 소개합니다.

Issue #1

기대 결과 : 채팅 종료 후 마지막 답변한 내용이 채팅 목록에 표시되어야 함 

실제 결과 :  (채팅 상세 화면에서 종료 후) 채팅 목록에서 마지막 메시지가 나오지 않고, 
다른 메시지가 보일때도 있음 (정상적인 채팅 종료 시, 채팅 중간에 얘기한 답변들이 나왔음

예상 flow

문제 식별

2가지 케이스가 있었습니다.
1. 트랜잭션이 모두 완료되지 않았는데 채팅방 목록을 조회
2. SessionClose 후 채팅방 조회

1번일 경우엔 최신화가 되지않았고, 2번의 경우엔 모든 트랜잭션이 마친 후에 조회를 하는데 최신화가 되지않았습니다.

Solution #1

Transaction Operation 은 변경하지 않고 Schedule 의 순서를 변경에 대한 필요성을 느꼈습니다.

Lock

Isolation Level 이 Mysql 의 경우 default 가 repeatable read 로 되어있습니다.
따라서 현재 발생하는 Non-Repeatable-Read 의 문제는 해결 될것이라 생각합니다.

하지만 이렇게 할 경우 채팅을 할때마다 RDBMS 의 IO가 발생하게 됩니다.

Caching Strategies

RDBMS 에 command 를 하는 것보다 퍼포먼스 향상을 위해, 최신의 채팅내역을 Scale-out 되더라도 사용할 수 있는 Redis 를 사용하기로 했습니다.

캐시 전략은 read/write 에 따라 여러가지 방법이 존재하고 어떻게 적용해야할지가 주요 관건 이었는데요.

우리는 IO 를 최소화 하고, 현재의 데이터가 캐싱되기를 원하기 때문에

  • write back 패턴
  • Look Aside 패턴

이두가지를 사용하기로 결정하였습니다.

Issue #2

사용자와 WebSocket Connection 이 맺어지면 Redis 에 있는 Agora Channel 정보를 받아 Add 후 set 을 해줬는데요 다음과 같은 문제가 발생 하였습니다.

기대 결과 : 1:1 채팅

실제 결과 : n:m 채팅

Stress Test 중 식별된 Redis의 동시성 문제입니다.
Redis 가 싱글스레드로 동작하기때문에 발생하지 않을거라 생각한 문제였습니다.

flow

Solution #2

get 이 이뤄지고 set이 실행되기전 다른 Transaction 에서 get 할경우 발생한 문제입니다.

각 상황에 맞게 leaseTime 과 waitTime 을 설정해줍니다.

Redis Lock Develop

Redis Lock 을 사용하는곳이 점점 늘어납니다. 채팅 기록, 채팅방, 시그널링 채널 등등...
동일한 코드의 재사용이 늘어나고 있는데 AOP 로 구현하는 Solution 을 접했고 우리 프로젝트에 적용하면 좋을 것 같다는 생각이 들었습니다.

Annotation

어노테이션을 활용하여 가독성과 사용 편의성을 향상 시킵니다.

@Target(AnnotationTarget.FUNCTION)
@Retention(AnnotationRetention.RUNTIME)
annotation class DistributedLock(
    val key: String,
    val timeUnit: TimeUnit = TimeUnit.SECONDS,
    val leaseTime: Long = 3L,
    val waitTime: Long = 5L,
)

Transaction

Spring 의 Requires New 를 사용하여 별도의 트랜잭션으로 격리 시켜줍니다.

@Component
class AopForTransaction {
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    fun proceed(joinPoint: ProceedingJoinPoint): Any? {
        return joinPoint.proceed()
    }
}

AOP

먼저 Redis Lock 을 획득 후 method 종료 후 lock 을 해제합니다.

   ...
   @Around("@annotation(com.example.core.common.annotation.DistributedLock)")
    fun lock(joinPoint: ProceedingJoinPoint): Any? {
        val signature = joinPoint.signature as MethodSignature
        val method = signature.method
        val annotation = method.getAnnotation(DistributedLock::class.java)

        val key: String = REDISSON_LOCK_PREFIX + annotation.key

        val lock = redissonClient.getLock(key)

        try {
            val tryLock = lock.tryLock(annotation.waitTime, annotation.leaseTime, annotation.timeUnit)
            if (!tryLock) {
                return false
            }
            return aopForTransaction.proceed(joinPoint)
        } catch (e: InterruptedException) {
            throw ServiceException()
        } finally {
            try {
                lock.unlock()
            } catch (_: IllegalMonitorStateException) {
                // 이미 lease time으로 인해 lock 해제
            }
        }
    }

어떻게 동작할까??

Lock 획득
Creitical Section 코드인데 결국 여러개의 쓰레드 에서 Lock 을 획득해야합니다.
동기화 하는 방법은 여러가지 가 있겠지만 저는 Redisson 과 Lettuce 에서 동기화 하는 방법을 찾아봤습니다.

  1. Lettuce 는 Spin Lock 을 활용해 동기화를 시켜줍니다.
    Redis 공식문서
    를 참조하면 setnx 를 활용하여 spin lock 으로 lock 을 획득합니다.

  2. Redisson 은 redis pub/sub 을 활용해 lock 을 획득합니다.

    @Override
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return true;
        }
        
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(waitTime, unit, threadId);
            return false;
        }
        
        current = System.currentTimeMillis();
        CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        try {
            subscribeFuture.get(time, TimeUnit.MILLISECONDS);
        } catch (TimeoutException e) {
            if (!subscribeFuture.completeExceptionally(new RedisTimeoutException(
                    "Unable to acquire subscription lock after " + time + "ms. " +
                            "Try to increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."))) {
                subscribeFuture.whenComplete((res, ex) -> {
                    if (ex == null) {
                        unsubscribe(res, threadId);
                    }
                });
            }
            acquireFailed(waitTime, unit, threadId);
            return false;
        } catch (ExecutionException e) {
            acquireFailed(waitTime, unit, threadId);
            return false;
        }

        try {
            time -= System.currentTimeMillis() - current;
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }
        
            while (true) {
                long currentTime = System.currentTimeMillis();
                ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    return true;
                }

                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }

                // waiting for message
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
                    commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }

                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }
            }
        } finally {
            unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
        }
//        return get(tryLockAsync(waitTime, leaseTime, unit));
    }

tryLock 의 구현체를 확인해보겠습니다.
pub/sub 으로 구현되어있다고 하는것처럼 첫 시도후 lock 을 얻지못하면 threadid 로 subscribe 를 합니다.

acquireLock 은 lua script 로 작성되어있으며

if (redis.call('exists', KEYS[1]) == 0) then 
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;

// LOCK KEY가 존재하는지 확인한다(없으면 0, 있으면 1)
// LOCK KEY가 존재하지 않으면 LOCK KEY와 현재 쓰레드 아이디를 기반으로 값을 1 증가시켜준다
// LOCK KEY에 유효시간을 설정한다
// null 값을 리턴한다

if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;

// 해시맵 기반으로 LOCK KEY와 쓰레드 아이디로 존재하면 0이고, 존재하지 않으면 저장하고 1을 리턴한다
// LOCK KEY가 존재하지 않으면 LOCK KEY와 현재 쓰레드 아이디를 기반으로 값을 1 증가시켜준다
// LOCK KEY에 유효시간을 설정한다
// null 값을 리턴한다

return redis.call('pttl', KEYS[1]);

// 위의 조건들이 모두 false 이면 현재 존재하는 LOCK KEY의 TTL 시간을 리턴한다

subscribe

    public CompletableFuture<E> subscribe(String entryName, String channelName) {
        AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));
        CompletableFuture<E> newPromise = new CompletableFuture<>();

        semaphore.acquire().thenAccept(c -> {
            if (newPromise.isDone()) {
                semaphore.release();
                return;
            }

            E entry = entries.get(entryName);
            if (entry != null) {
                entry.acquire();
                semaphore.release();
                entry.getPromise().whenComplete((r, e) -> {
                    if (e != null) {
                        newPromise.completeExceptionally(e);
                        return;
                    }
                    newPromise.complete(r);
                });
                return;
            }

            E value = createEntry(newPromise);
            value.acquire();

            E oldValue = entries.putIfAbsent(entryName, value);
            if (oldValue != null) {
                oldValue.acquire();
                semaphore.release();
                oldValue.getPromise().whenComplete((r, e) -> {
                    if (e != null) {
                        newPromise.completeExceptionally(e);
                        return;
                    }
                    newPromise.complete(r);
                });
                return;
            }

            RedisPubSubListener<Object> listener = createListener(channelName, value);
            CompletableFuture<PubSubConnectionEntry> s = service.subscribeNoTimeout(LongCodec.INSTANCE, channelName, semaphore, listener);
            newPromise.whenComplete((r, e) -> {
                if (e != null) {
                    s.completeExceptionally(e);
                }
            });
            s.whenComplete((r, e) -> {
                if (e != null) {
                    entries.remove(entryName);
                    value.getPromise().completeExceptionally(e);
                    return;
                }
                value.getPromise().complete(value);
            });

        });

        return newPromise;
    }

semapore 를 활용하여 여러 쓰레드에서 요청이 왔을때 subscribe 와 unsubscribe 를 하기위해 사용됩니다.

Transaction

사용할때는 Requires_New Transaction 을 사용하였기 때문에 해당 어노테이션을 사용한 function 의 runtime exception 을 swallowed 한다면 의도대로 동작되지 않을 수 있다는 점 입니다.

이점을 유의하여 사용한다면 매우 간단하게 분산락을 사용할 수 있습니다. :)

회고

write back 패턴과 Look Aside 의 패턴의 문제점중 하나인 Redis 가 다운 되었을때 해결해야만 과제가 있었고, Docker 로 pull 받아 사용하던 redis 를 AWS ElastiCache 로 전환한 이유 입니다.

아직 cache store 가 다운 됐을때 대응과 정합성 처리가 미흡한 부분이 있습니다.
이러한 부분을 어느정도 해결하고자 AWS ElastiCache 를 사용하였고 고가용성을 추구할 수 있도록 종속되지 않은 서비스를 제공하고자 합니다.

Reference


innodb lock
ElastiCache vs Memory DB
컬리 기술블로그

0개의 댓글