Learning Go 정리 10일차 - Goroutine

0

Learning Go

목록 보기
11/12

Concurrency in go

동시성(concurrency)는 computer science 용어로 single process를 독립적인 컴포넌트로 나누고, 어떻게 이 컴포넌트가 안전하게 데이터를 공유하는 지에 대해서 구체화한 것을 의미한다.

대부분의 프로그래밍 언어들은 os level threads를 사용하는 라이브러리를 통해 concurrency를 제공하는데, os level threads는 보통 locks을 얻으려는 시도를 함으로서 데이터를 공유한다. 그러나 go는 다르다. go의 주된 동시성 모델은 CSP(Communicating Sequential Processes)이다. CSP 패턴은 기존의 동시성 방법처럼 강력하지만, 더 이해하기 더 쉽다는 장점을 가진다.

한 가지 명심해야할 것은, 모든 코드에 동시성 코드를 넣는다고 성능이 더 좋아지는 것은 아니다. 동시성 코드는 성능을 오히려 더 안좋게 만들 수 있고, 이해하기 어렵게 만들 수 있다. 핵심은 동시성(concurrency)병렬성(parallelism)은 다르다는 것이다. 동시성(concurrency)은 해결하려고 하고있는 문제를 더욱 좋게 구조화하기 위해 사용되는 툴이다. 동시성(concurrency)이든 아니든 코드가 병렬적(parallelism)으로 동작하려면 이는 알고리즘이 이를 허용하는 하드웨어에 달려있다. Amdahl’s Law는 얼마나 병렬 프로세싱(parallel processing)이 performance를 향상시키는 지 알려주는 공식이다.

Goroutines

process는 program의 인스턴스로 computer's operating system에 의해 동작한다. OS는 memory같은 일부 자원들을 process에 결부시키는데 다른 process들이 현재 process에 결부된 자원들에 접근하지 못하게 만든다. 또한 process는 하나 또는 그 이상의 thread들로 이루어져 있다. thread는 실행 단위로 OS에 의해 동작할 시간이 주어진다. process안의 thread들은 자원들을 서로 공유하기도 한다. CPU는 core수에 따라 동시간에 하나, 그 이상의 thread들로부터 명령어들을 수행할 수 있다. OS의 일 중 하나는 CPU에 할당된 thread들을 스케줄링하고 모든 process들, 또는 process안의 thread들이 실행될 기회를 주는 것이다.

goroutines는 lightweight processes로 GO runtime에 의해 관리된다. go 프로그램이 실행되면 go runtime은 수많은 thread들을 만들고, 개발자가 만든 프로그램을 실행시키기 위한 single goroutine을 launche시킨다. 개발자에 의해 만들어진 모든 goroutine들은 GO runtime scheduler에 의해 앞서 만들어진 thread들에 자동으로 할당되는 것이다. 마치 OS가 CPU cores에 따라 thread들을 스케줄링하듯이 말이다. 이는 추가적인(불필요헌) 작업 같아보인다. 기저에 있는 OS가 이미 thread들과 process들을 스케줄링하는 것을 포함하고 있기 때문이다. 그러나 이는 몇몇 이점을 갖는데 다음과 같다.

  1. goroutine creation은 thread creation보다 빠르다. 이는 OS-level 자원을 만들지 않기 때문이다. 즉, CPU, memory를 OS가 직접 할당하지 않는다는 것이다.
  2. goroutine 초기 스택 사이즈는 thread stack size보다 훨씬 작다. 그리고 goroutine이 구동되면 필요에 따라 스택 사이즈가 늘어난다. 이는 goroutine이 애무 효율적으로 메모리를 사용한다는 것을 알 수 있다.
  3. goroutine들 끼리의 switching은 thread들 간의 switching보다 훨씬 더 빠르다. 이는 goroutine간의 switching은 process안에서 전적으로 이루어지고(go runtime), 상대적으로 느린 OS call은 피하기 때문이다.
  4. go runtime scheduler는 scheduler의 의견(기능)을 최적화할 수 있다. 왜냐면 scheduler는 Go process의 부분이기 때문이다. scheduler는 network poller와 상호작용하는데, goroutine이 I/O작용 때문에 blocking되어 unscheduled될 수 있을 때를 탐지한다. scheduler는 또한 garbage collector와 통합되는데, 개발자의 go process에 할당된 OS thread에서 작업(work)이 적절하게 균형을 유지할 수 있도록 한다.

정리하자면, goroutine은 thread보다 create 속도도 빠르고 메모리 효율도 좋으며 context switching이 발생할 때 더욱 오버헤드가 적으며, scheduler 측면에서도 부하가 적다는 것을 알 수 있다.

goroutine은 일반 함수 invocation(호출)에 go 키워드 하나만 붙여도 만들어진다. 일반 함수처럼 파라미터도 넣어줄 수 있는데 다만, 리턴값은 무시된다.

Channcels

