C++ Server - 멀티스레드 프로그래밍 (1)

창고지기·2025년 8월 18일
0

Cpp Server

목록 보기
1/6
post-thumbnail

1. 스레드(Thread)

  • 프로그램은 프로세스의 형태로 실행되며 하나 이상의 흐름을 가진다. 이러한 흐름의 단위를 스레드(Thread)라 부른다.
  • 컴퓨터 과학에서 스레드(Thread)는 스케쥴러가 독립적으로 관리할 수 있는 최소의 명령어 흐름이다. (출처)

2. 멀티스레드(Muilti Thread)

(1) 멀티스레드?

말 여러 개의 스레드(흐름)이 동시에 존재하는 상태.
추상적이라서 와닿지 않지만 멀티 태스킹을 떠올리면 이해가 쉽다.

  • 프로세스의 목표 -> 저녁 식사 완료하기
    • 싱글 스레드 (다음 단계를 순차적으로 진행)
      • 재료 준비
      • 요리
      • 식사
      • 설거지, 정리
    • 멀티 스레드
      동시에 수행 가능한 작업들을 같이 처리
      • 요리를 하면서 재료 손질을 할 수 있다.
      • 요리를 하면서 다른 요리를 할 수 있다.
      • 요리/손질을 하면서 설거지를 할 수 있다.

더 구체적인 예를 들어 저녁에 국이 메뉴에 있다면. 싱글 스레드는 국이 완성될 때까지 아무것도 하지 못한다. 반면 멀티 스레드는 국을 불에 올려놓고 동시에 설거지, 다음 요리 준비등을 할 수 있다.

정확히 말하면 번갈아 가면서 하는건데 설거지 하다가 국이 넘치거나 하지 않는지 확인하고 다시 설거지를 하는 경우
설거지 -> 요리 -> 설거지로 실행 흐름이 빠르게 변경된 것으로 볼 수 있다.

순간에 어떤 행동을 할지는 나(OS 스케쥴러)의 판단에 달려있다. 나의 판단으로 우선 순위를 잘 조절해서 행동을 한다. (Batch, interactive, RealTime)

(2) 장단점

  • 장점
    • 빠르다.
  • 단점
    • 고려할 점이 많다.
      • Race Condition
      • Critical Section

(3) Race Condition, Critical Section

개념

아까 저녁식사 비유에서 실제와 조금 다른것이 있다면. 각 행동, 즉 각 스레드는 서로의 상황을 알지 못한다. 요리를 하고 있다면 요리의 진행도만 알 수 있고, 설거지가 어디까지 되었는지는 모른다. 이 점을 유의하자.

  • Race Condition
    • 요리를 진행 하는데 다음은 설탕을 넣을 차례다.
    • 주변정리를 먼저 진행 한다.
    • 조미료를 정리 하면서 설탕과 소금의 위치를 바꿨다.
    • 설탕을 넣을 차례이니 설탕 위치에 있던 무언가(소금)를 넣는다.

이 예시에서 각 스레드(요리, 정리)는 소금, 설탕 이라는 공유 자원을 가지고 있다. 서로 이 공유 자원을 사용하다가 설탕 대신 소금을 넣는 원치 않는 결과가 나타났다.

이처럼 스레드가 공유 자원을 서로 사용하려는 상태를 Race Condition (경쟁 상태)라고 하며 경쟁 상태를 유발하는 코드 상의 구역(소금 넣기, 조미료 정리)을 Critical Section이라 한다.

해결 방안

Critical Section에 관련한 문제를 해결 하려면 다음의 조건들을 만족해야 한다

  • 어떠한 가정(cpu 개수, 속도 등)도 없어야 한다.
  • 상호배제 (Mutual Exclusion) : 누군가 Critical Section에 들어갔다면 아무도 못들어 간다.
  • Progress: 아무도 Critical Section에 없으면 누군가는 들어가야 한다.
  • Bounded Waiting: 한 없이 기다리지 않아야 한다.

