[위드마켓 개발기] 리뷰 작성을 처리하는게 이렇게 어렵다고?!

Doccimann·2022년 8월 11일
0
post-thumbnail

🚀 이번에 소개할 주제는....!

이번에 소개할 주제는 ShopReview를 작성하면서 겪은 어려움 입니다.

사실 shopReview를 작성한다고 하면 고려할 사항은 여럿 있습니다.

  1. 실제 존재하는 shop에 대해서 리뷰를 쓰고있는게 맞아?
  2. 글 내용은 200자로 제한을 해야할 것 같은데?
  3. 리뷰 평점은 0 초과 10 미만으로 검증을 해야한다.
  4. review가 작성이 되면 dynamoDB, Redis에 모두 반영이 되어야한다.
  5. review가 작성이 되면 shop에 reviewCount, averageScore 까지 변화가 전파되어야한다.

위의 고려사항과, 아키텍처적으로 겪은 문제점들을 위주로 한번 저의 삽질(?)을 소개하겠습니다.


🤔 Validator를 어디에 위치시킬건가?

우선 저는 위드마켓 서버를 작성함에 있어 명확한 목표가 있습니다.

  1. 역할과 책임에 따라서 명확한 시스템 모듈화가 이뤄져야한다.
  2. 코드에는 응집도가 높아야한다. 즉, 한 클래스에 많은 참조가 발생하여 import가 뚱뚱해지는건 싫다.
  3. 도메인, 다시 말하면 테이블간에는 최대한 연결이 없어야한다. 즉, 도메인 간에 디커플링을 지향하여 추후에 도메인 별로 시스템을 분리할 필요가 있을 때 어려움이 없어야한다.

위의 사항을 모두 고려한 아키텍처를 소개해보겠습니다.

글씨가 쬐끄만건 좀 죄송합니다.

여기서 아키텍처에 대해서 설명하기에는 너무 읽는데 피로가 있을 것 같아, 제 프로젝트의 깃허브 주소를 공유하겠습니다.

위드마켓 가게 노출 시스템

우선 여기서 첫번째로 마주한 문제점이 발생합니다. review를 작성함에 있어 review의 대상이 있는 shop이 존재하는지를 검사해야하는데, 조회 로직은 모두 application-query에서 돌아가고, 이를 application-command에서 조회할 방법은 마땅치 않습니다.

저는 이를 webClient를 사용해서 조회하는 방식을 채택하였는데요, 코드는 아래와 같습니다.

private fun isExistsShop(shopId: String, shopName: String): Mono<Boolean> {
        return shopWebClient.get()
            .uri(
                UriComponentsBuilder
                    .fromUriString("/v2/shop/simple")
                    .queryParam("id", shopId)
                    .queryParam("name", shopName)
                    .toUriString()
            ).retrieve()
            .bodyToMono(Results.SingleResult::class.java)
            .doOnError {
                throw ShopNotFoundException("Shop is not found!!")
            }
            .map { result -> result.success }
    }

webClient를 이용해서 동작중인 query server에서 조회를 요청한 뒤, 이를 비동기적으로 처리를 해내는 모습입니다.

그리고 마주하는 두번째 문제인데요, review에 들어온 요청을 검증함에 있어 review도 들리고, shop도 들리고 검증 로직이 분산이 되는 현상이 벌어집니다. 이렇게 되면 검증 로직이 응집성을 가지지 못하고 흩어지는 효과를 보여서 코드적으로 좋지 못하기 때문에, 이를 한군데 모으기로 결정했습니다.

validator를 만드는 방식으로 노선을 결정하였는데요, 어차피 webflux의 functional endpoint는 request가 Mono 타입으로 들어오기 때문에 @Valid 어노테이션이 통하지도 않기 때문에 필요했던 클래스였을겁니다.

이제 문제는, 이러한 validator를 어디다가 둘거냐의 문제입니다. 저는 이를 domain module에 위치시키기로 결정을 하였는데요, 근거는 다음과 같습니다.

여기서 validator는 application logic은 모르지만 domain과 관련된 business logic에는 참여하기 때문에 domain module에 validator를 위치시키기로 결정하였습니다.

따라서 domain module의 패키지는 아래 구조로 구성됩니다.

