[CS 스터디] 1.2 프로세스 (2)

Gamchan Kang·2024년 4월 10일
0

CS 스터디

목록 보기
3/5

1.2.6 콘텍스트 스위칭 (중요도 3)

  • 인터럽트: 입출력 관련 이벤트가 발생하거나 예외 상황이 발생할 때 이에 대응할 수 있게 CPU에 처리를 요청하는 것
  • CPU는 하나의 프로세스만 처리할 수 있음 → 멀티 프로세스를 처리하기 위해 CPU 스케줄러에 의해 인터럽트 발생: 콘텍스트 스위칭 이뤄짐
  • 콘텍스트: CPU가 처리하는 프로세스의 정보
    콘텍스트 스위칭: 멀티 프로세스/스레드 환경에서 CPU가 처리 중인 프로세스/스레드의 정보를 바꾸는 것

  • P1 PCB를 업데이트하고 P2 PCB에서 데이터를 레지스터에 로드하는 동안 CPU는 아무 일도 못한다. → 오버헤드 발생
  • 스레드는 힙, 데이터, 코드 영역을 공유하므로 멀티 스레드에서 콘텍스트 스위치은 오버헤드가 비교적 작다.
  • PCB에는 프로그램 카운터와 스택 포인터 값이 저장되므로 콘텍스트 스위칭이 일어나고 난 뒤 작업을 이어서 진행할 수 있다.

1.2.7 프로세스 동기화 (중요도 3)

경쟁 상태

  • 여러 프로세스/스레드가 공유 자원에 동시에 접근해 순서에 따라 값이 바뀌는 상태
  • Too much milk problem

    1. 엄마가 냉장고에 우유가 없는 걸 확인
    2. 엄마가 우유를 사러 슈퍼마켓에 감
    3. 엄마가 우유를 사고 집에 오는 길에 아빠가 냉장고에 우유가 없는 걸 확인
    4. 아빠가 우유를 사러 슈퍼마켓에 감
    5. 아빠가 우유를 사고 집에 돌아옴

스레드에서는 GIL로 인해 오직 하나의 스레드만 동작하면서 이 문제가 발생하지 않는다.

import threading


counter = 0

def increment():
    global counter
    for _ in range(100_000):
        counter += 1
        
        
thread1 = threading.Thread(target=increment)
thread2 = threading.Thread(target=increment)

thread1.start()
thread2.start()

thread1.join()
thread2.join()

print(f"Expected counter value: 200000, Actual counter value: {counter}")
Expected counter value: 200000, Actual counter value: 200000

반면 멀티 프로세싱 환경에서는 경쟁 상태를 볼 수 있다.

from multiprocessing import Process, Value
import ctypes


def increment(shared_counter):
    for _ in range(100000):
        shared_counter.value += 1

if __name__ == '__main__':
    # 공유 메모리 값 생성
    counter = Value('i', 0)

    # 프로세스 생성
    process1 = Process(target=increment, args=(counter,))
    process2 = Process(target=increment, args=(counter,))

    # 프로세스 시작
    process1.start()
    process2.start()

    # 프로세스가 끝나길 기다림
    process1.join()
    process2.join()

    print(f"Expected counter value: 200000, Actual counter value: {counter.value}")
Expected counter value: 200000, Actual counter value: 100470

특히 counter.value 값은 매 실행마다 값이 달라지는 걸 볼 수 있다.

임계 영역

  • 공유 자원에 접근할 수 있고 접근 순서에 따라 결과가 달라지는 코드 영역을 임계 영역이라고 한다. 위 코드에서는 다음 부분이 임계 영역이다.
# 스레드 예제의 increment 함수 내부
counter += 1

# 프로세스 예제의 increment 함수 내부
shared_counter.value += 1

멀티 프로세스/스레드 환경에서 데이터의 일관성은 중요하므로 프로세스 동기화는 필수이다. 임계 영역에서 여러 접근을 막기 위해서는 다음 3가지 방법이 있다.

  • 상호배제 기법: 임계 영역을 실행하는 프로세스 외에는 다른 프로세스는 임계 영역에 접근할 수 없다. 뮤텍스와 세마포어가 있다.
  • 진행: 임계 영역을 실행 중인 프로세스가 없으면 다른 프로세스가 임계 영역을 실행한다.
  • 한정된 대기: 임계 영역 접근을 요청했을 때 대기 시간을 제한한다.

