
말 여러 개의 스레드(흐름)이 동시에 존재하는 상태.
추상적이라서 와닿지 않지만 멀티 태스킹을 떠올리면 이해가 쉽다.
더 구체적인 예를 들어 저녁에 국이 메뉴에 있다면. 싱글 스레드는 국이 완성될 때까지 아무것도 하지 못한다. 반면 멀티 스레드는 국을 불에 올려놓고 동시에 설거지, 다음 요리 준비등을 할 수 있다.
정확히 말하면 번갈아 가면서 하는건데 설거지 하다가 국이 넘치거나 하지 않는지 확인하고 다시 설거지를 하는 경우
설거지 -> 요리 -> 설거지로 실행 흐름이 빠르게 변경된 것으로 볼 수 있다.
순간에 어떤 행동을 할지는 나(OS 스케쥴러)의 판단에 달려있다. 나의 판단으로 우선 순위를 잘 조절해서 행동을 한다. (Batch, interactive, RealTime)
아까 저녁식사 비유에서 실제와 조금 다른것이 있다면. 각 행동, 즉 각 스레드는 서로의 상황을 알지 못한다. 요리를 하고 있다면 요리의 진행도만 알 수 있고, 설거지가 어디까지 되었는지는 모른다. 이 점을 유의하자.
이 예시에서 각 스레드(요리, 정리)는 소금, 설탕 이라는 공유 자원을 가지고 있다. 서로 이 공유 자원을 사용하다가 설탕 대신 소금을 넣는 원치 않는 결과가 나타났다.
이처럼 스레드가 공유 자원을 서로 사용하려는 상태를 Race Condition (경쟁 상태)라고 하며 경쟁 상태를 유발하는 코드 상의 구역(소금 넣기, 조미료 정리)을 Critical Section이라 한다.
Critical Section에 관련한 문제를 해결 하려면 다음의 조건들을 만족해야 한다
이를 기반으로 해결 방안을 살펴보자
소프트웨어적 방법 (사용자의 알고리즘을 통해)
( 위 2가지 방법은 실제 CPU에서는 잘 따라주지 않는다. 파이프라이닝, 캐싱, 코드 최적화 등)
하드웨어적 방법
Disabling Interrupts
Test, Set, Lock
기계어 수준에서 lock 변수를 만든다
기계어 수준에서 lock 변수의 값을 바꾼다 (원자적)
일단 잠근다
원래 값을 반환
bool TSL(bool *lock) { bool old = *lock; // 옛날 값 읽기 *lock = true; // 락을 걸어버림 return old; // 옛날 값 반환 }
LOCK XCHG reg, mem
CAS
function cas(p: pointer to int, old: int, new: int) is
if *p ≠ old
return false
*p ← new
return true운영체제에 의한 방법
// 바이너리, 카운팅(여러명) 세마포어 가능
wait(empty); // 빈 공간이 있는지 확인
wait(sem); // 버퍼 접근 락
put_item(); // 버퍼에 데이터 넣기
signal(sem); // 락 해제
signal(full); // 데이터 개수 증가수행 도중 중단될 수 없는 하나의 명령을 의미한다 좀 더 풀어서 말하자면 한번에 실행되어 중간 상태가 관찰되지 않는다고 할 수 있겠다. 이를 설명하기 위해 유명한 예시를 하나 들자면
int count = 0;
void Add(int number)
{
for (int i=0; i < 1000000; i++)
count++;
}
위의 코드를 여러 스레드로 실행하면 의도한 값이 나오지 않는다. 바로 count++가 원자적이지 않기 때문이다. 어셈블리를 통해 자세히 알아보자
00007FF7C2BD2545 mov eax,dword ptr [count (07FF7C2C165DCh)] 00007FF7C2BD254B inc eax 00007FF7C2BD254D mov dword ptr [count (07FF7C2C165DCh)],eax 1. count의 값을 eax에 이동시킴 2. eax의 값을 증가시킴 3. eax의 값을 count에 이동시킴
코드상에서 한번의 연산이지만 디스어셈블리에서는 3번의 연산을 진행한다. 즉 중간상태가 발견되기 때문에 멀티스레드 환경에서 문제가 발생한다.
간단한 예시로 t1이 3번을 시작하기 직전에 t2가 1번을 시작하면 t2는 증가되지 않은 값에 대해서 inc 명령어를 실행하기 때문에 의도한 값이 나오지 않는다.
CAS,TSL 등이 원자적 연산에 해당한디.
#include <iostream>
#include <thread>
using namespace std;
thread t(__FN&& fn, __Args&&... _Ax)
//or
t = thread (__FN&& fn, __Args&&... _Ax)
//가용 가능한 코어 개수 (정확하지 않음)
t.hardware_concurrency()
//thread의 id
t.get_id()
//실제로 실행된 스레드와 스래드 객체 t의 연결을 끊는다
//리눅스의 데몬프로세스도 알아보자
t.detach()
//t객체에 연결된 callable 객체가 있는지 확인
t.joinable()
//해당 스레드가 끝날 때 까지 기다림
t.join()
atomic<int> count = 0;
void Add(int number)
{
for (int i=0; i < 1000000; i++)
count++;
}
c++11부터 추가된 atomic을 통해서 원자적 연산을 할 수 있다.
00007FF636572705 xor edx,edx
00007FF636572707 lea rcx,[count (07FF6365B65DCh)]
00007FF63657270E call std::_Atomic_integral<int,4>::operator++ (07FF63656BAE0h)
1. edx의 값을 0으로 만든다
2. rcx에 count의 '주소'를 가져온다
3. 내부 함수를 호출한다.
2번에서 값이 아니라 주소를 가져오는 것이 인상적이다.
3번에 대해서 좀 찾아보니 CPU에 따라서 지원하는 명령어를 호출하여 한번에 처리한다.
(x86 아키텍쳐에서는 LOCK prefix를 통해서 이를 지원하는 듯)
따라서 여기서의 count++는 '원자적으로' 동작한다.
기존 stl은 멀티스레드 환경에서는 동작하지 않는다고 봐야한다.
#include <vector>
#include <iostream>
#include <thread>
unsing namespace std;
vector<int32> v;
void Push()
{
for (int i = 0; i < 10'000; i++)
v.push_back(i);
}
int main()
{
std::thread t1(Push);
std::thread t2(Push);
// 바로 크래시
// 기존거 날리기 하면서 문제 발생
// 그럼 reserve하면요
// 그래도 당신이 원하는 숫자는 안나옴
// 추가 할때 동일한 back을 참조하면 덮어쓰기 발생
v.reserve(20'000);
t1.join();
t2.join();
cout << v.size() << endl;
}
그러니 c++ 기본 라이브러리인 mutex를 사용하자.
#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
vector<int32> v;
// 자물쇠처럼 사용
// 탈의실 사용중 같이 사용하고 있다는걸 알려줌
mutex m;
void Push()
{
for (int i = 0; i < 10'000; i++)
{
// 잠그기
m.lock();
v.push_back(i);
// 풀기
m.unlock();
}
}
int main()
{
std::thread t1(Push);
std::thread t2(Push);
t1.join();
t2.join();
cout << v.size() << endl;
}
자원의 획득은 초기화이다.
간단히 설명하면 객체가 생성될 때 자원을 획득하고 소멸할 때 자원을 해제
이걸 lock에 적용시켜보자
template<typename T>
class LockGuard
{
public:
LockGuard(T& m)
{
_mutex = &m;
_mutex->lock();
}
~LockGuard()
{
_mutex->unlock();
}
private:
T* _mutex;
};
for (int i = 0; i < 10'000; i++)
{
// 잠그기
// 단순 뮤텍스만 사용
//m.lock();
// RAII패턴 적용
//LockGuard<std::mutex> lockGuard(m);
//C++ 표준
//std::lock_guard<std::mutex> lockGuard(m);
std::unique_lock<std::mutex> uniqueLock(m, std::defer_lock);
uniqueLock.lock(); // 유니크 lock을 사용하면 원하는 타이밍에 lock 가능
v.push_back(i);
// 풀기
//m.unlock();
// LockGuard사용시 자동으로 풀림
}
1차적 으로 순서를 부여해서 피해보자
나중에는 그래프를 통한 판별을 해보자
mutex m1;
mutex m2;
// 내부의 일관적인 순서로 잠금
std::lock(m1,m2);
//이미 lock 된 상태이니 나중에 소멸될 떄 풀어주기만 해라
lock_guard<mutex> g1(m1, std::adapt_lock()
lock_guard<mutex> g2(m2, std::adapt_lock()
class SpinLock
{
public:
void lock()
{
//CAS (Compare And Swap)
bool expected=false;
bool desired=true;
// _locked랑 expected랑 비교
// 일치하면 _locked = desired
// expected = _locked;
// return true
// 다르면
// expected = _locked;
while (_locked.compare_exchange_strong(expected, desired) == false)
{
expected = false;
}
}
void unlock()
{
_locked.store(false);
}
private:
atomic<bool> _locked = false;
};
bool expected=false;
bool desired=true;
while (_locked.compare_exchange_strong(expected, desired) == false)
{
expected = false;
this_thread::sleep_for(std::chrono::milliseconds(100));
//this_thread::sleep_for(100ms);
//this_thread::yield();
}
// Handle은 일종의 번호표, 이벤트는 커널 오브젝트
// Usage Count
// Signal / UnSignal (bool)
// Auto / Manual (bool)
HANDLE handle = ::CreateEvent(NULL/*보안 속성*/, FALSE/*bmanual reset*/, FALSE/*b initial statte*/, NULL);
::CloseHandle(handle); <- 세트로 관리해줘야함
mutex m;
queue<int32> q;
HANDLE handle;
void Producer()
{
while (true)
{
{
unique_lock<mutex> lock(m);
q.push(100);
}
// 해당 핸들의 시그널을 true로 바꿔주세요
::SetEvent(handle);
this_thread::sleep_for(10000ms);
}
}
void Consumer()
{
while (true)
{
// 커널 오브젝트의 시그널을 확인하고
// nonsignal이면 바뀔때까지 기다림
::WaitForSingleObject(handle, INFINITE);
// auto 에서는 여기부터 non-Signal
//::ResetEvent(handle)
unique_lock<mutex> lock(m);
if (q.empty() == false)
{
int32 data = q.front();
q.pop();
cout << data << endl;
}
}
}
int main()
{
// 이번트는 커널 오브젝트
// Usage Count
// Signal / Non-Signal
// Auto / Manual
// Handle은 일종의 번호표
handle = ::CreateEvent(NULL/*보안 속성*/, FALSE/*bmanual reset*/, FALSE/*b initial statte*/, NULL);
thread t1(Producer);
thread t2(Consumer);
t1.join();
t2.join();
::CloseHandle(handle);
}
condition_variable cv;
void Producer()
{
while (true)
{
// 1. Lock을 잡고
// 2. 공유 변수를 수정하고
// 3. Lock을 풀고
// 4. 조건변수를 통해 다른 쓰레드에게 통지
{
unique_lock<mutex> lock(m);
q.push(100);
}
//wait중인 쓰레드가 있으면 딱 1개를 깨운다
cv.notify_one();
}
}
void Consumer()
{
while (true)
{
unique_lock<mutex> lock(m);
cv.wait(lock, []() { return q.empty() == false; });
// 1. lock을 잡으려고 시도
// 2. 조건 확인
// - 만족하면 이어서 실행
// - 만족x 면 lock을 풀고 wait상태
{
int32 data = q.front();
q.pop();
cout << q.size() << endl;
}
// 그런데 notify_one을 했으면 항상 조건식을 만족하는거 아닐까?
// Superious WakeUp
// notify_one 할때는 lock을 잡을 상태가 아니니까
// notify_one 할때 lock을 잡으면? 동작은 하긴 하는데 별로 해결되지는 않음
}
}
int main()
{
thread t1(Producer);
thread t2(Consumer);
t1.join();
t2.join();
}
#include <future>
int64 Calculate()
{
int64 sum = 0;
for (int32 i = 0; i < 100'000; i++)
{
sum += i;
}
return sum;
}
void PromiseWorker(std::promise<string>&& promise)
{
promise.set_value("Secret Message");
}
void TaskWorker(std::packaged_task<int64(void)>&& task)
{
task();
}
int main()
{
// 동기 방식 호출 (Synchronous) 실행
//int64 sum = Calculate();
//cout << sum << endl;
//이 방식은 값을 받아오려면 전역변수를 사용해야함
//thread t(Calculate);
//t.join();
//std::future
{
// 1) deferred -> lazy evaluation (커맨트 패턴에서과 유사)
// 2) async -> 별도의 쓰레드를 만들어서 실행하세요
// 3) defferd | async -> 둘중에 알아서 하쇼
std::future<int64> future = std::async(std::launch::async, Calculate);
//TODO
//결과가 완료 되었는지 궁금할 때
//파라미터를 넣지 않으면 get()이랑 똑같음
//get()은 한번만 호출해야한다.
std::future_status status = future.wait_for(1ms);
if (status == future_status::ready)
{
}
//실제로 결과물이 필요한 시점에 사용
int64 sum = future.get();
class Knight
{
public:
int64 GetHp() { return 100; }
};
Knight k;
std::future<int64> future2 = std::async(std::launch::async, &Knight::GetHp,k);
}
//std::promise
{
//미래의 결과물을 반환해줄 거라 약속해줘
std::promise<string> promise;
std::future<string> future = promise.get_future();
//promise의 소유권을 t스레드에게 넘겨줌
thread t(PromiseWorker, std::move(promise));
string message = future.get();
cout << message << endl;
t.join();
}
//std::packaged_task
//원하는 함수의 실행 결과를 package_task를 통해서 future로 받아줌
{
std::packaged_task<int64(void)> task(Calculate);
std::future<int64> future = task.get_future();
thread t(TaskWorker, std::move(task));
int64 sum = future.get();
cout << sum << endl;
t.join();
}
}