로컬에서 카프카와 통신 오류

문법식·2022년 8월 11일
0
post-thumbnail

아파치 카프카 애플리케이션 프로그래밍 with 자바라는 책을 공부하고 실습 중이었다. EC2 서버에 카프카 클러스터를 구성하고 로컬에서 카프카 클라이언트를 구성했다.

$ bin/kafka-broker-api-version.sh --bootstrap-server EC2 퍼블릭 IP:9092

로컬에서 위의 명령어를 실행하여 원격으로 카프카의 버전과 broker.id, rack 정보, 각종 카프카 옵션들을 확인하면 로컬 컴퓨터와 카프카의 연동이 완료된 것을 확인하는 실습이었다. 하지만 나는 명령어를 실행해도 아무것도 출력되지 않았고 계속 해서 응답을 기다리기만 하는 상황이었다. 원인을 파악하기 위해 카프카의 logs 디렉토리에서 여러 로그를 확인해봤는데 controller.log에서 아래와 같은 경고가 나고 있었다.

[2022-08-11 18:05:23,749] WARN [RequestSendThread controllerId=0] Controller 0's connection to broker EC2 퍼블릭 IP:9092 (id: 0 rack: null) was unsuccessful (kafka.controller.RequestSendThread)
java.net.SocketTimeoutException: Failed to connect within 30000 ms
        at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:289)
        at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:242)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)

경고 로그가 찍힌 시각이 내가 위의 명령어로 로컬에서 카프카와 통신하려고 시도했던 시각과 일치했다. 원인 파악하려고 구글링을 했는데 EC2의 보안 그룹 인바운드 규칙 설정을 내 로컬의 퍼블릭 IP만 허용하는 것이 아니라 모든 IP를 허용해야 한다는 글을 보았다.(아래에 링크를 첨부했다.) 또한 책에서는 EC2 보안 그룹 인바운드 규칙 설정 시 모든 IP를 허용했는데 나 혼자의 판단으로 내 로컬의 퍼블릭 IP만 허용해도 제대로 동작할 것 같다는 생각에 EC2 보안 그룹 인바운드 규칙 설정에서 내 로컬의 퍼블릭 IP만 허용한 것이 기억났다.

EC2 보안 그룹 인바운드 규칙 설정을 모든 IP 허용으로 변경했더니 해결됐다. 로컬에서 카프카와 통신이 제대로 되었고 카프카의 버전과 broker.id, rack 정보, 각종 카프카 옵션들이 로컬 화면에 제대로 출력되었다.

EC2의 보안 그룹 설정에 내 로컬의 퍼블릭 IP만 허용하면 위의 로그와 같이 컨트롤러가 브로커에 연결을 실패했다고 경고 로그를 찍는지 원인을 제대로 파악하기엔 카프카에 대한 지식이 너무 없고, 구글링을 해보아도 이유를 찾지 못했다. 추후에 카프카에 대한 지식이 쌓이고 글을 수정하도록 하겠다.

참고


2022-08-12
오늘 경고 로그 찍힌 상황을 재현하려고 EC2의 보안 그룹 인바운드 규칙 설정을 내 로컬의 퍼블릭 IP만 허용하게 변경했다. 그리고 로컬에서 카프카와 통신을 시도했는데 통신이 잘 된다. 어제 통신이 안 되고 경고 로그를 찍은 이유를 더 모르게 됐다.. 깨달은게 생기면 글을 수정하겠다.


2022-08-18
드디어 원인을 제대로 알았다. 오늘 카프카 클러스터를 다시 시작하고 나니 또다시 로컬과의 통신이 제대로 되지 않았다. 그래서 EC2의 인바운드 규칙을 모든 IP 허용으로 하니 제대로 되었다.

[2022-08-18 06:56:55,919] WARN [RequestSendThread controllerId=0] Controller 0's connection to broker EC2 퍼블릭 IP:9092 (id: 0 rack: null) was unsuccessful (kafka.controller.RequestSendThread)
java.net.SocketTimeoutException: Failed to connect within 30000 ms
        at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:289)
        at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:242)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)

controller.log를 확인하니 또다시 위와 같은 오류를 내면서 브로커에 연결을 못하고 있었다. 그래서 카프카 컨트롤러가 무슨 역할을 하는지 먼저 파악해보기로 했다.

  • https://devidea.tistory.com/71
    위의 블로그 글을 참고해보니 카프카 컨트롤러는 카프카 클러스터에서 하나의 브로커가 컨트롤러 역할을 하고 브로커의 상태 체크, 새 리더 선출 등의 역할을 한다는 것을 알 수 있었다.

또한, 경고 로그를 보니 카프카 컨트롤러가 브로커를 관리할 때 advertised.listeners에 내가 설정한 EC2의 퍼블릭 IP를 사용한다는 것을 알 수 있었다.

  • https://www.confluent.io/ko-kr/blog/kafka-listeners-explained/
    위 블로그 글을 참고해보니 listeners는 클라이언트가 카프카 브로커와 같은 네트워크 안에서 연결할 때 사용하는 것이고 advertised.listeners 카프카 브로커의 네트워크 외부에서 연결할 때 사용하는 것을 알 수 있었습니다.

나는 카프카 클러스터를 브로커 한 개와 주키퍼 한 개로 구성했는데 브로커와 주키퍼를 같은 EC2에 설치했다. 그래서 컨트롤러 역할을 하는 브로커도 하나고 관리 대상의 브로커도 자기 자신인 한개 뿐이다. 카프카 컨트롤러 역할을 하는 브로커가 advertised.listeners에 설정한 EC2 퍼블릭 IP로 관리해야 할 브로커로 요청이 갔는데 요청의 src IPdst IP 모두 동일한 EC2 퍼블릭 IP인 것이다. 위의 그림처럼 요청이 EC2를 나갔다가 다시 들어오는데 인바운드 규칙에는 내 로컬의 퍼블릭 IP만 허용했기 때문에 해당 요청이 들어오지 못했다. 그래서 controller.log에 컨트롤러가 브로커와 연결에 실패했다는 위에 첨부한 경고 로그를 찍은 것이다.

EC2의 인바운드 규칙에 EC2의 퍼블릭 IP를 추가하니까 해결이 됐다. 인바운드 규칙은 요청을 보낸 호스트의 IP를 검사하는데 요청을 보낸 호스트가 자기 자신이라도 규칙으로 추가해야 하는 것 같다.

profile
백엔드

2개의 댓글

comment-user-thumbnail
2022년 9월 12일

혹시 도움 요청 드려도 되나요 ?

1개의 답글