goroutine들은 channels을 사용하여 통신한다. slices, maps, channels 들은 build-in type으로 make function을 통해 만들어진다.

ch := make(chan int)

maps와 같이 channels도 reference type이다(즉, default가 pointer), 따라서 channel을 다른 함수에 넣어버리면 이는 channel에 대한 pointer가 넘어가는 것이다. 또한 maps, slice와 같이 channel의 zero-value는 nil이다.

Reading, Writing and Buffering

channel과 interact하고 싶다면 <- 연산자를 사용하면 된다. channel을 통해 데이터를 받으려면 채널 앞(왼쪽)에 <-를 넣고, channel에 데이터를 써주려면 channel 뒤(오른쪽)에 <-를 넣어주면 된다.

a := <- ch // read a value from ch and assign it to a
ch <- b // write the value in b to ch

하나의 채널에 하나의 값이 쓰여지고 여러 goroutine이 이 채널에서 데이터를 읽어가도 데이터를 받는 것은 하나의 goroutine이라는 것을 명심하자.

같은 channel에 하나의 goroutine이 read, write을 다하는 경우는 보기 드물다. channel을 변수나, 함수의 파라미터, 필드에 적용할 때는 chan 키워드 앞에 <-연산자를 붙이도록 하여 이것이 오직 channel로부터 값을 read하는 것이라는 것을 명시해주도록 하자. ex ch <- chan int. 또한 channel이 write로만 쓰인다면 channel 뒤에 <-연산자를 두어서 이 channel을 쓰는 goroutine은 해당 channel을 write용으로만 사용한다는 것을 명시해주도록 하자. 이렇게 함으로서 go compiler가 해당 channel이 함수에서 read 용인지, write 용인지 확신할 수 있기 때문이다.

기본적으로 default channels는 unbuffered이다. 모든 unbuffured channel에 어떤 goroutine이 쓰기 연산을 실행한다면 다른 goroutine들은 이 unbuffered channel에 대한 쓰기 연산을 pause하게 되는데, 이 unbuffered channel의 데이터를 가져오기 이전까지 멈추게 되는 것이다. 이와 유사하게 읽기 연산도 마찬가지인데 읽으려고 하는 goroutine들은 write연산으로 channel에 어떤 데이터를 넣어주기 이전까지 pause하게 된다. 이는 unbuffered channel을 가지고 하나의 goroutine 내에서 읽고 쓰기가 불가능하다는 것이다.

go는 또한 buffered channel을 지원하는데, 이러한 channel들은 제한된 수많은 write연산들을 blocking(pause)없이 버퍼링한다. 만약 buffer가 가득 채워지면, 해당 channel에 대한 read연산이 있기 전까지 channel에 대한 일련의 write연산을 하는 goroutine들은 멈추게된다. buffer에 아무것도 없어도 똑같이 멈추게 된다.

ch := make(chan int, 10)

len, cap 함수들도 channel에 대해서 동작하는데, 현재 channel에 몇개의 데이터가 버퍼링되어 있는 지를 알고 싶다면 len을 사용하고, 최대 용량을 알고 싶다면 cap을 사용해야한다. 참고로 unbuffered channellen, cap을 쓰면 0이 나오니 알아두도록 하자.

사실 대부분의 경우 unbuffered channel을 사용한다. 언제 unbuffered chan을 사용하고 buffered channel을 사용할 지는 추후에 알아보자.

for-range and channels

channel의 값을 for-range loop를 통해 얻을 수 있는데 다음과 같다.

for v := range ch {
    fmt.Println(v)
}

다른 for-range문법과는 다르게, 여기에는 channel안의 값을 의미하는 하나의 variable만 쓰인다. 이 loop는 channel이 closed되거나, 함수가 return, break될 때까지 반복된다.

Closing a Channel

channel에 더이상 쓰기 연산이 끝났다면 channel을 닫을 수 있는데, 이 때 close 함수를 사용하면 된다.

close(ch)

일단 channel이 닫히고 나서는 해당 channel에 대한 어떠한 write연산, close 연산은 거부되며 panic을 발생시킨다. 재밌게도 닫힌 channel에 대한 read연산은 항상 성공한다. 만약 channel을 닫았는데 해당 channel이 값들을 아직 buffering하고 있다면 read연산을 해도 값이 나온다. 만약 channel을 닫았는데 해당 channel에 값이 없다면 zero value가 반환된다.

그렇다면 channel이 닫혀서 zero value가 나온 것과 닫히지 않은 channel에서 zero value를 반환한 것을 어떻게 구분해야 할까?? 이는 map과 같은 문제고 해결 방법 역시도 같다.

v, ok := <-ch

만약 oktrue이면 channel은 열려있는 것이고, 만약 false이면 channel은 닫혀있는 것이다.

