[Nest.js] Kafka 사용하기

Woong·2023년 1월 20일
0

Nestjs

목록 보기
24/28

설치

  • kafka 를 사용하기 위해 아래 2개 설치
yarn add @nestjs/microservices kafkajs 

Module 설정

import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import {ClientsModule, Transport} from '@nestjs/microservices';

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'my_client_name',
        transport: Transport.KAFKA,
        options: {
          client: {
            clientId: 'my_client_id',
            brokers: ['localhost:9092'],
          },
          consumer: {
            groupId: 'my_consumger_string_id',
          },
        },
      },
    ]),
  ],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {}
  • hybrid 애플리케이션으로 구현할 경우
    • microservice 여럿을 시작하거나, 웹과 같이 사용하기
    • connectMicroservice(..) 로 각 microservice 를 등록, startAllMicroServices() 로 시작한다.
import { ConfigService } from '@nestjs/config'
import { NestFactory } from '@nestjs/core'
import { Transport } from '@nestjs/microservices'
import { AppModule } from '~/src/app.module'

async function bootstrap() {
  const app = await NestFactory.create(AppModule)

  const configService = app.get(ConfigService)
  const port = configService.get('NODE_SERVER_PORT')

  app.connectMicroservice({
    transport: Transport.KAFKA,
    options: {
      client: {
        brokers: [configService.get('KAFKA_BROKERS')],
      },
    },
  })
  
  await app.startAllMicroservices()
  await app.listen(port)
}
bootstrap()
  • 단일 Microservice 애플리케이션으로 구현할 시
async function bootstrap() {
  const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
    transport: Transport.KAFKA,
    options: {
      client: {
        brokers: ['localhost:9092'],
      }
    }
  })
  await app.listen()
}
bootstrap()

Producer

  • Module 에서 Client 설정
    • configService 는 dotenv 로 구현하므로 .env 에서 설정 지정
import { Module } from '@nestjs/common'
import { MongooseModule } from '@nestjs/mongoose'
import { CacheModule } from '@nestjs/cache-manager'
import { ConfigModule, ConfigService } from '@nestjs/config'
import { ClientsModule, Transport } from '@nestjs/microservices'

@Module({
  imports: [
    ClientsModule.registerAsync({
      isGlobal: false,
      clients: [{
        inject: [ConfigService],
        name: 'my_kafka_client', // injection 시 사용할 name
        useFactory: async (configService: ConfigService) => (
          {
            transport: Transport.KAFKA,
            options: {
              client: {
                clientId: configService.get('KAFKA_CLIENT_ID'),
                brokers: configService.get('KAFKA_BROKERS').split(','),
              },
              // producer 만 쓸 경우엔 consumer 설정 불필요
              consumer: {
                groupId: configService.get('KAFKA_CONSUMER_GROUP_ID'),
                sessionTimeout: configService.get('KAFKA_SESSION_TIMEOUT'),
                rebalanceTimeout: configService.get('KAFKA_REBALANCE_TIMEOUT')
              },
            },
          })
      }]
    }),
  ],
  controllers: [],
  providers: [MyProducerService],
  exports: []
})
export class MyKafkaModule { }
  • service 구현

@Injectable()
export class MyProducerService {
  constructor(
    @Inject('my_kafka_client') private readonly kafkaClient: ClientKafka
  ){ }

  sendMessage(message) {
    try {  
    this.kafkaClient.emit('my_topic_name', message)
    } catch {
      // ....
    }
  }
}

Consumer

import { Controller } from '@nestjs/common'
import { Ctx, KafkaContext, MessagePattern, Payload } from '@nestjs/microservices'

@Controller()
export class MyConsumerController {
  @MessagePattern('my_topic_name')
  readMessage(@Payload() message: any, @Ctx() context: KafkaContext) {
    // message, context.getMessage() 동일하게 message 접근 가능
    const originalMessage = context.getMessage()
    const response = originalMessage.value
    
    console.log(originalMessage.value)
    console.log(message)
    
    // 메시지 이외 context 정보
    console.log(context.getTopic())
    console.log(context.getArgs())
    console.log(context.getPartition())

    return response
  }
}
  • Module 에서 import 하여 사용
    • CLI nest g controller <name> 으로 생성시 자동으로 import
@Module({
  imports: [...],
  providers: [...],
  controllers: [MyConsumerController]
})
export class MyTestModule { }

reference

0개의 댓글