자료구조에 락을 추가하여 쓰레드가 사용할 수 있도록 만들면, 그 구조는 쓰레드 사용에 안전(thread safe)하다고 할 수 있다.
핵심 질문: 자료구조에 락을 추가하는 방법
- 자료구조에 어떤 방식으로 락을 추가해야 정확하게 동작하게 만들 수 있을까?
- 자료구조에 락을 추가하여 여러 쓰레드가 그 자료구조를 동시에 사용이 가능하도록 하면(병행성), 고성능을 얻기 위해 무엇을 해야할까?
typedef struct _ _counter_t {
int value;
pthread_mutex_t lock;
} counter_t;
void init(counter_t *c) {
c−>value = 0;
Pthread_mutex_init(&c−>lock, NULL);
}
void increment(counter_t *c) {
Pthread_mutex_lock(&c−>lock);
c−>value++;
Pthread_mutex_unlock(&c−>lock);
}
void decrement(counter_t *c) {
Pthread_mutex_lock(&c−>lock);
c−>value−−;
Pthread_mutex_unlock(&c−>lock);
}
int get(counter_t *c) {
Pthread_mutex_lock(&c−>lock);
int rc = c−>value;
Pthread_mutex_unlock(&c−>lock);
return rc;
}
하나의 쓰레드가 하나의 CPU에서 작업을 끝내는 것처럼, 멀티프로세서에서 실행되는 쓰레드들도 빠르게 작업을 처리하고 싶을 것이다.
이를 완벽한 확장성(perfect scaling)이라고 한다.
완벽한 확장성(perfect scaling): 더 많은 작업을 처리하더라도 각 작업이 병렬적으로 처리되어 완료 시간이 늘어나지 않는 것
엉성한 카운터(sloppy counter)
CPU 코어마다 물리적인 지역 카운터와 전역 카운터로 구성되어 있다.
지역/전역 카운터들을 위한 락이 존재한다.
기본 개념
S
마다 전역 카운터로 전달한다.S
의 값: 확장성 X, 전역 카운터의 값이 정확해짐S
의 값: 확장성 O, 전역 카운터의 값이 부정확해짐엉성한 카운터의 흐름
엉성한 카운터의 성능
S=1024
인 경우의 성능이다.엉성한 카운터의 확장성
엉성한 카운터의 구현
typedef struct __counter_t {
int global; // 전역 카운터
pthread_mutex_t glock; // 전역 카운터
int local[NUMCPUS]; // 지역 카운터 (cpu당)
pthread_mutex_t llock[NUMCPUS]; // ... 그리고 락들
int threshold; // 갱신 빈도
} counter_t;
// init:한계치를 기록하고 락과 지역 카운터
// 그리고 전역 카운터의 값들을 초기화함
void init(counter_t *c, int threshold) {
c−>threshold = threshold;
c−>global = 0;
pthread_mutex_init(&c−>glock, NULL);
for (int i = 0; i < NUMCPUS; i++) {
c−>local[i] = 0;
pthread_mutex_init(&c−>llock[i], NULL);
}
}
// update: 보통은 지역 락을 획득한 후 지역 값을 갱신함
// '한계치' 까지 지역 카운터의 값이 커지면,
// 전역 락을 획득하고 지역 값을 전역 카운터에 전달함
void update(counter_t *c, int threadID, int amt) {
pthread_mutex_lock(&c−>llock[threadID]);
c−>local[threadID] += amt; // amt > 0을 가정
if (c−>local[threadID] >= c−>threshold) { // 전역으로 전달
pthread_mutex_lock(&c−>glock);
c−>global += c−>local[threadID];
pthread_mutex_unlock(&c−>glock);
c−>local[threadID] = 0;
}
pthread_mutex_unlock(&c−>llock[threadID]);
}
// get: 전역 카운터의 값을 리턴 (정확하지 않을 수 있음)
int get(counter_t *c) {
pthread_mutex_lock(&c−>glock);
int val = c−>global;
pthread_mutex_unlock(&c−>glock);
return val; // 대략의 값임!
}
// 기본 노드 구조
typedef struct __node_t {
int key;
struct __node_t *next;
} node_t;
// 기본 리스트 구조 (리스트마다 하나씩 사용)
typedef struct __list_t {
node_t *head;
pthread_mutex_t lock;
} list_t;
void List_Init(list_t *L) {
L−>head = NULL;
pthread_mutex_init(&L−>lock, NULL);
}
int List_Insert(list_t *L, int key) {
pthread_mutex_lock(&L−>lock);
node_t *new = malloc(sizeof(node_t));
if (new == NULL) {
perror("malloc");
pthread_mutex_unlock(&L−>lock);
return −1; // 실패
}
new−>key = key;
new−>next = L−>head;
L−>head = new;
pthread_mutex_unlock(&L−>lock);
return 0; // 성공
}
int List_Lookup(list_t *L, int key) {
pthread_mutex_lock(&L−>lock);
node_t *curr = L−>head;
while (curr) {
if (curr−>key == key) {
pthread_mutex_unlock(&L−>lock);
return 0; // 성공
}
curr = curr−>next;
}
pthread_mutex_unlock(&L−>lock);
return −1; // 오류
}
그렇다면 삽입 연산이 병행하여 진행되는 상황에서 실패를 하더라도 락 해제를 호출하지 않으면서 삽입과 검색이 올바르게 동작하도록 수정할 수 있을까?
// malloc 포함 전부 감싸는 모습
pthread_mutex_lock(&L−>lock);
node_t *new = malloc(sizeof(node_t));
if (new == NULL) {
perror("malloc");
pthread_mutex_unlock(&L−>lock);
return −1; // 실패
}
new−>key = key;
new−>next = L−>head;
L−>head = new;
pthread_mutex_unlock(&L−>lock);
...
new−>key = key;
// Insert 전체를 감싸지 않고, 임계 영역만 락으로 보호
pthread_mutex_lock(&L−>lock);
new−>next = L−>head;
L−>head = new;
pthread_mutex_unlock(&L−>lock);
pthread_mutex_lock(&L−>lock);
node_t *curr = L−>head;
while (curr) {
if (curr−>key == key) {
pthread_mutex_unlock(&L−>lock); // 불안한 코드
return 0; // 성공
}
curr = curr−>next;
}
pthread_mutex_unlock(&L−>lock);
return −1; // 오류
int rv = -1;
pthread_mutex_lock(&L−>lock);
node_t *curr = L−>head;
while (curr) {
if (curr−>key == key) {
rv = 0;
break;
}
curr = curr−>next;
}
pthread_mutex_unlock(&L−>lock);
return rv; // 성공과 실패를 나타냄
void List_Init(list_t *L) {
L−>head = NULL;
pthread_mutex_init(&L−>lock, NULL);
}
void List_Insert(list_t *L, int key) {
// 동기화할 필요 없음
node_t *new = malloc(sizeof(node_t));
if (new == NULL) {
perror("malloc");
return;
}
new−>key = key;
// 임계 영역만 락으로 보호
pthread_mutex_lock(&L−>lock);
new−>next = L−>head;
L−>head = new;
pthread_mutex_unlock(&L−>lock);
}
int List_Lookup(list_t *L, int key) {
int rv = -1;
pthread_mutex_lock(&L−>lock);
node_t *curr = L−>head;
while (curr) {
if (curr−>key == key) {
rv = 0;
break;
}
curr = curr−>next;
}
pthread_mutex_unlock(&L−>lock);
return rv; // 성공과 실패를 나타냄
}
병행이 가능한 연결리스트를 갖게 되었지만 확장성이 좋지 않다는 문제가 있다. 이러한 병행성 문제를 개선하기 위해 hand-over-hand locking(또는 lock coupling) 기법을 개발했다.
hand-over-hand locking
결론: 큰 락을 사용하는 것이 병행 자료구조를 만들기에 표준이다.
typedef struct __node_t {
int value;
struct __node_t *next;
} node_t;
typedef struct __queue_t {
node_t *head;
node_t *tail;
pthread_mutex_t headLock;
pthread_mutex_t tailLock;
} queue_t;
void Queue_Init(queue_t *q) {
node_t *tmp = malloc(sizeof(node_t));
tmp−>next = NULL;
q−>head = q−>tail = tmp;
pthread_mutex_init(&q−>headLock , NULL);
pthread_mutex_init(&q−>tailLock , NULL);
}
void Queue_Enqueue(queue_t *q , int value)
node_t *tmp = malloc(sizeof(node_t));
assert(tmp != NULL);
tmp−>value = value;
tmp−>next = NULL;
pthread_mutex_lock(&q−>tailLock);
q−>tail−>next = tmp;
q−>tail = tmp;
pthread_mutex_unlock(&q−>tailLock);
}
int Queue_Dequeue(queue_t *q , int *value)
pthread_mutex_lock(&q−>headLock);
node_t *tmp = q−>head;
node_t *newHead = tmp−>next;
if (newHead == NULL) {
pthread_mutex_unlock(&q−>headLock);
return −1; // 큐가 비어있음
}
*value = newHead−>value;
q−>head = newHead;
pthread_mutex_unlock(&q−>headLock);
free(tmp);
return 0;
}
#define BUCKETS ()
typedef struct __hash_t {
list_t lists[BUCKETS];
} hash_t;
void Hash_Init(hash_t *H) {
for (int i = 0; i < BUCKETS; i++) {
List_Init(&H−>lists[i]);
}
}
int Hash_Insert(hash_t *H , int key) {
int bucket = key % BUCKETS;
return List_Insert(&H−>lists[bucket] , key);
}
int Hash_Lookup(hash_t *H , int key) {
int bucket = key % BUCKETS;
return List_Lookup(&H−>lists[bucket] , key);
}