Redis Stream

dong5854·2022년 4월 21일
0

Redis Messaging

목록 보기
3/3

Redis Stream은 Redis 5.0에 새롭게 추가된 데이터 타입으로 로그데이터 에 적합한 중간에 데이터를 수정할 수 없는 append-only 데이터 스트럭처이다. Redis Stream의 특별한 점은 카프카와 같이 Consumer Groups라는 컨셉을 통해 특정 컨슈머가 블록킹 방식으로 producer에서 데이터를 추가하는 것을 대기하는 기능을 추가한다.

import threading
import redis
import time

conn = redis.Redis(host='localhost', port=6379, db=0)


def run_list_queue():
    # sender
    threading.Thread(target=sender, args=(10,)).start()
    try:
    	# 컨슈머 그룹 "test_1" 생성
        conn.xgroup_create("test_stream", groupname="test_1", id="$", mkstream=True)
    except Exception as e:
        print(e)
    try:
    	#컨슈머 그룹 "test_2" 생성 
        conn.xgroup_create("test_stream", groupname="test_2", id=0, mkstream=True)
    except Exception as e:
        print(e)
    # receiver
    while True:
        print("waiting message...")
        res1 = conn.xreadgroup("test_1", streams={"test_stream": ">"}, consumername="dong", count=1)
        res2 = conn.xreadgroup("test_2", streams={"test_stream": 0}, consumername="chae", count=1)
        if res1 is not None:
            print("res1", end="")
            print(res1)
        if res2 is not None:
            print("res2", end="")
            print(res2)
        time.sleep(0.5)


def sender(n: int):
    time.sleep(1)
    for num in range(n):
        time.sleep(1)
        conn.xadd("test_stream", {'num': f"{num}"}, '*')


if __name__ == '__main__':
    run_list_queue()

XADD에서 *는 데이터가 저장되는 시간에 기반하여 id값을 자동생성 할 것을 나타낸다. 이렇게 자동생성하는 id말고 사용자가 직접 id값을 지정하는 것도 가능하다고 한다. 기본적으로 시간을 기반하여 id값을 생성하는 이유는 이런 방식으로 id를 생성하면XRANGE를 통해 범위에 대한 쿼리를 날릴 때 시간에 기반하여 데이터를 쉽게 받아올 수 있기 때문이다.

XGROUP_CREATE에서 사용되는 $는 특별한 id값으로 현재 stream의 가장 큰 값의 id를 나타낸다. id 인자값은 해당 값보다 큰 stream안의 값들을 읽을 것을 나타내는 것이기 때문에 0 인 경우에는 모든 stream 데이터를 읽게 되고 $의 경우에는 해당 그룹이 생성된 이후의 값들만 읽게 된다.

또한 XREADGROUP>다른 컨슈머에게 전달 된 적 없는 데이터를 뜻한다. >는 다른 컨슈머에게 전달 된 적 없는 가장 최신의 데이터를 받기 때문에 컨슈머 그룹의 last id를 업데이트한다.

Redis Stream은 데이터구조는 단순하지만 강력한 기능들을 많이 제공하는 많큼 옵션들이 많기 때문에 사용 전에 반드시 공식 도큐먼트를 한번 읽어보는것이 좋을 것 같다.

레디스 공식 도큐먼트
https://redis.io/docs/manual/data-types/streams/

profile
https://github.com/dong5854?tab=repositories

0개의 댓글