장고에 kafka 적용

최동혁·2023년 3월 21일
0

클라우드

목록 보기
16/18

로직 설명

  1. 게시글 작성하면 producer가 특정 topic으로 broker에게 메세지 전달
  2. broker는 큐에 메시지 저장
  3. consumer는 kafka와 연결되어 있는 rest_proxy에 자신의 ID 등록
  4. 자신의 저장 공간을 할당 받은 후, 특정 Topic 구독 등록
  5. 저장 공간 받은 url + records에 접근해서 구독한 Topic의 메시지 뽑아오기.

사전 준비

  • kafka(broker) 서버 구동
  • rest_proxy 서버 구동
  • pip install kafka-python
  • pip install requests

producer -> broker

게시글 작성

@login_required(login_url='/user/login')
def register(request):
    if request.method == "GET":
        boardForm = BoardForm()
        return render(request, 'board/register.html', {'boardForm': boardForm})
    elif request.method == "POST":
        boardForm = BoardForm(request.POST)
        if boardForm.is_valid():
            board = boardForm.save(commit=False)
            board.writer = request.user
            board.save()
            
            # 토픽 메시지 발행하는 코드
            producer = KafkaProducer(
                acks=0,
                compression_type='gzip',
                bootstrap_servers=['broker의 ip:9092'],
                value_serializer=lambda x: dumps(x).encode('utf-8')
            )

            data = {'message': request.user.username + ' 유저 : ' + board.title + ' 게시글 등록 완료'}
            producer.send('WEB_BOARD_REGISTER', value=data)
            producer.flush()

            return redirect('/')
  • kafka 모듈에서 지원하는 Kafkaproduer 클래스를 선언해서 produer에게 객체를 할당해준다.
  • 객체의 send 메소드에는 Topic과 data를 실어서 보낸다.

consumer -> rest_proxy

counsumer id 등록

result = requests.post("http://rest_proxy의 ip:8082/consumers/" + str(request.user.username),
                  data='{"name": "'+str(request.user.username)+'_instance", "format": "json", "auto.offset.reset": "earliest"}',
                  headers={'Content-Type': 'application/vnd.kafka.v2+json'})
  • rest_proxy 서버에 해당 user를 판별할 수 있는 값으로 consumer_id를 등록한다.
  • 일단 proxy 서버에서는 proxy의 ip:8082/consumers라는 url이 만들어진다.
  • 그리고 그 뒤에 유저를 판별할 수 있는 특정 id 값을 붙여서 data를 보내게된다면?
  • proxy의 ip:8082/consumers/판별 값/instances/판별값_instance 이 자동으로 만들어진다.

구독 등록

result = requests.post("http://rest_proxy의 ip:8082/consumers/"+str(request.user.username)+"/instances/"+str(request.user.username)+"_instance/subscription",
                  data='{"topics":["WEB_BOARD_REGISTER"]}',
                  headers={'Content-Type': 'application/vnd.kafka.v2+json'})
  • proxy에서 만들어준 할당 url에 subscription에 구독할 topic을 data로 실어서 보내면 해당 topic에 관한 메세지들을 받을 수 있다.

구독 후, 메세지 받기

  • 위에서 했던 과정들은 consumer로 등록을 한 후, 그곳에 WEB_BOARD_REGISTER라는 topic을 구독해서 주어진 특정 url에 메세지들을 받게 해준 것이다.
  • 그렇다면 특정 url로 접근을 해서 GET 메소드로 불러들여서 보여주는 html을 짜도록 하자.
{% extends 'base.html'%}
{% load static %}

{% block add_js %}
<script src="https://code.jquery.com/jquery-3.4.1.slim.min.js"
        integrity="sha384-J6qa4849blE2+poT4WnyKhv5vZF5SrPo0iEjwBvKU7imGFAV0wwj1yYfoRSJoZ+n"
        crossorigin="anonymous"></script>
{% endblock %}
{% block contents %}
<!-- Portfolio Grid-->
<section class="page-section bg-light" id="portfolio">
    <div class="container">
        <div class="text-center">
            <h2 class="section-heading text-uppercase">게시판 현황</h2>
            <h3 class="section-subheading text-muted">게시판 실시간 확인 페이지</h3>
        </div>
        <div class="row">

            <div id="content">
                <table id="topTable" class="table">
                    <thead>
                    <tr>
                        <th width="150">Topic</th>
                        <th width="100">Offset</th>
                        <th width="100">Message</th>
                    </tr>
                    </thead>
                    <tbody>

                    </tbody>
                </table>
            </div>

            <script type="text/javascript">

			function sendRequest() {
				var httpRequest = new XMLHttpRequest();
				httpRequest.onreadystatechange = function() {
					if (httpRequest.readyState == XMLHttpRequest.DONE && httpRequest.status == 200 ) {
						var result = JSON.parse(httpRequest.responseText);
						if (result.length != 0) {
							for(var i=0; i<result.length; i++) {
								var tableBody = $("#topTable").find("tbody");

								var row = $("<tr>").append(
										$("<td>").text(result[i]['topic']),
										$("<td>").text(result[i]['offset']),
										$("<td>").text(result[i]['value']['message'])
										);
								tableBody.append(row);
							}

						}
					}
				};
				// GET 방식으로 요청을 보내면서 데이터를 동시에 전달함.
				httpRequest.open("GET", "http://rest_proxy의 ip:8082/consumers/{{request.user}}/instances/{{request.user}}_instance/records", true);
				httpRequest.setRequestHeader("Accept", "application/vnd.kafka.json.v2+json");
				httpRequest.send();
			}


            function consume() {
				sendRequest();
				setTimeout(consume, 5000);

			};

			consume();

            </script>

        </div>
    </div>
</section>

{% endblock %}

과정

  • 아직 게시글을 작성하기 전이라 아무것도 메시지가 오지 않았다.

  • 게시글을 작성하면 위의 사진처럼 메시지가 온 것이 보여지게 된다.
profile
항상 성장하는 개발자 최동혁입니다.

0개의 댓글