kafka.kafka

유현민·2024년 9월 23일
0

kafka

목록 보기
13/13

Kafka.scala 파일은 Kafka 서버를 시작하는 메인 진입점을 정의합니다. 이 파일은 kafka 패키지에 속하며, 운영에 필요한 여러 유틸리티 클래스와 라이브러리를 가져옵니다.

Kafka 객체는 로깅 기능을 제공하는 Logging 트레이트를 확장합니다. getPropsFromArgs 메서드는 명령줄 인수를 파싱하여 서버 속성을 로드하는 역할을 합니다. 이 메서드는 joptsimple 라이브러리의 OptionParser를 사용하여 --override라는 선택적 인수를 정의하며, 이는 server.properties 파일에 지정된 속성을 덮어쓸 수 있습니다. 인수가 제공되지 않으면 사용법 정보를 출력하고 종료합니다.

val optionParser = new OptionParser(false)
val overrideOpt = optionParser.accepts("override", "Optional property that should override values set in server.properties file")
  .withRequiredArg()
  .ofType(classOf[String])

main 메서드는 애플리케이션의 진입점입니다. 먼저 getPropsFromArgs를 호출하여 서버 속성을 로드합니다. 그런 다음, 이 속성을 사용하여 KafkaServerStartable 인스턴스를 생성합니다. 이 메서드는 또한 Windows 또는 IBM JDK 환경을 제외하고 종료 시그널을 로깅하기 위한 시그널 핸들러를 등록하려고 시도합니다.

val serverProps = getPropsFromArgs(args)
val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)

JVM이 종료될 때 Kafka 서버가 정상적으로 종료되도록 보장하기 위해 종료 훅이 추가됩니다. 이는 런타임의 종료 훅에 새로운 스레드를 추가하여, kafkaServerStartable 인스턴스의 shutdown 메서드를 호출합니다.

Runtime.getRuntime().addShutdownHook(new Thread("kafka-shutdown-hook") {
  override def run(): Unit = kafkaServerStartable.shutdown()
})

마지막으로, startup 메서드를 호출하여 Kafka 서버를 시작하고, awaitShutdown을 호출하여 서버가 종료될 때까지 메인 스레드를 블록합니다. 이 과정에서 치명적인 예외가 발생하면 로그에 기록되고 애플리케이션은 비정상 종료합니다.

kafkaServerStartable.startup()
kafkaServerStartable.awaitShutdown()
profile
smilegate

0개의 댓글