실시간 데이터는 업비트 API를 이용해 최근거래 체결 데이터를 전송하는 예제를 만들어봤다.
토픽삭제 커맨드
./kafka-topics.sh --delete --bootstrap-server kafka-01:9092,kafka-02:9092,kafka-03:9092 --topic topic-B
토픽 생성 커맨드
./kafka-topics.sh --create --replication-factor 1 --partithons 1 topic upbit --bootstrap-server kafka-01:9092,kafka-02:9092,kafka-03:9092
토픽 리스트 커맨드
./kafka-topics.sh --list --bootstrap-server kafka-01:9092,kafka-02:9092,kafka-03:9092
replication-factor : 생성할 Topic의 replication factor 수를 지정합니다.
Topic 단위로 replication factor를 설정한다 해서 Topic을 replication 하는 것이 아니라, Topic을 이루는 각각의 Partition을 replication 하는 것입니다.
partitions : 생성할 Topic의 Partition 수를 지정합니다.
하나의 토픽은 여러개의 파티션으로 나눌 수 있다.(병렬처리 -> 고성능)
이 부분을 이해 할 때 데이터를 분할 하는 방식이 elasticsearch와 유사한거 같다는 생각을 했다.
elasticsearch | kafka |
---|---|
Index | Topic |
primary shard | partitions |
replica shard | replication |
파티션은 늘릴 수 있지만 반대로 줄일 수는 없다.
프라이머리 샤드(elasticsearch)는 변경이 불가.
producer.py
from kafka import KafkaProducer
import requests
from json import dumps
producer = KafkaProducer(bootstrap_servers = ['kafka-01:9092','kafka-02:9092','kafka-03:9092'],
value_serializer=lambda x: dumps(x).encode('utf-8')
)
server_url = 'https://api.upbit.com'
headers = {'Content-Type': 'application/json;'}
res = requests.get(server_url + '/v1/trades/ticks?market=KRW-BTC&count=1', headers=headers)
res_str = str(res.json()[0])
producer.send('upbit', value=res.json())
producer.flush()
최근 거래 체결 내역 API 결과를 upbit 토픽으로 전송.
https://docs.upbit.com/reference/%EC%B5%9C%EA%B7%BC-%EC%B2%B4%EA%B2%B0-%EB%82%B4%EC%97%AD
consumer.py
from kafka import KafkaConsumer
from json import loads
import time
import datetime
#from datetime import datetime, timezone, timedelta
consumer = KafkaConsumer("upbit",
bootstrap_servers=['kafka-01:9092','kafka-02:9092','kafka-03:9092'], auto_offset_reset="earliest", # 어디서부터 값을 읽어올지 (earlest >가장 처음 latest는 가장 최근)
enable_auto_commit=True, # 완료되었을 떄 문자 전송
value_deserializer=lambda x: loads(x.decode('utf-8')), # 역직렬화 ( >받을 떄 ) ; 메모리에서 읽어오므로 loads라는 함수를 이용한다. // 직렬화 (보낼 떄)
#consumer_timeout_ms=1000 # 1000초 이후에 메시지가 오지 않으면 없는 >것으로 취급.
)
start = time.time() # 현재 시간
print("START= ", start)
for message in consumer:
topic = message.topic
partition=message.partition
offset=message.offset
value=message.value
timestamp=message.timestamp
datetimeobj=datetime.datetime.fromtimestamp(timestamp/1000)
utc_timestamp = value[0]['timestamp']
utc_datetime = datetime.datetime.utcfromtimestamp(utc_timestamp/1000)
kst_datetime = utc_datetime + datetime.timedelta(hours=9)
print(kst_datetime.strftime('%Y-%m-%d %H:%M:%S'))
#datetimeobj = datetime.datetime.fromtimestamp(kst_timestamp/1000)
print("Topic:{}, partition:{}, offset:{}, value:{}, datetimeobj:{}".format(topic,partition,offset,value,datetimeobj))
print("Elapsed time= ",(time.time()-start)) # 걸리는 시간
producer.py 실행 시 API를 호출하고 그 결과를 upbit 토픽으로 보내고
consumer.py를 실행 하면 해당 메시지를 print 한다.
producer.py
producer.py