Kafka 시작하기

📝 1yangsh·2021년 4월 1일
0

Kafka

아파치 소프트웨어 재단이 스칼라로 개발한 오픈 소스 메시지 브로커 프로젝트

  • 실시간 데이터 피드를 관리하기 위한 통일된, 높은 처리량, 낮은 지연시간을 지닌 플랫폼을 제공하는 것이 목적

  • Kafka는 발행-구독(publish-subscribe) 모델을 기반으로 동작하며 크게 producer, consumer, broker로 구성

kafka 구성요소

kafka 시작하기

windows는 bin\windows~
macOS, linux, unix 계열은 bin\~
Kafka 설치 링크

  • zookeeper 서버 설치

    • bin\windows\zookeeper-server-start.bat config\zookeeper.properties
  • Kafka 실행

    • bin\windows\kafka-server-start.bat config\server.properties
  • topic 생성

    • bin\windows\kafka-topics.bat --create --topic <topic명> --bootstrap-server localhost:9092
  • topic 리스트 확인

    • bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092
    • bin\windows\kafka-topics.bat --describe --topic <topic명> --bootstrap-server localhost:9092
  • consumer 실행

    • bin\windows\kafka-console-consumer.bat --topic <topic명> --from-beginning --bootstrap-server localhost:9092
    • bin\windows\kafka-console-producer.bat --topic <topic명> --bootstrap-server localhost:9092
  • kafka 브로커에서 topic 삭제

    • bin\windows\kafka-topics.bat --delete --topic <topic명> --bootstrap-server localhost:9092
  • zookeeper에서 topic 삭제

    • bin\windows\kafka-topics.bat --delete --topic <topic명> --zookeeper localhost:2181
  • python 모듈 설치

    • pip install kafka-python
  • python을 이용하여 data queuing

    • kafka_consumer.py
    from kafka import KafkaConsumer
    from json import loads
    import time
    
    consumer = KafkaConsumer('quickstart-events',
                             bootstrap_servers=['127.0.0.1:9092'],
                             auto_offset_reset='earliest',
                             enable_auto_commit=True,
                             group_id='my-group',
                             # value_deserializer=lambda x: loads(x.decode('utf-8')),
                             consumer_timeout_ms=1000)
    
    start = time.time()
    
    for message in consumer:
        topic = message.topic
        partition = message.partition
        offset = message.offset
        key = message.key
        value = message.value
        print("Topic:{},Partition:{},offset:{},key:{},value:{}".format(
            topic, partition, offset, key, value))
    
    print("Elasped: ", (time.time() - start))
  • kafka_producer.py
    from kafka import KafkaProducer
    from json import dumps
    import time
    
    # dict (key, value) -> object
    # str -> string
    
    producer = KafkaProducer(acks=0,
                             compression_type='gzip',
                             bootstrap_servers=['127.0.0.1:9092'],
                             value_serializer=lambda x: dumps(x).encode('utf-8'))
    
    start = time.time()
    for i in range(10):
        # data = {'name': 'Yang-' + str(i)}
        data = {"schema": {"type": "struct", "fields": [{"type": "int32", "field": "id"}, {"type": "string", "field": "user_id"}, {"type": "string", "field": "pwd"}, {"type": "string", "field": "NAME"}, {
            "type": "int64", "name": "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "created_at"}], "name": "users"}, "payload": {"id": 10, "user_id": "new_test10", "pwd": "new_pwd10", "NAME": "NEW TEST USER10", "created_at": 1615349727000}}
    
        producer.send('my_topic_users', value=data)
        producer.flush()
    
    print("Done, Elasped time: ", (time.time() - start))
  • 코드 실행

    • python kafka_consumer.py
    • python kafka_producer.py
profile
개발 경험 저장소

0개의 댓글