아파치 카프카 애플리케이션 프로그래밍 with 자바
라는 책을 공부하고 실습 중이었다. EC2
서버에 카프카 클러스터를 구성하고 로컬에서 카프카 클라이언트를 구성했다.
$ bin/kafka-broker-api-version.sh --bootstrap-server EC2 퍼블릭 IP:9092
로컬에서 위의 명령어를 실행하여 원격으로 카프카의 버전과 broker.id
, rack
정보, 각종 카프카 옵션들을 확인하면 로컬 컴퓨터와 카프카의 연동이 완료된 것을 확인하는 실습이었다. 하지만 나는 명령어를 실행해도 아무것도 출력되지 않았고 계속 해서 응답을 기다리기만 하는 상황이었다. 원인을 파악하기 위해 카프카의 logs
디렉토리에서 여러 로그를 확인해봤는데 controller.log
에서 아래와 같은 경고가 나고 있었다.
[2022-08-11 18:05:23,749] WARN [RequestSendThread controllerId=0] Controller 0's connection to broker EC2 퍼블릭 IP:9092 (id: 0 rack: null) was unsuccessful (kafka.controller.RequestSendThread)
java.net.SocketTimeoutException: Failed to connect within 30000 ms
at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:289)
at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:242)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
경고 로그가 찍힌 시각이 내가 위의 명령어로 로컬에서 카프카와 통신하려고 시도했던 시각과 일치했다. 원인 파악하려고 구글링을 했는데 EC2
의 보안 그룹 인바운드 규칙 설정을 내 로컬의 퍼블릭 IP
만 허용하는 것이 아니라 모든 IP
를 허용해야 한다는 글을 보았다.(아래에 링크를 첨부했다.) 또한 책에서는 EC2
보안 그룹 인바운드 규칙 설정 시 모든 IP
를 허용했는데 나 혼자의 판단으로 내 로컬의 퍼블릭 IP
만 허용해도 제대로 동작할 것 같다는 생각에 EC2
보안 그룹 인바운드 규칙 설정에서 내 로컬의 퍼블릭 IP
만 허용한 것이 기억났다.
EC2
보안 그룹 인바운드 규칙 설정을 모든 IP
허용으로 변경했더니 해결됐다. 로컬에서 카프카와 통신이 제대로 되었고 카프카의 버전과 broker.id
, rack
정보, 각종 카프카 옵션들이 로컬 화면에 제대로 출력되었다.
EC2
의 보안 그룹 설정에 내 로컬의 퍼블릭 IP
만 허용하면 위의 로그와 같이 컨트롤러가 브로커에 연결을 실패했다고 경고 로그를 찍는지 원인을 제대로 파악하기엔 카프카에 대한 지식이 너무 없고, 구글링을 해보아도 이유를 찾지 못했다. 추후에 카프카에 대한 지식이 쌓이고 글을 수정하도록 하겠다.
참고
2022-08-12
오늘 경고 로그 찍힌 상황을 재현하려고 EC2
의 보안 그룹 인바운드 규칙 설정을 내 로컬의 퍼블릭 IP
만 허용하게 변경했다. 그리고 로컬에서 카프카와 통신을 시도했는데 통신이 잘 된다. 어제 통신이 안 되고 경고 로그를 찍은 이유를 더 모르게 됐다.. 깨달은게 생기면 글을 수정하겠다.
2022-08-18
드디어 원인을 제대로 알았다. 오늘 카프카 클러스터를 다시 시작하고 나니 또다시 로컬과의 통신이 제대로 되지 않았다. 그래서 EC2
의 인바운드 규칙을 모든 IP
허용으로 하니 제대로 되었다.
[2022-08-18 06:56:55,919] WARN [RequestSendThread controllerId=0] Controller 0's connection to broker EC2 퍼블릭 IP:9092 (id: 0 rack: null) was unsuccessful (kafka.controller.RequestSendThread)
java.net.SocketTimeoutException: Failed to connect within 30000 ms
at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:289)
at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:242)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
controller.log
를 확인하니 또다시 위와 같은 오류를 내면서 브로커에 연결을 못하고 있었다. 그래서 카프카 컨트롤러가 무슨 역할을 하는지 먼저 파악해보기로 했다.
또한, 경고 로그를 보니 카프카 컨트롤러가 브로커를 관리할 때 advertised.listeners
에 내가 설정한 EC2
의 퍼블릭 IP
를 사용한다는 것을 알 수 있었다.
listeners
는 클라이언트가 카프카 브로커와 같은 네트워크 안에서 연결할 때 사용하는 것이고 advertised.listeners
카프카 브로커의 네트워크 외부에서 연결할 때 사용하는 것을 알 수 있었습니다.나는 카프카 클러스터를 브로커 한 개와 주키퍼 한 개로 구성했는데 브로커와 주키퍼를 같은 EC2
에 설치했다. 그래서 컨트롤러 역할을 하는 브로커도 하나고 관리 대상의 브로커도 자기 자신인 한개 뿐이다. 카프카 컨트롤러 역할을 하는 브로커가 advertised.listeners
에 설정한 EC2
퍼블릭 IP
로 관리해야 할 브로커로 요청이 갔는데 요청의 src IP
와 dst IP
모두 동일한 EC2
퍼블릭 IP
인 것이다. 위의 그림처럼 요청이 EC2
를 나갔다가 다시 들어오는데 인바운드 규칙에는 내 로컬의 퍼블릭 IP
만 허용했기 때문에 해당 요청이 들어오지 못했다. 그래서 controller.log
에 컨트롤러가 브로커와 연결에 실패했다는 위에 첨부한 경고 로그를 찍은 것이다.
EC2
의 인바운드 규칙에 EC2
의 퍼블릭 IP
를 추가하니까 해결이 됐다. 인바운드 규칙은 요청을 보낸 호스트의 IP
를 검사하는데 요청을 보낸 호스트가 자기 자신이라도 규칙으로 추가해야 하는 것 같다.
혹시 도움 요청 드려도 되나요 ?