๐Ÿš€ Spring WebFlux์™€ ๋ฆฌ์•กํ‹ฐ๋ธŒ ํ”„๋กœ๊ทธ๋ž˜๋ฐ

์กด์Šค๋…ธ์šฐยท2025๋…„ 8์›” 13์ผ
0

๐Ÿ“Œ ๋ชฉ์ฐจ

  1. ์™œ Spring WebFlux๋ฅผ ์„ ํƒํ–ˆ๋Š”๊ฐ€?
  2. ๋ฉ”๋ชจ๋ฆฌ ๊ด€์ ์—์„œ Mono/Flux ์ดํ•ดํ•˜๊ธฐ
  3. Publisher-Subscriber ํŒจํ„ด์˜ ์‹ค์ œ
  4. Spring WebFlux ์‹คํ–‰ ํ™˜๊ฒฝ ์ดํ•ด
  5. ์‹ค์ œ ๋ฐ์ดํ„ฐ ํ๋ฆ„ ์ถ”์ ํ•˜๊ธฐ

1. ์™œ Spring WebFlux๋ฅผ ์„ ํƒํ–ˆ๋Š”๊ฐ€?

๐ŸŽฏ ๋ฆฌ์•กํ‹ฐ๋ธŒ ์•„ํ‚คํ…์ฒ˜๊ฐ€ ํ•„์š”ํ•œ ์ด์œ 

๋†’์€ ๋™์‹œ์„ฑ ์ฒ˜๋ฆฌ ์š”๊ตฌ์‚ฌํ•ญ

  • ์ˆ˜์ฒœ~์ˆ˜๋งŒ ๊ฐœ์˜ ๋™์‹œ ์—ฐ๊ฒฐ ์ฒ˜๋ฆฌ ํ•„์š”
  • ์ „ํ†ต์ ์ธ ์Šค๋ ˆ๋“œ ๋ชจ๋ธ(1์š”์ฒญ=1์Šค๋ ˆ๋“œ)๋กœ๋Š” ๋ฉ”๋ชจ๋ฆฌ ๋ถ€์กฑ
  • ์‹ค์‹œ๊ฐ„ ์ด๋ฒคํŠธ ์ŠคํŠธ๋ฆฌ๋ฐ (SSE) ์ง€์› ํ•„์š”

I/O ์ง‘์•ฝ์  ์ž‘์—… ํŠน์„ฑ

suspend fun check(): Response {
    val tasks = taskService.query(...).asFlow().toList()  // DB ์กฐํšŒ
    val deliveries = deliveryService.query(...).toList()   // ๋‹ค๋ฅธ ์„œ๋น„์Šค ์กฐํšŒ
    val editorResource = editorResourceClient.queryOne(...) // ์™ธ๋ถ€ API ํ˜ธ์ถœ
}
  • CPU ์—ฐ์‚ฐ๋ณด๋‹ค I/O ๋Œ€๊ธฐ ์‹œ๊ฐ„์ด ๋Œ€๋ถ€๋ถ„
  • ๋…ผ๋ธ”๋กœํ‚น I/O๋กœ ๋Œ€๊ธฐ ์‹œ๊ฐ„ ๋™์•ˆ ๋‹ค๋ฅธ ์š”์ฒญ ์ฒ˜๋ฆฌ ๊ฐ€๋Šฅ

๋งˆ์ดํฌ๋กœ์„œ๋น„์Šค ํ™˜๊ฒฝ

  • ์„œ๋น„์Šค ๊ฐ„ ๋„คํŠธ์›Œํฌ ํ˜ธ์ถœ์ด ๋นˆ๋ฒˆ
  • ๋™๊ธฐ ๋ฐฉ์‹์œผ๋กœ๋Š” cascading failure ์œ„ํ—˜
  • Circuit Breaker, Retry, Timeout์„ ๋ฆฌ์•กํ‹ฐ๋ธŒํ•˜๊ฒŒ ์ฒ˜๋ฆฌ

2. ๋ฉ”๋ชจ๋ฆฌ ๊ด€์ ์—์„œ Mono/Flux ์ดํ•ดํ•˜๊ธฐ

๐Ÿ“ฆ ์ผ๋ฐ˜ ๋ณ€์ˆ˜ vs Mono/Flux

์ผ๋ฐ˜ ๋ณ€์ˆ˜ - ์ฆ‰์‹œ ์‹คํ–‰, ์ฆ‰์‹œ ๋ฉ”๋ชจ๋ฆฌ ํ• ๋‹น