그리고 저는 ShopReviewValidator를 아래와 같이 작성을 해주었는데요, 코드만 간단하게 소개하고 지나가겠습니다.

🔨 ShopReviewValidator

@Component
class ShopReviewValidator(
    private val urlComponent: ServerUrlsInterface
) : Validator {

    private val baseUrl = urlComponent.SHOP_QUERY_SERVER_URL

    private val shopWebClient = WebClient
        .builder()
        .uriBuilderFactory(WebClientHelper.uriBuilderFactory(baseUrl))
        .baseUrl(baseUrl)
        .build()

    // 해당 리뷰가 생성 가능한지 검증하는 메소드
    suspend fun validateCreatable(shopReview: ShopReview) = with(shopReview) {
        validateFirst(this) // 우선 필드를 모두 검증한다

        // WebClient를 이용해서 해당 shop이 존재하는지 여부만 뽑아온다
        val shopResultMono: Mono<Boolean> = isExistsShop(shopId, shopName)
        val shopResultDeferred = CoroutinesUtils.monoToDeferred(shopResultMono)

        // shop이 존재하지 않는 경우 예외를 발생시킨다
        check(shopResultDeferred.await()) {
            throw ShopNotFoundException("shop review에 대응하는 shop이 존재하지 않습니다.")
        }
    }

    override fun supports(clazz: Class<*>): Boolean {
        return ShopReview::class.java.isAssignableFrom(clazz)
    }

    // reviewId, reviewTitle, shopId, shopName, reviewContent : 비어있는지 검증
    // reviewContent는 200자 이상으로는 못 쓰도록 검증한다
    override fun validate(target: Any, errors: Errors) {
        // reviewId, reviewTitle, shopId, shopName, reviewContent : 비어있는지 검증
        listOf("reviewId", "reviewTitle", "shopId", "shopName", "reviewContent").forEach { fieldName ->
            ValidationUtils.rejectIfEmpty(errors, fieldName, "field.required", "${fieldName}이 제공되지 않았습니다.")
        }

        val review = target as ShopReview

        // reviewContent의 길이를 200으로 제한한다
        if (review.reviewContent.length > 200) {
            errors.rejectValue("reviewContent", "field.max.length", "review의 내용은 200을 넘어서는 안됩니다.")
        }

        // reivewScore가 0점인 경우 제한한다
        if(review.reviewScore <= 0 || review.reviewScore > 10) {
            errors.rejectValue("reviewScore", "field.value.range", "review score은 무조건 0 초과 10 이하입니다.")
        }
    }

    // 기본적으로 검증해야하는 메소드
    private fun validateFirst(shopReview: ShopReview) = with(shopReview) {
        val errors = BeanPropertyBindingResult(this, ShopReview::class.java.name)
        validate(this, errors)

        // 기본 조건들을 만족하지 못하면 exception을 터뜨린다
        check(errors == null || errors.allErrors.isEmpty()) {
            throw RequestFieldException(errors.allErrors.toString())
        }
    }

    private fun isExistsShop(shopId: String, shopName: String): Mono<Boolean> {
        return shopWebClient.get()
            .uri(
                UriComponentsBuilder
                    .fromUriString("/v2/shop/simple")
                    .queryParam("id", shopId)
                    .queryParam("name", shopName)
                    .toUriString()
            ).retrieve()
            .bodyToMono(Results.SingleResult::class.java)
            .doOnError {
                throw ShopNotFoundException("Shop is not found!!")
            }
            .map { result -> result.success }
    }
}

🤔 이제 Repository는 어떻게 구성할 것인가?

검증이 끝났다면, 다음 차례는 repository를 통해 db에 접근을 해야합니다.

그런데 저는 아키텍처적으로 고민을 한가지 하였습니다.

👉 도메인 모듈은 도메인만을 보관해야지, DB에 직접 접근하는 로직을 여기에 두는건 도메인 주도 아키텍처의 정신에는 맞지 않을것 같다.

따라서 저는 domain module에는 DB 접근 로직을 두지 않고, infrastructure layer로 Repository 구현체를 두기로 결정하였습니다.

