지금까지 락의 개념을 학습하였다. 불행히도 "락"만으로는 병행 프로그램을 제대로 작성할 수 없다.
쓰레드가 계속 진행하기 전에 어떤 조건이 참인지를 검사하는 경우가 많다.
volatile int done = 0;
void *child(void *arg) {
printf("child\n");
done = 1;
return NULL;
}
int main(int argc , char *argv[]) {
printf("parent: begin\n");
pthread_t c;
Pthread_create(&c, NULL, child, NULL); // 자식 생성
while (done == 0)
; // 회전
printf("parent: end\n");
return 0;
}
핵심 질문: 조건을 기다리는 법
멀티 쓰레드 프로그램에서는 특정 조건이 참이 되기를 기다리는 것이 유용할 때가 많이 있다. 그렇다면 쓰레드는 어떻게 조건을 기다려야 할까?
컨디션 변수(condition variable): 어떤 실행의 상태(조건)가 원하는 것과 다를 때 조건이 참이 되기를 기다리며 쓰레드가 대대기할 수 있는 큐.
일종의 큐 자료구조이다.
다른 쓰레드가 상태를 변경 -> 대기중인 쓰레드에 시그널을 보내 깨움
사용 방법
// 컨디션 변수 초기화
pthread_cond_t c = PTHREAD_COND_INITIALIZER;
// wait/signal 연산
pthread_cond_wait(pthread_cond_t *c , pthread_mutex_t *m);
pthread_cond_signal(pthread_cond_t *c);
wait()
: 락을 해제하고 호출한 쓰레드를 재우는 함수wait()
에서 리턴하기 전에 락을 재획득해야 한다!signal()
: 조건이 참이 되길 기다리며 잠자고 있던 쓰레드를 깨우는 함수경쟁조건 방지 예제
int done = 0;
pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t c = PTHREAD_COND_INITIALIZER;
void thr_exit() {
Pthread_mutex_lock(&m);
done = 1;
Pthread_cond_signal(&c);
Pthread_mutex_unlock(&m);
}
void *child(void *arg) {
printf("child\n");
thr_exit();
return NULL;
}
void thr_join() {
Pthread_mutex_lock(&m);
while (done == 0) // if 문을 사용하지 않는다.
Pthread_cond_wait(&c , &m);
Pthread_mutex_unlock(&m);
}
int main(int argc , char *argv[]) {
printf("parent: begin\n");
pthread_t p;
Pthread_create(&p , NULL , child , NULL);
thr_join();
printf("parent: end\n");
return 0;
}
thr_join()
을 호출한 후 자식 쓰레드가 끝나길 기다리는 경우thr_join()
을 실행하여 sleep 상태가 된다.wait()
을 호출하고 락을 반납함)thr_exit()
을 실행하여 done
을 1로 바꾸고 시그널을 보내 잠든 (부모) 쓰레드를 깨운다.parent:end
를 출력한다.done
이 바로 1로 설정되고, 쓰레드를 깨우기 위한 시그널은 보낸다.thr_join()
을 실행하고, done
이 1이므로 대기 없이 리턴한다.done
이 없다면 부모 쓰레드가 영원히 잠들게 된다. 부모를 깨워줄 자식 쓰레드도 없고, 자식 쓰레드가 이미 실행되었던 상태인지도 모르기 때문이다.참고: 시그널을 보내기 전에 항상 락을 획득하자
void thr_exit() {
done = 1;
Pthread_cond_signal(&c);
}
void thr_join() {
if (done == 0)
Pthread_cond_wait(&c);
}
wait()
를 수행하기 직전에 인터럽트가 걸렸다고 가정done
을 1로 바꾸고 시그널을 보낸다.wait()
를 수행하지만, 자식 쓰레드는 이미 실행된 상황이라 더이상 부모를 깨워줄 자식이 존재하지 않게 된다.다음으로 살펴볼 동기화 문제는 Dijkstra가 처음 제시한 생산자/소비자(producer/consumer) 문제이다. 유한 버퍼(bounded 버퍼) 문제로도 알려져 있다.
생산자/소비자 문제 (유한버퍼 문제)
생산자 쓰레드는 데이터를 만들어 버퍼에 넣고, 소비자 쓰레드는 데이터를 꺼내어 사용한다.
유한버퍼는 공유 자원이며, 경쟁 조건의 발생을 방지하기 위해 동기화가 필요하다.
get/put 함수 정의
// get/put 함수 정의
int buffer; // 일단 지금은 정수 하나로 정의
int count = 0; // 처음엔 버퍼 비어있음
void put(int value) {
assert(count == 0); // 비었는지 확인용
count = 1;
buffer = value;
}
int get() {
assert(count == 1); // 찼는지 확인용
count = 0;
return buffer;
}
생산자/소비자 쓰레드 v1
// 생산자/소비자 쓰레드 v1
void *producer(void *arg) {
int i;
int loops = (int) arg;
for (i = 0; i < loops; i++) {
put(i);
}
}
void *consumer(void *arg) {
int i;
while (1) { // 값을 무한히 추출
int tmp = get();
printf("%d\n", tmp);
}
}
buffer
에 대해 경쟁 조건이 발생한다.아래의 예에서 생산자와 소비자가 각각 하나씩 있고, 컨디션 변수 cond
와 그것과 연결된 mutex 락을 사용한다.
cond_t cond;
mutex_t mutex;
void *producer(void *arg) {
int i;
for (i = 0; i < loops; i++) {
Pthread_mutex_lock(&mutex); // p1
if (count == 1) // p2
Pthread_cond_wait(&cond , &mutex); // p3
put(i); // p4
Pthread_cond_signal(&cond); // p5
Pthread_mutex_unlock(&mutex); // p6
}
}
void *consumer(void *arg) {
int i;
for (i = 0; i < loops; i++) {
Pthread_mutex_lock(&mutex); // c1
if (count == 0) // c2
Pthread_cond_wait(&cond , &mutex); // c3
int tmp = get(); // c4
Pthread_cond_signal(&cond); // c5
Pthread_mutex_unlock(&mutex); // c6
printf("%d\n", tmp);
}
}
wait()
을 호출하여 락을 해제(c3)한다.put()
으로 넘어가서 버퍼를 채운다(p4).get()
을 실행(c4)시키려고 하지만 버퍼가 비어있다...위의 문제는 if 문을 while 문으로 바꾸는 것으로 쉽게 해결이 가능하다.
cond_t cond;
mutex_t mutex;
void *producer(void *arg) {
int i;
for (i = 0; i < loops; i++) {
Pthread_mutex_lock(&mutex); // p1
while (count == 1) // p2
Pthread_cond_wait(&cond , &mutex); // p3
put(i); // p4
Pthread_cond_signal(&cond); // p5
Pthread_mutex_unlock(&mutex); // p6
}
}
void *consumer(void *arg) {
int i;
for (i = 0; i < loops; i++) {
Pthread_mutex_lock(&mutex); // c1
while (count == 0) // c2
Pthread_cond_wait(&cond , &mutex); // c3
int tmp = get(); // c4
Pthread_cond_signal(&cond); // c5
Pthread_mutex_unlock(&mutex); // c6
printf("%d\n", tmp);
}
}
Mesa semantic의 가장 기본적인 법칙은 언제나 while 문을 사용하는 것이다. 이게 안전하다.
그러나 위의 코드도 문제가 있다. , 이 둘 다 대기 상태에 있을 때 발생한다.
cond_t empty, fill;
mutex_t mutex;
void *producer(void *arg) {
int i;
for (i = 0; i < loops; i++) {
Pthread_mutex_lock(&mutex);
while (count == 1)
Pthread_cond_wait(&empty , &mutex);
put(i);
Pthread_cond_signal(&fill);
Pthread_mutex_unlock(&mutex);
}
}
void *consumer(void *arg) {
int i;
for (i = 0; i < loops; i++) {
Pthread_mutex_lock(&mutex);
while (count == 0)
Pthread_cond_wait(&fill , &mutex);
int tmp = get();
Pthread_cond_signal(&empty);
Pthread_mutex_unlock(&mutex);
printf("%d\n", tmp);
}
}
이제 제대로 동작하는 생산자/소비자 해법을 얻었지만 아직까지는 보편적인 방법은 아니다. 마지막 변경을 통해 병행성을 증가시키고 더 효율적으로 만들 수 있다.
int buffer[MAX];
int fill = 0;
int use = 0;
int count = 0;
void put(int value) {
buffer[fill] = value;
fill = (fill + 1) % MAX;
count++;
}
int get() {
int tmp = buffer[use];
use = (use + 1) % MAX;
count−−;
return tmp;
}
// 동작하는 최종 해법
cond_t empty , fill;
mutex_t mutex;
void *producer(void *arg) {
int i;
for (i = 0; i < loops; i++) {
Pthread_mutex_lock(&mutex); // p1
while (count == MAX) // p2
Pthread_cond_wait(&empty , &mutex); // p3
put(i); // p4
Pthread_cond_signal(&fill); // p5
Pthread_mutex_unlock(&mutex); // p6
}
}
void *consumer(void *arg) {
int i;
for (i = 0; i < loops; i++) {
Pthread_mutex_lock(&mutex); // c1
while (count == 0) // c2
Pthread_cond_wait(&fill , &mutex); // c3
int tmp = get(); // c4
Pthread_cond_signal(&empty); // c5
Pthread_mutex_unlock(&mutex); // c6
printf("%d\n", tmp);
}
}
// 몇 byte나 힙이 비어 있는가?
int bytesLeft = MAX_HEAP_SIZE;
// 락과 컨디션 변수가 필요함
cond_t c;
mutex_t m;
void *allocate(int size) {
Pthread_mutex_lock(&m);
while (bytesLeft < size)
Pthread_cond_wait(&c , &m);
void *ptr = . . . ; // 힙에서 메모리를 할당받음
bytesLeft −= size;
Pthread_mutex_unlock(&m);
return ptr;
}
void free(void *ptr , int size) {
Pthread_mutex_lock(&m);
bytesLeft += size;
Pthread_cond_signal(&c); // 시그널 전달 대상은?
Pthread_mutex_unlock(&m);
}
allocate(100)
실행allocate(10)
실행free(50)
호출pthread_cond_signal()
대신 pthread_cond_broadcast()
을 사용하여 대기중인 모든 쓰레드를 깨운다.