로직 설명
- 게시글 작성하면 producer가 특정 topic으로 broker에게 메세지 전달
- broker는 큐에 메시지 저장
- consumer는 kafka와 연결되어 있는 rest_proxy에 자신의 ID 등록
- 자신의 저장 공간을 할당 받은 후, 특정 Topic 구독 등록
- 저장 공간 받은 url + records에 접근해서 구독한 Topic의 메시지 뽑아오기.
사전 준비
- kafka(broker) 서버 구동
- rest_proxy 서버 구동
- pip install kafka-python
- pip install requests
producer -> broker
게시글 작성
@login_required(login_url='/user/login')
def register(request):
if request.method == "GET":
boardForm = BoardForm()
return render(request, 'board/register.html', {'boardForm': boardForm})
elif request.method == "POST":
boardForm = BoardForm(request.POST)
if boardForm.is_valid():
board = boardForm.save(commit=False)
board.writer = request.user
board.save()
producer = KafkaProducer(
acks=0,
compression_type='gzip',
bootstrap_servers=['broker의 ip:9092'],
value_serializer=lambda x: dumps(x).encode('utf-8')
)
data = {'message': request.user.username + ' 유저 : ' + board.title + ' 게시글 등록 완료'}
producer.send('WEB_BOARD_REGISTER', value=data)
producer.flush()
return redirect('/')
- kafka 모듈에서 지원하는 Kafkaproduer 클래스를 선언해서 produer에게 객체를 할당해준다.
- 객체의 send 메소드에는 Topic과 data를 실어서 보낸다.
consumer -> rest_proxy
counsumer id 등록
result = requests.post("http://rest_proxy의 ip:8082/consumers/" + str(request.user.username),
data='{"name": "'+str(request.user.username)+'_instance", "format": "json", "auto.offset.reset": "earliest"}',
headers={'Content-Type': 'application/vnd.kafka.v2+json'})
- rest_proxy 서버에 해당 user를 판별할 수 있는 값으로 consumer_id를 등록한다.
- 일단 proxy 서버에서는 proxy의 ip:8082/consumers라는 url이 만들어진다.
- 그리고 그 뒤에 유저를 판별할 수 있는 특정 id 값을 붙여서 data를 보내게된다면?
- proxy의 ip:8082/consumers/판별 값/instances/판별값_instance 이 자동으로 만들어진다.
구독 등록
result = requests.post("http://rest_proxy의 ip:8082/consumers/"+str(request.user.username)+"/instances/"+str(request.user.username)+"_instance/subscription",
data='{"topics":["WEB_BOARD_REGISTER"]}',
headers={'Content-Type': 'application/vnd.kafka.v2+json'})
- proxy에서 만들어준 할당 url에 subscription에 구독할 topic을 data로 실어서 보내면 해당 topic에 관한 메세지들을 받을 수 있다.
구독 후, 메세지 받기
- 위에서 했던 과정들은 consumer로 등록을 한 후, 그곳에 WEB_BOARD_REGISTER라는 topic을 구독해서 주어진 특정 url에 메세지들을 받게 해준 것이다.
- 그렇다면 특정 url로 접근을 해서 GET 메소드로 불러들여서 보여주는 html을 짜도록 하자.
{% extends 'base.html'%}
{% load static %}
{% block add_js %}
<script src="https://code.jquery.com/jquery-3.4.1.slim.min.js"
integrity="sha384-J6qa4849blE2+poT4WnyKhv5vZF5SrPo0iEjwBvKU7imGFAV0wwj1yYfoRSJoZ+n"
crossorigin="anonymous"></script>
{% endblock %}
{% block contents %}
<section class="page-section bg-light" id="portfolio">
<div class="container">
<div class="text-center">
<h2 class="section-heading text-uppercase">게시판 현황</h2>
<h3 class="section-subheading text-muted">게시판 실시간 확인 페이지</h3>
</div>
<div class="row">
<div id="content">
<table id="topTable" class="table">
<thead>
<tr>
<th width="150">Topic</th>
<th width="100">Offset</th>
<th width="100">Message</th>
</tr>
</thead>
<tbody>
</tbody>
</table>
</div>
<script type="text/javascript">
function sendRequest() {
var httpRequest = new XMLHttpRequest();
httpRequest.onreadystatechange = function() {
if (httpRequest.readyState == XMLHttpRequest.DONE && httpRequest.status == 200 ) {
var result = JSON.parse(httpRequest.responseText);
if (result.length != 0) {
for(var i=0; i<result.length; i++) {
var tableBody = $("#topTable").find("tbody");
var row = $("<tr>").append(
$("<td>").text(result[i]['topic']),
$("<td>").text(result[i]['offset']),
$("<td>").text(result[i]['value']['message'])
);
tableBody.append(row);
}
}
}
};
httpRequest.open("GET", "http://rest_proxy의 ip:8082/consumers/{{request.user}}/instances/{{request.user}}_instance/records", true);
httpRequest.setRequestHeader("Accept", "application/vnd.kafka.json.v2+json");
httpRequest.send();
}
function consume() {
sendRequest();
setTimeout(consume, 5000);
};
consume();
</script>
</div>
</div>
</section>
{% endblock %}
과정
- 아직 게시글을 작성하기 전이라 아무것도 메시지가 오지 않았다.
- 게시글을 작성하면 위의 사진처럼 메시지가 온 것이 보여지게 된다.