[kafka] 실습

최동혁·2023년 3월 21일
0

클라우드

목록 보기
15/18

자신의 장고 프로젝트에 입혀보기

  • 필자는 기본적인 루틴을 생성하는 API가 있기 때문에, 생성하는 곳에 밑의 코드를 넣었다.
  • 그래서 루틴을 생성하게 되면, 해당 broker에게 생성 메세지를 날리고, consumer로 받게끔 실습을 할 예정이다.
  • 물론 producer는 장고이다.

producer

producer = KafkaProducer(
            acks=0,
            compression_type='gzip',
            bootstrap_servers=['broker의 ip:9092'],
            value_serializer=lambda x: dumps(x).encode('utf-8')
        )

routine_instance = Routine.objects.get(routine_id=response.data['routine_id'])

data = {'message': f'[{routine_instance.title}] 루틴 등록 완료'}
producer.send('WEB_BOARD_REGISTER', value=data)
producer.flush()

  • 루틴 생성 API를 호출한다.
  • 제목은 유효성 테스트라는 루틴이다.

consumer

# consumer.py
from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'WEB_BOARD_REGISTER',
    bootstrap_servers=['broker의 ip:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='my-group',
    value_deserializer=lambda x: json.loads(x.decode('utf-8')),
    consumer_timeout_ms=10000
)

print('[begin] get consumer list')

for message in consumer:
    print("Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s" % (
    message.topic, message.partition, message.offset, message.key, message.value))

print('[end] get consumer list')

  • 유효성 테스트라는 제목을 가진 루틴 등록 완료라고 메세지를 broker에게서 빼온 것을 확인할 수 있다!

proxy 설치

  • 다운

  • 압축 해제

    • unzip confluent-community-5.5.0-2.12.zip
  • 설정

    • vi [압축푼 폴더]/etc/kafka-rest/kafka-rest.properties
    		zookeeper.connect=주키퍼IP:2181
    		bootstrap.servers=PLAINTEXT://카프카IP:9092
    
    		access.control.allow.origin=*
    		access.control.allow.methos=GET,POST,PUT,DELETE
    		access.control.allow.headers=origin,content-type,accept,authorization
  • 실행

    • [압축푼 폴더]/bin/kafka-rest-start etc/kafka-rest/kafka-rest.properties

proxy 서버 테스트

  • putty로 세션 하나 더 생성
  • curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
    --data '{"name": "routine_consumer", "format": "json", "auto.offset.reset": "earliest"}' \
    http://localhost:8082/consumers/routine

producer

# producer.py
from kafka import KafkaProducer
from json import dumps
import time

producer = KafkaProducer(
    acks=0,
    compression_type='gzip',
    bootstrap_servers=['200.200.200.10:9092'],
    value_serializer=lambda x: dumps(x).encode('utf-8')
)

start = time.time()

for i in range(100):
    data = {'str' : 'result'+str(i)}
    producer.send('person', value=data)
    producer.flush()

print("elapsed :", time.time() - start)

다시 proxy 서버

profile
항상 성장하는 개발자 최동혁입니다.

0개의 댓글