channel을 닫는 것은 오롯이 해당 channel에 대한 write연산을 하고있는 goroutine과 관련된 것이다. channel에서 데이터를 받기를 기대하고 있는 goroutine들(for-range을 사용하는)을 위해서라도 channel을 닫아주는 것이 좋다. 더 이상 사용되지 않는 channel들은 가비지 콜렉터의 대상이 되며 go runtime이 이를 잡아준다.

channels goroutine의 동시성 모델의 두 가지 특징을 가진다.
1. 개발자의 코드가 일련의 stage(순차적으로 동작)을 가지도록 생각하게 해주고,
2. Channels are one of the two things that set apart Go’s concurrency model. data 의존성을 clear하게 해준다.

이를 이해하기 위해서는 다른 언어의 동시성 모델과 비교하면 된다. 대부분의 경우 thread간의 동시성 문제를 해결하기위해서 전역 변수의 state를 이용한다. 이는 thread가 동작하는 코드를 얼핏봐서는 thread는 아무 변화가 없는데 코드의 동작이 달라지는 이상한 경험을 할 수 있다. 그러나 goroutine은 channel을 통해서 동기화를 진행하기 때문에 코드가 마치 순차적으로 동작하는 것처럼 보인다. 즉, goroutine 내에서의 로직만 보고도 상태에 따른 코드 변화를 알 수 있는 것이다. 더불어 전역 변수에 대한 의존성이 사라지게 됨으로 goroutine들은 channel만 보면된다. 즉, goroutine들의 상태를 변화시킬 수 있는 것은 전역 변수가 아니라 channel에 의존하도록 만들면 코드의 flow를 더 따라가기가 쉬울 것이다.

channel을 사용할 때 항상 조심해야 할 것 중 하나는 channel은 reference type이다 보니까, make로 생성해주지 않고 사용하면 다이나믹한 에러가 발생한다는 것이다. 만약 nilchannel에 계속해서 write, read연산을 하면 hang forever(계속 반복)되고, close연산을 하게되면 panic이 발생한다.

select

select 문법은 go concurrency를 구조화하고 제어하기 위한 방법으로, 만약 '두개의 동시성 연산이 실행되고 있다면 이들 중 어떤 연산이 먼저 올 지', 또는 '어떤 케이스에 대해 처리할 수 없다면'과 같은 문제에 대해 해결방법을 제시한다.

select 키워드는 goroutine이 여러 개의 채널들에서 하나에 write 또는 read 연산을 하도록 해준다. 이는 걸핏봐서는 switch-case와 비슷해보인다.

select {
    case v := <- ch:
        fmt.Println(v)
    case v := <- ch2:
        fmt.Println(v):
    case ch3 <- x:
        fmt.Println("wrote", x)
    case <- ch4:
        fmt.Println("got value on ch4, but ignored it")
}

select안의 각 case는 channel에 대한 read, write연산으로 만약 read 또는 write가case에 가능하다면 이는 case의 body문을 실행하도록 한다.

그렇다면 여러개의 채널에서 동시에 read 또는 write 연산이 발생한다면 어떻게 될가?? select 알고리즘은 굉장히 간단한데, 랜덤하게 case 중 하나를 뽑고 실행시킨다. case로 쓰여진 순서는 별로 중요하지 않다. 이것이 switch-case와 다른 점이다. 왜 랜덤하게 동작하도록 했냐면 이는 starvation을 해결하기 위함으로 어떤 case든 간에 더 선호하거나, 비 선호하는 경우를 없애기 위함이다.

랜덤하게 case를 선택하는 또 다른 select의 이점은 inconsistent한 순서로 lock들을 얻음으로서 deadlock 이슈를 해결하기 위함이다. 만약 두 개의 goroutine에서 두 개의 channel을 같이 사용하고 있다면 이들은 정해진 순서대로 동작해야 한다. 그렇지 않으면 deadlock이 발생하고 말것이다. 만약 go에서 deadlock이 발생하면 go는 프로그램을 종료시켜버린다.

  • deadlocking goroutines
func main(){
    ch1 := make(chan int)
    ch2 := make(chan int)
    go func() {
        v := 1
        ch1 <- v
        v2 := <- ch2
        fmt.println(v, v2)
    }()
    v := 2
    ch2 <- v
    v2 := <- ch1
    fmt.Println(v, v2)
}

다음의 코드를 실행하면 아래의 에러를 얻게된다.

fatal error: all goroutines are asleep - deadlock!

main으로 동작하고 있는 goroutine과 func으로 만든 goroutine들 끼리 충돌이 나서 deadlock에 빠진 것이다.

만약 channel을 사용하는 main goroutine부분을 select로 감싸고 동작시키면 deadlock을 피할 수 있다.

func main(){
    ch1 := make(chan int)
    ch2 := make(chan int)
    go func() {
        v := 1 
        ch1 <- v
        v2 := <- ch2
        fmt.Println(v, v2)
    }()
    v := 2
    var v2 int
    select {
        case ch2 <- v:
        case v2 = <- ch1:
    }
    fmt.Println(v, v2)
}
2 1