어차피 Repository interface를 domain module에 두고, 구현체를 infrastructure에 둬도 문제가 될것은 없는게, 어차피 DIP 원칙에 의거하여 의존성은 domain -> 상위 레이어 방향으로 흐르기도 하고, 어차피 상위 레이어는 interface 타입을 참조하지, 구현체를 참조하지는 않기 때문입니다.

따라서 아래의 방식으로 구현을 하였습니다.

🔨 ShopReviewDynamoRepository.kt <- domain에 위치함

interface ShopReviewDynamoRepository {

    // review를 하나 생성하는 메소드
    fun createReviewAsync(shopReview: ShopReview): Mono<Void>
}

🔨 ShopReviewDynamoRepositoryImpl.kt <- infrastructure에 위치함

@Repository
class ShopReviewDynamoRepositoryImpl(
    private val dynamoDbEnhancedAsyncClient: DynamoDbEnhancedAsyncClient
): ShopReviewDynamoRepository {
    val asyncTable: DynamoDbAsyncTable<ShopReview> =
        dynamoDbEnhancedAsyncClient.table("shop_review", TableSchema.fromBean(ShopReview::class.java))

    // review를 하나 생성하는 메소드
    override fun createReviewAsync(shopReview: ShopReview): Mono<Void> {
        val reviewFuture = asyncTable.putItem(shopReview)

        return Mono.fromFuture(reviewFuture)
    }

    /** Key를 반환하는 private method
     * @param reviewId Partition Key of shop_review
     * @param reviewTitle Sort Key of shop_review
     * @return key of sjop_review table
     */
    private fun generateKey(reviewId: String, reviewTitle: String): Key = Key.builder()
        .partitionValue(reviewId)
        .sortValue(reviewTitle)
        .build()
}

여기까지만 했다면, review 생성 자체의 구현은 거의 끝났습니다. 상위 레이어에서는 이걸 차례대로 처리만 하면 되니까요.

Service logic에서는 review를 검증하고, 생성후 저장하고, 그 상위 레이어에서는 들어온 request 자체를 검증하고 service layer로 request를 내리고, 그런 일련의 과정만 거치면 끝나니까요. (이에 대해서는 깃허브 코드를 참조해주세요!)

하지만 진짜 문제는 이제 시작됩니다. 😭


🚀 이제 이벤트를 전파해보자. 그리고 처리해보자.

review가 생성됐습니다. 여기서 끝일까요? 절대 아닙니다. 위에서 언급한 고려사항 4, 5번도 만족해야합니다.

그런데 저는 위에서 언급했듯이, 도메인간에는 어떠한 커플링 현상도 용납하지 않을겁니다.

따라서 저는 review가 작성되어 shop에다가 reviewCount를 늘리고, averageScore를 조작하는데 있어서 shopDynamoRepository 라던가, 절대 참조하지 않고 이를 수행해야합니다.

따라서 저는 review가 생성이 되면 이에 관한 이벤트를 전파해서 consumer 측에서 처리를 하도록 구현을 해볼겁니다.

일단은 event를 produce까지 하는걸로 handler를 작성하였습니다.

🔨 ShopReviewCommandHandler

@Component
class ShopReviewCommandHandler(
    private val shopReviewCommandService: ShopReviewCommandService,
    private val shopReviewKafkaTemplate: KafkaTemplate<String, ShopReview>,
    private val reviewCountEventKafkaTemplate: KafkaTemplate<String, ShopCommand.ReviewCountEventDto>,
    private val resultFactory: ResultFactory
) {

    // shopReview를 하나 생성하는 메소드
    suspend fun createReview(request: ServerRequest): ServerResponse = coroutineScope {
        // 비동기적으로 reviewDto를 body로부터 뽑아온다
        val reviewCreateDto = request.bodyToMono(ShopReviewCommand.CreateDto::class.java)
            .awaitSingleOrNull()

        // body가 유실되어있는지 검증
        checkNotNull(reviewCreateDto) {
            throw RequestBodyLostException("Body is lost!!")
        }

        // TODO service의 createReview 로직 작성
        val createdReview = shopReviewCommandService.createReview(reviewCreateDto)

        // Kafka에 이벤트를 전파하는 로직
        with(createdReview) {
            // 생성된 review를 redis에서 처리하도록 이벤트 발행
            shopReviewKafkaTemplate.send(KafkaTopics.shopReviewCreateTopic, this)

            // review가 생성되었음을 shop table로 전파
            reviewCountEventKafkaTemplate.send(
                KafkaTopics.reviewCountEventTopic, ShopCommand.ReviewCountEventDto(
                    shopId, shopName, true, reviewScore
                )
            )
        }

        return@coroutineScope ok()
            .contentType(MediaType.APPLICATION_JSON)
            .bodyValueAndAwait(resultFactory.getSuccessResult())
    }
}

