이 글의 목적: 쓰레드를 더 깊게 이해하고 row level에서 다뤄본것을 기록.
지난 포스트들은 거의 javascript 기반으로 작성이 되었는데, OS나 Network 처럼 깊게 파야하는 주제들은 C++ 로 다뤄보겠다.
싱글코어를 가지는 CPU가 여러개의 프로그램을 동시에 띄워서 실행시킬 수 있는 방법은 아주 짧은 시간동안 여러 프로세스를 옮겨다니면서 실행을 한다. 이로 인해 동시에 여러 프로세스들이 동작하는것 처럼 보인다.
CPU의 특정코어의 성능을 높히는 방법이 아니라 코어의 개수를 늘리는 식으로 발전을 해왔다. 가령 딱 코어 수 만큼의 쓰레드를 실행시키는게 가장 이상적이기 때문에 코어의 개수를 늘리는게 코어의 스펙을 올리는것 보다 훨씬 효율적일 수 있겠다.
각 쓰레드들은 각자의 고유한 스택영역을 받아서 사용한다.
쓰레드들은 힙 영역(new)과 데이터영역(static 변수)를 공유해서 사용하게 된다.
여기서의 문제는 공용으로 사용하는 영역에 접근하는 것이 문제가 될 수 있다는 것이다.
예를 들어 특정쓰레드가 A라는 영역에 쓰기 행위를 하고 있는데, 어떤 쓰레드는 A라는 영역에 읽기를 하게 되면 교착상태에 빠지게 된다.
이 부분들을 효율적으로 (mutex등) 처리하는 방법이 필요할듯 하다.
#include "pch.h"
#include <thread>
#include <iostream>
void HelloThread()
{
cout << "Hello Thread" << endl;
}
int main()
{
std::thread(HelloThread);
}
위와 같이 쓰레드를 사용할 수 있다. 하지만 이렇게 사용을 하면 오류가 날건데 이 오류는 t 라는 객체로 쓰레드를 관리하는데 Main Thread가 먼저 끝나고, t 라는 쓰레드는 계속 살아있기 때문에 오류가 발생한다. 이 때는 join 이라는 함수를 사용하면 되는데 이 함수를 쓰면 내가 생성한 쓰레드가 끝날 때까지 기다려진다.
그래서 보통은 쓰레드를 만들 때
if (t.joinable())
t.join();
이런식으로 joinable이라는 함수를 사용해서 thread의 객체가 살아있는지 아닌지를 판단한 다음 thread에 join을 해준다.
만약에 여러개의 매개변수가 필요한 쓰레드를 생성하려면 이렇게 하면된다.
void HelloThread(int32 num)
{
cout << num << endl;
}
int main()
{
vector<std::thread> v;
for(int32 i = 0; i < 10; i++ )
{
v.push_back(std::thread(HelloThread, i));
}
for(int32 i = 0; i < 10; i++)
{
if (v[i].joinable())
v[i].join();
}
}
만약에 아래의 코드를 실행하면 어떻게 될까?
#include <iostream>
#include <thread>
#include <atomic>
int sum = 0;
void Add()
{
for (int32 i = 0; i < 100'0000; i++)
{
sum++
}
}
void Sub()
{
for (int32 i = 0; i < 100'0000; i++)
{
sum--;
}
}
int main()
{
Add();
Sub();
cout << sum << endl;
}
당연히 0이 나온다. 100만번 더해주고 100만번 빼주기 때문이다. 하지만 만약에 Add 와 Sub가 새로운 쓰레드로 폴링되서 실행되면 어떻게 될까?
std::thread t1(Add);
std::thread t2(Sub);
t1.join();
t2.join();
cout << sum << endl;
랜덤한 값들로 이상하게 나올 것이다.
왜냐하면 sum을 공유해서 사용하기 때문인데, t1, t2 두개의 쓰레드가 sum의 값에 동시에 접근해서 이런 현상이 나타난다.
그럼 왜 이런현상이 나타날까?
sum++을 어셈블리어로 까보면 다음과 같이 동작한다.
00007FF7E25824F5 mov eax,dword ptr [sum (07FF7E258F440h)]
00007FF7E25824FB inc eax
00007FF7E25824FD mov dword ptr [sum (07FF7E258F440h)],eax
간단히 설명을 하자면 sum 을 eax 라는 변수로 옮겨서 eax 를 증가시키고 다시 eax를 sum으로 옮기는 작업을 한다. 왜 이렇게 동작을 하냐 라는 질문을 가질 수 있는데 CPU 설계상 어떤 메모리에 값을 꺼내오고 거기에 연산을 하는 2가지 작업을 동시에 할수없기 때문이다.
그러면 sum의 공유자원이 결국엔
int eax = sum;
eax = eax + 1;
sum = eax;
이렇게 실행이 되는것이였다. 이것이 두개의 쓰레드로 돌아가다 보니 sum의 t1 쓰레드에서 1을 쓸때 t2 쓰레드에서 -1 을 써버리면 sum은 결국에 -1 이 되어버리는 것이다.
이것을 해결하기 위한 방법중 하나가 Atmoic을 사용하는 것인데,
#include "pch.h"
#include "CorePch.h"
#include <iostream>
#include <thread>
#include <atomic>
// atomic All-Or-Nothing
atomic<int> sum = 0;
void Add()
{
for (int32 i = 0; i < 100'0000; i++)
{
sum.fetch_add(1);
}
}
void Sub()
{
for (int32 i = 0; i < 100'0000; i++)
{
sum.fetch_add(-1);
}
}
int main()
{
Add();
Sub();
cout << sum << endl;
std::thread t1(Add);
std::thread t2(Sub);
t1.join();
t2.join();
cout << sum << endl;
}
이렇게 사용하면 atomic이란 라이브러리가 다른 하나가 실행이 될때 다른 하나가 실행이 되지 못하도록 막아주는 역활을 한다. 그러면 결국에 Lock이 되긴하지만 atmoic이 생각보다 많이 느려서 남발은 하면 안된다.
다음과 같은 코드가 있다고 가정을 해보자
#include <iostream>
#include <thread>
#include <mutex>
vector<int32> v;
void Push()
{
for (int32 i = 0; i < 10000; i++)
{
v.push_back(i);
}
}
int main()
{
std::thread t1(Push);
std::thread t2(Push);
t1.join();
t2.join();
cout << v.size() << endl;
}
쓰레드 t1, t2를 통해 vector로 구성된 v에 동시에 접근하려고 한다. 이럴 경우엔 실행이 될때 애초에 오류가 난다. c++ 의 vector는 크기가 유동적으로 바뀌는데 서로 다른 쓰레드가 push_back을 하게되면 유동적으로 범위를 늘리려하는 vector가 크리티컬한 버그를 낼수 있어서이다.
그렇다면 우리는 이를 어떻게 해결할수 있을까?
#include <iostream>
#include <thread>
#include <mutex>
vector<int32> v;
mutex m;
void Push()
{
for (int32 i = 0; i < 10000; i++)
{
//자물쇠 잠그기
m.lock();
v.push_back(i);
//자물쇠 풀기
m.unlock();
}
}
int main()
{
std::thread t1(Push);
std::thread t2(Push);
t1.join();
t2.join();
cout << v.size() << endl;
}
뮤텍스를 사용하여 lock과 unlock을 사용해주면 되는 것이다.
하지만 이 부분도 문제가 조금 있는게 예외처리 같은 부분이 있을 때 깜빡하고 unlock을 안해주면 계속 lock 상태로 대기가 된다. (복잡한 함수의 경우엔 더욱 관리가 어려워짐)
그래서 우리는 다음과 같이 수정할 수 있다.
template<typename T>
class LockGuard
{
public:
LockGuard(T &m)
{
_mutex = &m;
_mutex->lock();
}
~LockGuard()
{
_mutex->unlock();
}
private:
T* _mutex;
};
vector<int32> v;
mutex m;
void Push()
{
for (int32 i = 0; i < 10000; i++)
{
LockGuard<std::mutex> lockGuard(m)
v.push_back(i);
}
}
바로 생성자를 통해 mutex의 lock을 걸어주고 소멸자를 통해 unlock을 시켜주는 것이다. 그렇게 되면 한줄만 처리해놓으면 바로 사용이 가능하다.
이 부분은 STL에서 이미 구현 되어 있으며 다음과 같이 사용 가능하다.
std::lock_guard<std::mutex> lockGuard(m);
lock_guard는 앞에서 구현한 LockGuard 클래스와 동일한 역할을한다.
std::unique_lock <std::mutex> uniqueLock(m, std::defer_lock);
uniqueLock.lock();
unique_lock도 있는데 unique_lock은 내가 lock을 할 시점을 지정 할 수 있다.
A와 B라는 클래스가 있을 때 A에선 B 클래스를 lock을 걸어 호출하고, B클래스 에선 A클래스를 호출하면 어떻게 될까?
먼저 코드로 예를 들어보면 다음과 같다.
void UserManager::ProccessSave() {
// userLock
lock_guard(mutex) guard(_mutex)
//accountLock
Account* account = AccountManager::Instance()->GetAccount(100);
}
void AccountManager::ProcessLogin() {
// accountLock
lock_guard(mutex) guard(_mutex)
//userLock
Account* account = AccountManager::Instance()->GetAccount(100);
}
이런 코드에 UserManager에서는 Account에 락을걸고 AccountManger에서는 User에 락을 걸게 되는데, 쓰레드1이 ProccessSave에 접근하고 쓰레드2가 ProcessLogin에 접근하게 되면 데드락이 걸린다.
데드락이 걸리는 이유는 약간 복잡 할수도 있다. 쓰레드1 에선 ProccessSave에 접근하여 User에 락을 걸고 그 다음 Account에 락을 걸려고하는 상황이다. 쓰레드2 에선 ProcessLogin에 접근하여 Account에 락을 걸고 User에 락을 걸려고하는 상황인데 쓰레드 1은 User에 락을 획득하고 쓰레드2는 Account의 락을 획득한 상황인데 쓰레드 1,2가 서로 먼저 풀겠지 라고 생각하면서 교착 상태에 빠진 것이다.
해결방법은 의외로 간단하다. User나 Account 둘중 하나에 먼저 lock 을 걸어야 한다 라고 하면 해결이 되는데, 나는 Account에 lock 을 먼저 걸어보겠다.
void UserManager::ProccessSave() {
//accountLock
Account* account = AccountManager::Instance()->GetAccount(100);
// userLock
lock_guard(mutex) guard(_mutex)
}
void AccountManager::ProcessLogin() {
// accountLock
lock_guard(mutex) guard(_mutex)
//userLock
Account* account = AccountManager::Instance()->GetAccount(100);
}
이렇게 되면 항상 account에 lock이 먼저 걸려서 교착상태에 빠지지 않는다.
Spin Lock은 쓰레드 1에서 락이 걸렸을 때 쓰레드 2가 쓰레드1의 락이 풀릴 때까지 대기를 하고 락이 풀리면 쓰레드1에서 락을 걸어 처리하는 방법이다.
스핀락을 코드로 구현 해보면 어떻게 할 수 있을까?
#include <iostream>
#include <thread>
#include <mutex>
class SpinLock
{
public:
void lock()
{
while(_locked)
{
}
_locked = true;
}
void unlock()
{
_locked = false;
}
private:
_locked = false;
};
아주 간단히 구현 해보면 이렇다. locked라는 변수에 flag를 저장해두고 lock 함수에선 락이 풀릴때까지 대기하고 locked를 true로 만들어서 현재 쓰레드의 상태가 lock임을 알리고 또 다른 쓰레드가 접근하면 대기하도록 한다.
unlock 함수는 간단하게 locked의 상태를 풀어준다. 이 코드를 기반으로 실행을 하면 어떻게 될까?
int32 sum = 0;
mutex m;
SpinLock spinLock;
void Add()
{
for (int32 i = 0; i < 10'0000; i++)
{
lock_guard<SpinLock> guard(spinLock);
sum++;
}
}
void Sub()
{
for(int32 i = 0; i< 10'0000; i++)
{
lock_guard<SpinLock> guard(spinLock);
sum--;
}
}
int main()
{
thread t1(Add);
thread t2(Sub);
t1.join();
t2.join();
cout << sum << endl;
}
예상치 못한 값이 나온다. 왜냐하면 locked는 결국에 공유자원으로써 쓰레드1, 쓰레드2가 동시에 접근하면 값이 덮힐수도 있기 때문이다.
그러면 결국에 SpinLock 클래스를 수정해줘야 한다.
class SpinLock
{
public:
void lock()
{
bool expected = false;
bool desired = true;
while(_locked.compare_exchange_strong(expected, desired) == false)
// 만약에 lock 획득에 실패했으면 성공할때 까지 무한정으로 돌림
{
expected = false;
}
}
void unlock()
{
_locked.store(false);
}
private:
atomic<bool >_locked = false;
};
여기서 재미있는건 atomic을 쓴다는건데 저번에 lock을 처리하기위해 atomic을 활용했다. 이번에도 atomic을 활용하여 원자단위로 task가 제대로 처리되던지 아니면 task가 전부 롤백되던지 둘중 하나의 상태로 처리가 된다.
여기서 compare_exchange_strong라는 함수가 있는데 이 함수는 expected, desired라는 매개변수를 가지도록 되어있다. expected는 내가 예상하는 locked의 값 desired는 내가 원하는 locked의 상태이다. 수도코드를 간단히 적어보자면 다음과 같다.
if(_locked == expected)
{
expected = _locked;
_locked = desired;
return true; => 내가 lock 을 획득했을 경우.
}else
{
expected = _locked;
return false; => 다른 사람이 lock을 획득했을 경우.
}
locked의 상태와 예상값이 다르면 결국에 lock이 걸린 상태이고 그렇지 않으면 내가 lock을 획득할 수 있는 경우이다.
앞에서 Spin Lock은 쓰레드1의 lock이 끝날때 까지 쓰레드2가 무한정 대기 상태였다면, Sleep은 쓰레드1의 lock이 걸린 상태면 쓰레드2는 우리가 지정한 시간만큼 커널모드로 돌아간다.(무한정 기다리는게 아니고 지정한 시간뒤에 다시 lock이 걸렸는지 확인 한다는 뜻)
어떻게 보면 무한정 대기하는건 CPU 측면에서 낭비일 수 있다. lock이 언제 끝날지 모르는 상태에서 무한정 대기를 계속하니 비효율적일 수도 있다는 것이다.
사실 구현 방법은 SpinLock에서 한줄만 더 추가하면 된다.
class SpinLock
{
public:
void lock()
{
bool expected = false;
bool desired = true;
while(_locked.compare_exchange_strong(expected, desired) == false)
{
expected = false;
this_thread::sleep_for(1ms);
}
}
void unlock()
{
_locked.store(false);
}
private:
atomic<bool>_locked = false;
};
sleep_for 함수를 통해 1ms 뒤에 쓰레드의 락이 걸려있는지 다시 확인을 한다.
Event는 쓰레드1이 락이 걸린 상태에서 쓰레드2가 리소스에 접근을 해야할 때 쓰레드2가 쓰레드1의 락이 끝날때 누군가(커널)에게 알려달라 하는 패턴이다.
SpinLock 같은 경우에는 쓰레드1의 락이 끝날때까지 계속 기다렸고 Sleep 같은 경우에는 쓰레드1의 락이 끝났는지 특정 시간후에 다시와서 체크했었고 Event는 락이 끝났을 때 누군가에게 쓰레드1의 락이 끝났다 라고 알려달라 하는 패턴이다.
#include <iostream>
#include <thread>
#include <mutex>
#include <windows.h>
mutex m;
queue<int32> q;
void Producer()
{
while(true)
{
{
unique_lock<mutex> lock(m);
q.push(100);
}
this_thread::sleep_for(100ms);
}
}
void Consumer()
{
while(true)
{
unique_lock<mutex> lock(m);
if(q.empty() == false)
{
int32 data = q.front();
q.pop();
cout << data << endl;
}
}
}
int main()
{
thread t1(Producer);
thread t2(Consumer);
t1.join();
t2.join();
}
위의 코드는 Producer는 락을 걸어 queue에 데이터를 계속 푸쉬한다. Consumer는 queue에서 데이터를 하나씩 빼와서 pop 시킨뒤 출력을 한다.
실행을 시키면 제대로 동작을한다. 여기서 문제점이 있는데, 만약에 sleep 이 100ms 가 아니라 10000000ms 이면 Consumer 작업의 오버헤드는 너무 커진다는 단점이 있다. 그래서 이걸 Event 기반으로 바꿔보면 다음과 같다.
#include <iostream>
#include <thread>
#include <mutex>
#include <windows.h>
mutex m;
queue<int32> q;
HANDLE handle;
void Producer()
{
while(true)
{
{
unique_lock<mutex> lock(m);
q.push(100);
}
::SetEvent(handle);
this_thread::sleep_for(100ms);
}
}
void Consumer()
{
while(true)
{
::WaitForSingleObject(handle, INFINITE);
unique_lock<mutex> lock(m);
if(q.empty() == false)
{
int32 data = q.front();
q.pop();
cout << data << endl;
}
}
}
int main()
{
handle = ::CreateEvent(NULL/*보안속성*/, FALSE/*ManualReset*/, FALSE/*initalState*/, NULL);
thread t1(Producer);
thread t2(Consumer);
t1.join();
t2.join();
::CloseHandle(handle);
}
Handle 이라는 커널 오브젝트를 생성하여 사용을 하는데, CreateEvent 함수를 통해서 이벤트를 만든다.
여기서 커널 오브젝트는 Usage Count라는 오브젝트를 몇명이 사용하고 있는지를 공유하고, Signal과 Non-Signal 을 관리한다 켜져있으면 Signal 꺼져있으면 Non-Signal 이다.
두번째 매개변수로 들어가는 ManualReset 이 FALSE로 셋팅 되면 꺼져있는 상태로 시작을 하는것이다.
이렇게 Event를 하나 만들고 Consumer쪽에서 WaitForSingleObject라는 함수를 사용했는데, 이 함수는 Signal 상태가 켜지기 전까진 Thread를 잠시 꺼놓는것이다.
그리고 Producer쪽에는 SetEvent 함수를 사용하는데, queue에 데이터를 밀어넣고 커널 오브젝트에 Lock이 끝났으니 Signal 값으로 바꿔달라고 하는 함수이다.
하지만 문제는 여전히 있다.
이 방법은 커널이 Event가 되어버려서 사용이된다.
만약에 Produce 함수가 sleep 이 걸리지 않고 계속 데이터를 push 하게 되면 결국에 데이터가 꼬이게 된다.
이럴경우 데이터를 좀더 실시간적으로 처리할수 있게되는 condition_variable 을 쓰면 된다.
condition_variable은 유저레벨에서 실행되는 객체이기에 실시간성으로 사용할수 있어서 좀더 효율적으로 처리할수 있다.
코드는 다음과 같다.
#include "pch.h"
#include "CorePch.h"
#include <iostream>
#include <thread>
#include <mutex>
#include <windows.h>
mutex m;
queue<int32> q;
HANDLE handle;
// CV는 커널오브젝트가 아니라 User-Level Object이다.
condition_variable cv;
void Producer()
{
while(true)
{
//1 락을 잡고
//2 공유 변수 값을 수정
//3 락을 풀고
//4 조건변수를 통해 다른 쓰레드에게 통지
{
unique_lock<mutex> lock(m);
q.push(100);
}
cv.notify_one(); // wait 중인 쓰레드가 있으면 1를 깨운다.
}
}
void Consumer()
{
while(true)
{
unique_lock<mutex> lock(m);
cv.wait(lock, []() { return q.empty() == false; });
//1 락을 잡고
//2 조건 확인
// 만족 O => 빠져나와서 코드를 진행
// 만족 X => Lock을 풀어주고 대기 상태
int32 data = q.front();
q.pop();
cout << q.size() << endl;
}
}
int main()
{
thread t1(Producer);
thread t2(Consumer);
t1.join();
t2.join();
}
이렇게 쓰레드에 대해서 알아봤는데 아직 다 담지 못해서 2편에 마무리 하도록 하겠다.