이라는 답이 나온다.

이러한 일이 가능한 이유는, select문이 만약 case들 중에 실행 가능한 것이 있는 지 check해주어 deadlock을 피할 수 있기 때문이다.

select문이 channel들 간의 통신을 담당하다보니 select문은 종종 for loop안에 임베딩되는 경우가 많다. 즉, select-case문은 무한 루프를 제공하지 않는다는 것이다.

for {
    select {
        case <- done:
            return
        case v := <- ch:
            fmt.Println(v)
    }
}

for-select loop 구조는 종종 언급되는 조합으로 많이 사용된다. for-select loop를 사용할 때는 exit loop를 반드시 두어야 한다. 나중에 the done channel pattern을 보도록 하자.

switch문과 같이 select문도 default절을 가질 수 있는데, 이는 읽거나 쓸 수 있는 채널들에 대한 case들이 없는 경우 동작한다. 만약 어떠한 channel에 대한 nonblocking read 또는 write를 구현하고 싶다면 selectdefault와 함께 쓰는 것이 좋다.

select {
case v := <-ch:
    fmt.Println(v)
default:
    fmt.Println("no value written to ch")
}

실행하면 ch에서 읽을 데이터가 없어도 blocking되는 것이 아니라 default절로 빠지게 되어 종료된다.

만약, for-select loop에 default절을 사용한다면 거의 항상 뭔가를 잘못하고 있는 것이다. 이는 매번 triggering되기 때문이다. 심지어 read, write를 하지않을 때도 말이다. 이는 for loop를 영원히 동작하게 만들며 엄청난 양의 CPU자원을 소모한다.

기본적인 goroutine과 channel에 대한 설명은 끝났으니 어떻게 사용되는 지 pattern을 알아보도록 하자

Concurrency Practices and Patterns

Keep Your APIs Concurrency-Free

동시성은 세부사항이 구현되므로, 좋은 API 디자인은 가능한 구현의 자세한 사항을 감춰야 한다. 이는 개발자가 어떻게 이 코드가 '불리는 지'를 변경하는 것 없이 어떻게 이 코드가 '작동하는 지'를 변경할 수 있도록 해준다.

실질적으로, 이는 개발자가 절대로 channels, mutex들을 API 타입, 함수, 메서드로 노출시키면 안된다는 것을 의미한다. 만약 channel이 노출되면 해당 API를 사용하는 유저에게 channel관리 책임을 전달하는 것이다. 즉 이는 유저가 channel가 buffered되었는 지, closed되었는 지 nil인지와 같은 concerns에 대해서 걱정(생각)을 해야한다는 것을 의미한다. 이는 또한 유저들이 생각지도 못한 순서로 channel들과 mutex에 접근하여 deadlock을 발생시킬 수 있다.

오해하지 말아야 할 것이, 이는 절대 함수의 파라미터나 구조체 필드로 channel, mutex를 사용하지 말라는 것이 아니다. 다만 이들을 절대로 외부에 노출시키지 말라는 것이다.

Goroutines, for Loops, and Varying Variables

대부분, goroutine으로 만들 클로저(the closure)는 어떠한 파라미터들이 없다. 대신에 해당 goroutine이 실행되는 환경으로부터 value들을 capture한다. 그런데, 이러한 로직이 실행되지 않는 한 가지 경우가 있다. for-loopindex, value를 capture할 때가 그렇다.

  • bug code
package main

import "fmt"

func main() {
	a := []int{2, 4, 6, 8, 10}
	ch := make(chan int, len(a))
	for _, v := range a {
		go func() {
			ch <- v * 2
		}()
	}
	for i := 0; i < len(a); i++ {
		fmt.Println(<-ch)
	}
}

우리가 원하는 정답은 다음과 같다.

4
8
12
16
20

그러나 결과는 다음과 같다.

20
20
20
20
20

ch에 20이 계속해서 쓰이냐면, 모든 goroutine들의 closure가 같은 variable을 capture하기 때문이다. for에 사용되는 index, value는 각 iteration에서 계속해서 재사용된다. v에 마지막 값 10이 할당되고, goroutine이 동작할 때 goroutine이 v를 보게되는 것이다. 이러한 문제는 for에만 해당하는 것이 아니라, 매 순간 goroutine이 어떠한 변수에 의존하는데, 이 변수 값이 변화된다면 똑같은 문제가 발생한다. 이를 해결하기 위해서는 goroutine에 해당 변수 값을 파라미터로 넘겨주어야 한다.

두 가지 방법이 있는데 첫번째는 변수를 shadow하는 방법이다.

for _, v := range a {
    v := v
    go func(){
        ch <- v * 2
    }()
}

그러나 shadowing하는 방법은 그렇게 좋은 방법이 아니다. 좀 더 코드를 깔끔하게 만들고 싶다면 goroutine의 파리미터로 값을 넘겨주면 된다.

