실무에서 redis pub/sub이나 mosquitto와 같은 메시지 브로커를 사용했었는데,
메시지 브로커 같은 경우에는 kafka와 rabbitMQ 같은 것들도 있지만, 사이드 프로젝트에서 go를 사용하고 있기 때문에 nats에 대해서 찾아보게 되었다.
추가적으로 메시지 pub/sub 경우 메시지 유실 혹은 백업에 대한 문제도 확인해볼 필요가 있어 jetstream이라는 것을 알게되었다.
제대로 알기 위해서는 공식문서를 더 찾아보거나 실제 프로젝트에서 어떻게 사용하고 있는지 확인해봐야 할 것 같다.
NATS는 경량, 고성능의 오픈 소스 메시징 시스템으로, 분산 시스템에서 마이크로서비스 간 통신을 원활하게 할 수 있도록 설계된다. NATS는 기본적으로 Pub/Sub (발행/구독) 및 Request/Reply (요청/응답) 패턴을 지원하며, 수평 확장성이 뛰어나 클라우드 네이티브 애플리케이션에서 많이 사용된다.
docker run -d --name nats-server -p 4222:4222 nats:latest
Pub/Sub 패턴
• Publisher는 특정 주제(Subject)로 메시지를 발행하고, Subscriber는 해당 주제를 구독한다.
• 예를 들어, notifications라는 주제로 메시지를 발행하면, 이 주제를 구독하는 모든 Subscriber에게 메시지가 전달된다.
Request/Reply 패턴
• Request/Reply는 Publisher와 Subscriber 간의 양방향 통신이 필요한 경우 사용된다.
• 예를 들어, 클라이언트가 서버에 요청을 보내고 응답을 기다릴 때 유용하다.
JetStream
• JetStream은 NATS의 영속적 메시지 저장 기능으로, 메시지의 영구 저장과 스트리밍 기능을 제공한다.
• 데이터 보관이 필요한 서비스에서 JetStream을 통해 데이터를 영구 저장하고 복구할 수 있다.
예시 pub topic
- `user.create`: 새 사용자 등록
- `user.update.profile`: 프로필 정보 업데이트
- `user.update.settings`: 사용자 설정 변경
- `user.delete`: 사용자 계정 삭제
- `user.password.reset`: 비밀번호 초기화 요청
* >
와 같은 와일드 카드들도 존재한다.*
의 경우 한 계층만을 대체한다.>
의 경우 해당 계층 이후를 포괄한다.sub
1. user.* 로 토픽을 sub
받을 수 있는 토픽
user.create
user.delete
--
2. user.> 로 토픽을 sub
받을 수 있는 토픽
user.create
user.update.profile
user.update.settings
user.delete
user.passsword.reset
user.으로 시작하는 모든 토픽들을 받을 수 있다.
--
3. user.*.profile 토픽을 sub
받을 수 있는 토픽
user.update.profile만 받을 수 있다.
nats는 기본적으로 메시지 유실 가능성이 있다.
네트워크 문제나 구독을 하고 있는 구독자가 연결이 끊기고 재연결 후에는 메시지를 재전송 할 수 있는 방법이 없다.
nats Jetstream을 사용하면, 메시지를 디스크에 영구 저장이 가능하여 재수신이 가능하다.
docker compose YML 파일
version: "3.8"
services:
nats:
image: nats
ports:
- "4222:4222"
- "8222:8222"
- "6222:6222"
command: >
--cluster_name NATS
--jetstream
--cluster nats://0.0.0.0:6222
--http_port 8222
--server_name nats-0
--routes nats://nats-1:6222,nats://nats-2:6222
networks:
nats:
nats-1:
image: nats
command: >
--cluster_name NATS
--jetstream
--cluster nats://0.0.0.0:6222
--http_port 8222
--server_name nats-1
--routes nats://nats:6222,nats://nats-2:6222
depends_on:
- nats
networks:
nats:
nats-2:
image: nats
command: >
--cluster_name NATS
--jetstream
--cluster nats://0.0.0.0:6222
--http_port 8222
--server_name nats-2
--routes nats://nats:6222,nats://nats-1:6222
depends_on:
- nats
networks:
nats:
networks:
nats:
name: nats
docker 컨테이너 nats box 실행
docker run --network nats --rm -it natsio/nats-box
두개의 터미널에서 nats box에 접속하여 pub/sub을 테스트
해당 test1.a
를 pub/sub했더니 메시지가 날라옴을 확인
test1.> 로 토픽을 sub 할 때
>
으로 해당 계층 하위 계층까지 sub이 가능하다는 것을 확인nats stream add mystream --subjects "test.subject" --storage file
으로 해당 test.subject에 대한 스트림 생성 및 파일 시스템에 저장
Replication : 스트림의 복제본 수를 설정 (기본값은 1)
Retention Policy:
• Limits: 제한에 따라 보관
• Interest: 구독자가 있을 때만 보관
• Work Queue: 메시지가 소비되면 삭제 작업 큐처럼 동작
Discard Policy:
• Old: 제한을 초과하면 가장 오래된 메시지부터 삭제
• New: 제한을 초과하면 새 메시지를 삭제
Stream Messages Limit: 스트림의 총 메시지 개수 제한 -1은 제한이 없음
Per Subject Messages Limit: 특정 주제의 메시지 개수 제한 -1은 제한이 없음
Total Stream Size: 스트림의 전체 크기 제한(바이트 단위) -1은 제한이 없음
Message TTL (Time-to-Live): 메시지가 스트림에 유지되는 시간 5m은 5분 동안 메시지를 유지
Max Message Size: 메시지 크기 제한 -1은 제한이 없음
Duplicate Tracking Time Window: 중복 메시지 추적을 위한 시간 창 기본값은 2m0s
Allow message Roll-ups: 동일한 주제에서 특정 메시지가 이전의 동일 주제 메시지를 대체할 수 있게 설정 (yes면 설정)
Allow message deletion: 스트림에서 개별 메시지를 삭제할 수 있는지 설정
Allow purging subjects or the entire stream: 특정 주제나 스트림 전체를 정리(Purge)할 수 있는지 설정
해당 스트림에는 여러 토픽을 추가할 수 있고, 수정도 가능하며, 와일드카드도 활용가능하다.
해당 스트림을 생성하고 나서 ls
명령어로 확인해보면 스트림에 대한 내용을 볼 수 있다.
해당 토픽에 메시지를 pub 하고 확인하면
메시지가 늘어나 있음을 확인할 수 있다.
nats --server nats://nats:4222 stream info mystream 로도 정보 확인이 가능하다.
nats --server nats://nats:4222 consumer add mystream my_consumer --ack explicit
pull 모드는 소비자가 요청할때 메시지 수신(기본)
push 메시지가 실시간으로 특정 주제로 푸시
그 외 설정은 아직 잘 모르겠다. 문서를 보거나 찾아보니 gpt는 아래와 같이 응답함
1. Delivery target (empty for Pull Consumers):
• 메시지를 실시간으로 전달할 대상 주제(subject)를 지정
• 빈칸으로 두면 Pull 모드 소비자가 되어, 소비자가 요청할 때마다 메시지를 가져옴
2. Start policy:
• 스트림에서 소비자가 어떤 시점부터 메시지를 수신할지를 정의
• all: 스트림의 모든 메시지를 처음부터 수신.
• new: 소비자가 생성된 이후의 새로운 메시지만 수신.
• last: 가장 최근의 메시지부터 수신.
• subject: 특정 주제에 대한 메시지만 수신.
• 1h, msg sequence: 특정 시간 또는 메시지 시퀀스 번호로부터 시작.
3. Replay policy:
• 스트림에 저장된 메시지를 소비자에게 어떻게 재생할지 정의
• instant: 가능한 한 빨리 모든 메시지를 즉시 재생
• original: 메시지가 처음 발행된 타이밍 간격을 유지하며 재생
4. Filter Stream by subjects:
• 특정 주제의 메시지로만 필터링 함 빈칸으로 두면 스트림의 모든 주제를 수신
5. Maximum Allowed Deliveries:
• 각 메시지가 전달될 최대 횟수를 지정
• 기본값 -1은 제한 없이 재전송을 허용 설정한 숫자만큼 전달 시도가 실패할 경우, 메시지는 사라지거나 죽은 편지 상자(dead letter queue)에 보낼 수 있다.
6. Maximum Acknowledgments Pending:
• 소비자가 확인을 보내지 않은 상태로 유지할 수 있는 최대 메시지 수를 설정
• 기본값 0은 제한이 없음을 의미 이 값을 설정하면 소비자가 너무 많은 미확인 메시지를 수신하지 않도록 제한할 수 있다.
7. Deliver headers only without bodies:
• Yes로 설정하면 메시지의 헤더만 전달되고, 메시지 본문(body)은 제외
• No로 설정하면 전체 메시지(헤더와 본문)를 전달
8. Add a Retry Backoff Policy:
• Yes로 설정하면, 실패한 메시지에 대해 재시도 백오프(backoff) 정책을 추가할 수 있다.
• 백오프 정책을 설정하면, 메시지 전송이 실패했을 때 지연 후 다시 시도하는 정책을 정의할 수 있다.
nats 공식문서 : https://docs.nats.io/