NestJS기반 백엔드 서버에 카프카를 도입하고자 할 때,
kafkajs + @nestjs/microservices 를 조합해서 손쉽게 카프카를 사용할 수 있었습니다.
외부 채널링을 구현하며, 기존 예약서버의 로직을 재활용하기 위해 internal 통신을 구현해야했습니다.
API 통신으로 구현할 수 있었지만, 신사업에 대한 서버이기에 자원이 한정적이었기에 신규 서버가 동기화 배치(Cron), 예약 API 등 여러 기능을 모놀리식하게 처리해야했습니다.
때문에 단일 서버에서의 자원을 효율적으로 활용하기 위한 Non-Block-동기 로직을 구현했습니다.
Producing을 맡을 클래스는 nestjs의 OnModuleInit 를 구현해야합니다.
export class Producer implements OnModuleInit {
constructor(
@Inject(KAFKA_CLIENT)
private readonly clientKafka: ClientKafka
) {}
async onModuleInit(): Promise<void> {
this.clientKafka.subscribeToResponseOf(
`foo.get`
)
}
}
기존에 foo.get 라는 토픽이 있었을 것입니다.
응답을 구독하려면 foo.get.reply 라는 토픽을 미리 생성해두어야 합니다.
준비는 끝입니다. 이제 메시지를 발행할 때 ClientKafka의 두 가지 메소드(emit, send) 중 send 를 사용하면 됩니다.
send<TResult = any, TInput = any>(pattern: any, data: TInput): Observable<TResult>;
send는 Observable객체를 return 하기 때문에 lastValueFrom으로 Consumer의 응답결과를 받으면 됩니다.
이 때, 내부적으로는 rpc 통신의 헤더에 응답토픽에 메시지가 추가되기 위한 정보를 싣습니다.
const result = await lastValueFrom(
this.clientKafka
.send<
KafkaReplyDto,
KafkaProduceDto<FooGetDto>
>(`foo.get`, {})
.pipe(timeout(3000))
)
rxjs pipe를 이용해 타임아웃까지 추가해줍니다.
이 단계에서 retry등의 유틸함수등을 활용할 수 있습니다.
끝입니다.
이제 consumer에서 return 한 값이 result에 담기게 됩니다.
consumer example
@MessagePattern(`foo.get`)
async consumer(
@Payload() data: KafkaProduceDto<FooGetDto>,
@Ctx() context: KafkaContext
) {
return 'producer result에 들어갈 값'
}
카프카와의 rpc 통신 헤더에 포함된 정보를 통해 응답토픽에 return한 값이 프로듀싱됩니다.
이 때 헤더에는 응답토픽명과 파티션까지 특정되어있습니다.
즉, producer - consumer(producer) - consumer 일련의 관계가 파티션 단위까지 보장되어 있어야 한다는 것입니다.
이 때문에 한계점이 발생합니다.
REST-API 통신으로 구현했을 때와 구분되는 장점은,
동기적으로 통신해야할 부분을 카프카를 통해 구현하면서
예약시도에 대한 응답이 오기까지 서버가 Block 되지 않아서 효율적으로 다른 작업을 수행하게 되는 장점이 있습니다. (nonblock-sync)
producer -> consumer -> producer 절차를 비동기로 구현하면서
네트워크 이슈가 발생할 것을 고려하여, 각 컴포넌트의 멱등성을 보장해야합니다.
consumer 로직과 응답consumer(producer서버) 의 로직을 멱등적으로 구현하여
네트워크 장애 발생시 재시도등을 통한 가용성이 보장됩니다.
응답패턴은 기본적으로 Producer가 topic.reply 토픽의 구독자(consumer)가 되는 개념입니다.
서버가 부팅할 때 특정 토픽의 구독자로 등록이 됩니다. (consumer-group으로 묶여서)
서버 인스턴스를 늘릴 때, 토픽의 파티션이 부족하다면 (인스턴스 보다 적다면) 구독자로 등록이 되지 못해서 응답패턴이 제대로 동작하지 않습니다.
일반적으로 파티션과 인스턴스(Consumer)가 1:1관계가 됩니다.
ECS 등의 블루그린 배포는 기본적으로 서버(task) 인스턴스가 두 배로 늘어났다가 원래 수로 돌아오는 방식이라는 것을 알 겁니다.
이 때 파티션 개수가 부족하다면 추가로 뜨는 인스턴스가 구독자로 등록 되지 못해서 응답패턴이 제대로 동작하지 못합니다.
blue/green 배포가 아닐지언정 인스턴스의 개수가 조정되며 재구성되는 배포방식일 경우 검토가 필요할 것입니다
응답-구독 패턴으로 서버의 자원을 효율적으로 사용하고 기존 로직을 재활용할 수 있었으나
한계점도 분명히 존재했습니다.
위 설명처럼 서비스간 결합도가 높아 확장(Scale-out)이 어려울 수 있는 단점이 있습니다.
비즈니스에 맞게 적용하면 좋을 것 같습니다