for _, v := range a {
    go func(val int){
        ch <- val * 2
    }(v)
}

가장 좋은 방법은 goroutine함수에 인자로 변경되는 값을 넘겨주는 것이다.

Always Clean Up Your Goroutines

goroutine function을 시작시키면, 개발자는 항상 이 goroutine이 종료되도록 만들어야 한다. 변수와는 달리 go runtime은 goroutine이 더 이상 사용 중이 아니라는 것을 감지할 수 없다. 만약 goroutine이 종료되지 않으면 스케줄러는 주기적으로 아무것도 하지 않는 시간을 할당할 것이고 이는 프로그램의 성능을 다운시키게 된다. 이러한 것을 goroutine leak라고 한다.

goroutine이 종료되었다는 것을 보장하는 것을 확인하기는 어려운 일이다. 가령 다음과 같이 goroutine이 하나의 generator로 쓰이는 경우를 보자.

func countTo(max int) <-chan int {
	ch := make(chan int)
	go func() {
		for i := 0; i < max; i++ {
			ch <- i
		}
		close(ch)
	}()
	return ch
}

func main() {
	for i := range countTo(10) {
		fmt.Println(i)
	}
}

예제를 들려고 만든 코드이지, goroutine을 절대로 generator로 쓰지말도록 하자.

일반적인 경우, goroutine에서 채널로 데이터를 다 보내면 goroutine은 종료된다. 그러나 만약 다음과 같은 경우는 어떨까??

func main() {
    for i := range countTo(10) {
        if i > 5 {
            break
        }
        fmt.Println(i)
    }
}

코드는 문제없이 동작하지만, for이 끝나버리면서 goroutine은 모든 값을 채널로 전송하지 못하고 채널에 넣은 다음 block상태(unbuffered 이기 때문에)에 빠지게 된다. 이러면 goroutine leak이 발생한 것이다.

The Done Channel Pattern

done channel pattern이는 goroutine에게 이제는 processing을 멈출 시간이라는 신호를 던져준다. 신호를 던져주기 위해 channel을 사용한다. 다음의 예제를 확인해보자.

func searchData(s string, searchers []func(string) []string) []string {
    done := make(chan struct{})
    result := make(chan []string)
    for _, searcher := range searchers {
        go func(searcher func(string) []string) {
            select {
            case result <- searcher(s):
            case <-done:
            }
        }(searcher)
    }
    r := <-result
    close(done)
    return r
}

위 함수에서 channel done을 사용하였는데 데이터 타입으로 struct{}을 사용한다. 빈 구조체를 타입으로 사용한 이유는 별로 값이 중요하지 않기 때문이다. done채널에는 어떠한 값도 사용하지 않을 것이고 단지 close()함수를 실행할 것이다.

searcher 함수가 값을 반환하거나 또는 done채널에서 값을 읽어올 때까지 goroutine에서의 select문은 계속해서 실행된다. 열려있는 channel에서 값을 읽어오는 것은 값이 들어오거나, 채널이 닫혀 zero value가 들어오지 않는한 계속해서 goroutine이 멈춰있는다.

이는 done channeldone이 닫히기 전까지 멈춘다는 것이다. 만약 우리가 result로 부터 첫번째 데이터를 얻는다면 done채널은 close되게되고 이 신호가 모든 goroutine에 퍼지게되어 goroutine은 종료된다. 이러한 방법으로 goroutine leaking을 막을 수 있다.

Using a Cancel Function to Terminate a Goroutine

다른 done 패턴으로 channel에 대한 cancellation 함수를 반환함으로서 구현할 수 있다. 다음의 예제를 확인해보자.

func countTo(max int) (<-chan int, func()) {
    ch := make(chan int)
    done := make(chan struct{})
    cancel := func() {
        close(done)
    }
    go func() {
        for i := 0; i < max; i++ {
            select {
            case <-done:
                return
            case ch<-i:

            }
        }
        close(ch)
    }()
    return ch, cancel
}

func main() {
    ch, cancel := countTo(10)
    for i := range ch {
        if i > 5 {
            break
        }
        fmt.Println(i)
    }
    cancel()
}

