[P1] 실시간 데이터 Kafka로 받기

ehwnghks·2023년 2월 27일
1

실시간 데이터는 업비트 API를 이용해 최근거래 체결 데이터를 전송하는 예제를 만들어봤다.

토픽삭제 커맨드

./kafka-topics.sh --delete --bootstrap-server kafka-01:9092,kafka-02:9092,kafka-03:9092 --topic topic-B

토픽 생성 커맨드

./kafka-topics.sh --create --replication-factor 1 --partithons 1 topic upbit --bootstrap-server kafka-01:9092,kafka-02:9092,kafka-03:9092

토픽 리스트 커맨드

./kafka-topics.sh --list --bootstrap-server kafka-01:9092,kafka-02:9092,kafka-03:9092
  • replication-factor : 생성할 Topic의 replication factor 수를 지정합니다.
    Topic 단위로 replication factor를 설정한다 해서 Topic을 replication 하는 것이 아니라, Topic을 이루는 각각의 Partition을 replication 하는 것입니다.

  • partitions : 생성할 Topic의 Partition 수를 지정합니다.
    하나의 토픽은 여러개의 파티션으로 나눌 수 있다.(병렬처리 -> 고성능)

이 부분을 이해 할 때 데이터를 분할 하는 방식이 elasticsearch와 유사한거 같다는 생각을 했다.

elasticsearchkafka
IndexTopic
primary shardpartitions
replica shardreplication

파티션은 늘릴 수 있지만 반대로 줄일 수는 없다.
프라이머리 샤드(elasticsearch)는 변경이 불가.

[python] producer.py, consumer.py 작성

producer.py

from kafka import KafkaProducer
import requests
from json import dumps

producer = KafkaProducer(bootstrap_servers = ['kafka-01:9092','kafka-02:9092','kafka-03:9092'],
                         value_serializer=lambda x: dumps(x).encode('utf-8')
                         )

server_url = 'https://api.upbit.com'
headers = {'Content-Type': 'application/json;'}
res = requests.get(server_url + '/v1/trades/ticks?market=KRW-BTC&count=1', headers=headers)

res_str = str(res.json()[0])

producer.send('upbit', value=res.json())
producer.flush()

최근 거래 체결 내역 API 결과를 upbit 토픽으로 전송.
https://docs.upbit.com/reference/%EC%B5%9C%EA%B7%BC-%EC%B2%B4%EA%B2%B0-%EB%82%B4%EC%97%AD

consumer.py

from kafka import KafkaConsumer
from json import loads
import time
import datetime
#from datetime import datetime, timezone, timedelta

consumer = KafkaConsumer("upbit",
                         bootstrap_servers=['kafka-01:9092','kafka-02:9092','kafka-03:9092'],                          auto_offset_reset="earliest", # 어디서부터 값을 읽어올지 (earlest >가장 처음 latest는 가장 최근)
                        enable_auto_commit=True, # 완료되었을 떄 문자 전송
                        value_deserializer=lambda x: loads(x.decode('utf-8')), # 역직렬화 ( >받을 떄 ) ; 메모리에서 읽어오므로 loads라는 함수를 이용한다. // 직렬화 (보낼 떄)
                        #consumer_timeout_ms=1000 # 1000초 이후에 메시지가 오지 않으면 없는 >것으로 취급.
                         )

start = time.time() # 현재 시간
print("START= ", start)
for message in consumer:
    topic = message.topic
    partition=message.partition
    offset=message.offset
    value=message.value
    timestamp=message.timestamp
    datetimeobj=datetime.datetime.fromtimestamp(timestamp/1000)
    utc_timestamp = value[0]['timestamp']
    utc_datetime = datetime.datetime.utcfromtimestamp(utc_timestamp/1000)
    kst_datetime = utc_datetime + datetime.timedelta(hours=9)
    print(kst_datetime.strftime('%Y-%m-%d %H:%M:%S'))
    #datetimeobj = datetime.datetime.fromtimestamp(kst_timestamp/1000)
    print("Topic:{}, partition:{}, offset:{}, value:{}, datetimeobj:{}".format(topic,partition,offset,value,datetimeobj))

print("Elapsed time= ",(time.time()-start)) # 걸리는 시간

producer.py 실행 시 API를 호출하고 그 결과를 upbit 토픽으로 보내고
consumer.py를 실행 하면 해당 메시지를 print 한다.

실행 결과 캡쳐

producer.py

producer.py


💡출처
https://jennana.tistory.com/165

profile
반갑습니다.

0개의 댓글