val text: String = "test"  
// ๋ฉ”๋ชจ๋ฆฌ: [0x1234] โ†’ "test" (๋ฐ”๋กœ ์ €์žฅ๋จ)

val result = getDataFromDB()  
// DB ์ฟผ๋ฆฌ ์ฆ‰์‹œ ์‹คํ–‰ โ†’ 3์ดˆ ๋Œ€๊ธฐ โ†’ ๋ฉ”๋ชจ๋ฆฌ์— ๊ฒฐ๊ณผ ์ €์žฅ

Mono/Flux - ์‹คํ–‰ ์•ˆ ํ•จ, ๋ช…๋ น์–ด๋งŒ ์ €์žฅ

val mono: Mono<String> = Mono.just("test")
// ๋ฉ”๋ชจ๋ฆฌ: [0x5678] โ†’ {๋ช…๋ น: "test ๋ฐ˜ํ™˜ํ•˜๊ธฐ"} (์‹คํ–‰ ์•ˆ ๋จ!)

val flux: Flux<Task> = taskRepository.findAll()
// ๋ฉ”๋ชจ๋ฆฌ: [0x9999] โ†’ {๋ช…๋ น: "DB์—์„œ ์กฐํšŒํ•˜๊ธฐ"} (์ฟผ๋ฆฌ ์•ˆ ๋‚ ๋ฆผ!)

๐ŸŽฏ ํ•ต์‹ฌ: Mono/Flux๋Š” "์‹คํ–‰ ๊ณ„ํš์„œ"

// Mono/Flux = ์‹คํ–‰ ๊ณ„ํš์„œ
Mono<Pizza> = "ํ”ผ์ž ๋งŒ๋“ค ๊ณ„ํš"  (ํ”ผ์ž โŒ)

// subscribe() = ์‹คํ–‰ ๋ฒ„ํŠผ  
.subscribe() = "๊ณ„ํš ์‹คํ–‰!"  (ํ”ผ์ž โœ…)

// ๋ฉ”๋ชจ๋ฆฌ ๊ด€์ 
Mono = [๋ช…๋ น์–ด ๋ฆฌ์ŠคํŠธ]  // ๊ฐ€๋ฒผ์›€
subscribe() = ๋ช…๋ น์–ด ์‹คํ–‰ โ†’ ์‹ค์ œ ๋ฉ”๋ชจ๋ฆฌ ํ• ๋‹น

๐Ÿ’พ ๋ฉ”๋ชจ๋ฆฌ์—์„œ ์‹ค์ œ๋กœ ์ผ์–ด๋‚˜๋Š” ์ผ

Mono/Flux ์ƒ์„ฑ ์‹œ

๋ฉ”๋ชจ๋ฆฌ ์ƒํƒœ:
[0xA000] taskMono = {
    type: "Mono",
    operation: "SELECT * FROM task WHERE uuid = ?",
    params: [uuid],
    executed: false  ๐Ÿ‘ˆ ์•„์ง ์‹คํ–‰ ์•ˆ ๋จ
}

์—ฐ์‚ฐ์ž ์ฒด์ธ ์‹œ

๋ฉ”๋ชจ๋ฆฌ ์ƒํƒœ:
[0xB000] result = {
    type: "Mono",
    chain: [
        {op: "SELECT", params: [uuid]},        ๐Ÿ‘ˆ 1๋ฒˆ ๋ช…๋ น
        {op: "MAP", fn: "state ๋ณ€๊ฒฝ"},         ๐Ÿ‘ˆ 2๋ฒˆ ๋ช…๋ น  
        {op: "FLATMAP", fn: "UPDATE task..."}  ๐Ÿ‘ˆ 3๋ฒˆ ๋ช…๋ น
    ],
    executed: false  ๐Ÿ‘ˆ ์—ฌ์ „ํžˆ ์‹คํ–‰ ์•ˆ ๋จ!
}

3. Publisher-Subscriber ํŒจํ„ด์˜ ์‹ค์ œ

๐ŸŽฌ Spring WebFlux์—์„œ ์‹ค์ œ Subscriber๋Š” ๋ˆ„๊ตฌ?

WebFlux Controller์˜ ๊ฒฝ์šฐ