여기서 주목할 부분은, 이 부분이 되겠습니다.

with(createdReview) {
    // 생성된 review를 redis에서 처리하도록 이벤트 발행
    shopReviewKafkaTemplate.send(KafkaTopics.shopReviewCreateTopic, this)

    // review가 생성되었음을 shop table로 전파
    reviewCountEventKafkaTemplate.send(
    KafkaTopics.reviewCountEventTopic, ShopCommand.ReviewCountEventDto(
                    shopId, shopName, true, reviewScore
        )
    )
}

여기서 저는 Kafka로 이벤트를 전파하였는데요, 저는 Kafka로 이벤트를 전송할 때, 최대한 zero payload 정책에 맞춰서 이벤트를 전파하고 싶었습니다. 따라서 저는 새로운 DTO를 선언해서 이벤트를 전송하였습니다. 바로 ReviewCountEventDto 인데요, 클래스 숫자가 늘어나는 것을 방지하기 위해서 이를 ShopCommand 라는 Sealed Class로 감싸서 처리하였습니다.

🔨 ShopCommand

sealed class ShopCommand {
    /** Shop을 생성하는데 사용하는 dto class
     * @since 22.07.24
     */
    data class ShopCreateDto(
        @JsonProperty("shop_name") var shopName: String,
        @JsonProperty("open_time") var openTime: LocalTime,
        @JsonProperty("close_time") var closeTime: LocalTime,
        @JsonProperty("lot_number_address") var lotNumberAddress: String,
        @JsonProperty("road_name_address") var roadNameAddress: String,
        @JsonProperty("latitude") var latitude: Double,
        @JsonProperty("longitude") var longitude: Double,
        @JsonProperty("shop_description") var shopDescription: String,
        @JsonProperty("is_branch") var isBranch: Boolean,
        @JsonProperty("branch_name") var branchName: String? = null,
        @JsonProperty("shop_category") var shopCategory: Category,
        @JsonProperty("shop_detail_category") var shopDetailCategory: DetailCategory,
        @JsonProperty("main_image_url") var mainImageUrl: String?,
        @JsonProperty("representative_image_url") var representativeImageUrlList: List<String>
    )

    /** shop에 대한 review가 작성되거나, 혹은 삭제되었을 때의 이벤트를 처리하는 dto
     * @param shopId shop의 id
     * @param shopName shop의 name
     * @param isGenerated shop의 리뷰가 작성되었는지, 아니면 삭제되었는지 여부를 저장하는 변수
     */
    data class ReviewCountEventDto(
        var shopId: String,
        var shopName: String,
        var isGenerated: Boolean,
        var reviewScore: Double
    )
}

event도 producing하였으니, 이제는 consume만 하면 됩니다.

여기서 저는 문제에 봉착했습니다. Spring Kafka에 Coroutines를 이용해 비동기적으로 event를 컨슘하려 했더니, 아래의 에러메시지를 마주하였습니다.

🚨 Error message

Cannot convert from [team.bakkas.domaindynamo.entity.ShopReview] to [kotlin.coroutines.Continuation] for GenericMessage 
[payload=ShopReview(reviewId=2d2b89fa-0e47-4643-bdff-92a9c0d99f1d, reviewTitle=꾸덕꾸덕한게 맛있네요!, shopId=85485be6-f065-4305-a8c6-ff23997ae9f1, shopName=Hash, reviewContent=아주아주 추천해요!, reviewScore=10.0, reviewPhotoList=[], createdAt=2022-08-11T20:27:36.023015, updatedAt=null), 
headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@43963b4d, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=1, kafka_receivedTopic=withmarket.shopReview.create, kafka_receivedTimestamp=1660217256882, ...]

