이 장에서는 카프카를 실습해보면서 카프카의 동작을 이해해보았다. 각 단계별로 실습을 정리했다.
AWS
의 EC2
1대로 카프카 클러스터를 구성했다. EC2
인스턴스를 생성한다.
인스턴스 생성을 완료하고 나면 인스턴스에 아래의 명령으로 자바를설치한다.
$ sudo yum install -y java-1.8.0-openjdk-devel.x86_64
https://archive.apache.org/dist/kafka/2.5.0/kafka_2.12-2.5.0.tgz
위의 링크로 카프카 바이너리 패키지를 다운로드하고 압축을 푼다.
카프카 브로커를 실행하기 위해서는 힙 메모리 설정이 필요하다. 내가 실습용으로 생성한 인스턴스(t2.micro
)는 1G
메모리를 가지고 있다. 그러나 카프카 패키지의 힙 메모리는 카프카 브로커 1G
, 주키퍼는 512MB
로 카프카 브로커와 주키퍼를 기본 설정으로 실행하면 1.5G
메모리가 필요하기 때문에 Cannot allocate memory
에러가 발생한다.
export
명령어를 사용해 힙 메모리 사이즈를 아래와 같이 미리 환경변수로 지정해서 실행하면 오류가 해결된다.
$ export KAFKA_HEAP_OPTS="-Xmx400m -Xms400m"
위의 경우 터미널 세션이 종료되고 나면 다시 초기화되어 재사용이 불가능하다. 그래서 위의 명령을 ~/.bashrc
에 추가하면 된다.
config/server.properties
로 카프카 브로커가 클러스터에 운영에 필요한 옵션들을 지정할 수 있다. 지금은 advertised.listener
만 설정한다. advertised.listener
는 카프카 클라이언트 또는 커맨드 라인 툴을 브로커와 연결할 때 사용한다. 현재 EC2
의 퍼블릭 IP
와 카프카 기본포트인 9092를 PLAINTEXT://
와 함께 붙여넣고 advertised.listener
의 주석을 해제하면 된다.
주키퍼는 카프카의 클러스터 설정 리더 정보, 컨트롤러 정보를 담고 있어 카프카를실행하는 데에 필요한 필수 애플리케이션이다. 아래의 명령으로 주키퍼를 실행하면 된다.
$ bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
아래의 명령으로 주키퍼가 실행됐는지 확인하면 된다.
$ jps -m
아래의 명령으로 카프카를 실행하면 된다.
$ bin/kafka-server-start.sh -daemon config/server.properties
카프카 브로커의 정상 동작 여부는 로그를 확인하면 된다. 카프카 클라이언트를 개발할 때뿐만 아니라 카프카 클러스터를 운영할 때 이슈가 발생할 경우 모두 카프카 브로커에 로그가 남는다. 그래서 문제가 생겼을 때 카프카 브로커가 실행되고 있는 로그를 확인하면 더 빠르게 문제를 해결할 수 있다.
로컬에도 카프카를 설치한다. 그리고 아래의 명령으로 카프카와 통신이 되는지 확인하면 된다.
$ bin/kafbroker-api-versions.sh --bootstrap-server EC2 퍼블릭 IP:9092
정상 작동하면 아래와 같은 응답을 받는다.
EC2 퍼블릭 IP:9092 (id: 0 rack: null) -> (
Produce(0): 0 to 8 [usable: 8],
Fetch(1): 0 to 11 [usable: 11],
ListOffsets(2): 0 to 5 [usable: 5],
Metadata(3): 0 to 9 [usable: 9],
LeaderAndIsr(4): 0 to 4 [usable: 4],
StopReplica(5): 0 to 2 [usable: 2],
UpdateMetadata(6): 0 to 6 [usable: 6],
ControlledShutdown(7): 0 to 3 [usable: 3],
OffsetCommit(8): 0 to 8 [usable: 8],
OffsetFetch(9): 0 to 7 [usable: 7],
FindCoordinator(10): 0 to 3 [usable: 3],
JoinGroup(11): 0 to 7 [usable: 7],
Heartbeat(12): 0 to 4 [usable: 4],
LeaveGroup(13): 0 to 4 [usable: 4],
SyncGroup(14): 0 to 5 [usable: 5],
DescribeGroups(15): 0 to 5 [usable: 5],
ListGroups(16): 0 to 3 [usable: 3],
SaslHandshake(17): 0 to 1 [usable: 1],
ApiVersions(18): 0 to 3 [usable: 3],
CreateTopics(19): 0 to 5 [usable: 5],
DeleteTopics(20): 0 to 4 [usable: 4],
DeleteRecords(21): 0 to 1 [usable: 1],
InitProducerId(22): 0 to 3 [usable: 3],
OffsetForLeaderEpoch(23): 0 to 3 [usable: 3],
AddPartitionsToTxn(24): 0 to 1 [usable: 1],
AddOffsetsToTxn(25): 0 to 1 [usable: 1],
EndTxn(26): 0 to 1 [usable: 1],
WriteTxnMarkers(27): 0 [usable: 0],
TxnOffsetCommit(28): 0 to 3 [usable: 3],
DescribeAcls(29): 0 to 2 [usable: 2],
CreateAcls(30): 0 to 2 [usable: 2],
DeleteAcls(31): 0 to 2 [usable: 2],
DescribeConfigs(32): 0 to 2 [usable: 2],
AlterConfigs(33): 0 to 1 [usable: 1],
AlterReplicaLogDirs(34): 0 to 1 [usable: 1],
DescribeLogDirs(35): 0 to 1 [usable: 1],
SaslAuthenticate(36): 0 to 2 [usable: 2],
CreatePartitions(37): 0 to 2 [usable: 2],
CreateDelegationToken(38): 0 to 2 [usable: 2],
RenewDelegationToken(39): 0 to 2 [usable: 2],
ExpireDelegationToken(40): 0 to 2 [usable: 2],
DescribeDelegationToken(41): 0 to 2 [usable: 2],
DeleteGroups(42): 0 to 2 [usable: 2],
ElectLeaders(43): 0 to 2 [usable: 2],
IncrementalAlterConfigs(44): 0 to 1 [usable: 1],
AlterPartitionReassignments(45): 0 [usable: 0],
ListPartitionReassignments(46): 0 [usable: 0],
OffsetDelete(47): 0 [usable: 0]
)
카프카 클러스터를 구성 완료했다. 그 다음엔 토픽을 생성하고 메세지를 보내보고 컨슈머 그룹을 생성해보는 실습을 진행했다. 실전 아파치 카프카 책을 실습할 때 해본 것이지만 다시 복습한다는 생각으로 해보면서 카프카의 동작에 대해서 좀 더 이해할 수 있었다.