@GetMapping("{uuid}")
fun sseScopeTask(@PathVariable taskUuid: UUID): Flux<ServerSentEvent<TaskUpdatedEvent>> {
    return taskUpdatedEventChannel.events.asFlux()
        .map { ServerSentEvent.builder(it).build() }
}
  • Spring WebFlux ํ”„๋ ˆ์ž„์›Œํฌ๊ฐ€ ์ž๋™์œผ๋กœ ๊ตฌ๋…
  • HTTP ํด๋ผ์ด์–ธํŠธ(๋ธŒ๋ผ์šฐ์ €)๊ฐ€ ์ตœ์ข… ์†Œ๋น„์ž
  • ํด๋ผ์ด์–ธํŠธ๊ฐ€ ์—ฐ๊ฒฐํ•˜๋ฉด โ†’ Spring์ด ์ž๋™์œผ๋กœ .subscribe() ํ˜ธ์ถœ

Coroutines ์‚ฌ์šฉ ์‹œ

suspend fun complete(taskUuid: UUID, completedBy: UUID) {
    val task = taskDomainQueryService.queryForUpdate(taskUuid)  
    // .awaitSingle()์ด ๋‚ด๋ถ€์ ์œผ๋กœ subscribe() + ๊ฒฐ๊ณผ ๋Œ€๊ธฐ
    orderClient.queryOne(spec).awaitSingleOrNull()  
}

๐Ÿ“Š Publisher-Subscriber ์‹ค์ œ ๊ตฌํ˜„ ์ •๋ฆฌ

PublisherSubscriber๊ตฌ๋… ์‹œ์ 
Mono<Task> (Repository)Spring WebFluxController ๋ฐ˜ํ™˜ ์‹œ
Flux<Event> (SSE)HTTP ํด๋ผ์ด์–ธํŠธ๋ธŒ๋ผ์šฐ์ € ์—ฐ๊ฒฐ ์‹œ
Mono/FluxCoroutines.await*() ํ˜ธ์ถœ ์‹œ
Event (Domain)@EventListener์• ํ”Œ๋ฆฌ์ผ€์ด์…˜ ์‹œ์ž‘ ์‹œ
Flux<Message>RabbitMQ Binder์ŠคํŠธ๋ฆผ ๋ฐ”์ธ๋”ฉ ์‹œ

4. Spring WebFlux ์‹คํ–‰ ํ™˜๊ฒฝ ์ดํ•ด

๐Ÿƒ ๊ฐœ๋ฐœ์ž๋Š” subscribe()๋ฅผ ์“ฐ์ง€ ์•Š๋Š”๋‹ค!

์šฐ๋ฆฌ๊ฐ€ ์ž‘์„ฑํ•˜๋Š” ์ฝ”๋“œ

@RestController
class TaskController {
    @GetMapping("/task/{id}")
    suspend fun getTask(@PathVariable id: Long): Task {
        return taskRepository.findById(id).awaitSingle()
    }
}

Spring์ด ์ž๋™์œผ๋กœ ํ•˜๋Š” ์ผ

// ๐Ÿ”ด Spring WebFlux ๋‚ด๋ถ€ (์šฐ๋ฆฌ๊ฐ€ ์•ˆ ๋ด„)
class SpringWebFluxEngine {
    fun processRequest(request: HttpRequest) {
        // 1. Controller ๋ฉ”์„œ๋“œ ํ˜ธ์ถœ
        val mono = controller.getTask(123)
        
        // 2. ์ž๋™์œผ๋กœ ๊ตฌ๋… ๋ฐ ์‹คํ–‰
        mono.subscribe(object : Subscriber<Task> {
            override fun onNext(task: Task) {
                // โœ… ์—ฌ๊ธฐ์„œ ์‹ค์ œ DB ์ฟผ๋ฆฌ ์‹คํ–‰๋จ!
                response.writeJson(task)
            }
        })
    }
}

๐Ÿ’พ ๋ธ”๋กœํ‚น vs ๋…ผ๋ธ”๋กœํ‚น ๋ฉ”๋ชจ๋ฆฌ ๋น„๊ต

Spring MVC (๋ธ”๋กœํ‚น)

๋ฉ”๋ชจ๋ฆฌ ์ƒํƒœ:
[Thread-1] 
    โ†“ ์š”์ฒญ ๋ฐ›์Œ
    โ†“ DB ์ฟผ๋ฆฌ (3์ดˆ ๋Œ€๊ธฐ) ๐Ÿšซ Thread ๋ธ”๋กœํ‚น!
    โ†“ ๊ฒฐ๊ณผ ๋ฐ›์Œ
    โ†“ JSON ๋ณ€ํ™˜
    โ†“ ์‘๋‹ต
[Thread-1 ํ•ด์ œ]

๋ฌธ์ œ: 3์ดˆ ๋™์•ˆ Thread-1์ด ๋†€๊ณ  ์žˆ์Œ