이를 기반으로 해결 방안을 살펴보자

  • 소프트웨어적 방법 (사용자의 알고리즘을 통해)

    • Decker's Algorithm
      • Turn, flag를 사용
      • 들어갈때 flag를 true로 바꿈
      • 둘 다 true면 turn을 보고 판단.
      • 실행 후 턴을 상대에게 넘김
      • 2개의 프로세스 가정, busy Waiting
    • Peterson's Algorithm
      • 데커 알고리즘의 개선된 버전.
      • 조건 체크 전에 상대에게 턴을 줌
      • 2개의 프로세스 가정, busy Waiting

    ( 위 2가지 방법은 실제 CPU에서는 잘 따라주지 않는다. 파이프라이닝, 캐싱, 코드 최적화 등)

    • Bakery Algorithm
      • 번호표를 받고 번호 순으로 들어감
      • 번호가 작은거 먼저 들어감
      • 번호가 같으면 pid, index등 다른 우선순위 판단
      • 3개이상 프로세스 ,busy Waiting
  • 하드웨어적 방법

    • Disabling Interrupts

      • 인터럽트 : 실행 중인 작업 중단, 다른 작업 하는 것
      • 인터럽트를 끄면 실행 중인 작업을 중단하지 않겠구나!
      • 단일 코어만 사용가능
    • Test, Set, Lock

      • 기계어 수준에서 lock 변수를 만든다

      • 기계어 수준에서 lock 변수의 값을 바꾼다 (원자적)

      • 일단 잠근다

      • 원래 값을 반환

        • 원래 값이 false -> 성공적인 획득
        • 원래 값이 true -> 누군가 있음
      • bool TSL(bool *lock) 
         {
         	bool old = *lock;  // 옛날 값 읽기
         	*lock = true;      // 락을 걸어버림
         	return old;        // 옛날 값 반환
         }
      • LOCK XCHG reg, mem

    • CAS

      • 별도의 게시글에서 더 깊게 다룬다.
      • Compare And Swap의 약자로 기계어이다.
      • function cas(p: pointer to int, old: int, new: int) is
        if *p ≠ old
            return false
        *p ← new
        return true
      • 현재 *p 값이 이전의 old와 같은지
        • 값을 읽고 CAS 들어오기 전까지 값이 안 바뀌었는지
      • lock free 프로그래밍에 사용
  • 운영체제에 의한 방법

    • Semaphores
      • 생산자 소비자 문제와 함께 등장
        • 원형 공용 큐
        • 생산자는 큐에 넣고 소비자는 뺸다
        • 꽉 차면 생산자는 sleep, 비어 있으면 소비자 sleep.
      • wait (P)
        • 세마포어 값을 감소
        • 0보다 크면 계속 실행
      • signal (V)
        • 세마포어 값 증가
        • 대기 중인 프로세스 있다면 하나 깨움
      // 바이너리, 카운팅(여러명) 세마포어 가능
      			wait(empty);   // 빈 공간이 있는지 확인
      			wait(sem);   // 버퍼 접근 락
      			put_item();    // 버퍼에 데이터 넣기
      			signal(sem); // 락 해제
      			signal(full);  // 데이터 개수 증가
      • OS/커널 에서의 세마포어
        • 카운트 큐를 가진 구조체.
        • 카운트가 음수면 이벤트 큐 삽입 후 블록
        • 카운트가 0이면 이벤트 큐에서 삭제 후 레디 리스트
        • 세마포어가 음수면 몇개가 슬립 중인지
        • 카운트 양수로 해도 됨
        • OS 내부에서 우선순위 역전을 해결하기 위해서 우선순위 상속기법을 적용
    • Mutex
      • 상호 배제를 위해서 사용
      • lock 이라고도 부르는듯
      • 한 번에 하나의 프로세스만
      • lock, unlock, TSL사용
      • condition variable을 사용해서 sleep, awake의 매개 역할
      • sleep에서 깨어나면 mutex lock 시도. 성공하면 완전히 깨어남
    • Monitors
      • 컴파일러 수준의 동기화
        • 따로 lock 걸지 않아도 키워드 같은걸 사용해서 컴파일러가 자동 생성
      • Java (synchronized) 등

(4) Atomic

수행 도중 중단될 수 없는 하나의 명령을 의미한다 좀 더 풀어서 말하자면 한번에 실행되어 중간 상태가 관찰되지 않는다고 할 수 있겠다. 이를 설명하기 위해 유명한 예시를 하나 들자면

 int count = 0;
 
 void Add(int number)
{
	for (int i=0; i < 1000000; i++)
		count++;
}

위의 코드를 여러 스레드로 실행하면 의도한 값이 나오지 않는다. 바로 count++가 원자적이지 않기 때문이다. 어셈블리를 통해 자세히 알아보자