위의 에러메시지는, suspend 함수 생성 과정에서 Continuation을 생성하지 못한다 라는 내용입니다.

이 소리가 무엇이냐면, 너는 Spring Kakfa랑 Coroutines를 같이 써서는 안된다! 라는 소리와 같습니다.

이유는 다음과 같습니다.

👉 Kotlin Coroutines는 Continuation을 생성하여 중단점을 생성합니다. 이 Continuation을 이용해서 Coroutines는 CPS 방식으로 비동기 처리를 수행하는데요, 이러한 Continuation이 생성되지 않는다는 것은, Coroutine을 이용한 비동기 처리는 못 한다는 소리입니다.

따라서 저는 이벤트를 처리하기 위해서 아래의 방식을 채택하였습니다.

👉 이가 없으면 잇몸으로. Coroutines 대신에 Webflux의 Mono를 이용해서 이벤트를 처리한다.

따라서 저는 이벤트 구독 로직을 아래와 같이 구성했습니다.

🔨 ShopEventListener

@Component
class ShopEventListener(
    private val shopDynamoRepository: ShopDynamoRepository,
    private val shopRedisRepository: ShopRedisRepository
) {
    // shop에 대해서 리뷰가 작성되면 카운트를 증가시켜주는 리스너 메소드
    @KafkaListener(
        topics = [KafkaTopics.reviewCountEventTopic],
        groupId = KafkaConsumerGroups.updateShopReviewCountGroup
    )
    fun updateReviewCount(reviewCountEventDto: ShopCommand.ReviewCountEventDto) {
        /*
        1. Shop을 DynamoDB로부터 가져온다
        2. DynamoDB로부터 가져온 Shop에 대해서 averageScore, reviewCount를 조작한다.
        3. 해당 Shop을 DynamoDB에 갱신하고, 동시에 Redis에도 갱신한다.
         */
        val shopMono = with(reviewCountEventDto) {
            shopDynamoRepository.findShopByIdAndNameAsync(shopId, shopName)
        }.map { it!! }
            .map { changeShopInfo(it, reviewCountEventDto) }

        // 비동기적으로 dynamo, redis에 해당 정보 저장
        shopMono.flatMap { shopDynamoRepository.createShopAsync(it) }.subscribe()
        shopMono.flatMap { shopRedisRepository.cacheShop(it) }.subscribe()
    }

    // shop의 변화를 반영해주는 메소드
    private fun changeShopInfo(shop: Shop, reviewCountEventDto: ShopCommand.ReviewCountEventDto): Shop {
        return when (reviewCountEventDto.isGenerated) {
            true -> applyGenerateReview(shop, reviewCountEventDto.reviewScore)
            false -> applyDeleteReview(shop, reviewCountEventDto.reviewScore)
        }
    }

    // review가 삭제되었을 때 해당 리뷰 삭제를 shop에 반영해주는 메소드
    private fun applyDeleteReview(shop: Shop, reviewScore: Double): Shop = with(shop) {
        val newTotalScore = averageScore * reviewNumber - reviewScore // 새로 반영될 총점 계산
        averageScore = newTotalScore / (reviewNumber - 1)
        reviewNumber -= 1

        return@with this
    }

    // review가 생성되었을 때 해당 리뷰 생성을 shop에 반영해주는 메소드
    private fun applyGenerateReview(shop: Shop, reviewScore: Double): Shop = with(shop) {
        val newTotalScore = averageScore * reviewNumber + reviewScore
        averageScore = newTotalScore / (reviewNumber + 1)
        reviewNumber += 1

        return@with this
    }
}

위와 같이 webflux를 통한 비동기 처리를 통해서 coroutines를 이용하지 않고 이벤트를 성공적으로 처리를 할수있게 되었습니다.


🌲 마치며

다음에도 위드마켓 개발을 하면서 겪은 삽질을 추가적으로 소개해드리겠습니다.

긴 글 읽어주셔서 감사합니다 😁

profile
Hi There 🤗! I'm college student majoring Mathematics, and double majoring CSE. I'm just enjoying studying about good architectures of back-end system(applications) and how to operate the servers efficiently! 🔥

0개의 댓글