동시성(concurrency)는 computer science 용어로 single process를 독립적인 컴포넌트로 나누고, 어떻게 이 컴포넌트가 안전하게 데이터를 공유하는 지에 대해서 구체화한 것을 의미한다.
대부분의 프로그래밍 언어들은 os level threads를 사용하는 라이브러리를 통해 concurrency를 제공하는데, os level threads는 보통 locks을 얻으려는 시도를 함으로서 데이터를 공유한다. 그러나 go는 다르다. go의 주된 동시성 모델은 CSP(Communicating Sequential Processes)이다. CSP 패턴은 기존의 동시성 방법처럼 강력하지만, 더 이해하기 더 쉽다는 장점을 가진다.
한 가지 명심해야할 것은, 모든 코드에 동시성 코드를 넣는다고 성능이 더 좋아지는 것은 아니다. 동시성 코드는 성능을 오히려 더 안좋게 만들 수 있고, 이해하기 어렵게 만들 수 있다. 핵심은 동시성(concurrency)와 병렬성(parallelism)은 다르다는 것이다. 동시성(concurrency)은 해결하려고 하고있는 문제를 더욱 좋게 구조화하기 위해 사용되는 툴이다. 동시성(concurrency)이든 아니든 코드가 병렬적(parallelism)으로 동작하려면 이는 알고리즘이 이를 허용하는 하드웨어에 달려있다. Amdahl’s Law는 얼마나 병렬 프로세싱(parallel processing)이 performance를 향상시키는 지 알려주는 공식이다.
process는 program의 인스턴스로 computer's operating system에 의해 동작한다. OS는 memory같은 일부 자원들을 process에 결부시키는데 다른 process들이 현재 process에 결부된 자원들에 접근하지 못하게 만든다. 또한 process는 하나 또는 그 이상의 thread들로 이루어져 있다. thread는 실행 단위로 OS에 의해 동작할 시간이 주어진다. process안의 thread들은 자원들을 서로 공유하기도 한다. CPU는 core수에 따라 동시간에 하나, 그 이상의 thread들로부터 명령어들을 수행할 수 있다. OS의 일 중 하나는 CPU에 할당된 thread들을 스케줄링하고 모든 process들, 또는 process안의 thread들이 실행될 기회를 주는 것이다.
goroutines는 lightweight processes로 GO runtime에 의해 관리된다. go 프로그램이 실행되면 go runtime은 수많은 thread들을 만들고, 개발자가 만든 프로그램을 실행시키기 위한 single goroutine을 launche시킨다. 개발자에 의해 만들어진 모든 goroutine들은 GO runtime scheduler에 의해 앞서 만들어진 thread들에 자동으로 할당되는 것이다. 마치 OS가 CPU cores에 따라 thread들을 스케줄링하듯이 말이다. 이는 추가적인(불필요헌) 작업 같아보인다. 기저에 있는 OS가 이미 thread들과 process들을 스케줄링하는 것을 포함하고 있기 때문이다. 그러나 이는 몇몇 이점을 갖는데 다음과 같다.
정리하자면, goroutine은 thread보다 create 속도도 빠르고 메모리 효율도 좋으며 context switching이 발생할 때 더욱 오버헤드가 적으며, scheduler 측면에서도 부하가 적다는 것을 알 수 있다.
goroutine은 일반 함수 invocation(호출)에 go
키워드 하나만 붙여도 만들어진다. 일반 함수처럼 파라미터도 넣어줄 수 있는데 다만, 리턴값은 무시된다.
goroutine들은 channels을 사용하여 통신한다. slices, maps, channels 들은 build-in type으로 make
function을 통해 만들어진다.
ch := make(chan int)
maps와 같이 channels도 reference type이다(즉, default가 pointer), 따라서 channel을 다른 함수에 넣어버리면 이는 channel에 대한 pointer가 넘어가는 것이다. 또한 maps, slice와 같이 channel의 zero-value는 nil
이다.
channel과 interact하고 싶다면 <-
연산자를 사용하면 된다. channel을 통해 데이터를 받으려면 채널 앞(왼쪽)에 <-
를 넣고, channel에 데이터를 써주려면 channel 뒤(오른쪽)에 <-
를 넣어주면 된다.
a := <- ch // read a value from ch and assign it to a
ch <- b // write the value in b to ch
하나의 채널에 하나의 값이 쓰여지고 여러 goroutine이 이 채널에서 데이터를 읽어가도 데이터를 받는 것은 하나의 goroutine이라는 것을 명심하자.
같은 channel에 하나의 goroutine이 read, write을 다하는 경우는 보기 드물다. channel을 변수나, 함수의 파라미터, 필드에 적용할 때는 chan
키워드 앞에 <-
연산자를 붙이도록 하여 이것이 오직 channel
로부터 값을 read
하는 것이라는 것을 명시해주도록 하자. ex ch <- chan int
. 또한 channel이 write
로만 쓰인다면 channel 뒤에 <-
연산자를 두어서 이 channel
을 쓰는 goroutine은 해당 channel을 write
용으로만 사용한다는 것을 명시해주도록 하자. 이렇게 함으로서 go compiler가 해당 channel이 함수에서 read 용인지, write 용인지 확신할 수 있기 때문이다.
기본적으로 default channels는 unbuffered이다. 모든 unbuffured channel에 어떤 goroutine이 쓰기 연산을 실행한다면 다른 goroutine들은 이 unbuffered channel에 대한 쓰기 연산을 pause하게 되는데, 이 unbuffered channel의 데이터를 가져오기 이전까지 멈추게 되는 것이다. 이와 유사하게 읽기 연산도 마찬가지인데 읽으려고 하는 goroutine들은 write연산으로 channel에 어떤 데이터를 넣어주기 이전까지 pause하게 된다. 이는 unbuffered channel을 가지고 하나의 goroutine 내에서 읽고 쓰기가 불가능하다는 것이다.
go는 또한 buffered channel을 지원하는데, 이러한 channel들은 제한된 수많은 write연산들을 blocking(pause)없이 버퍼링한다. 만약 buffer가 가득 채워지면, 해당 channel에 대한 read연산이 있기 전까지 channel에 대한 일련의 write연산을 하는 goroutine들은 멈추게된다. buffer에 아무것도 없어도 똑같이 멈추게 된다.
ch := make(chan int, 10)
len
, cap
함수들도 channel에 대해서 동작하는데, 현재 channel에 몇개의 데이터가 버퍼링되어 있는 지를 알고 싶다면 len
을 사용하고, 최대 용량을 알고 싶다면 cap
을 사용해야한다. 참고로 unbuffered channel에 len
, cap
을 쓰면 0이 나오니 알아두도록 하자.
사실 대부분의 경우 unbuffered channel을 사용한다. 언제 unbuffered chan을 사용하고 buffered channel을 사용할 지는 추후에 알아보자.
channel의 값을 for-range
loop를 통해 얻을 수 있는데 다음과 같다.
for v := range ch {
fmt.Println(v)
}
다른 for-range
문법과는 다르게, 여기에는 channel안의 값을 의미하는 하나의 variable만 쓰인다. 이 loop는 channel이 closed되거나, 함수가 return, break될 때까지 반복된다.
channel에 더이상 쓰기 연산이 끝났다면 channel을 닫을 수 있는데, 이 때 close
함수를 사용하면 된다.
close(ch)
일단 channel이 닫히고 나서는 해당 channel에 대한 어떠한 write연산, close 연산은 거부되며 panic
을 발생시킨다. 재밌게도 닫힌 channel에 대한 read연산은 항상 성공한다. 만약 channel을 닫았는데 해당 channel이 값들을 아직 buffering하고 있다면 read연산을 해도 값이 나온다. 만약 channel을 닫았는데 해당 channel에 값이 없다면 zero value
가 반환된다.
그렇다면 channel이 닫혀서 zero value
가 나온 것과 닫히지 않은 channel에서 zero value
를 반환한 것을 어떻게 구분해야 할까?? 이는 map
과 같은 문제고 해결 방법 역시도 같다.
v, ok := <-ch
만약 ok
가 true
이면 channel은 열려있는 것이고, 만약 false
이면 channel은 닫혀있는 것이다.
channel을 닫는 것은 오롯이 해당 channel에 대한 write연산을 하고있는 goroutine
과 관련된 것이다. channel에서 데이터를 받기를 기대하고 있는 goroutine들(for-range
을 사용하는)을 위해서라도 channel
을 닫아주는 것이 좋다. 더 이상 사용되지 않는 channel
들은 가비지 콜렉터의 대상이 되며 go runtime
이 이를 잡아준다.
channels goroutine의 동시성 모델의 두 가지 특징을 가진다.
1. 개발자의 코드가 일련의 stage(순차적으로 동작)을 가지도록 생각하게 해주고,
2. Channels are one of the two things that set apart Go’s concurrency model. data 의존성을 clear하게 해준다.
이를 이해하기 위해서는 다른 언어의 동시성 모델과 비교하면 된다. 대부분의 경우 thread간의 동시성 문제를 해결하기위해서 전역 변수의 state를 이용한다. 이는 thread가 동작하는 코드를 얼핏봐서는 thread는 아무 변화가 없는데 코드의 동작이 달라지는 이상한 경험을 할 수 있다. 그러나 goroutine은 channel을 통해서 동기화를 진행하기 때문에 코드가 마치 순차적으로 동작하는 것처럼 보인다. 즉, goroutine 내에서의 로직만 보고도 상태에 따른 코드 변화를 알 수 있는 것이다. 더불어 전역 변수에 대한 의존성이 사라지게 됨으로 goroutine
들은 channel
만 보면된다. 즉, goroutine
들의 상태를 변화시킬 수 있는 것은 전역 변수가 아니라 channel
에 의존하도록 만들면 코드의 flow를 더 따라가기가 쉬울 것이다.
channel
을 사용할 때 항상 조심해야 할 것 중 하나는 channel
은 reference type이다 보니까, make
로 생성해주지 않고 사용하면 다이나믹한 에러가 발생한다는 것이다. 만약 nil
인 channel
에 계속해서 write, read연산을 하면 hang forever
(계속 반복)되고, close
연산을 하게되면 panic
이 발생한다.
select
문법은 go concurrency를 구조화하고 제어하기 위한 방법으로, 만약 '두개의 동시성 연산이 실행되고 있다면 이들 중 어떤 연산이 먼저 올 지', 또는 '어떤 케이스에 대해 처리할 수 없다면'과 같은 문제에 대해 해결방법을 제시한다.
select
키워드는 goroutine이 여러 개의 채널들에서 하나에 write 또는 read 연산을 하도록 해준다. 이는 걸핏봐서는 switch-case
와 비슷해보인다.
select {
case v := <- ch:
fmt.Println(v)
case v := <- ch2:
fmt.Println(v):
case ch3 <- x:
fmt.Println("wrote", x)
case <- ch4:
fmt.Println("got value on ch4, but ignored it")
}
select
안의 각 case
는 channel에 대한 read, write연산으로 만약 read 또는 write가case
에 가능하다면 이는 case
의 body문을 실행하도록 한다.
그렇다면 여러개의 채널에서 동시에 read 또는 write 연산이 발생한다면 어떻게 될가?? select
알고리즘은 굉장히 간단한데, 랜덤하게 case 중 하나를 뽑고 실행시킨다. case
로 쓰여진 순서는 별로 중요하지 않다. 이것이 switch-case
와 다른 점이다. 왜 랜덤하게 동작하도록 했냐면 이는 starvation
을 해결하기 위함으로 어떤 case
든 간에 더 선호하거나, 비 선호하는 경우를 없애기 위함이다.
랜덤하게 case
를 선택하는 또 다른 select
의 이점은 inconsistent한 순서로 lock들을 얻음으로서 deadlock
이슈를 해결하기 위함이다. 만약 두 개의 goroutine에서 두 개의 channel을 같이 사용하고 있다면 이들은 정해진 순서대로 동작해야 한다. 그렇지 않으면 deadlock
이 발생하고 말것이다. 만약 go에서 deadlock
이 발생하면 go는 프로그램을 종료시켜버린다.
func main(){
ch1 := make(chan int)
ch2 := make(chan int)
go func() {
v := 1
ch1 <- v
v2 := <- ch2
fmt.println(v, v2)
}()
v := 2
ch2 <- v
v2 := <- ch1
fmt.Println(v, v2)
}
다음의 코드를 실행하면 아래의 에러를 얻게된다.
fatal error: all goroutines are asleep - deadlock!
main
으로 동작하고 있는 goroutine과 func
으로 만든 goroutine들 끼리 충돌이 나서 deadlock에 빠진 것이다.
만약 channel을 사용하는 main goroutine부분을 select
로 감싸고 동작시키면 deadlock
을 피할 수 있다.
func main(){
ch1 := make(chan int)
ch2 := make(chan int)
go func() {
v := 1
ch1 <- v
v2 := <- ch2
fmt.Println(v, v2)
}()
v := 2
var v2 int
select {
case ch2 <- v:
case v2 = <- ch1:
}
fmt.Println(v, v2)
}
2 1
이라는 답이 나온다.
이러한 일이 가능한 이유는, select
문이 만약 case들 중에 실행 가능한 것이 있는 지 check해주어 deadlock을 피할 수 있기 때문이다.
select
문이 channel들 간의 통신을 담당하다보니 select
문은 종종 for
loop안에 임베딩되는 경우가 많다. 즉, select-case
문은 무한 루프를 제공하지 않는다는 것이다.
for {
select {
case <- done:
return
case v := <- ch:
fmt.Println(v)
}
}
for-select
loop 구조는 종종 언급되는 조합으로 많이 사용된다. for-select
loop를 사용할 때는 exit loop를 반드시 두어야 한다. 나중에 the done channel pattern
을 보도록 하자.
switch
문과 같이 select
문도 default
절을 가질 수 있는데, 이는 읽거나 쓸 수 있는 채널들에 대한 case들이 없는 경우 동작한다. 만약 어떠한 channel에 대한 nonblocking read 또는 write를 구현하고 싶다면 select
을 default
와 함께 쓰는 것이 좋다.
select {
case v := <-ch:
fmt.Println(v)
default:
fmt.Println("no value written to ch")
}
실행하면 ch
에서 읽을 데이터가 없어도 blocking되는 것이 아니라 default
절로 빠지게 되어 종료된다.
만약, for-select
loop에 default
절을 사용한다면 거의 항상 뭔가를 잘못하고 있는 것이다. 이는 매번 triggering되기 때문이다. 심지어 read, write를 하지않을 때도 말이다. 이는 for loop
를 영원히 동작하게 만들며 엄청난 양의 CPU자원을 소모한다.
기본적인 goroutine과 channel에 대한 설명은 끝났으니 어떻게 사용되는 지 pattern을 알아보도록 하자
동시성은 세부사항이 구현되므로, 좋은 API 디자인은 가능한 구현의 자세한 사항을 감춰야 한다. 이는 개발자가 어떻게 이 코드가 '불리는 지'를 변경하는 것 없이 어떻게 이 코드가 '작동하는 지'를 변경할 수 있도록 해준다.
실질적으로, 이는 개발자가 절대로 channels
, mutex
들을 API 타입, 함수, 메서드로 노출시키면 안된다는 것을 의미한다. 만약 channel
이 노출되면 해당 API를 사용하는 유저에게 channel
관리 책임을 전달하는 것이다. 즉 이는 유저가 channel가 buffered되었는 지, closed되었는 지 nil인지와 같은 concerns에 대해서 걱정(생각)을 해야한다는 것을 의미한다. 이는 또한 유저들이 생각지도 못한 순서로 channel들과 mutex에 접근하여 deadlock을 발생시킬 수 있다.
오해하지 말아야 할 것이, 이는 절대 함수의 파라미터나 구조체 필드로 channel, mutex를 사용하지 말라는 것이 아니다. 다만 이들을 절대로 외부에 노출시키지 말라는 것이다.
대부분, goroutine으로 만들 클로저(the closure)는 어떠한 파라미터들이 없다. 대신에 해당 goroutine이 실행되는 환경으로부터 value들을 capture한다. 그런데, 이러한 로직이 실행되지 않는 한 가지 경우가 있다. for-loop
의 index
, value
를 capture할 때가 그렇다.
package main
import "fmt"
func main() {
a := []int{2, 4, 6, 8, 10}
ch := make(chan int, len(a))
for _, v := range a {
go func() {
ch <- v * 2
}()
}
for i := 0; i < len(a); i++ {
fmt.Println(<-ch)
}
}
우리가 원하는 정답은 다음과 같다.
4
8
12
16
20
그러나 결과는 다음과 같다.
20
20
20
20
20
왜 ch
에 20이 계속해서 쓰이냐면, 모든 goroutine들의 closure가 같은 variable을 capture하기 때문이다. for
에 사용되는 index, value는 각 iteration에서 계속해서 재사용된다. v
에 마지막 값 10이 할당되고, goroutine이 동작할 때 goroutine이 v
를 보게되는 것이다. 이러한 문제는 for
에만 해당하는 것이 아니라, 매 순간 goroutine이 어떠한 변수에 의존하는데, 이 변수 값이 변화된다면 똑같은 문제가 발생한다. 이를 해결하기 위해서는 goroutine에 해당 변수 값을 파라미터로 넘겨주어야 한다.
두 가지 방법이 있는데 첫번째는 변수를 shadow
하는 방법이다.
for _, v := range a {
v := v
go func(){
ch <- v * 2
}()
}
그러나 shadowing하는 방법은 그렇게 좋은 방법이 아니다. 좀 더 코드를 깔끔하게 만들고 싶다면 goroutine의 파리미터로 값을 넘겨주면 된다.
for _, v := range a {
go func(val int){
ch <- val * 2
}(v)
}
가장 좋은 방법은 goroutine함수에 인자로 변경되는 값을 넘겨주는 것이다.
goroutine function을 시작시키면, 개발자는 항상 이 goroutine이 종료되도록 만들어야 한다. 변수와는 달리 go runtime은 goroutine이 더 이상 사용 중이 아니라는 것을 감지할 수 없다. 만약 goroutine이 종료되지 않으면 스케줄러는 주기적으로 아무것도 하지 않는 시간을 할당할 것이고 이는 프로그램의 성능을 다운시키게 된다. 이러한 것을 goroutine leak라고 한다.
goroutine이 종료되었다는 것을 보장하는 것을 확인하기는 어려운 일이다. 가령 다음과 같이 goroutine이 하나의 generator로 쓰이는 경우를 보자.
func countTo(max int) <-chan int {
ch := make(chan int)
go func() {
for i := 0; i < max; i++ {
ch <- i
}
close(ch)
}()
return ch
}
func main() {
for i := range countTo(10) {
fmt.Println(i)
}
}
예제를 들려고 만든 코드이지, goroutine을 절대로 generator로 쓰지말도록 하자.
일반적인 경우, goroutine에서 채널로 데이터를 다 보내면 goroutine은 종료된다. 그러나 만약 다음과 같은 경우는 어떨까??
func main() {
for i := range countTo(10) {
if i > 5 {
break
}
fmt.Println(i)
}
}
코드는 문제없이 동작하지만, for
이 끝나버리면서 goroutine은 모든 값을 채널로 전송하지 못하고 채널에 넣은 다음 block상태(unbuffered 이기 때문에)에 빠지게 된다. 이러면 goroutine leak이 발생한 것이다.
done channel pattern
이는 goroutine에게 이제는 processing을 멈출 시간이라는 신호를 던져준다. 신호를 던져주기 위해 channel을 사용한다. 다음의 예제를 확인해보자.
func searchData(s string, searchers []func(string) []string) []string {
done := make(chan struct{})
result := make(chan []string)
for _, searcher := range searchers {
go func(searcher func(string) []string) {
select {
case result <- searcher(s):
case <-done:
}
}(searcher)
}
r := <-result
close(done)
return r
}
위 함수에서 channel
done
을 사용하였는데 데이터 타입으로 struct{}
을 사용한다. 빈 구조체를 타입으로 사용한 이유는 별로 값이 중요하지 않기 때문이다. done
채널에는 어떠한 값도 사용하지 않을 것이고 단지 close()
함수를 실행할 것이다.
searcher
함수가 값을 반환하거나 또는 done
채널에서 값을 읽어올 때까지 goroutine에서의 select
문은 계속해서 실행된다. 열려있는 channel
에서 값을 읽어오는 것은 값이 들어오거나, 채널이 닫혀 zero value
가 들어오지 않는한 계속해서 goroutine
이 멈춰있는다.
이는 done
channel
은 done
이 닫히기 전까지 멈춘다는 것이다. 만약 우리가 result
로 부터 첫번째 데이터를 얻는다면 done
채널은 close
되게되고 이 신호가 모든 goroutine
에 퍼지게되어 goroutine
은 종료된다. 이러한 방법으로 goroutine leaking
을 막을 수 있다.
다른 done
패턴으로 channel
에 대한 cancellation
함수를 반환함으로서 구현할 수 있다. 다음의 예제를 확인해보자.
func countTo(max int) (<-chan int, func()) {
ch := make(chan int)
done := make(chan struct{})
cancel := func() {
close(done)
}
go func() {
for i := 0; i < max; i++ {
select {
case <-done:
return
case ch<-i:
}
}
close(ch)
}()
return ch, cancel
}
func main() {
ch, cancel := countTo(10)
for i := range ch {
if i > 5 {
break
}
fmt.Println(i)
}
cancel()
}
countTo
함수는 두 개의 channel
들을 만든다. 하나는 done
신호를 호출하고자하는 값을 반환하고 하는 값을 반환하는 channel
을 반환한다. done
channel을 직접 반환하기보다는
done
channel을 닫는
closure(함수)를 반환하는 방법을 사용했다.
closure`를 이용한 방법은 추가적인 clean-up 로직을 추가할 수 있다는 장점을 가지고 있다.
언제 Buffered channel
을 사용하고 언제 Unbuffered channel
을 사용하는 가??
Buffered channel
은 개발자가 얼마나 많은 goroutine
들을 만들었고, 몇 개의 goroutine
들을 제한하고 싶은 지를 알 때, 또는 queue에 있는 작업들의 양을 제한하고 싶을 때 사용하는 것이 좋다.
Buffered channel
는 개발자가 일련의 goroutine
들로 부터 데이터를 얻기를 원하거나, concurrency usage를 제한하기를 원할 때 효과가 좋다. Buffered channel
는 또한 시스템이 queueing하고 있는 작업의 양을 관리하는데 도움을 주고, 개발자의 프로그램이 너무 어려워지고 복잡해지는 것을 막아준다.
다음의 예제를 보자. 10개의 goroutine
들을 동작시키고, channel
에서 10개의 결과들을 처리하고 있다. 즉, 각 goroutine
들은 Buffered channel
에 결과를 써주고 있는 것이다.
package main
import "fmt"
func process(data int) int {
return data+1
}
func processChannel(ch chan int) []int{
const conc = 10
results := make(chan int , conc)
for i := 0; i < conc; i++ {
go func() {
v := <-ch
results <- process(v)
}()
}
var out []int
for i := 0; i < conc; i++ {
out = append(out, <-results)
}
return out
}
func main(){
pass := make(chan int, 20)
for i := 0; i < 10; i++ {
pass <- i
}
ret := processChannel(pass)
fmt.Println(ret)
}
우리는 정확히 몇개의 goroutine
들이 시작되었는 지 알고, 각 goroutine
이 각자의 작업을 완료하고 나서 바로 종료되기를 원한다. 이것은 우리가 각 goroutine
들을 위한 하나의 buffered channel
를 만들 수 있다는 것을 의미하고, 각 goroutine
들은 데이터를 blocking
없이 해당 채널에 쓸 수 있다는 것을 의미한다. 또한, buffered channel
에 대한 loop를 돌며, 각 goroutine들이 결과를 반환하는 것을 기다리는데 사용할 수 있다. 모든 결과들이 쓰여지면 그제서야 out
slice를 반환하는데, 이러한 패턴으로 우리는 여러 개의 goroutine들이 누수없이 사용된다는 것을 확인할 수 있다.
buffered channel
을 사용하는 또 다른 technique은 Backpressure
이다.
backpressure
의 정의는 다음과 같다.
소프트웨어 개발에서 원하는 데이터 flow의 저항성이다. 대표적으로 computational speed가 있다. 컴퓨터팅 속도가 너무 빨라진 것에 비해 메모리 양이 적으면, 속도는 빨라졌지만 오히려 처리하지 못하는 데이터가 많아지게 된다. 가령, 컨베이어 벨트에서 물건들을 포장한다고 할 때 컨베이어 벨트의 속도가 더 빨라진다해도, 물건을 포장하는 사람들의 속도가 이전과 같으면 결국에 처리하는 양인 이전과 같다. 그래서 처리하지 못한 데이터를 버퍼링하거나 드랍해야한다.
https://medium.com/@jayphelps/backpressure-explained-the-flow-of-data-through-software-2350b3e77ce7
정리하자면, 컴퓨터 시스템의 성능이 전반적으로 좋아진 것에 맞춰서 양을 계속 늘릴 것이 아니라, 충분히 처리할 양을 제한하는 것이 오히려 더 성능이 좋다는 것이다. 우리는 buffered channel
을 사용하고 select
문으로 동시에 들어오는 요청들의 수를 제한하는데 사용할 수 있다.
type PressureGauge struct {
ch chan struct{}
}
func New(limit int) *PressureGauge {
ch := make(chan struct{}, limit)
for i := 0; i < limit; i++ {
ch <- struct{}{}
}
return &PressureGauge{
ch: ch,
}
}
func (pg *PressureGauge) Process(f func()) error {
select {
case <-pg.ch:
f()
pg.ch <- struct{}{}
return nil
default:
return errors.New("no more capacity")
}
}
PressureGauge
구조체는 다량의 token(struct{}{})
을 포함한 buffered channel
을 가진다. Process
함수를 실행하면 channel
로 부터 token
을 읽는다. 만약 token
을 읽어오지 못하면 default
케이스로 넘어가고 에러가 반환된다. 즉, 정리하자면 Process
함수를 실행할 때 pg.ch
에 token(struct{}{})
이 있다면 입력받은 함수를 실행하고, token(struct{}{})
이 없다면 default
로 가서 에러를 반환한다는 것이다.
왜 이렇게 해야하는가? 그건 backpressure
때문이다. 한 번에 너무 많은 요청이 들어오면 이것을 처리해야 할 시스템에서 문제가 생길 수 있다. 그래서 PressureGauge
는 limit
을 두어 이를 관리하는 것이다. 이 때 Buffered channel
은 하나의 통신 도구일 뿐이다. 어떠한 의미있는 데이터를 전달하는 것이 아니라, 특정 함수를 실행해도 된다.
안된다.
라는 신호를 줄 뿐이다.
다음의 예제를 보자, 다음의 예제는 http
서버를 구동하고 request
라는 url
에 너무 많은 요청이 들어오면 이를 제한한다.
package main
import (
"errors"
"net/http"
"time"
)
type PressureGauge struct {
ch chan struct{}
}
func New(limit int) *PressureGauge {
ch := make(chan struct{}, limit)
for i := 0; i < limit; i++ {
ch <- struct{}{}
}
return &PressureGauge{
ch: ch,
}
}
func (pg *PressureGauge) Process(f func()) error {
select {
case <-pg.ch:
f()
pg.ch <- struct{}{}
return nil
default:
return errors.New("no more capacity")
}
}
func doThingThatShouldBeLimited() string {
time.Sleep(2 * time.Second)
return "done"
}
func main() {
pg := New(10)
http.HandleFunc("/request", func(w http.ResponseWriter, r *http.Request) {
err := pg.Process(func() {
w.Write([]byte(doThingThatShouldBeLimited()))
})
if err != nil {
w.WriteHeader(http.StatusTooManyRequests)
w.Write([]byte("Too many requests"))
}
})
http.ListenAndServe(":8080", nil)
}
다음의 코드를 실행하고 추가적으로 다음의 코드를 실행하도록 하자.
package main
import (
"fmt"
"net/http"
"sync"
)
var wait sync.WaitGroup
func request(){
resp, err := http.Get("http://localhost:8080/request")
if err != nil {
fmt.Println(err)
}
fmt.Println(resp)
wait.Done()
}
func main(){
wait.Add(20)
for i := 0; i < 20; i++ {
go request()
}
wait.Wait()
}
request.go
는 실행하자마자 20개의 GET요청을 /request
url로 한번에 보낸다. 결과는 다음과 같다.
&{429 Too Many Requests 429 HTTP/1.1 1 1 map[Content-Length:[17] Content-Type:[text/plain; charset=utf-8] Date:[Sun, 09 Oct 2022 08:11:55 GMT]] 0xc0002981c0 17 [] false false map[] 0xc00031a000 <nil>}
...반복x9
&{200 OK 200 HTTP/1.1 1 1 map[Content-Length:[4] Content-Type:[text/plain; charset=utf-8] Date:[Sun, 09 Oct 2022 08:11:57 GMT]] 0xc000398380 4 [] false false map[] 0xc00031a100 <nil>}
...반복x9
즉, 맨 처음에는 Too Many Requests
가 나오게 된다. 즉, 요청이 한꺼번에 너무 많이 들어오게 되어 이를 처리할 때 pg.ch
에 남은 token
이 없기 때문이다. 그리고, 함수들이 차례대로 처리됨에 따라 10개의 Status:200
이 오게 된다. 이는 pg.ch
에 token
있기 때문에 처리에 성공한 것이다.
이렇게 Buffered channel
을 이용하여 Backpressure
문제를 해결할 수 있다.
closed
상태인 channel
에 대한 select
문의 reading
은 항상 성공이다. 즉, zero value
가 오게되고 이를 처리하는 로직(validation, dropping)에 수많은 자원을 사용할 수 밖에 없다.
그래서 nil
channel
을 두는 것이다. nil
channel
에 데이터를 읽거나, 데이터를 쓰는 작업은 해당 코드를 영원히 hang하게 한다. 이러한 점을 이용하여 버그가 발생할 때, nil
channel
을 이용하여 select
문의 case
를 막을 수 있다. channel
이 닫힌 것을 감지하면 channel
의 값에 nil
을 넣는 것으면 해당 channel
과 관련된 모든 case
들은 닫히게 되어 더 이상 동작하지 않는다. 왜냐면 select
문에서 nil
channel
로 부터 데이터를 읽어오는 것은 절대 값을 반환하지 않기 때문이다.
// in and in2 are channels, done is a done channel.
for {
select {
case v, ok := <-in:
if !ok {
in = nil // the case will never succeed again!
continue
}
// process the v that was read from in
case v, ok := <-in2:
if !ok {
in2 = nil // the case will never succeed again!
continue
}
// process the v that was read from in2
case <-done:
return
}
}
인터랙티브한 프로그램은 반드시 일정 시간 내에 응답을 돌려주어야 한다. go의 동시성 프로그램에서 우리가 할 수 있는 것은 요청에 얼마나 많은 시간이 소요되었나를 관리하는 것이다. 다른 언어의 경우는 promise
나 future
의 최상위에 추가적인 feature를 넣어준다. 그러나 go의 타임아웃은 기존 부분에 추가적으로 복잡한 로직을 추가해주어야 한다.
func timeLimit() (int, error) {
var result int
var err error
done := make(chan struct{})
go func() {
result, err = doSomeWork()
close(done)
}()
select {
case <-done:
return result, err
case <-time.After(2 * time.Second):
return 0, errors.New("work timed out")
}
}
위의 코드는 두가지 case
문으로 이루어져 있다. 첫번째는 done
channel 패턴으로 우리가 이전에 다루었던 이야기이다. go의 closure
를 이용하여 result, err
를 가져와 처리하고 결과를 select
의 case
로 반환하는 방법이다.
두 번째는 channel
이 After
함수의 반환된 값을 case
로 받아내는 방법이다. After
함수는 특정 시간이 지난 후에 case
에 반환값을 써준다. 이를 통해 timeout
을 구현할 수 있다.
단일 goroutine
의 종료를 다루기 위해서 done
채널 패턴을 사용할 수 있는 것을 확인하였다. 그러나, 여러 개의 goroutine
들의 실행이 종료됨을 인지하여 프로그램을 구동하려면 어떻게 해야할까??
이럴 때 사용하는 것이 바로 WaitGroups
이다.
func main() {
var wg sync.WaitGroup
wg.Add(3)
go func() {
defer wg.Done()
doThing1()
}()
go func() {
defer wg.Done()
doThing2()
}()
go func() {
defer wg.Done()
doThing3()
}()
wg.Wait()
}
sync.WaitGroup
는 초기화할 필요없이 그냥 선언만해도된다. 내부적으로 counter
를 가지고 있고 이를 통해 goroutine
이 몇 개 실행되고, 종료되었는 지를 확인할 수 있다. sync.WaitGroup
는 3가지 메서드들로 이루어져 있는데 이들 모두 내부의 counter
에 의존한다. 이 3가지 메서들은 Add
, Done
, Wait
이다.
Add
는 기다릴 goroutine
들의 수를 넣어주는 것이다. 즉, 내부 counter
에 +
연산을 해주는 것이다. Done
은 counter
를 감소시키는 것으로, 기다릴 goroutine
이 하나 줄어들었다는 것을 알릴 때 쓰므로 goroutine
이 종료되면 쓴다. Wait
은 내부 counter
가 0이 될 때까지 멈추어 있겠다는 것이다. 즉, 기다릴 goroutine
이 0개가 될 때까지 멈추어 있겠다는 것이다.
조심해야할 것은 Done
이 불리지 않는다면 영원히 block
상태에 빠져 waiting하고 있다는 것이다. 만약, goroutine
이 panic
에 빠진다면?? Done
를 수행하지않고 종료될 것이다. 이를 해결하기위해서 defer
로 반드시 Done
을 수행해야한다.
sync.WaitGroup
을 명시적으로 pass하지 않는 이유가 있는데, 첫번째는 언제 어디서든 sync.WaitGroup
을 쓸 때는 같은 인스턴스를 공유하고 있어야 하기 때문이다. 만약 sync.WaitGroup
을 포인터가 아닌 값으로 goroutine
함수에 전달하여 사용한다면 함수는 copy
를 할 뿐이고 같은 counter
를 쓰고 있지않아 문제가 발생할 것이다. 그래서 closure
를 사용하여 sync.WaitGroup
을 잡아두면 같은 인스턴스를 사용할 수 있어 문제가 없어진다.
두번째 이유는 설계(design)
때문이다. 동시성은 개발자의 API 밖에서도 유지되어야 한다. 이전부터 계속해서 보았던 것처럼 비지니스 로직을 담고 있는 goroutine
closure
를 만들어서 사용했다. closure
는 동시성의 이슈를 다루고 알고리즘을 제공하는 함수를 관리한다.
여러 개의 goroutine
들이 같은 channel
에 대해서 값을 쓰고 있다면, channel
이 오직 한 번만 close
되어야 한다는 것을 보장해야한다. 이때 sync.WaitGroup
을 쓰는 것이 좋다.
다음의 예제는, 하나의 channel
에 동시적으로 값을 처리하는 함수에서 어떻게 작동하는 지를 보여준다. 결과들을 slice
로 모아주고 slice
를 반환한다.
func processAndGather(in <-chan int, processor func(int) int, num int) []int {
out := make(chan int, num)
var wg sync.WaitGroup
wg.Add(num)
for i := 0; i < num; i++ {
go func() {
defer wg.Done()
for v := range in {
out <- processor(v)
}
}()
}
go func() {
wg.Wait()
close(out)
}()
var result []int
for v := range out {
result = append(result, v)
}
return result
}
우리의 예제에서는 하나의 monitoring goroutine
을 실행시켰다. monitoring goroutine
는 처리중인 모든 goroutine
들이 종료될까지 기다린다. 처리중인 모든 goroutine
들이 종료되면 monitoring goroutine
는 close
를 호출하여 output channel을 닫는다. output channel이 닫히면 for-range
문에서 out
이 닫히게 되고 버퍼는 비게된다. 따라서 for-range
문이 종료된다.
WaitGroups
이 아무리 편하다고해도, goroutine
들을 조율할 첫번째 방법이 될 순 없다.
(추후작성)