Redis List

dong5854·2022년 4월 21일
0

Redis Messaging

목록 보기
2/3
import threading
import redis
import time

conn = redis.Redis(host='localhost', port=6379, db=0)


def run_list_queue():

    # sender
    threading.Thread(target=sender, args=(10,)).start()

    # receiver
    while True:
        print("waiting message...")
        res = conn.blpop('test_list', timeout=0)
        if res is not None:
            print(res)
        time.sleep(0.5)


def sender(n: int):
    time.sleep(1)
    for num in range(n):
        time.sleep(1)
        conn.rpush('test_list', f'message #{num}')


if __name__ == '__main__':
    run_list_queue()

파이썬에서 Redis List를 사용해 메세징 큐를 구현해 보았다. publish하는 시점에 subscriber가 없으면 데이터가 유실되는 Pub/Sub과는 다르게 메세지를 넣어 둔 후 시간이 지나도 메세지를 가져오는것이 가능하다.

위의 코드를 실행할 때 인상깊었던 부분은 blpop에서 꺼내올 데이터가 없다면 대기를 한다는 점이다.

아래의 실행결과를 보면 가져올 데티어가 없으면 while문이 계속해서 반복되는것이 아니라 blpop을 하는 지점에서 멈춰있는 것을 확인할 수 있다.

waiting message...
(b'test_list', b'message #0')
waiting message...
(b'test_list', b'message #1')
waiting message...
(b'test_list', b'message #2')
waiting message...
(b'test_list', b'message #3')
waiting message...
(b'test_list', b'message #4')
waiting message...
(b'test_list', b'message #5')
waiting message...
(b'test_list', b'message #6')
waiting message...
(b'test_list', b'message #7')
waiting message...
(b'test_list', b'message #8')
waiting message...
(b'test_list', b'message #9')
waiting message...

위의 예시에서는 rpush를 사용했지만 rpushx를 사용하면 키가 존재할 때만 리스트에 데이터를 추가할 수 있다. 트위터는 해당 기능을 사용해 트위터를 자주 이용하던 유저의 타임라인에만 새로운 데이터를 캐싱하고 자주 사용하지 않는 유저에 대해서는 데이터를 캐싱하는 비효율적인 작업을 방지하는 캐싱전략에도 사용된다고 한다.

또한 위의 예시에서 사용한 blpoplpop과는 다르게 blocking 기능이 있기 때문에 제어권을 자신이 가지고 있어 Event Queue로써 기능할 수 있게 해준다.

profile
https://github.com/dong5854?tab=repositories

0개의 댓글