뮤텍스

  • 락을 가진 프로세스/스레드 만이 공유 자원에 접근할 수 있게 하는 방법
  • 화장실과 열쇠가 하나 뿐인 상황
  • 락킹 매커니즘이라고도 함
  • 바쁜 대기(프로세스가 공유 자원에 접근할 수 있는 권한을 얻을 때까지 확인하는 과정)의 한 종류인 스핀락(프로세스가 반복문을 돌면서 기다림)을 통해서 락이 풀렸는지 여부를 기다림

세마포어

  • 공유 자원에 접근할 수 있는 프로세스의 수를 정해 접근을 제어함
  • n개 열쇠와 화장실
  • 공유 자원에 접근한 프로세스가 접근을 해제하면 다른 프로세스가 접근하도록 신호를 보냄 → 시그널링 매커니즘이라고도 함
from multiprocessing import Process, Value, Lock, Semaphore

def increment_without_sync(shared_counter):
    for _ in range(100000):
        shared_counter.value += 1

def increment_with_lock(shared_counter, lock):
    for _ in range(100000):
        with lock:
            shared_counter.value += 1

def increment_with_semaphore(shared_counter, semaphore):
    for _ in range(100000):
        with semaphore:
            shared_counter.value += 1

if __name__ == '__main__':
    # 동기화 없이
    counter = Value('i', 0)
    p1 = Process(target=increment_without_sync, args=(counter,))
    p2 = Process(target=increment_without_sync, args=(counter,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print(f"Final counter value without synchronization: {counter.value}")

    # 뮤텍스(Lock)를 사용하여
    counter.value = 0  # 카운터 초기화
    lock = Lock()
    p1 = Process(target=increment_with_lock, args=(counter, lock))
    p2 = Process(target=increment_with_lock, args=(counter, lock))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print(f"Final counter value with mutex (Lock): {counter.value}")

    # 세마포어를 사용하여
    counter.value = 0  # 카운터 초기화
    semaphore = Semaphore(2)  # 동시에 최대 2개의 프로세스만 접근 허용
    p1 = Process(target=increment_with_semaphore, args=(counter, semaphore))
    p2 = Process(target=increment_with_semaphore, args=(counter, semaphore))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print(f"Final counter value with semaphore: {counter.value}")
Final counter value without synchronization: 100555
Final counter value with mutex (Lock): 200000
Final counter value with semaphore: 136282

* 세마포어도 완전히 경쟁 상태를 막지는 못한다!

동기: 작업 순서 보장
비동기: 작업 순서 보장 X
블로킹: 대기 가능
논블로킹: 대기 없이 작업

1.2.8 교착 상태 (중요도 3)

  • 2개 이상의 프로세스가 각각 자원을 가지고 있으며 서로의 자원을 요구하며 기다리는 상태. 다음 4가지 필요 충분 조건으로 발생한다.
  • 상호배제: 하나의 공유 자원에는 하나의 프로세스만 사용해야 한다.
  • 점유와 대기: 프로세스가 최소 하나의 자원을 점유하고 있고 추가로 다른 프로세스의 자원을 점유하기 위해 대기한다.
  • 비선점: 다른 프로세스의 자원을 뺏을 수 없다.
  • 환형 대기: 자신의 자원과 앞/뒤 프로세스의 자원을 요구한다.

import threading
import time

# 두 개의 자원을 위한 락 생성
lock1 = threading.Lock()
lock2 = threading.Lock()

# 스레드 1의 작업
def thread1_work():
    while True:
        with lock1:
            print("Thread 1 has acquired lock1")
            time.sleep(1)
            with lock2:
                print("Thread 1 has acquired lock2")
        print("Thread 1 is done")

# 스레드 2의 작업
def thread2_work():
    while True:
        with lock2:
            print("Thread 2 has acquired lock2")
            time.sleep(1)
            with lock1:
                print("Thread 2 has acquired lock1")
        print("Thread 2 is done")

# 스레드 생성 및 시작
t1 = threading.Thread(target=thread1_work)
t2 = threading.Thread(target=thread2_work)

t1.start()
t2.start()

t1.join()
t2.join()
Thread 1 has acquired lock1
Thread 2 has acquired lock2
...이후 교착 상태

1.2.9 스레드 안전 (중요도 2)

  • 멀티 스레드 환경에서 하나의 변수, 함수, 객체에 여러 스레드가 동시에 접근해도 문제가 없음. 다음 조건이 있다.

  • 상호배제: 공유 자원에 접근할 때 뮤텍스/세마포어 기법으로 접근
  • 원자 연산: '연산 했음/안했음' 두가지만 존재하는 연산인 원자 연산 혹은 원자적으로 정의된 연산을 이용
  • 재진입성: 함수가 스레드에서 실행 중일때, 다른 스레드에서 실행해도 각 스레드에 올바른 결과가 나와야 함
  • 스레드 지역 저장소: 각 스레드에서만 접근할 수 있는 저장소를 사용하여 공유 자원 축소
import threading
import time

# 상호 배제를 위한 락 객체
lock = threading.Lock()

# 원자 연산 예제로, 간단한 플래그 변수 사용
operation_done = False

# 스레드 지역 저장소 생성
thread_local = threading.local()

# 재진입 가능한 함수
def reentrant_function():
    # 스레드 지역 저장소에 값 설정. 각 스레드는 자신만의 unique_value를 가짐
    if not hasattr(thread_local, "unique_value"):
        thread_local.unique_value = 0

    # 상호 배제를 보장하기 위해 락을 사용
    with lock:
        # 원자 연산 사용 (여기서는 간단한 플래그 체크)
        global operation_done
        if not operation_done:
            print(f"Performing an operation in {threading.current_thread().name}")
            thread_local.unique_value += 1  # 스레드 지역 저장소 사용
            operation_done = True  # 원자 연산으로 상태 변경
        else:
            print(f"Operation was already performed by another thread")

    # 재진입성을 보여주기 위한 출력. 동일한 함수가 다른 스레드에서 실행되어도, 각 스레드는 올바른 결과를 가짐
    print(f"{threading.current_thread().name}'s unique value: {thread_local.unique_value}")

# 스레드를 생성하고 시작하는 함수
def start_thread():
    for i in range(2):
        t = threading.Thread(target=reentrant_function, name=f"Thread-{i+1}")
        t.start()
        time.sleep(1)  # 스레드 실행 간격

start_thread()
Performing an operation in Thread-1
Thread-1's unique value: 1
Operation was already performed by another thread
Thread-2's unique value: 0

1.2.10 IPC (중요도 2)

1. 공유 메모리

from multiprocessing import Process, Value

def increment(shared_value):
    for _ in range(10000):
        shared_value.value += 1

if __name__ == "__main__":
    shared_value = Value('i', 0)  # 공유 메모리 생성
    p1 = Process(target=increment, args=(shared_value,))
    p2 = Process(target=increment, args=(shared_value,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print(f"Final value: {shared_value.value}")
Final value: 10037

2. 소켓

# server.py
import socket

def server():
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.bind(('localhost', 12345))
        s.listen()
        print("Server is listening")
        conn, addr = s.accept()
        with conn:
            print('Connected by', addr)
            while True:
                data = conn.recv(1024)
                if not data:
                    break
                conn.sendall(data)

if __name__ == "__main__":
    server()
import socket

def client():
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.connect(('localhost', 12345))
        s.sendall(b'Hello, world')
        data = s.recv(1024)
        print('Received', repr(data))

if __name__ == "__main__":
    client()

그리고 터미널 두개로 각각 파이썬 파일을 실행하면

# server.py
Connected by ('127.0.0.1', 60587)

# client.py
Received b'Hello, world'

3. 세마포어

4. 파이프

from multiprocessing import Process, Pipe

def f(conn):
    conn.send(['hello', 42, None])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # ['hello', 42, None]
    p.join()
['hello', 42, None]

5. 메시지 큐

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()
[42, None, 'hello']

1.2.11 좀비 프로세스와 고아 프로세스 (중요도 2)

  • 좀비 프로세스: 부모 프로세스가 자식 프로세스의 종료 상태를 회수하지 않았을 경우 남겨진 자식 프로세스 → 자원이 낭비될 수 있다.
  • 고아 프로세스: 부모 프로세스가 자식 프로세스보다 먼저 종료되는 경우 → 자식 프로세스의 부모 PID를 부팅 시 가장 먼저 실행되는 init 프로세스인 1로 바꿔줌
profile
Someday, the dream will come true

0개의 댓글