producer = KafkaProducer(
acks=0,
compression_type='gzip',
bootstrap_servers=['broker의 ip:9092'],
value_serializer=lambda x: dumps(x).encode('utf-8')
)
routine_instance = Routine.objects.get(routine_id=response.data['routine_id'])
data = {'message': f'[{routine_instance.title}] 루틴 등록 완료'}
producer.send('WEB_BOARD_REGISTER', value=data)
producer.flush()
# consumer.py
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'WEB_BOARD_REGISTER',
bootstrap_servers=['broker의 ip:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
consumer_timeout_ms=10000
)
print('[begin] get consumer list')
for message in consumer:
print("Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s" % (
message.topic, message.partition, message.offset, message.key, message.value))
print('[end] get consumer list')
다운
압축 해제
설정
zookeeper.connect=주키퍼IP:2181
bootstrap.servers=PLAINTEXT://카프카IP:9092
access.control.allow.origin=*
access.control.allow.methos=GET,POST,PUT,DELETE
access.control.allow.headers=origin,content-type,accept,authorization
실행
# producer.py
from kafka import KafkaProducer
from json import dumps
import time
producer = KafkaProducer(
acks=0,
compression_type='gzip',
bootstrap_servers=['200.200.200.10:9092'],
value_serializer=lambda x: dumps(x).encode('utf-8')
)
start = time.time()
for i in range(100):
data = {'str' : 'result'+str(i)}
producer.send('person', value=data)
producer.flush()
print("elapsed :", time.time() - start)