카프카에서는 AdminClient
를 활용해 토픽 목록 조회, 생성, 삭제, 클러스터 상세 정보 확인, ACL 관리 등 다양한 기능을 제공합니다.
이전까지는 토픽이 있는 걸 확인할 수 없어서 producer.send
()메서드 내에서 UNKNOWN_TOPIC_OR_PARTITION
예외가 발생하는 걸 잡아서 처리하거나 하였습니다. AdminClient
는 이런 문제접을 해결합니다.
AdminClinet.createTopics
메서드는 토픽이 생성될 떄 까지 기다리거나, 생성 상태를 확인 후 토픽 상태 설정을 가져올 수 있습니다.
하지만 카프카 컨트롤러부터 브로커로의 메타데이터 전파는 비동기적으로 이루어지기에 AdminClient
가 전달하는 정보는 실시간으로 동기화될 수는 없지만, 언젠가는 동기화 됩니다.
val props = Properties().apply {
put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
}
val admin = AdminClient.create(props)
admin.close(Duration.ofSeconds(30))
close()
를 실행할 때는 아직 진행중인 작업이 있을 수 있다는 것을 유의해야 합니다. close()
는 30초동안 모든 작업을 기다리고, 그 이후의 작업에 대해서는 예외를 발생하게 됩니다.
그렇다면 눈여겨볼만한 설정은 무엇이 있을까요?
카프카는 호스트명을 기준으로 연결을 검증하고 해석하고 생성합니다. 이 과정 중 DNS를 사용하는 경우와 IP를 활용하는 경우 각각의 설정이 필요할 수 있습니다.
all-brokers.hostname.com
을 활용하면 쉽게 연결할 수 있지만, SASL
인증을 활요할 경우 문제가 생길 수 있습니다.
SASL은 Kafka 프로토콜이 데이터 교환 과정에서 Kafka가 지원하는 Kerberos, PLAIN, SCRAM, OAUTHBEARER 등의 메커니즘을 사용하여 인증/인가를 할 수 있도록 해주며, 인증/인가 교환이 성공했을 때, 후속 데이터 교환을 데이터 보안 계층 위에서 할 수 있도록 해 주는 기술 입니다.
또한 특정 브로커만을 연결할 수 있도록 할 수 있지만, 다음과 같이 브로커 DNS가
broker1.hostname.com
broker2.hostname.com
broker3.hostname.com
일치하지 않을 경우 resolve_cannonical_bootstrap_servers_only
설정을 수행하면 됩니다. 이는 DNS를 펼치게 적용하기에 DNS 별칭에 포함된 모든 브로커 이름을 일일히 부트스트랩 서버 목록에 넣어 준 것과 동일하게 작동합니다.
하나의 DNS가 다양한 IP로 연결될 수 있기에 use_all_dns_ips
를 활용하면 안전하게 클라이언트에서 연결할 수 있습니다.
AdminClient
의 응답을 기다리는 최대 값을 지정합니다.
val topics = admin.listTopics()
topics.names.forEach { println(it) }
adminClinet
는 모든 응답을 Future
객체로 감싸어 리턴합니다. 따라서 적절하게 Future
객체를 처리하는 것이 필요합니다.
또한 토픽 리스트 출력 뿐만아니라, 파티션 리스트, 토픽 생성, 삭제 등도 제공합니다.
어플리케이션에서의 토픽 삭제는 돌이킬 수 없습니다. 주의하여 사용하기를 권장합니다.
ConfigResource
객체를 활용하여 설정 관리를 진행할 수 있습니다.
카프카는 컨슈머 그룹마다 이전에 데이터를 읽어서 처리한 것과 완전히 동일한 순서로 데이터를 재처리할 수 있게 해줍니다. 어플리케이션 내에서 데이터 재처리 기능을 미리 구현해놓았다는 의미입니다.
즉
AdminClinet
를 활용하면 컨슈머 그룹과 이 그룹들이 커밋한 오프셋을 조회하고 수정할 수 있습니다.
admin.listConsumerGroups().valid()
조회 방법은 책의 소스를 참고하도록 하고 그렇다면 컨슈머 그룹의 무엇을 수정할 수 있을까요?
등의 기능을 제공합니다. 이 중 가장 많이 쓰이는 것은 오프셋 변경 기능이 가장 자주 사용됩니다.
오프셋 삭제는 컨슈머를 맨 처음부터 실행시키는 가장 간단한 방법으로 보일 수 있지만, 이는 컨슈머 설정에 의존적입니다. 컨슈머가 시작됐는데 커밋된 오프셋을 못 찾을 경우 컨슈머는 토픽의 맨 앞에서 부터 처리를 시작하게 됩니다. (컨슈머의 리셋)
이러한 오프셋 토픽의 변경은 컨스머 그룹에 변경 여부는 전달되지 않습니다.
컨슈머 그룹은 컨슈머가 새로운 파티션을 할당 받거나 새로 시작할 때만 오프셋을 토픽에 저장된 값을 읽어올 뿐입니다. 컨슈머 그룹이 돌아가고 있는 상태에서 오프셋을 변경하고자 한다면 UnknownMemeberIdException
이 발생하게 됩니다.
토픽에 파티션을 추가할 때 어플리케이션 데이터들이 깨질 수 있습니다. 따라서 토픽 용량 한계를 늘리기 위해 파티션을 늘리는 상황은 드뭅니다.
토픽에서 특정 레코드 삭제하기
리더 선출 규칙 변경
이는 선호 리더 선출 기법과 언클린 리더 선출 기법이 존재합니다.
레플리카를 하나의 브로커에서 다른 브로커로 옮기는 것은 재할당을 하는것과 동일합니다.
아파치 카프카는 원하는 수만큼의 브로커를 설정해서 초기화할 수 있는 MockAdminClient
테스트 클래스를 제공합니다. 이 클래스를 활요하여 실제 어드민 작업을 수행할 필요 없이 어플리케이션이 작동하는지 테스트할 수 있습니다.
책에서는 Mock
카프카 객체가 특정 토픽을 가지는지, 토픽이 정상적으로 만들어지는지에 대한 테스트를 진행합니다.