병행 프로그램의 근본적인 문제
여러 개의 명령어들을 원자적으로 실행하고 싶지만, 단일 프로세서의 인터럽트로 인해 불가능
이 장에서는 앞서 다룬 락(Lock)을 이용하여 임계 영역을 하나의 원자 단위 명령어인 것처럼 실행되도록 해볼 것이다.
락을 사용하여 임계 영역(balance = balance+1
)을 감쌀 수 있다.
lock_t mutex; // 글로벌 변수로 선언된 락
...
lock(&mutex);
balance = balance + 1;
unlock(&mutex);
락 변수: 락의 상태를 나타내는 변수
lock_t mutex;
)lock/unlock 루틴
lock()
: 락 획득을 시도한다.unlock()
: 락 소유자가 unlock을 호출하면 락은 이제 다시 사용 가능한 상태가 된다.
- 락은 프로그래머에게 스케줄링에 대한 최소한의 제어권을 제공한다.
- 락으로 코드를 감싸서 프로그래머는 그 코드 내에서는 하나의 쓰레드만 동작하도록 보장한다.
POSIX 라이브러리는 락을 mutex
라고 부른다.
(상호 배제, mutual exclusion)
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
Pthread_mutex_lock(&lock); // pthread_mutex_lock()을 위한 래퍼
balance = balance + 1;
Pthread_mutex_unlock(&lock);
핵심 질문: 락은 어떻게 만들까
- 효율적인 락은 낮은 비용으로 상호 배제 기법을 제공하고, 하드웨어 및 운영체제 지원이 필요하다. 그렇다면...
- 효율적인 락은 어떻게 만들어야 하는가?
- 어떤 하드웨어 지원 필요?
- 어떤 운영체제 지원 필요?
정교한 락 라이브러리를 제작하는 데 있어 운영체제가 관여하는 것은 무엇인지 알아보자.
락의 효율을 어떻게 평가해야 할까?
초창기 단일 프로세스 시스템에서는 상호 배제 지원을 위해 임계 영역 내에서는 인터럽트를 비활성화하는 방법을 사용했다.
void lock() {
DisableInterrupts();
}
void unlock() {
EnableInterrupts();
}
따라서 상호 배제를 위해 인터럽트를 비활성화하는 것은 제한된 범위에서만 사용되어야 한다.
Test-And-Set 기법 (원자적 교체, atomic exchange)
typedef struct __lock_t {
int flag;
} lock_t;
void init(lock_t *mutex) {
// 0 −> 락이 사용 가능함, 1 −> 락 사용 중
mutex−>flag = 0;
}
void lock(lock_t *mutex) {
while (mutex−>flag == 1) // flag 변수를 검사(TEST) 함
; // spin−wait (do nothing)
mutex−>flag = 1; // 이제 설정(SET) 한다!
}
void unlock(lock_t *mutex) {
mutex−>flag = 0;
}
lock()
이 호출되었을 때, 다른 쓰레드에서 락을 사용하고 있으면(mutex의 flag가 1이면) 락을 소유한 쓰레드의 unlock()
기다린다. (spin-wait)unlock()
을 호출하면, 이제 해당 쓰레드가 락을 가지게 된다.flag=1
이 될 수도 있다.int TestAndSet(int *old_ptr, int new) {
int old = *old_ptr; // old_ptr 의 이전 값을 가져옴
*old_ptr = new; // old_ptr 에 new' 의 값을 저장함
return old; // old의 값을 반환함
}
typedef struct __lock_t {
int flag;
} lock_t;
void init(lock_t *lock) {
// 0 은 락이 획득 가능한 상태를 표시, 1 은 락을 획득했음을 표시
lock−>flag = 0;
}
void lock(lock_t *lock) {
while (TestAndSet(&lock−>flag, 1) == 1)
; // 스핀(아무 일도 하지 않음)
}
void unlock(lock_t *lock) {
lock−>flag = 0;
}
TestAndSet()
은 0을 반환하고 flag는 1이 되어 락을 획득한다.TestAndSet()
은 1을 반환하므로 while문을 계속해서 스핀한다.스핀 락은 임의의 시간에 단 하나의 쓰레드만이 임계 영역에 진입할 수 있도록 한다. 👍
스핀 락은 어떠한 공정성도 보장해 줄 수 없다. 👎
(while 문을 회전 중인 쓰레드는 경쟁에 밀려서 계속 그 상태에 남아 있을 수 있다.)
Compare-And-Swap 기법
int CompareAndSwap(int *ptr, int expected, int new) {
int actual = *ptr;
if (actual == expected)
*ptr = new;
return actual;
}
ptr
이 가리키고 있는 주소의 값이 expected
변수와 일치하는지 검사하는 것이다.ptr
을 새 값으로 변경하고, 그렇지 않다면 아무 것도 하지 않는다.void lock(lock_t *lock) {
while (CompareAndSwap(&lock−>flag, 0, 1) == 1)
; // 스핀(아무 일도 하지 않음)
}
MIPS 구조에서는 load-linked와 store-conditional 명령어를 앞뒤로 사용하여 락이나 기타 병행 연산을 위한 자료 구조를 만들 수 있다.
int LoadLinked(int *ptr) {
return *ptr;
}
int StoreConditional(int *ptr, int value) {
if (LoadLinked 후 ptr이 갱신되지 않은 경우) {
*ptr = value;
return 1; // 성공!
} else {
return 0; // 갱신을 실패함
}
}
LoadLinked()
: 일반 로드 명령어와 같이 메모리 값을 레지스터에 저장한다.StoreConditional()
: ptr 갱신이 없는 경우에만 저장을 성공하고, 성공 시 load-linked가 탑재했던 값을 갱신한다.void lock(lock_t *lock) {
while (1) {
while (LoadLinked(&lock−>flag) == 1)
; // 0이 될 때까지 스핀
if (StoreConditional(&lock−>flag, 1) == 1)
return; // 1로 변경하는 것이 성공하였다면: 완료
// 아니라면: 처음부터 다시 시도
}
}
void unlock(lock_t *lock) {
lock−>flag = 0;
}
StoreConditional()
을 호출하여 락 획득을 시도하고, 성공 시 flag를 1이 되고 임계 영역 내로 진입한다.StoreConditional()
에 실패하는 경우lock()
을 호출하여 LoadLinked()
를 실행, 이후 락이 사용 가능한 상태가 되어 0을 반환한다.StoreConditional()
을 실행하기 직전에 인터럽드에 걸렸고, 다른 쓰레드가 lock()
코드를 호출하여 똑같이 LoadLinked()
의 반환값으로 0을 받았다고 하자.LoadLinked
명령어를 실행하였고, 둘 다 StoreConditional
를 부르려고 하는 상황이다.StoreConditional()
은 LoadLinked
후 ptr이 갱신되지 않은 경우에만 값을 갱신하므로, 오직 하나의 쓰레드만 flag 값을 1로 설정함을 보장한다.StoreConditional()
를 실행하는 쓰레드는 락 획득에 실패한다.Fetch-And-Add: 원자적으로 특정 주소의 예전 값을 반환하면서 값을 증가시키는 하드웨어 기법
int FetchAndAdd(int *ptr) {
int old = *ptr;
*ptr = old + 1;
return old;
}
// 티켓 락
typedef struct __lock_t {
int ticket;
int turn;
} lock_t;
void lock_init(lock_t *lock) {
lock−>ticket = 0;
lock−>turn = 0;
}
void lock(lock_t *lock) {
int myturn = FetchAndAdd(&lock−>ticket);
while (lock−>turn != myturn)
; // 회전
void unlock(lock_t *lock) {
FetchAndAdd(&lock−>turn);
}
myturn
)를 나타낸다. lock->turn
을 사용하여 어느 쓰레드의 차례인지 판단한다.myturn == turn
) 이라는 조건에 부합하면 그 쓰레드가 임계 영역에 진입할 차례인 것이다.모든 쓰레드들이 각자의 순서에 따라 진행한다는 것이 특징이다.
즉, 쓰레드가 티켓 값을 할당받았다면, 미래의 어느 때에 실행되는 것이 보장되는 것이다.
하드웨어 기반 락은 간단하고 잘 동작하지만, 때론 비효율적이기도 하다.
핵심 질문: 회전을 피하는 방법
어떻게 하면 스핀에 CPU 시간을 낭비하지 않는 락을 만들 수 있을까?
이제부터는 운영체제로부터의 지원이 추가로 필요하다.
무조건 양보
락이 해제되기를 기다리며 스핀해야 하는 경우, 자신에개 할당된 CPU를 다른 쓰레드에게 양보하는 전략
void init() {
flag = 0;
}
void lock() {
while (TestAndSet(&flag, 1) == 1) // 만약 다른 쓰레드가 락 보유하면,
yield(); // CPU를 양보함
}
void unlock() {
flag = 0;
}
yield()
기법이 있다고 가정한다.그러나 이 방법도 한계가 있다.
lock()
호출하고 CPU 양도이전 방법들은 너무 많을 부분을 운에 맡긴다는 문제점이 있었고, 따라서 어떤 쓰레드가 다음으로 락을 획득할지를 명시적으로 제어할 수 있어야 한다.
이를 위해서는 운영체제로부터 적절한 지원과 큐를 이용하여 대기 쓰레드를 관리하는 방법에 대해 알아볼 것이다.
typedef struct __lock_t {
int flag;
int guard;
queue_t *q;
} lock_t;
void lock_init(lock_t *m) {
m−>flag = 0;
m−>guard = 0;
queue_init(m−>q);
}
void lock(lock_t *m) {
while (TestAndSet(&m−>guard, 1) == 1)
; // 회전하면서 guard 락을 획득
if (m−>flag == 0) {
m−>flag = 1; // 락을 획득함
m−>guard = 0;
} else {
queue_add(m−>q, gettid());
m−>guard = 0;
park();
}
}
void unlock(lock_t *m) {
while (TestAndSet(&m−>guard, 1) == 1)
; // 회전하면서 guard 락을 획득
if (queue_empty(m−>q))
m−>flag = 0; // 락을 포기함: 누구도 락을 원치 않음
else
unpark(queue_remove(m−>q)); // 락을 획득함 (다음 쓰레드를 위해!)
m−>guard = 0;
}
park()
: 호출하는 쓰레드를 잠재우는 함수unpark(threadID)
: threadID로 명시된 특정 쓰레드를 깨우는 함수guard
: flag
와 큐의 삽입/삭제 동작을 스핀 락으로부터 보호하는데 사용lock()
을 호출하여 락 획득을 시도하였는데 다른 쓰레드가 이미 락을 보유한 경우를 가정하자.guard
변수를 0으로 변경하고 park()
를 호출하여 CPU를 양보한다 (sleep 상태).unlock()
이 호출되어 큐에서 나오게 되었을 때 잠자고 있던 쓰레드를 깨운다.park()
직전에 경쟁 조건이 발생된다.setspark()
라는 코드를 추가하여 해결하였다.queue_add(m−>q, gettid());
setpark();// 추가
m−>guard=0;
setspark()
: park()
를 호츌하기 직전이라는 것을 표시park()
가 실제로 호출되기 전에 다른 쓰레드가 unpark()
를 먼저 호출한다면, 추후 park()
문은 sleep 하지 않고 바로 return이 된다.Linux: futex 지원
futex_wait(address, expected)
futex_wake(address, expected)
void mutex_lock (int *mutex) {
int v;
/* 31번째 비트가 이미 초기화되어 있다. mutex를 이미획득했다. 바로 리턴한다. (이것이 빠르게 실행하는 방법이다) */
if (atomic_bit_test_set(mutex, 31) == 0)
return;
atomic_increment(mutex);
while (1) {
if (atomic_bit_test_set(mutex, 31) == 0) {
atomic_decrement(mutex);
return;
}
/* 이제 대기해야 한다. 먼저, 우리가 관찰 중인 futex 값이 실제로 음수 인지 확인해야 한다(잠겨있는 상태인지). */
v = *mutex;
if (v >= 0)
continue;
futex_wait(mutex, v);
}
}
void mutex_unlock (int *mutex) {
/* 필요충분 조건으로 관심 대상의 다른 쓰레드가 없는 경우에 한해서
0x80000000를 카운터에 더하면 0을 얻는다. */
if (atomic_add_zero(mutex, 0x80000000))
return;
/* 이 mutex를 기다리는 다른 쓰레드가 있다면 그 쓰레드들을 깨운다. */
futex_wake(mutex);
2단계 락(two-phase lock)