[카프카 핵심 가이드] Chapter 05. 프로그램 내에서 코드로 카프카 관리하기

Falco·2023년 11월 27일
0
post-thumbnail

프로그램 내에서 코드로 카프카 관리리하기

카프카에서는 AdminClient를 활용해 토픽 목록 조회, 생성, 삭제, 클러스터 상세 정보 확인, ACL 관리 등 다양한 기능을 제공합니다.

이전까지는 토픽이 있는 걸 확인할 수 없어서 producer.send()메서드 내에서 UNKNOWN_TOPIC_OR_PARTITION예외가 발생하는 걸 잡아서 처리하거나 하였습니다. AdminClient는 이런 문제접을 해결합니다.

5.1 AdminClient 개요

AdminClinet.createTopics메서드는 토픽이 생성될 떄 까지 기다리거나, 생성 상태를 확인 후 토픽 상태 설정을 가져올 수 있습니다.

하지만 카프카 컨트롤러부터 브로커로의 메타데이터 전파는 비동기적으로 이루어지기에 AdminClient가 전달하는 정보는 실시간으로 동기화될 수는 없지만, 언젠가는 동기화 됩니다.

5.2 Admin Client 사용법: 생성, 설정, 닫기

val props = Properties().apply {
	put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
}
val admin = AdminClient.create(props)
admin.close(Duration.ofSeconds(30))

close()를 실행할 때는 아직 진행중인 작업이 있을 수 있다는 것을 유의해야 합니다. close()는 30초동안 모든 작업을 기다리고, 그 이후의 작업에 대해서는 예외를 발생하게 됩니다.

그렇다면 눈여겨볼만한 설정은 무엇이 있을까요?

client.dns.lookup

카프카는 호스트명을 기준으로 연결을 검증하고 해석하고 생성합니다. 이 과정 중 DNS를 사용하는 경우와 IP를 활용하는 경우 각각의 설정이 필요할 수 있습니다.

  1. DNS를 활용하는 경우

all-brokers.hostname.com을 활용하면 쉽게 연결할 수 있지만, SASL인증을 활요할 경우 문제가 생길 수 있습니다.

SASL은 Kafka 프로토콜이 데이터 교환 과정에서 Kafka가 지원하는 Kerberos, PLAIN, SCRAM, OAUTHBEARER 등의 메커니즘을 사용하여 인증/인가를 할 수 있도록 해주며, 인증/인가 교환이 성공했을 때, 후속 데이터 교환을 데이터 보안 계층 위에서 할 수 있도록 해 주는 기술 입니다.

또한 특정 브로커만을 연결할 수 있도록 할 수 있지만, 다음과 같이 브로커 DNS가

broker1.hostname.com
broker2.hostname.com
broker3.hostname.com

일치하지 않을 경우 resolve_cannonical_bootstrap_servers_only설정을 수행하면 됩니다. 이는 DNS를 펼치게 적용하기에 DNS 별칭에 포함된 모든 브로커 이름을 일일히 부트스트랩 서버 목록에 넣어 준 것과 동일하게 작동합니다.

  1. 다수의 IP주소로 연결되는 DNS 이름을 사용하는 경우

하나의 DNS가 다양한 IP로 연결될 수 있기에 use_all_dns_ips를 활용하면 안전하게 클라이언트에서 연결할 수 있습니다.

request.timeout.ms

AdminClient의 응답을 기다리는 최대 값을 지정합니다.

5.3 토픽 관리 기능

val topics = admin.listTopics()
topics.names.forEach { println(it) }

adminClinet는 모든 응답을 Future객체로 감싸어 리턴합니다. 따라서 적절하게 Future객체를 처리하는 것이 필요합니다.

또한 토픽 리스트 출력 뿐만아니라, 파티션 리스트, 토픽 생성, 삭제 등도 제공합니다.

어플리케이션에서의 토픽 삭제는 돌이킬 수 없습니다. 주의하여 사용하기를 권장합니다.

5.4 설정 관리

ConfigResource객체를 활용하여 설정 관리를 진행할 수 있습니다.

5.5 컨슈머 그룹 관리

카프카는 컨슈머 그룹마다 이전에 데이터를 읽어서 처리한 것과 완전히 동일한 순서로 데이터를 재처리할 수 있게 해줍니다. 어플리케이션 내에서 데이터 재처리 기능을 미리 구현해놓았다는 의미입니다.

AdminClinet를 활용하면 컨슈머 그룹과 이 그룹들이 커밋한 오프셋을 조회하고 수정할 수 있습니다.

admin.listConsumerGroups().valid()

조회 방법은 책의 소스를 참고하도록 하고 그렇다면 컨슈머 그룹의 무엇을 수정할 수 있을까요?

  • 그룹 삭제
  • 멤버 제외
  • 커밋된 오프셋 삭제 혹은 변경

등의 기능을 제공합니다. 이 중 가장 많이 쓰이는 것은 오프셋 변경 기능이 가장 자주 사용됩니다.

오프셋 삭제는 컨슈머를 맨 처음부터 실행시키는 가장 간단한 방법으로 보일 수 있지만, 이는 컨슈머 설정에 의존적입니다. 컨슈머가 시작됐는데 커밋된 오프셋을 못 찾을 경우 컨슈머는 토픽의 맨 앞에서 부터 처리를 시작하게 됩니다. (컨슈머의 리셋)

이러한 오프셋 토픽의 변경은 컨스머 그룹에 변경 여부는 전달되지 않습니다.

컨슈머 그룹은 컨슈머가 새로운 파티션을 할당 받거나 새로 시작할 때만 오프셋을 토픽에 저장된 값을 읽어올 뿐입니다. 컨슈머 그룹이 돌아가고 있는 상태에서 오프셋을 변경하고자 한다면 UnknownMemeberIdException이 발생하게 됩니다.

5.7 고급 어드민 작업

  1. 토픽에 파티션 추가하기

토픽에 파티션을 추가할 때 어플리케이션 데이터들이 깨질 수 있습니다. 따라서 토픽 용량 한계를 늘리기 위해 파티션을 늘리는 상황은 드뭅니다.

  1. 토픽에서 특정 레코드 삭제하기

  2. 리더 선출 규칙 변경

이는 선호 리더 선출 기법과 언클린 리더 선출 기법이 존재합니다.

  1. 레플리카 재할당

레플리카를 하나의 브로커에서 다른 브로커로 옮기는 것은 재할당을 하는것과 동일합니다.

5.8 카프카 테스트하기

아파치 카프카는 원하는 수만큼의 브로커를 설정해서 초기화할 수 있는 MockAdminClient테스트 클래스를 제공합니다. 이 클래스를 활요하여 실제 어드민 작업을 수행할 필요 없이 어플리케이션이 작동하는지 테스트할 수 있습니다.

책에서는 Mock 카프카 객체가 특정 토픽을 가지는지, 토픽이 정상적으로 만들어지는지에 대한 테스트를 진행합니다.

참고 자료

https://devfunny.tistory.com/768

https://velog.io/@limsubin/Kafka-SASLPALIN-%EC%9D%B8%EC%A6%9D-%EC%84%A4%EC%A0%95%EC%9D%84-%ED%95%B4%EB%B3%B4%EC%9E%90

profile
강단있는 개발자가 되기위하여

0개의 댓글