00007FF7C2BD2545  mov         eax,dword ptr [count (07FF7C2C165DCh)]  
00007FF7C2BD254B  inc         eax  
00007FF7C2BD254D  mov         dword ptr [count (07FF7C2C165DCh)],eax  

1. count의 값을 eax에 이동시킴 
2. eax의 값을 증가시킴
3. eax의 값을 count에 이동시킴

코드상에서 한번의 연산이지만 디스어셈블리에서는 3번의 연산을 진행한다. 즉 중간상태가 발견되기 때문에 멀티스레드 환경에서 문제가 발생한다.

간단한 예시로 t1이 3번을 시작하기 직전에 t2가 1번을 시작하면 t2는 증가되지 않은 값에 대해서 inc 명령어를 실행하기 때문에 의도한 값이 나오지 않는다.

CAS,TSL 등이 원자적 연산에 해당한디.

(5) Dead lock

dead lock

  • 교착 상태
  • 서로 락을 건 상태로 대기하는 것
  • 아래 4가지 조건을 만족하면 Dead lock 발생
    • Mutual exclusion
    • Hold on Wait
    • No Preemption
    • Circular wait

dead lock 회피

  • 그래프를 통한 사이클 감지
  • Banker's Algorithm
    • 다익스트라가 개발한 방법
    • Remaining Need값이 Current Available을 초과할 경우자원을 할당해주지 않는다.
    • safe, unsafe state
      • 위의 조건을 따라서 모든 프로세스를 dead lock없이 졸료 시킬 수 있는 순서가 있으면 dead lock 유지할 수 있음
      • 그렇다고 unsafe가 매번 dead lock 발생 시키지는 않음

3. c++ 에서의 사용

(1) thread 기본

#include <iostream>
#include <thread>
using namespace std;

thread t(__FN&& fn, __Args&&...  _Ax)
//or
t = thread (__FN&& fn, __Args&&...  _Ax)

//가용 가능한 코어 개수 (정확하지 않음)
t.hardware_concurrency()

//thread의 id 
t.get_id()

//실제로 실행된 스레드와 스래드 객체 t의 연결을 끊는다
//리눅스의 데몬프로세스도 알아보자
t.detach()

//t객체에 연결된 callable 객체가 있는지 확인
t.joinable()

//해당 스레드가 끝날 때 까지 기다림
t.join()

(2) Atomic

atomic<int> count = 0;
 
void Add(int number)
{
	for (int i=0; i < 1000000; i++)
		count++;
}

c++11부터 추가된 atomic을 통해서 원자적 연산을 할 수 있다.

00007FF636572705  xor         edx,edx  
00007FF636572707  lea         rcx,[count (07FF6365B65DCh)]  
00007FF63657270E  call        std::_Atomic_integral<int,4>::operator++ (07FF63656BAE0h)  

1. edx의 값을 0으로 만든다
2. rcx에 count의 '주소'를 가져온다
3. 내부 함수를 호출한다.

2번에서 값이 아니라 주소를 가져오는 것이 인상적이다.
3번에 대해서 좀 찾아보니 CPU에 따라서 지원하는 명령어를 호출하여 한번에 처리한다.
(x86 아키텍쳐에서는 LOCK prefix를 통해서 이를 지원하는 듯)

따라서 여기서의 count++는 '원자적으로' 동작한다.

(3) lock

기존 stl

기존 stl은 멀티스레드 환경에서는 동작하지 않는다고 봐야한다.

#include <vector>
#include <iostream>
#include <thread>
unsing namespace std;

vector<int32> v;