Spring WebFlux (๋…ผ๋ธ”๋กœํ‚น)

๋ฉ”๋ชจ๋ฆฌ ์ƒํƒœ:
[Thread-1] 
    โ†“ ์š”์ฒญ ๋ฐ›์Œ
    โ†“ Mono ์ƒ์„ฑ (๋ช…๋ น๋งŒ)
    โ†“ Spring์— Mono ์ „๋‹ฌ
    โ†“ Thread-1 ํ•ด์ œ! โœ… (๋‹ค๋ฅธ ์š”์ฒญ ์ฒ˜๋ฆฌ ๊ฐ€๋Šฅ)

[EventLoop] (๋ณ„๋„ ์Šค๋ ˆ๋“œ)
    โ†“ DB ์ฟผ๋ฆฌ ์‹œ์ž‘
    โ†“ (3์ดˆ ํ›„) ๊ฒฐ๊ณผ ๋ฐ›์Œ
    โ†“ JSON ๋ณ€ํ™˜
    โ†“ ์‘๋‹ต

๐Ÿ“Š 100๊ฐœ ์š”์ฒญ ๋™์‹œ ์ฒ˜๋ฆฌ ๋น„๊ต

Spring MVC:
100๊ฐœ ์š”์ฒญ โ†’ 100๊ฐœ Thread ํ•„์š”
Thread 1๊ฐœ = 1MB ๋ฉ”๋ชจ๋ฆฌ
์ด ๋ฉ”๋ชจ๋ฆฌ = 100MB

Spring WebFlux:
100๊ฐœ ์š”์ฒญ โ†’ 4๊ฐœ Thread๋กœ ์ฒ˜๋ฆฌ (CPU ์ฝ”์–ด ์ˆ˜)
์ด ๋ฉ”๋ชจ๋ฆฌ = 4MB

5. ์‹ค์ œ ๋ฐ์ดํ„ฐ ํ๋ฆ„ ์ถ”์ ํ•˜๊ธฐ

๐ŸŒŠ Task ์™„๋ฃŒ ์‹œ ๋ฐ์ดํ„ฐ ํ๋ฆ„

Step 1: ์‚ฌ์šฉ์ž ์š”์ฒญ

[์›น ๋ธŒ๋ผ์šฐ์ €] โ†’ HTTP POST /task/complete/123-456

Step 2: Service์—์„œ ์ด๋ฒคํŠธ ๋ฐœํ–‰

suspend fun complete(taskUuid: UUID, completedBy: UUID) {
    val task = taskRepository.findByUuid(taskUuid)
    task.state = TaskState.COMPLETED
    
    // ์ด๋ฒคํŠธ ๋ฐœํ–‰ (Publisher)
    task.publishTaskCompletedEvent(...)  
    taskRepository.save(task)
}

Step 3: ์—ฌ๋Ÿฌ ๊ตฌ๋…์ž๊ฐ€ ๋™์‹œ ์ฒ˜๋ฆฌ

graph TB
    A[์‚ฌ์šฉ์ž ํด๋ฆญ] --> B[Controller]
    B --> C[Service: Task ์ƒํƒœ ๋ณ€๊ฒฝ]
    C --> D[์ด๋ฒคํŠธ ๋ฐœํ–‰: TaskCompletedEvent]
    
    D --> E[Spring Event Bus]
    
    E --> F[๊ตฌ๋…์ž1: SSE Handler]
    E --> G[๊ตฌ๋…์ž2: RabbitMQ Handler]  
    E --> H[๊ตฌ๋…์ž3: External API Handler]
    E --> I[๊ตฌ๋…์ž4: ํ†ต๊ณ„ Handler]

โšก ๋ฐœํ–‰-๊ตฌ๋…์˜ ์žฅ์ 

// โŒ ๊ธฐ์กด ๋ฐฉ์‹ (์ง์ ‘ ํ˜ธ์ถœ)
fun completeTask(task: Task) {
    task.state = COMPLETED
    taskRepository.save(task)
    
    // ๋ชจ๋“  ๊ณณ์„ ์ง์ ‘ ํ˜ธ์ถœํ•ด์•ผ ํ•จ ๐Ÿ˜ฑ
    sseService.notify(task)
    rabbitMQ.send(task)
    emailService.send(task)
    statisticsService.update(task)
    pivoApi.notify(task)
}

