from kafka import KafkaProducer
import time
producer = KafkaProducer(
bootstrap_servers=['200.200.200.5:9092']
)
start = time.time()
for i in range(100):
producer.send('testing', value="받아랏".encode("utf-8")) # 토픽은 testing
producer.flush()
print("elapsed :", time.time() - start)
vi /etc/logstash/conf.d/kafka.conf
input {
kafka {
bootstrap_servers => "200.200.200.5:9092" # 파이썬이 카프카에게 보냈으니 여기서 받아야함
group_id => "logstash"
topics => ["testing"]
consumer_threads => 1
}
}
output {
elasticsearch {
hosts => ["http://200.200.200.70:9200"] # 엘라스틱여기에 저장할 것이고
index => "kafka-test-%{+YYYY-MM-dd}" # 인덱스는 kafka-test~ 으로 할 것임
}
}
/usr/share/elasticsearch/bin/elasticsearch-plugin install analysis-nori
systemctl restart elasticsearch
/usr/share/elasticsearch/bin/elasticsearch-plugin install file:///root/hanhinsam-0.1.zip
systemctl restart elasticsearch