Kafka.scala
파일은 Kafka 서버를 시작하는 메인 진입점을 정의합니다. 이 파일은 kafka
패키지에 속하며, 운영에 필요한 여러 유틸리티 클래스와 라이브러리를 가져옵니다.
Kafka
객체는 로깅 기능을 제공하는 Logging
트레이트를 확장합니다. getPropsFromArgs
메서드는 명령줄 인수를 파싱하여 서버 속성을 로드하는 역할을 합니다. 이 메서드는 joptsimple
라이브러리의 OptionParser
를 사용하여 --override
라는 선택적 인수를 정의하며, 이는 server.properties
파일에 지정된 속성을 덮어쓸 수 있습니다. 인수가 제공되지 않으면 사용법 정보를 출력하고 종료합니다.
val optionParser = new OptionParser(false)
val overrideOpt = optionParser.accepts("override", "Optional property that should override values set in server.properties file")
.withRequiredArg()
.ofType(classOf[String])
main
메서드는 애플리케이션의 진입점입니다. 먼저 getPropsFromArgs
를 호출하여 서버 속성을 로드합니다. 그런 다음, 이 속성을 사용하여 KafkaServerStartable
인스턴스를 생성합니다. 이 메서드는 또한 Windows 또는 IBM JDK 환경을 제외하고 종료 시그널을 로깅하기 위한 시그널 핸들러를 등록하려고 시도합니다.
val serverProps = getPropsFromArgs(args)
val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
JVM이 종료될 때 Kafka 서버가 정상적으로 종료되도록 보장하기 위해 종료 훅이 추가됩니다. 이는 런타임의 종료 훅에 새로운 스레드를 추가하여, kafkaServerStartable
인스턴스의 shutdown
메서드를 호출합니다.
Runtime.getRuntime().addShutdownHook(new Thread("kafka-shutdown-hook") {
override def run(): Unit = kafkaServerStartable.shutdown()
})
마지막으로, startup
메서드를 호출하여 Kafka 서버를 시작하고, awaitShutdown
을 호출하여 서버가 종료될 때까지 메인 스레드를 블록합니다. 이 과정에서 치명적인 예외가 발생하면 로그에 기록되고 애플리케이션은 비정상 종료합니다.
kafkaServerStartable.startup()
kafkaServerStartable.awaitShutdown()