본 문서는 Kafka 도입에 대한 고찰을 기록한 문서입니다.
본 서비스를 구현하면서 Kafka가 도입을 고려하게 되었습니다.
이유는 "알림 기능", "닉네임 변경 기능" 때문에 도입을 고려하게 되었습니다.
"알림 기능"은 내가 쓴 댓글에 대댓글이 등록되면 사용자에게 알림을 주는 기능입니다.
소켓 통신을 고려해보았지만, 소켓 세션을 저장한다는 개념이 자원을 점유하기 때문에 소켓은 배제하였습니다.
알림 기능은 사용자 클라이언트에서 서버에 "알림 유무 조회" 요청을 하면 알림 내용이 사용자에게 응답됩니다.
서버 아키텍쳐는 사용자 서버, Board 서버로 분산화가 되어있습니다.
"댓글 등록"은 Board 서버로 요청이 들어갑니다. Board 서버에 댓글이 등록되었을 떄, 사용자 서버로 데이터를 전달하여야 했습니다.
데이터를 전달하는 방법으로 다음과 같은 방법을 고려하였습니다.
3가지 모두 좋은 방법이라고 생각이 됩니다. 하지만 장단점이 존재하여 3번 "이벤트 큐"를 적용하게되었습니다.
1번은 서버에서 다른 서버의 정보를 많이 알아야해서 너무나 복잡한 서버구조가 된다고 판단되어 사용하지 않았습니다.
2번은 메세지 큐의 메세지를 1번 소모하면 메세지가 사라지는 문제 때문에 사용하지 않았습니다.
Board 서버는 MySql을 사용하여 댓글 및 대댓글 정보를 저장하고 있습니다.
Member 라는 Entity를 따로 명시하지 않았습니다. 이유는 사용자 서버는 이미 구축되어 따로 사용자에 대한 데이터를 명시하지 않아도 판단하였습니다.
그래서 Reply(댓글) 에 따로 객체를 매핑하여 구성하지 않고, Reply 객체에 프로퍼티로 저장하도록 구성하였습니다.
사용자가 닉네임 변경을 "사용자 서버"로 요청을 하게 되면, Board 서버에 등록된 모든 댓글에 프로퍼티를 수정하여야 하였습니다.
이 때도 위와 같은 이유 때문에 Kafka를 통한 publish - consume 구조로 변경 처리를 구성하였습니다.
사용자 서버 Event Publish -> Board 서버 Event Consume
memberEventPublisher.sendEventUpdateProfile(member);
public void sendEventUpdateProfile(Member memberEntity){
DtoOfUpdateNicknameEvent eventDto = DtoOfUpdateNicknameEvent.builder()
.id(memberEntity.getId())
.nickname(memberEntity.getNickname())
.build();
ObjectMapper om = new ObjectMapper();
try {
String json = om.writeValueAsString(eventDto);
this.kafkaTemplate.send(TOPIC, json);
}catch (Exception e){
throw new RuntimeException();
}
}
@KafkaListener(topics = ["profile"], groupId = "foo")
fun nicknameEvent(dto : String){
var gson = Gson()
var event = gson.fromJson(dto, EventOfUpdateNickname::class.java);
commentService.updateProfile(event)
replyService.updateProfile(event)
}
위와 같이 String 을 직렬화 -> 역직렬화 과정을 통하여 Event를 구성하였습니다.
Board 서버 event publish -> 사용자 서버 event consume
eventPublisher.publishEvent(replyEntity)
@Async
@EventListener
fun alarmComment(reply: Reply){
var dto = EventOfPublishReplyAlarm(
replyId = reply.id,
replyUserNickname = reply.userNickname,
commentId = reply.comment.id,
commentContent = reply.comment.content,
boardId = reply.comment.board.isbn,
commentUserId = reply.comment.userId,
createdAt = reply.createdAt,
page = reply.comment.page,
replyContent = reply.content
)
var json = objectMapper.writeValueAsString(dto)
kafkaTemplate.send("reply", json)
}
코드에서 살펴볼 수 있듯이, "내부 이벤트"를 적용하였습니다.
이유는 코드 상에서 내부 이벤트를 publish 하게 된다면 다른 특정한 종속이 필요없어지고, 코드가 깔끔해지는 효과를 기대하였습니다.
@KafkaListener(topics = "reply", groupId = "foo")
public void consume(String json) throws IOException {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.registerModule(new JavaTimeModule());
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
EventOfPublishReplyNotification dto = objectMapper.readValue(json, EventOfPublishReplyNotification.class);
memberService.updateAlarm(dto);
}
위와 같이 Event가 publish 되면 해당 이벤트를 소모하도록 구성하였습니다.
최종적으로 보면 양방향으로 publisher - subscriber 구조를 가지고 있습니다.