countTo함수는 두 개의 channel들을 만든다. 하나는 done신호를 호출하고자하는 값을 반환하고 하는 값을 반환하는 channel을 반환한다. done channel을 직접 반환하기보다는 done channel을 닫는 closure(함수)를 반환하는 방법을 사용했다. closure`를 이용한 방법은 추가적인 clean-up 로직을 추가할 수 있다는 장점을 가지고 있다.

When to Use Buffered and Unbuffered Channels

언제 Buffered channel을 사용하고 언제 Unbuffered channel을 사용하는 가??

Buffered channel은 개발자가 얼마나 많은 goroutine들을 만들었고, 몇 개의 goroutine들을 제한하고 싶은 지를 알 때, 또는 queue에 있는 작업들의 양을 제한하고 싶을 때 사용하는 것이 좋다.

Buffered channel는 개발자가 일련의 goroutine들로 부터 데이터를 얻기를 원하거나, concurrency usage를 제한하기를 원할 때 효과가 좋다. Buffered channel는 또한 시스템이 queueing하고 있는 작업의 양을 관리하는데 도움을 주고, 개발자의 프로그램이 너무 어려워지고 복잡해지는 것을 막아준다.

다음의 예제를 보자. 10개의 goroutine들을 동작시키고, channel에서 10개의 결과들을 처리하고 있다. 즉, 각 goroutine들은 Buffered channel에 결과를 써주고 있는 것이다.

package main

import "fmt"

func process(data int) int {
	return data+1
}

func processChannel(ch chan int) []int{
	const conc = 10
	results := make(chan int , conc)
	for i := 0; i < conc; i++ {
		go func() {
			v := <-ch 
			results <- process(v)
		}()
	}
	var out []int
	for i := 0; i < conc; i++ {
		out = append(out, <-results)
	}
	return out
}

func main(){
	pass := make(chan int, 20)
	for i := 0; i < 10; i++ {
		pass <- i
	}
	ret := processChannel(pass)
	fmt.Println(ret)
}

우리는 정확히 몇개의 goroutine들이 시작되었는 지 알고, 각 goroutine이 각자의 작업을 완료하고 나서 바로 종료되기를 원한다. 이것은 우리가 각 goroutine들을 위한 하나의 buffered channel를 만들 수 있다는 것을 의미하고, 각 goroutine들은 데이터를 blocking없이 해당 채널에 쓸 수 있다는 것을 의미한다. 또한, buffered channel에 대한 loop를 돌며, 각 goroutine들이 결과를 반환하는 것을 기다리는데 사용할 수 있다. 모든 결과들이 쓰여지면 그제서야 out slice를 반환하는데, 이러한 패턴으로 우리는 여러 개의 goroutine들이 누수없이 사용된다는 것을 확인할 수 있다.

Backpressure

buffered channel을 사용하는 또 다른 technique은 Backpressure이다.

backpressure의 정의는 다음과 같다.

소프트웨어 개발에서 원하는 데이터 flow의 저항성이다. 대표적으로 computational speed가 있다. 컴퓨터팅 속도가 너무 빨라진 것에 비해 메모리 양이 적으면, 속도는 빨라졌지만 오히려 처리하지 못하는 데이터가 많아지게 된다. 가령, 컨베이어 벨트에서 물건들을 포장한다고 할 때 컨베이어 벨트의 속도가 더 빨라진다해도, 물건을 포장하는 사람들의 속도가 이전과 같으면 결국에 처리하는 양인 이전과 같다. 그래서 처리하지 못한 데이터를 버퍼링하거나 드랍해야한다.
https://medium.com/@jayphelps/backpressure-explained-the-flow-of-data-through-software-2350b3e77ce7

정리하자면, 컴퓨터 시스템의 성능이 전반적으로 좋아진 것에 맞춰서 양을 계속 늘릴 것이 아니라, 충분히 처리할 양을 제한하는 것이 오히려 더 성능이 좋다는 것이다. 우리는 buffered channel을 사용하고 select 문으로 동시에 들어오는 요청들의 수를 제한하는데 사용할 수 있다.

type PressureGauge struct {
    ch chan struct{}
}

func New(limit int) *PressureGauge {
    ch := make(chan struct{}, limit)
    for i := 0; i < limit; i++ {
        ch <- struct{}{}
    }
    return &PressureGauge{
        ch: ch,
    }
}

func (pg *PressureGauge) Process(f func()) error {
    select {
    case <-pg.ch:
        f()
        pg.ch <- struct{}{}
        return nil
    default:
        return errors.New("no more capacity")
    }
}

PressureGauge 구조체는 다량의 token(struct{}{})을 포함한 buffered channel을 가진다. Process함수를 실행하면 channel로 부터 token을 읽는다. 만약 token을 읽어오지 못하면 default케이스로 넘어가고 에러가 반환된다. 즉, 정리하자면 Process함수를 실행할 때 pg.chtoken(struct{}{})이 있다면 입력받은 함수를 실행하고, token(struct{}{})이 없다면 default로 가서 에러를 반환한다는 것이다.

왜 이렇게 해야하는가? 그건 backpressure 때문이다. 한 번에 너무 많은 요청이 들어오면 이것을 처리해야 할 시스템에서 문제가 생길 수 있다. 그래서 PressureGaugelimit을 두어 이를 관리하는 것이다. 이 때 Buffered channel은 하나의 통신 도구일 뿐이다. 어떠한 의미있는 데이터를 전달하는 것이 아니라, 특정 함수를 실행해도 된다. 안된다.라는 신호를 줄 뿐이다.

다음의 예제를 보자, 다음의 예제는 http 서버를 구동하고 request라는 url에 너무 많은 요청이 들어오면 이를 제한한다.

  • main.go
package main

import (
	"errors"
	"net/http"
	"time"
)

type PressureGauge struct {
    ch chan struct{}
}

func New(limit int) *PressureGauge {
    ch := make(chan struct{}, limit)
    for i := 0; i < limit; i++ {
        ch <- struct{}{}
    }
    return &PressureGauge{
        ch: ch,
    }
}

func (pg *PressureGauge) Process(f func()) error {
    select {
    case <-pg.ch:
        f()
        pg.ch <- struct{}{}
        return nil
    default:
        return errors.New("no more capacity")
    }
}

func doThingThatShouldBeLimited() string {
    time.Sleep(2 * time.Second)
    return "done"
}

func main() {
    pg := New(10)
    http.HandleFunc("/request", func(w http.ResponseWriter, r *http.Request) {
        err := pg.Process(func() {
            w.Write([]byte(doThingThatShouldBeLimited()))
        })
        if err != nil {
            w.WriteHeader(http.StatusTooManyRequests)
            w.Write([]byte("Too many requests"))
        }
    })
    http.ListenAndServe(":8080", nil)
}

다음의 코드를 실행하고 추가적으로 다음의 코드를 실행하도록 하자.

  • request.go
package main

import (
	"fmt"
	"net/http"
	"sync"
)


var wait sync.WaitGroup
func request(){
	resp, err := http.Get("http://localhost:8080/request")
	if err != nil {
		fmt.Println(err)
	}
	fmt.Println(resp)
	wait.Done()
}

func main(){
	wait.Add(20)
	for i := 0; i < 20; i++ {
		go request()
	}
	wait.Wait()
}

request.go는 실행하자마자 20개의 GET요청을 /request url로 한번에 보낸다. 결과는 다음과 같다.

&{429 Too Many Requests 429 HTTP/1.1 1 1 map[Content-Length:[17] Content-Type:[text/plain; charset=utf-8] Date:[Sun, 09 Oct 2022 08:11:55 GMT]] 0xc0002981c0 17 [] false false map[] 0xc00031a000 <nil>}
...반복x9
&{200 OK 200 HTTP/1.1 1 1 map[Content-Length:[4] Content-Type:[text/plain; charset=utf-8] Date:[Sun, 09 Oct 2022 08:11:57 GMT]] 0xc000398380 4 [] false false map[] 0xc00031a100 <nil>}
...반복x9

즉, 맨 처음에는 Too Many Requests가 나오게 된다. 즉, 요청이 한꺼번에 너무 많이 들어오게 되어 이를 처리할 때 pg.ch에 남은 token이 없기 때문이다. 그리고, 함수들이 차례대로 처리됨에 따라 10개의 Status:200이 오게 된다. 이는 pg.chtoken있기 때문에 처리에 성공한 것이다.

이렇게 Buffered channel을 이용하여 Backpressure 문제를 해결할 수 있다.

Turning Off a case in a select

closed상태인 channel에 대한 select문의 reading은 항상 성공이다. 즉, zero value가 오게되고 이를 처리하는 로직(validation, dropping)에 수많은 자원을 사용할 수 밖에 없다.

그래서 nil channel을 두는 것이다. nil channel에 데이터를 읽거나, 데이터를 쓰는 작업은 해당 코드를 영원히 hang하게 한다. 이러한 점을 이용하여 버그가 발생할 때, nil channel을 이용하여 select문의 case를 막을 수 있다. channel이 닫힌 것을 감지하면 channel의 값에 nil을 넣는 것으면 해당 channel과 관련된 모든 case들은 닫히게 되어 더 이상 동작하지 않는다. 왜냐면 select문에서 nil channel로 부터 데이터를 읽어오는 것은 절대 값을 반환하지 않기 때문이다.

// in and in2 are channels, done is a done channel.
for {
    select {
    case v, ok := <-in:
        if !ok {
            in = nil // the case will never succeed again!
            continue
        }
        // process the v that was read from in
    case v, ok := <-in2:
        if !ok {
            in2 = nil // the case will never succeed again!
            continue
        }
        // process the v that was read from in2
    case <-done:
        return
    }
}

How to Time Out Code

인터랙티브한 프로그램은 반드시 일정 시간 내에 응답을 돌려주어야 한다. go의 동시성 프로그램에서 우리가 할 수 있는 것은 요청에 얼마나 많은 시간이 소요되었나를 관리하는 것이다. 다른 언어의 경우는 promisefuture의 최상위에 추가적인 feature를 넣어준다. 그러나 go의 타임아웃은 기존 부분에 추가적으로 복잡한 로직을 추가해주어야 한다.

func timeLimit() (int, error) {
    var result int
    var err error
    done := make(chan struct{})
    go func() {
        result, err = doSomeWork()
        close(done)
    }()
    select {
    case <-done:
        return result, err
    case <-time.After(2 * time.Second):
        return 0, errors.New("work timed out")
    }
}

위의 코드는 두가지 case문으로 이루어져 있다. 첫번째는 done channel 패턴으로 우리가 이전에 다루었던 이야기이다. go의 closure를 이용하여 result, err를 가져와 처리하고 결과를 selectcase로 반환하는 방법이다.

두 번째는 channelAfter함수의 반환된 값을 case로 받아내는 방법이다. After함수는 특정 시간이 지난 후에 case에 반환값을 써준다. 이를 통해 timeout을 구현할 수 있다.

Using WaitGroups

단일 goroutine의 종료를 다루기 위해서 done채널 패턴을 사용할 수 있는 것을 확인하였다. 그러나, 여러 개의 goroutine들의 실행이 종료됨을 인지하여 프로그램을 구동하려면 어떻게 해야할까??

이럴 때 사용하는 것이 바로 WaitGroups이다.

func main() {
    var wg sync.WaitGroup
    wg.Add(3)
    go func() {
        defer wg.Done()
        doThing1()
    }()
    go func() {
        defer wg.Done()
        doThing2()
    }()
    go func() {
        defer wg.Done()
        doThing3()
    }()
    wg.Wait()
}

sync.WaitGroup는 초기화할 필요없이 그냥 선언만해도된다. 내부적으로 counter를 가지고 있고 이를 통해 goroutine이 몇 개 실행되고, 종료되었는 지를 확인할 수 있다. sync.WaitGroup는 3가지 메서드들로 이루어져 있는데 이들 모두 내부의 counter에 의존한다. 이 3가지 메서들은 Add, Done, Wait이다.

Add는 기다릴 goroutine들의 수를 넣어주는 것이다. 즉, 내부 counter+ 연산을 해주는 것이다. Donecounter를 감소시키는 것으로, 기다릴 goroutine이 하나 줄어들었다는 것을 알릴 때 쓰므로 goroutine이 종료되면 쓴다. Wait은 내부 counter가 0이 될 때까지 멈추어 있겠다는 것이다. 즉, 기다릴 goroutine이 0개가 될 때까지 멈추어 있겠다는 것이다.

조심해야할 것은 Done이 불리지 않는다면 영원히 block상태에 빠져 waiting하고 있다는 것이다. 만약, goroutinepanic에 빠진다면?? Done를 수행하지않고 종료될 것이다. 이를 해결하기위해서 defer로 반드시 Done을 수행해야한다.

sync.WaitGroup을 명시적으로 pass하지 않는 이유가 있는데, 첫번째는 언제 어디서든 sync.WaitGroup을 쓸 때는 같은 인스턴스를 공유하고 있어야 하기 때문이다. 만약 sync.WaitGroup을 포인터가 아닌 값으로 goroutine 함수에 전달하여 사용한다면 함수는 copy를 할 뿐이고 같은 counter를 쓰고 있지않아 문제가 발생할 것이다. 그래서 closure를 사용하여 sync.WaitGroup을 잡아두면 같은 인스턴스를 사용할 수 있어 문제가 없어진다.

두번째 이유는 설계(design) 때문이다. 동시성은 개발자의 API 밖에서도 유지되어야 한다. 이전부터 계속해서 보았던 것처럼 비지니스 로직을 담고 있는 goroutine closure를 만들어서 사용했다. closure는 동시성의 이슈를 다루고 알고리즘을 제공하는 함수를 관리한다.

여러 개의 goroutine들이 같은 channel에 대해서 값을 쓰고 있다면, channel이 오직 한 번만 close되어야 한다는 것을 보장해야한다. 이때 sync.WaitGroup을 쓰는 것이 좋다.

다음의 예제는, 하나의 channel에 동시적으로 값을 처리하는 함수에서 어떻게 작동하는 지를 보여준다. 결과들을 slice로 모아주고 slice를 반환한다.

func processAndGather(in <-chan int, processor func(int) int, num int) []int {
    out := make(chan int, num)
    var wg sync.WaitGroup
    wg.Add(num)
    for i := 0; i < num; i++ {
        go func() {
            defer wg.Done()
            for v := range in {
                out <- processor(v)
            }
        }()
    }
    go func() {
        wg.Wait()
        close(out)
    }()
    var result []int
    for v := range out {
        result = append(result, v)
    }
    return result
}

우리의 예제에서는 하나의 monitoring goroutine을 실행시켰다. monitoring goroutine는 처리중인 모든 goroutine들이 종료될까지 기다린다. 처리중인 모든 goroutine들이 종료되면 monitoring goroutineclose를 호출하여 output channel을 닫는다. output channel이 닫히면 for-range문에서 out이 닫히게 되고 버퍼는 비게된다. 따라서 for-range문이 종료된다.

WaitGroups이 아무리 편하다고해도, goroutine들을 조율할 첫번째 방법이 될 순 없다.

Running Code Exactly Once

(추후작성)

0개의 댓글