# confluent
Spring Boot로 Kafka Cloud에 연결하여 Json 데이터 주고 받기 (Callback 함수 및 Shutdown Hook)
서론 지난 글에서 Kafka cloud에 연결하여 String으로 메시지를 주고 받았었다. 이번에는 Json으로 데이터를 주고 받는 코드를 구현할 것이다. 만약 Kakfa Cloud에 연결하기 위해 Configuration 설정하는 법이 알고 싶다면 지난 글을 참고하시길 바란다. https://velog.io/@millwheel/Spring-Boot로-Kafka-cloudConfluent-Conduktor-등에-연결하여-데이터-주고-받기-Configuration-방법-2가지 또한 추가적으로 producer에서 메시지를 보낸 후 받은 callback을 확인해 볼 것이다. Spring에서 Kafka를 사용하는 예문을 검색해보면 대부분의 자료에서 callback을 Listenablefuture에 담는 것을 볼 수 있는데, 해당 클래스는 Spring Boot 3부터는 deprecated된 기능이라 Spring Boot 3부터는 지원이 되지 않는다. Spring Boot 3에서는
Spring Boot로 Kafka cloud(Confluent, Conduktor 등)에 연결하여 데이터 주고 받기 (Configuration 방법 2가지)
서론 spring Boot를 사용하여 kafka에 연결하여 데이터를 주고 받으려고 Reference 자료들을 검색해보는데, 대부분의 자료들이 localhost:9092에 접속하여 로컬 컴퓨터에 있는 Kafka를 사용하는 예시를 보여주고 있었다. 현재 수많은 Kafka Cloud들이 존재하고 편리한 UI를 제공하여 데이터를 확인하기 좋게 편의성을 제공하고 있는데, Kafka Cloud에 연결하는 Spring 예문을 찾아보기 어려운 점은 상당히 아쉽게 느껴졌다. localhost에 접속할 때와는 Configuration 설정에서 조금 다른 부분이 있기 때문이다. 실제로 kafka를 직접 로컬 서버에 띄워서 사용하기보다는 cloud를 통해 데이터를 관리하는 경우가 많으므로, cloud에 연결하여 데이터를 주고받는 Spring 예문이 필요하다고 생각되어 해당 글을 작성해본다. 본 글을 참고하면 Configuration이 잘못되어 낭비되는 시간을 최대한 줄일 수 있을 것이다. (
[Apache Kafka] Kafka REST Proxy란?
공식 문서 REST Proxy 란? Confluent에서 제공하는 Apache Kafka 클러스터를 위한 RESTful 인터페이스입니다. 네이티브 Kafka 프로토콜이나 Kafka Connect, Kafka Client를 사용하지 않고도 REST API 를 통해 카프카 클러스터에 메세지를 전달하고, 클러스터의 상태를 모니터링 및 관리할 수 있습니다. 특징 MetaData 브로커, 토픽, 파티션, Config 등 카프카 클러스터에 대한 대부분의 메타 데이터를 GET 요청 방식으로 조회할 수 있습니다. Producers API가 특정 토픽이나 파티션으로 들어오는 Produce Requests를 받아, 작은 프로듀서 풀을 통해 라우팅 시킵니다. Producer 인스턴스는 공유되기 때문에 요청별로 Produce

[Kafka] Debezium Postgresql Source Connector 를 활용한 CDC.
TL;DR kafka를 활용해 Debezium Postgresql Source Connector를 사용한 CDC 환경을 만들어 보려고 했는데 처음 하다보니 debezium docs나 kafka docs들이 돌이켜보니 잘돼있었지만, 시간이 꽤나 걸렸다. 이를 기록해서 나와 같은 분들이 있다면 빠르게 초기 세팅을 할 수 있으면 좋겠다. CDC ( Change Data Capture ) CDC는 변경된 데이터를 캡쳐한다는 의미로 우리가 특정 Source Database 에서 Insert, Delete, Update 가 일어난 데이터들을 저장하여 다른 Target System으로 전달하는 것을 말한다. 기존에는 Batch 작업을 통해서 일정 주기마다 변경사항들을 전달하는 방식들이 있었는데 이를 realtime에 가깝게 할 수 있도록 하는 대체 방안으로 현재 많이 사용되고 있다. 이러한 CDC를 직접 application으로 개발하여서 할 수도 있겠지만, 이미 Provider들
[Confluent] Transformation 을 활용한 message key 변경.
TL;DR kafka는 message 의 key 값이 정해져 있지 않다면 round-robin 방식으로 각 파티션에 데이터들이 분배되게 된다. 이렇게 되면 데이터의 순서 보장이 되지 않을 수 있는데 이런 경우 message의 key를 지정해준다면, 해당 key 값에 해당하는 partition으로 분배되게 된다. kafka connect 의 개념중 하나인 transformation을 사용해서 전달되는 record들의 key 값을 설정해줄 수 있는다. 이 밖에도 message의 schema를 제거하거나 predefined돼 있는 field를 제거하는 등의 작업을 통해서 message를 좀 더 간단하게 만들어 줄 수 있다. 뿐만 아니라 필요하다면 field를 추가해서 사용도 가능하다. Transformation 각각의 제공자들 별로 다양한 transforms 기능들이 있다. 각 제공자들은 본인들의 connect에서 사용할 수 있는 transforms에 대해서 documentat
Confluent Certified Developer for Apache Kafka(CCDAK) 후기
CCDAK 합격 후기 시험비용: 150달러 문항수 : 60문제, 1시간 30분 all 영어 (감독관 대화 및 시험) 준비기간 : 4개월 떨어지면 2주간 재시험 불가 본인영어실력 : 초등생 수준(요즘 초등생보다 못할수도) 내용: 카프카와 confleunt 관련의 전반적인 내용 도움받은자료 : 유데미(ccdak , ccoak 시험자료) confluent에서 제공하는 간단한 문제 인프런 카프카 강의(all) 책 카프카 핵심가이드ver1 *실전 카프카 개발부터 운영까지 (데이터플랫폼의 중추 아파치 카프카의 내부동작과 개발,운영,보안의 모든것) ------ 제일도움많이됨 카프카 스트림즈와 KSQLDB confluent docs kafka docs 추천 방법 영어가 된다면 docs 또는 confluent에서 제공하는 pdf 추천 udemy 의 ccdak exam (manuel꺼) 영어가 된다
Confluent Certified Administrator for Apache Kafka(CCAAK) 후기
1. 시험정보 앞전의 CCDAK와 동일하나 문항수만 다르다. (총 40문항) > 2. 후기 CCDAK와 약간의 범위 차이만 있는듯하다. 개인적으론 CCDAK가 더 어려웠다.(CCAAK가 더어렵다는 분도 있다고 들었다) 끝
kafka connect sink config
kafka confluent의 sink connector config 설정 출처: Kafka Connect Sink Configuration Properties for Confluent Platform JDBC Sink Connector Configuration Properties name 전역으로 사용될 유니크한 sink connector의 이름 type: non-empty String co

Kubernetes Kafka 세팅 및 Confluent Kafka 사용해보기
나는 GCP에서 GKE를 구성하여 GKE 클러스터에 카프카를 세팅해볼 것이다. (여기서 쓰이는 yaml파일들은 특정 클라우드 환경에 종속되지 않기 때문에 편한 환경에서 진행해보면 된다.) GKE 구성 현재 진행하려 하는 것은 굳이 자동확장까지 할 필요는 없으므로 Standard를 선택한다. 여기서 이름, 리전, 그리고 노드에 관한 설정을 해주고 싶으면

confluent prometheus 연동
https://www.confluent.io/ko-kr/blog/bring-your-own-monitoring-with-confluent-cloud/ 이 링크에 시킨대로 한다. 나는 사용하는게 stable/prometheus 인데 이 경우 extraScrapeConfigs 에 추가해야 prometheus.yml가 변경된다고 하는데.. https://awskrug.github.io/eks-workshop/monitoring/deploy-prometheus/ 여기 있는 prometheus-values.yaml 를 사용할 경우 이상하게 반영이 안 된다. 근데 extraScrapeConfigs 바로 윗 줄에 가 prometheus.yml 내용물이다보니 그 바로밑에 를 추가해주면 된다. scrape_interval 이 1분이다보니 다른 것들처럼 15초마다 업데이트 되지 않는데, 이게 컨플루언트 설정이라 어떻게 할 수가 없다. 수동 쿼리는 1시간에 80번 치면 429 띄우며 막는댄
confluent cloud metrics api 에서 겪은 오류
https://docs.confluent.io/cloud/current/monitoring/metrics-api.html#query-for-average-consumer-lag-over-the-last-hour-grouped-by-topic-and-consumer-group 를 따라하는데 데이터가 [] 으로만 나오는 것이다! 저기 적혀있는대로 복붙을 했는데 말이다. 문의를 해 보았더니 답변은 아래와 같다. 즉 granularity 가 intervals 보다 작아야 한다는 것이다.

Apache Kafka 개요
Kafka란 > 보다 큰 규모의 데이터를 모으고, 처리하고, 저장하고 받아서 전달하는 이벤트 스트리밍 플랫폼. 데이터를 받아서 전달하는 데이터 버스 역할을 한다. 즉, 대량의 데이터를 분산 관리해주는 미들웨어이다. 이벤트 스트리밍 : ??? 데이터 스트리밍 : ??? 미들웨어 : 운영 체제에서 제공하지 않는 일반적인 서비스와 기능을 애플리케이션에 제공하는 소프트웨어. 이벤트 : 소프트웨어 또는 응용 프로그램에서 식별하거나 기록 하는 모든 유형의 작업, 사건 또는 변경 사항. 핵심 개념 > 높은 처리량, 빠른 응답 속도, 안정성 카프카를 사용하는 대표적인 이유는 위의 세 가지를 보장해주기 때문이다. 이를 보장해주는 구체적인 카프카의 핵심 개념은 아래와 같다. 분산 시스템 : 네트워크 상에서 연결된 컴퓨터들의 그룹. 높은 성능을 낼 수 있으며 장애 대응과 시스템 확장에 용이함. 페이지 캐시 : OS의 페이지 캐시를 활용하여 처리량을 높인다
Confluent CLI 사용해보기 (feat. 어떨 때 사용하면 좋을까)
Confluent CLI 를 사용해보고 어떤 목적으로 사용하는 것이 좋을지 판단하기 위해 테스트 해보았다 (사전 설정) Confluent Kafka Rest * confluent cloud에서 rest endpoint를 제공해주는데 confluent cli와 함께 직접적으로 이 endpoint를 사용하는 방법은 찾지 못했다. 그래서 로컬에서 rest 서버를 띄워서 우회하여 사용하는 방법을 사용하였다. 로컬에서 confluent cloud로 confluent rest kafka를 사용하기 위해서는 Connecting REST Proxy to Confluent Cloud 의 내용대로 REST Proxy 을 실행시켜 두어야 한다. confluent platform 다운로드 및 압축 풀기 confluent platform을 다운

Kafka를 얕고 넓게
Kafka에 대한 전반적인 내용 내가 현재까지 알고 있는것들에 대한 (22.01.10 기준) 생각해 보니 카프카에 대한 지식 보단 카프카를 어떻게 사용하는지에 대한 내용을 기술 할 예정이다. (카프카 고유에 대한 기술적 지식은 아직 더 공부가 필요한듯..) Kafka Cluster Kafka broker가 1개 이상 있는 broker의 묶음을 kafka cluster 라고 하는데 해당 클러스터에는 zookeeper가 필수적으로 존재해야한다. 정확히 Zookeeper가 어떤 원리로 동작하는진 잘 모르나 zookeeper는 브로커들의 관리와 메타데이터들의 관리를 해준다고 한다. Kafka Broker Kafka broker는 흔히 카프카라고도 불린다. 카프카 브로커가 실질적으로 produer로 부터 message를 받고 consumer가 가져 갈 수 있는 환경을 만들기 때문이다. 하나의 브로커는 복제를 할 수 있다. 이때 leader, sl