suspend fun check(): Response {
val tasks = taskService.query(...).asFlow().toList() // DB ์กฐํ
val deliveries = deliveryService.query(...).toList() // ๋ค๋ฅธ ์๋น์ค ์กฐํ
val editorResource = editorResourceClient.queryOne(...) // ์ธ๋ถ API ํธ์ถ
}
val text: String = "test"
// ๋ฉ๋ชจ๋ฆฌ: [0x1234] โ "test" (๋ฐ๋ก ์ ์ฅ๋จ)
val result = getDataFromDB()
// DB ์ฟผ๋ฆฌ ์ฆ์ ์คํ โ 3์ด ๋๊ธฐ โ ๋ฉ๋ชจ๋ฆฌ์ ๊ฒฐ๊ณผ ์ ์ฅ
val mono: Mono<String> = Mono.just("test")
// ๋ฉ๋ชจ๋ฆฌ: [0x5678] โ {๋ช
๋ น: "test ๋ฐํํ๊ธฐ"} (์คํ ์ ๋จ!)
val flux: Flux<Task> = taskRepository.findAll()
// ๋ฉ๋ชจ๋ฆฌ: [0x9999] โ {๋ช
๋ น: "DB์์ ์กฐํํ๊ธฐ"} (์ฟผ๋ฆฌ ์ ๋ ๋ฆผ!)
// Mono/Flux = ์คํ ๊ณํ์
Mono<Pizza> = "ํผ์ ๋ง๋ค ๊ณํ" (ํผ์ โ)
// subscribe() = ์คํ ๋ฒํผ
.subscribe() = "๊ณํ ์คํ!" (ํผ์ โ
)
// ๋ฉ๋ชจ๋ฆฌ ๊ด์
Mono = [๋ช
๋ น์ด ๋ฆฌ์คํธ] // ๊ฐ๋ฒผ์
subscribe() = ๋ช
๋ น์ด ์คํ โ ์ค์ ๋ฉ๋ชจ๋ฆฌ ํ ๋น
๋ฉ๋ชจ๋ฆฌ ์ํ:
[0xA000] taskMono = {
type: "Mono",
operation: "SELECT * FROM task WHERE uuid = ?",
params: [uuid],
executed: false ๐ ์์ง ์คํ ์ ๋จ
}
๋ฉ๋ชจ๋ฆฌ ์ํ:
[0xB000] result = {
type: "Mono",
chain: [
{op: "SELECT", params: [uuid]}, ๐ 1๋ฒ ๋ช
๋ น
{op: "MAP", fn: "state ๋ณ๊ฒฝ"}, ๐ 2๋ฒ ๋ช
๋ น
{op: "FLATMAP", fn: "UPDATE task..."} ๐ 3๋ฒ ๋ช
๋ น
],
executed: false ๐ ์ฌ์ ํ ์คํ ์ ๋จ!
}
@GetMapping("{uuid}")
fun sseScopeTask(@PathVariable taskUuid: UUID): Flux<ServerSentEvent<TaskUpdatedEvent>> {
return taskUpdatedEventChannel.events.asFlux()
.map { ServerSentEvent.builder(it).build() }
}
.subscribe() ํธ์ถsuspend fun complete(taskUuid: UUID, completedBy: UUID) {
val task = taskDomainQueryService.queryForUpdate(taskUuid)
// .awaitSingle()์ด ๋ด๋ถ์ ์ผ๋ก subscribe() + ๊ฒฐ๊ณผ ๋๊ธฐ
orderClient.queryOne(spec).awaitSingleOrNull()
}
| Publisher | Subscriber | ๊ตฌ๋ ์์ |
|---|---|---|
Mono<Task> (Repository) | Spring WebFlux | Controller ๋ฐํ ์ |
Flux<Event> (SSE) | HTTP ํด๋ผ์ด์ธํธ | ๋ธ๋ผ์ฐ์ ์ฐ๊ฒฐ ์ |
Mono/Flux | Coroutines | .await*() ํธ์ถ ์ |
| Event (Domain) | @EventListener | ์ ํ๋ฆฌ์ผ์ด์ ์์ ์ |
Flux<Message> | RabbitMQ Binder | ์คํธ๋ฆผ ๋ฐ์ธ๋ฉ ์ |
@RestController
class TaskController {
@GetMapping("/task/{id}")
suspend fun getTask(@PathVariable id: Long): Task {
return taskRepository.findById(id).awaitSingle()
}
}
// ๐ด Spring WebFlux ๋ด๋ถ (์ฐ๋ฆฌ๊ฐ ์ ๋ด)
class SpringWebFluxEngine {
fun processRequest(request: HttpRequest) {
// 1. Controller ๋ฉ์๋ ํธ์ถ
val mono = controller.getTask(123)
// 2. ์๋์ผ๋ก ๊ตฌ๋
๋ฐ ์คํ
mono.subscribe(object : Subscriber<Task> {
override fun onNext(task: Task) {
// โ
์ฌ๊ธฐ์ ์ค์ DB ์ฟผ๋ฆฌ ์คํ๋จ!
response.writeJson(task)
}
})
}
}
๋ฉ๋ชจ๋ฆฌ ์ํ:
[Thread-1]
โ ์์ฒญ ๋ฐ์
โ DB ์ฟผ๋ฆฌ (3์ด ๋๊ธฐ) ๐ซ Thread ๋ธ๋กํน!
โ ๊ฒฐ๊ณผ ๋ฐ์
โ JSON ๋ณํ
โ ์๋ต
[Thread-1 ํด์ ]
๋ฌธ์ : 3์ด ๋์ Thread-1์ด ๋๊ณ ์์
๋ฉ๋ชจ๋ฆฌ ์ํ:
[Thread-1]
โ ์์ฒญ ๋ฐ์
โ Mono ์์ฑ (๋ช
๋ น๋ง)
โ Spring์ Mono ์ ๋ฌ
โ Thread-1 ํด์ ! โ
(๋ค๋ฅธ ์์ฒญ ์ฒ๋ฆฌ ๊ฐ๋ฅ)
[EventLoop] (๋ณ๋ ์ค๋ ๋)
โ DB ์ฟผ๋ฆฌ ์์
โ (3์ด ํ) ๊ฒฐ๊ณผ ๋ฐ์
โ JSON ๋ณํ
โ ์๋ต
Spring MVC:
100๊ฐ ์์ฒญ โ 100๊ฐ Thread ํ์
Thread 1๊ฐ = 1MB ๋ฉ๋ชจ๋ฆฌ
์ด ๋ฉ๋ชจ๋ฆฌ = 100MB
Spring WebFlux:
100๊ฐ ์์ฒญ โ 4๊ฐ Thread๋ก ์ฒ๋ฆฌ (CPU ์ฝ์ด ์)
์ด ๋ฉ๋ชจ๋ฆฌ = 4MB
[์น ๋ธ๋ผ์ฐ์ ] โ HTTP POST /task/complete/123-456
suspend fun complete(taskUuid: UUID, completedBy: UUID) {
val task = taskRepository.findByUuid(taskUuid)
task.state = TaskState.COMPLETED
// ์ด๋ฒคํธ ๋ฐํ (Publisher)
task.publishTaskCompletedEvent(...)
taskRepository.save(task)
}
graph TB
A[์ฌ์ฉ์ ํด๋ฆญ] --> B[Controller]
B --> C[Service: Task ์ํ ๋ณ๊ฒฝ]
C --> D[์ด๋ฒคํธ ๋ฐํ: TaskCompletedEvent]
D --> E[Spring Event Bus]
E --> F[๊ตฌ๋
์1: SSE Handler]
E --> G[๊ตฌ๋
์2: RabbitMQ Handler]
E --> H[๊ตฌ๋
์3: External API Handler]
E --> I[๊ตฌ๋
์4: ํต๊ณ Handler]
// โ ๊ธฐ์กด ๋ฐฉ์ (์ง์ ํธ์ถ)
fun completeTask(task: Task) {
task.state = COMPLETED
taskRepository.save(task)
// ๋ชจ๋ ๊ณณ์ ์ง์ ํธ์ถํด์ผ ํจ ๐ฑ
sseService.notify(task)
rabbitMQ.send(task)
emailService.send(task)
statisticsService.update(task)
pivoApi.notify(task)
}
// โ
๋ฐํ-๊ตฌ๋
๋ฐฉ์
fun completeTask(task: Task) {
task.state = COMPLETED
taskRepository.save(task)
// ์ด๋ฒคํธ ํ๋๋ง ๋ฐํ! ๐
publishEvent(TaskCompletedEvent(task))
}
val cold = Mono.fromCallable { Random.nextInt() }
cold.subscribe() // ๊ฒฐ๊ณผ: 42
cold.subscribe() // ๊ฒฐ๊ณผ: 17 (๋ค๋ฅธ ๊ฐ!)
val hot = Sinks.many().multicast()
hot.emitNext(42)
hot.asFlux().subscribe() // 42 ๋ชป ๋ฐ์ (์ด๋ฏธ ์ง๋๊ฐ)
์ฐ๋ฆฌ๋ subscribe() ์ ์
// โ
์ด๋ ๊ฒ๋ง ํ๋ฉด ๋จ
@GetMapping("/data")
suspend fun getData(): Data {
return service.getData()
}
Spring์ด ์์์ ์คํ
๋ฉ๋ชจ๋ฆฌ ํจ์จ
๋ฐํ-๊ตฌ๋ = ๋ฐฉ์ก๊ตญ ์์คํ