그전과 동일하게 아래 영상을 참고 하였다.
https://www.youtube.com/watch?v=geMtm17ofPY
해당 영상은 Java 단위에서 수행 하였고 , 기본 적인 개념 설명 후 nest 에서 사용하는 법을 작성
프로듀서의 기본 흐름
- Sender 와 전반 Serializer & Partitioner 은 별도 Thread 로 동작
=> Batch 찼는지 여부와 상관없이 Sender 는 Broker 에 전송
- batch.size : Batch 크기 ( 다 차면 바로 전송 )
- linger.ms : 전송 대기 시간 ( 기본값 0 )
- 대기 시간 X 일 시 , 배치 덜 차도 바로 전송
- 대기 시간 O 일 시 , 시간 만큼 배치에 추가하여 한번에 전송 - 헌번에 처리 가능
=> 기본적 send 는 전송 결과 확인 X ( ack : 0
ack
ack : 0
ack : 1
- Partition 리더 에 저장되면 응답 받음
- 리더가 장애 발생시 Message 유실 가능성 존재
ack : all ( -1 )
- 모든 리프리카에 저장되면 응답 받음 ( min.insync.replicas 따라 설정 가능 )
min.insync.replicas ( Broker Option )
- ack 옵션이 all 일 떄 , 저장에 성공했다고 응답 해야하는 동기화 된 리플리카 최소 개수
Case1. 리플리카 개수 : 3 , ack = all , min.insync.replicas = 2
=> 리더에 저장 후 , 팔로워 중 한개 저장시 성공 응답 ( 리더도 min 에 포함 )
Case2. 리플리카 개수 : 3 , ack = all , min.insync.replicas = 1
=> 리더에 저장 되면 성공 응답 ( ack = 1 이라 차이 X - Message 유실 가능! )
Case3. 리플리카 개수 : 3 , ack = all , min.insync.replicas = 3
- 리더 1개 + 팔로워 2개 저장시 성공 응답
- 팔로워 중 한개 응답시 리플리카 부족 인한 저장 실패! ( 매우 치명적 ) - 리더도 리플리카에 포함
에러 유형
전송 과정 단위 실패
- 전송 타임 아웃
- 리더 다운 의한 새 리더 선출 진행 상태
- 브로커 설정 메시지 크기 한도 초과
- ...
전송 전 실패
- 직렬화 실패 , 프로듀서 자체 요청 크기 제한 초과
- Producer Buffer 가 찬 후 기다린 시간이 최대 대기 시간 초과
- ...
실패 대응
1. 재시도
- 재시도 가능 에러 는 재시도 처리 ( 브로커 응답 타임 아웃 , 일시적 리더 없음 등 )
재시도 위치 단
- 프로듀서는 자체적 브로커 전송 과정 중 에러 발생 시 재시도 가능 에러 대해 재전송 시도
- send 메서드에서 익셉션 발생시 익셉션 타입 따라 재호출
- 무한 재시도는 매우 지양 해야함! ( 서버 부하 )
2. 기록
- 추후 처리 위한 기록
- 별도 파일 , DB 등 이용해 실패 Message 기록
- 추후 수동 or 자동 통해 보정 작업 진행
기록 위치
- send Method 에 익셉션 발생시
- send Method 에 전달 콜백에서 익셉션 받는 경우
- send Method 가 리턴한 Future 의 get() Method 에서 익셉션 발생시
메시지 중복 전송 가능성
- Broker 응답이 늦게 와서 재시도 할 경우 중복 발송 가능
=> enable.idempotence 속성 참고
재시도와 순서
- max.in.flight.requests.per.connection
- Blocking 없이 한 Connection 에서 전송할 수 있는 최대 전송중 인 요청 개수
- 이 값이 1보다 크면 재시도 시점 따라 메시지 순서 바뀔수 있음. ( 여러개를 보내므로 )
- 전송 순서 중요시 1로 지정
in Nest
- nest 에서 제공하는 microservices 를 사용하려고 해도 , kafkajs 는 무조건 install 해야한다.
npm install kafkajs
npm install @nestjs/microservices
Producer
Producer Config
export interface ProducerConfig {
createPartitioner?: ICustomPartitioner;
retry?: RetryOptions;
metadataMaxAge?: number;
allowAutoTopicCreation?: boolean;
idempotent?: boolean;
transactionalId?: string;
transactionTimeout?: number;
maxInFlightRequests?: number;
}
- createPartitioner : Producer 가 Message 를 특정 Partition 라우팅 할 때 사용하는 파티셔너 지정
- metadataMaxAge : Cluster 의 Metadata 정보 얼마나 오래 Cache 할지 지정
( 캐시된 메타데이터 사용해 파티션 리더 정보 더 빠르게 검색 가능 )
- allowAutoTopicCreation : Kafka Topic 이 자동 생성 되게 할 지 여부
( true 시 , topic 존재 하지 않을 시 자동 생성)
- idempotent : 메시지 전송 멱등성 활성화 ( 메시지 중복 전송 방지 ) - enable.idempotence 와 동일
- transactionalId : 트랜잭션 아이디 설정 , 트랜잭션 사용 시 해당 아이디 지정해 트랜잭션 관리
- transactionTimeout : 트랜잭션의 Timeout 시간 설정 - 지정 시간 내 완료되야함
- maxInFlightRequest : 동시 처리 최대 요청 전송 수 지정 - 동시 처리 요청 수 제한
- retry : 메시지 전송 시 재시도 동작 구성 하는데 사용
RetryOptions
export interface RetryOptions {
maxRetryTime?: number;
initialRetryTime?: number;
factor?: number;
multiplier?: number;
retries?: number;
restartOnFailure?: (e: Error) => Promise<boolean>;
}
- maxRetryTime : 재시도 허용 되는 최대 시간 - 지정 시간 이내 재시도 완료 못할 시 중단
- initialRetryTime : 첫 재시도 시작되는 대기 시간 - 지정 시간 지난 후 재시도 시작
- factor : 각 재시도 간 대시 시간이 이전 재시도 대기 시간 대해 곱해지는 계수 - 2.0 이 시 , 두 배씩 증가
- multiplier : 대기 시간 계산에 사용 되는 기본 시간 단위 - 1000 이면 , 밀리초(ms)단위 대기 시간 계산
- retries : 최대 재시도 횟수 - 초과 시 실패 처리
- restartOnFailure : Error 처리하고 , 재시도 여부 결정 위한 Custom 정의 함수
Usage
- 여기서 주의해야할 점은 kafkajs 를 사용하는 것 과 @nestjs/microservices를 사용할 때 차이점이 있다는 것이다.
@nestjs/microservices
@Module({
imports: [
ClientsModule.register([
{
name: 'PRODUCER',
transport: Transport.KAFKA,
options: {
client: {
clientId: 'RooTripClient',
brokers: ['localhost:9094'],
},
producer: {retry:{retries:3,initialRetryTime:1000,multiplier:1.5}},
},
},
]),
],
- ClientsModule 을 통해서 생성가능
- name을 지정하여 , 여러가지 options 으로 생성하여 주입 가능
( 사담으로 , useFactory 를 이용하여 , client 가 아닌 , producer 단위 추출할 방법을 찾았으나 ,
microservices 의 client 는 client.producer.send 가 아닌 , client.send 처럼 client 가 전부 다 가지고 있으므로 불가능 )
send vs emit
- client 는 Kafka 에 Message 를 Produce 하기 위한 Method 로 send method 와 emit method 를 가진다.
send<TResult = any, TInput = any>(pattern: any, data: TInput): Observable<TResult>;
emit<TResult = any, TInput = any>(pattern: any, data: TInput): Observable<TResult>;
- send 와 emit 은 이렇게 모든게 동일한데 , 동작 방법이 다르다.
- 아래 예제는 다른 서버에서 , 이메일을 보내기 위한 kafka topic 을 발생시키는 예제이다.
send - message pattern
- send 는 producer 가 send 를 하기 전 , onMoudleInit Function 을 통해서 ,
producer 가 subscribeResponseof 를 통해서 , reply topic 을 구독해야 한다.
( Kafka microservice message pattern 이 두개 topic 활성 - request , reply channel )
@Injectable()
export class SendVertificationEmailEventListener {
constructor(@Inject('PRODUCER') private readonly producer: ClientKafka) {
}
onModuleInit(){
this.producer.subscribeToResponseOf('send.vertification.email');
}
@OnEvent(SendVertificationEmailDomainEvent.name)
handleSendVertificationEmailEvent(event: SendVertificationEmailDomainEvent) {
const result$ = this.producer.send(SEND_VERTIFICATION_EMAIL, {email:event.email,url:event.redirectUrl});
result$.subscribe({
next:(data)=>{
return;
},
error:(error)=>{
this.producer.emit(SEND_VERTIFICATION_EMAIL,{email:event.email,url:event.redirectUrl});
return;
}
})
}
- result 뒤에 $ 를 붙인 것은 observable 변수이므로 $ 를 붙여 구분했다.
- 여기서 주의해야할 점은 무조건 subscribe 를 해야 한다는 것이다.
=> 사실 해당 부분에 대해서는 issue 가 있다.
https://github.com/nestjs/nest/issues/2718
위처럼 send 는 의도적으로 cold observeable 을 반환한다.
이 점은 nestjs 공식 문서 - microservices/overview 에서 Sending messages 부분에 보면 있어서 모를수 있으므로 주의해야 한다. ( 하단 공식문서 부분 참조 )
따라서 , send 를 사용할때는 subscribe 를 해야만 한다.
- 성공한 경우인 next 와 에러가 발생한 경우인 error로 추가 로직을 처리할 수 있다.
( next 에 있는 data 를 출력해보면 , null 이 나온다. )
- performance.now 를 통해 측정
- 총 32개의 실행을 한 후 , 평균을 구한 결과 대략 5.33 ms 가 나왔다.
emit - event pattern
- emit 은 단순히 , 바로 선언해서 사용할 수 있다.
@Injectable()
export class SendVertificationEmailEventListener {
constructor(@Inject('PRODUCER') private readonly producer: ClientKafka) {
}
@OnEvent(SendVertificationEmailDomainEvent.name)
handleSendVertificationEmailEvent(event: SendVertificationEmailDomainEvent) {
const result$ = this.producer.emit(SEND_VERTIFICATION_EMAIL, {email:event.email,url:event.redirectUrl});
result$.subscribe({
next:(data)=>{
return;
},
error:(error)=>{
this.producer.emit(SEND_VERTIFICATION_EMAIL,{email:event.email,url:event.redirectUrl});
return;
}
})
}
- result 는 hot observable 을 반환한다.
- data 를 출력하면 , 아래와 같이 나온다.
[
{
topicName: 'send.vertification.email',
partition: 0,
errorCode: 0,
baseOffset: '290',
logAppendTime: '-1',
logStartOffset: '0'
}
]
- 다른 부분은 위와 동일하다
- 총 32개의 실행을 한 후 , 평균을 구한 결과 대략 2.7742 ms 가 나왔다.
Conclusion
- emit 이 사용하는 법도 간단하고 , 속도도 매우 빠르다.
- 왜 send 를 사용해야하는지에 대해서는 아직 잘 모르겠다.
By GPT
NestJS의 @nestjs/microservices
모듈을 사용할 때, send
와 emit
메서드는 서로 다른 메시지 패턴을 지원합니다. 각각의 메서드에는 다음과 같은 주요 특징이 있습니다:
-
send
메서드:
- Request-Response 메시지 패턴을 지원합니다.
- 메시지를 보내고 그에 대한 응답을 기다릴 수 있습니다.
send
를 사용하면 특정 주제(topic)로 메시지를 보내고 해당 주제에 대한 응답을 기다릴 수 있습니다.
- 즉,
send
를 사용하면 메시지를 보내고 해당 메시지에 대한 응답을 동기적으로 처리할 수 있습니다.
-
emit
메서드:
- Publish-Subscribe 메시지 패턴을 지원합니다.
- 메시지를 보내고 해당 메시지를 구독하는 모든 리스너에게 브로드캐스팅됩니다.
emit
를 사용하면 메시지를 보내고 해당 메시지를 처리하는 리스너에 대한 응답을 기다리지 않습니다.
- 비동기적인 이벤트 기반 통신에 적합합니다.
따라서 사용 사례에 따라 send
또는 emit
를 선택합니다. 예를 들어, Request-Response 패턴이 필요한 경우 send
를 사용하여 메시지를 보내고 응답을 기다릴 수 있습니다. 반면에, 이벤트를 브로드캐스팅하고 비동기적으로 처리해야 하는 경우 emit
를 사용할 수 있습니다.
요약하면, send
와 emit
는 서로 다른 메시지 패턴을 지원하며, 사용 사례와 요구 사항에 따라 선택하여 사용해야 합니다.
고찰점
- 현재 코드는 , ack 및 retry 에 대한 부분이 없다.
- 임의로 에러를 발생시켜서 에러 처리를 어떻게 하는지 찾아보려고 했으나 , 임의 에러 발생을 못해서 더 찾아봐야 할거 같다.
- docs 에도 자세히 안 나와있어서 차후 디스코드 및 커뮤니티에 물어볼 예정
kafkajs
- nestjs 에서 공식 제공하는게 아니다 보니 , Module 로 제공되는게 아닌 내가 직접 만들어서 주입했다.
@Module({
imports: [],
controllers: [],
providers: [
{
provide:"PRODUCER",
useFactory:()=>{
const producer = new Kafka({
clientId: 'RooTripClient',
brokers:['localhost:9094'],
}).producer({retry:{retries:3,initialRetryTime:1000,multiplier:1.5}});
producer.connect();
return producer;
}
}
],
exports: ["PRODUCER"],
})
@Injectable()
export class SendVertificationEmailEventListener {
constructor(@Inject('PRODUCER') private readonly producer: Producer) {
}
@OnEvent(SendVertificationEmailDomainEvent.name)
handleSendVertificationEmailEvent(event: SendVertificationEmailDomainEvent) {
const result = this.producer.send({
messages:[{value:JSON.stringify({email:event.email,url:event.redirectUrl})}]
,topic:"send.vertification.email",
acks:1})
result.then((data)=>{
return;
})
result.catch((error)=>{
return;
})
})
- send 에는 ProducerRecord 가 들어간다.
export interface ProducerRecord {
topic: string
messages: Message[]
acks?: number
timeout?: number
compression?: CompressionTypes
}
- Message 에는 Buffer | String 만 들어가므로 , JSON 을 넣을 시 , stringfy 를 통해 변환하여야 한다.
- result 는 위와 다르게 , Promise 를 반환한다.
- 따라서 , then 과 catch 문을 통해 처리가 가능하다.
- data 에는 아래와 같이 값이 나온다. ( 위 emit 함수와 동일 )
[
{
topicName: 'send.vertification.email',
partition: 0,
errorCode: 0,
baseOffset: '321',
logAppendTime: '-1',
logStartOffset: '0'
}
]
![[Pasted image 20231003122646.png]]
- performance.now 를 통해 측정
- 총 32개의 실행을 한 후 , 평균을 구한 결과 대략 3.37 ms 가 나왔다.
고찰점
- 위 emit 과 비교했을때 속도가 비슷하다.
- 위와 다르게, promise 를 반환함에 따라 , 단일 처리를 할 것으로 추정
- 다양한 옵션 제공 ( compression , ack 등등 )
Total Conclusion
- nestjs 에서 제공해주는게 , Module 단위 및 microservices 에 맞춰 더 잘 구현 되어 있으나 , 기능이 많이 부실한거 같다.
- 정확히는 모르겠으나 , 처음 컨슈머가 group 에 밸런싱 되는 시간도 , kafkajs 가 매우 빠름.
- kafkajs 가 처음 Client 를 생성한 후 , producer / consumer 등 역활에 맞춰 나누는 것을 보아 더 깔끔함.
- 다음 consumer 편에서 설명하겠지만 , @nestjs/microservices 에서 제공하는 Ctx 를 통해 현재 context 를 받아서 getConsumer 를 하면 kafkajs 의 consumer 가 나온다.
=> nestjs 에서 kafkajs 를 wrapping 하여 제공해주나 , kafkajs 를 쓰는게 더 용이해 보임
( 다만 , promise 와 rxjs 의 차이점이나 , 의존성 주입 할 때 싱글톤이나 부차적 문제가 발생하는지는 더 알아볼 예정 )
참고
https://github.com/nestjs/nest/issues/2718
https://so-kyte.tistory.com/199
https://docs.nestjs.com/microservices/basics#sending-messages
https://docs.nestjs.com/microservices/kafka
https://kafka.js.org/docs/introduction
Writed By Obisidan