void Push()
{
	for (int i = 0; i < 10'000; i++)
		v.push_back(i);
}

int main()
{
	std::thread t1(Push);
	std::thread t2(Push);

	// 바로 크래시
	// 기존거 날리기 하면서 문제 발생
	// 그럼 reserve하면요
	// 그래도 당신이 원하는 숫자는 안나옴
	// 추가 할때 동일한 back을 참조하면 덮어쓰기 발생
	v.reserve(20'000);

	t1.join();
	t2.join();

	cout << v.size() << endl;
}

mutex

그러니 c++ 기본 라이브러리인 mutex를 사용하자.

#include <iostream>
#include <vector>
#include <thread>
#include <mutex>

vector<int32> v;
// 자물쇠처럼 사용
// 탈의실 사용중 같이 사용하고 있다는걸 알려줌
mutex m;

void Push()
{
	for (int i = 0; i < 10'000; i++)
	{
		// 잠그기
		m.lock();
		v.push_back(i);
		// 풀기
		m.unlock();
	}
}

int main()
{
	std::thread t1(Push);
	std::thread t2(Push);

	t1.join();
	t2.join();

	cout << v.size() << endl;
}
  • 상호배제를 구현한 라이브러리
  • lock과 unlock의 짝을 잘 맞추자
    • 이걸 수동 으로 관리하기 힘들다.

RAII(Resource Acquitision is Initialization)

자원의 획득은 초기화이다.
간단히 설명하면 객체가 생성될 때 자원을 획득하고 소멸할 때 자원을 해제
이걸 lock에 적용시켜보자

template<typename T>
class LockGuard
{
public:
	LockGuard(T& m)
	{
		_mutex = &m;
		_mutex->lock();
	}

	~LockGuard()
	{
		_mutex->unlock();
	}

private:
	T* _mutex;
};

for (int i = 0; i < 10'000; i++)
{
	// 잠그기
    // 단순 뮤텍스만 사용
	//m.lock();
    // RAII패턴 적용
	//LockGuard<std::mutex> lockGuard(m);
    //C++ 표준
	//std::lock_guard<std::mutex> lockGuard(m);
	std::unique_lock<std::mutex> uniqueLock(m, std::defer_lock);
	uniqueLock.lock(); // 유니크 lock을 사용하면 원하는 타이밍에 lock 가능

	v.push_back(i);

	// 풀기
	//m.unlock();
	// LockGuard사용시 자동으로 풀림
}
  • 래퍼 클래스를 만들기
    • 생성자에서 잠금
    • 소멸자에서 해제

Dead lock

1차적 으로 순서를 부여해서 피해보자
나중에는 그래프를 통한 판별을 해보자

	mutex m1;
	mutex m2;
    // 내부의 일관적인 순서로 잠금
 	std::lock(m1,m2);

	//이미 lock 된 상태이니 나중에 소멸될 떄 풀어주기만 해라
	lock_guard<mutex> g1(m1, std::adapt_lock()
	lock_guard<mutex> g2(m2, std::adapt_lock()

Spin lock

(CAS 참고)

class SpinLock
{
public:
	void lock()
	{
		//CAS (Compare And Swap)
		bool expected=false;
		bool desired=true;
		// _locked랑 expected랑 비교
		// 일치하면 _locked = desired
		//		expected = _locked;
		//		return true
		// 다르면
		// expected = _locked;
		while (_locked.compare_exchange_strong(expected, desired) == false)
		{
			expected = false;
		}
	}

	void unlock()
	{
		_locked.store(false);
	}

private:
	atomic<bool> _locked = false;
};

Back off lock

bool expected=false;
bool desired=true;

while (_locked.compare_exchange_strong(expected, desired) == false)
{
	expected = false;

	this_thread::sleep_for(std::chrono::milliseconds(100));
	//this_thread::sleep_for(100ms);
	//this_thread::yield();
}

Kernel based lock

// Handle은 일종의 번호표, 이벤트는 커널 오브젝트
// Usage Count
// Signal / UnSignal (bool)
// Auto / Manual (bool)

HANDLE handle = ::CreateEvent(NULL/*보안 속성*/, FALSE/*bmanual reset*/, FALSE/*b initial statte*/, NULL);

::CloseHandle(handle); <- 세트로 관리해줘야함

mutex m;
queue<int32> q;
HANDLE handle;

void Producer()
{
	while (true)
	{
		{
			unique_lock<mutex> lock(m);
			q.push(100);
		}
		// 해당 핸들의 시그널을 true로 바꿔주세요
		::SetEvent(handle);
		this_thread::sleep_for(10000ms);
	}
}

void Consumer()
{
	while (true)
	{

		// 커널 오브젝트의 시그널을 확인하고
		// nonsignal이면 바뀔때까지 기다림
		::WaitForSingleObject(handle, INFINITE);
		// auto 에서는 여기부터 non-Signal
		//::ResetEvent(handle)

		unique_lock<mutex> lock(m);
		if (q.empty() == false)
		{
			int32 data = q.front();
			q.pop();
			cout << data << endl;
		}
	}
}


int main()
{
	// 이번트는 커널 오브젝트
	// Usage Count
	// Signal / Non-Signal
	// Auto / Manual
	// Handle은 일종의 번호표
	handle = ::CreateEvent(NULL/*보안 속성*/, FALSE/*bmanual reset*/, FALSE/*b initial statte*/, NULL);

	thread t1(Producer);
	thread t2(Consumer);

	t1.join();
	t2.join();

	::CloseHandle(handle);
}

Condition Variable

condition_variable cv;

void Producer()
{
	while (true)
	{
		// 1. Lock을 잡고
		// 2. 공유 변수를 수정하고
		// 3. Lock을 풀고
		// 4. 조건변수를 통해 다른 쓰레드에게 통지
		{
			unique_lock<mutex> lock(m);
			q.push(100);
		}
		
		//wait중인 쓰레드가 있으면 딱 1개를 깨운다
		cv.notify_one();

	}
}

void Consumer()
{
	while (true)
	{
		unique_lock<mutex> lock(m);
		cv.wait(lock, []() { return q.empty() == false; });
		// 1. lock을 잡으려고 시도
		// 2. 조건 확인
		// - 만족하면 이어서 실행
		// - 만족x 면 lock을 풀고 wait상태

		{
			int32 data = q.front();
			q.pop();
			cout << q.size() << endl;
		}

		// 그런데 notify_one을 했으면 항상 조건식을 만족하는거 아닐까?
		// Superious WakeUp
		// notify_one 할때는 lock을 잡을 상태가 아니니까
		// notify_one 할때 lock을 잡으면? 동작은 하긴 하는데 별로 해결되지는 않음
 	}
}


int main()
{
	thread t1(Producer);
	thread t2(Consumer);

	t1.join();
	t2.join();
}
  • 커널 오브젝트가 아닌 유저 레벨 오브젝트
  • lock 과 짝을 지어서 사용해야함

(4) Futuer

#include <future>

int64 Calculate()
{
	int64 sum = 0;

	for (int32 i = 0; i < 100'000; i++)
	{
		sum += i;
	}

	return sum;
}

void PromiseWorker(std::promise<string>&& promise)
{
	promise.set_value("Secret Message");
}

void TaskWorker(std::packaged_task<int64(void)>&& task)
{
	task();
}


int main()
{
	// 동기 방식 호출 (Synchronous) 실행
	//int64 sum = Calculate();
	//cout << sum << endl;

	//이 방식은 값을 받아오려면 전역변수를 사용해야함
	//thread t(Calculate);
	//t.join();

	//std::future
	{
		// 1) deferred			-> lazy evaluation (커맨트 패턴에서과 유사)
		// 2) async				-> 별도의 쓰레드를 만들어서 실행하세요
		// 3) defferd | async	-> 둘중에 알아서 하쇼
		std::future<int64> future = std::async(std::launch::async, Calculate);

		//TODO

		//결과가 완료 되었는지 궁금할 때
		//파라미터를 넣지 않으면 get()이랑 똑같음
		//get()은 한번만 호출해야한다.
		std::future_status status = future.wait_for(1ms);
		if (status == future_status::ready)
		{

		}

		//실제로 결과물이 필요한 시점에 사용
		int64 sum = future.get();

		class Knight
		{
		public:
			int64 GetHp() { return 100; }
		};

		Knight  k;
		std::future<int64> future2 = std::async(std::launch::async, &Knight::GetHp,k);
	}

	//std::promise
	{
		//미래의 결과물을 반환해줄 거라 약속해줘
		std::promise<string> promise;
		std::future<string> future = promise.get_future();

		//promise의 소유권을 t스레드에게 넘겨줌
		thread t(PromiseWorker, std::move(promise));

		string message = future.get();
		cout << message << endl;

		t.join();
	}

	//std::packaged_task
	//원하는 함수의 실행 결과를 package_task를 통해서 future로 받아줌
	{
		std::packaged_task<int64(void)> task(Calculate);
		std::future<int64> future = task.get_future();

		thread t(TaskWorker, std::move(task));

		int64 sum = future.get();
		cout << sum << endl;

		t.join();
	}
}
  • mutex, 조건변수까지 갈 필요없이 간단하게 사용할 때
profile
일단 창고에 넣어놓으면 언젠가는 쓰겠지

0개의 댓글