// โœ… ๋ฐœํ–‰-๊ตฌ๋… ๋ฐฉ์‹
fun completeTask(task: Task) {
    task.state = COMPLETED
    taskRepository.save(task)
    
    // ์ด๋ฒคํŠธ ํ•˜๋‚˜๋งŒ ๋ฐœํ–‰! ๐ŸŽ‰
    publishEvent(TaskCompletedEvent(task))
}

๐Ÿ”ฅ Hot vs Cold Publisher

Cold Publisher (๊ตฌ๋…ํ•  ๋•Œ๋งˆ๋‹ค ์ƒˆ๋กœ ์‹œ์ž‘)

val cold = Mono.fromCallable { Random.nextInt() }
cold.subscribe()  // ๊ฒฐ๊ณผ: 42
cold.subscribe()  // ๊ฒฐ๊ณผ: 17 (๋‹ค๋ฅธ ๊ฐ’!)

Hot Publisher (๊ตฌ๋…๊ณผ ๋ฌด๊ด€ํ•˜๊ฒŒ ๋ฐ์ดํ„ฐ ๋ฐœ์ƒ)

val hot = Sinks.many().multicast()
hot.emitNext(42)
hot.asFlux().subscribe()  // 42 ๋ชป ๋ฐ›์Œ (์ด๋ฏธ ์ง€๋‚˜๊ฐ)

๐ŸŽฏ ํ•ต์‹ฌ ์ •๋ฆฌ

Spring WebFlux ์‹ค๋ฌด ์š”์•ฝ

  1. ์šฐ๋ฆฌ๋Š” subscribe() ์•ˆ ์”€

    // โœ… ์ด๋ ‡๊ฒŒ๋งŒ ํ•˜๋ฉด ๋จ
    @GetMapping("/data")
    suspend fun getData(): Data {
        return service.getData()
    }
  2. Spring์ด ์•Œ์•„์„œ ์‹คํ–‰

    • Controller๊ฐ€ Mono/Flux ๋ฐ˜ํ™˜
    • Spring์ด ์ž๋™์œผ๋กœ subscribe()
    • ๊ฒฐ๊ณผ๋ฅผ HTTP Response๋กœ ๋ณ€ํ™˜
  3. ๋ฉ”๋ชจ๋ฆฌ ํšจ์œจ

    • MVC: 1์š”์ฒญ = 1Thread = 1MB
    • WebFlux: N์š”์ฒญ = 4Thread = 4MB
  4. ๋ฐœํ–‰-๊ตฌ๋… = ๋ฐฉ์†ก๊ตญ ์‹œ์Šคํ…œ

    • ๋ฐœํ–‰์ž(๋ฐฉ์†ก๊ตญ): ์ด๋ฒคํŠธ ์ƒ์„ฑ๋งŒ ๋‹ด๋‹น
    • ๊ตฌ๋…์ž(์‹œ์ฒญ์ž): ์›ํ•˜๋Š” ์ฑ„๋„ ๊ตฌ๋…ํ•˜์—ฌ ์ฒ˜๋ฆฌ
    • Spring(์ผ€์ด๋ธ”ํšŒ์‚ฌ): ์—ฐ๊ฒฐ ์ž๋™ ๊ด€๋ฆฌ

์™œ ๋ฆฌ์•กํ‹ฐ๋ธŒ ํ”„๋กœ๊ทธ๋ž˜๋ฐ์ธ๊ฐ€?

  • ๋Šฆ์€ ์‹คํ–‰: ํ•„์š”ํ•  ๋•Œ๋งŒ ์‹คํ–‰
  • ์žฌ์‚ฌ์šฉ: ๊ฐ™์€ ๊ณ„ํš์œผ๋กœ ์—ฌ๋Ÿฌ ๋ฒˆ ์‹คํ–‰ ๊ฐ€๋Šฅ
  • ์กฐํ•ฉ: ์—ฌ๋Ÿฌ ๊ณ„ํš์„ ํ•ฉ์ณ์„œ ํฐ ๊ณ„ํš ๋งŒ๋“ค๊ธฐ
  • ๋น„๋™๊ธฐ: ๊ณ„ํš๋งŒ ์งœ๊ณ  ๋‹ค๋ฅธ ์ผ ํ•˜๊ธฐ
  • ๋ฆฌ์†Œ์Šค ํšจ์œจ: ์ ์€ ์Šค๋ ˆ๋“œ๋กœ ๋งŽ์€ ์š”์ฒญ ์ฒ˜๋ฆฌ
profile
์–ด์ œ์˜ ๋‚˜๋ณด๋‹ค ํ•œ๊ฑธ์Œ ๋”

0๊ฐœ์˜ ๋Œ“๊ธ€