스레드에서는 GIL로 인해 오직 하나의 스레드만 동작하면서 이 문제가 발생하지 않는다.
import threading
counter = 0
def increment():
global counter
for _ in range(100_000):
counter += 1
thread1 = threading.Thread(target=increment)
thread2 = threading.Thread(target=increment)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(f"Expected counter value: 200000, Actual counter value: {counter}")
Expected counter value: 200000, Actual counter value: 200000
반면 멀티 프로세싱 환경에서는 경쟁 상태를 볼 수 있다.
from multiprocessing import Process, Value
import ctypes
def increment(shared_counter):
for _ in range(100000):
shared_counter.value += 1
if __name__ == '__main__':
# 공유 메모리 값 생성
counter = Value('i', 0)
# 프로세스 생성
process1 = Process(target=increment, args=(counter,))
process2 = Process(target=increment, args=(counter,))
# 프로세스 시작
process1.start()
process2.start()
# 프로세스가 끝나길 기다림
process1.join()
process2.join()
print(f"Expected counter value: 200000, Actual counter value: {counter.value}")
Expected counter value: 200000, Actual counter value: 100470
특히 counter.value
값은 매 실행마다 값이 달라지는 걸 볼 수 있다.
# 스레드 예제의 increment 함수 내부
counter += 1
# 프로세스 예제의 increment 함수 내부
shared_counter.value += 1
멀티 프로세스/스레드 환경에서 데이터의 일관성은 중요하므로 프로세스 동기화는 필수이다. 임계 영역에서 여러 접근을 막기 위해서는 다음 3가지 방법이 있다.
from multiprocessing import Process, Value, Lock, Semaphore
def increment_without_sync(shared_counter):
for _ in range(100000):
shared_counter.value += 1
def increment_with_lock(shared_counter, lock):
for _ in range(100000):
with lock:
shared_counter.value += 1
def increment_with_semaphore(shared_counter, semaphore):
for _ in range(100000):
with semaphore:
shared_counter.value += 1
if __name__ == '__main__':
# 동기화 없이
counter = Value('i', 0)
p1 = Process(target=increment_without_sync, args=(counter,))
p2 = Process(target=increment_without_sync, args=(counter,))
p1.start()
p2.start()
p1.join()
p2.join()
print(f"Final counter value without synchronization: {counter.value}")
# 뮤텍스(Lock)를 사용하여
counter.value = 0 # 카운터 초기화
lock = Lock()
p1 = Process(target=increment_with_lock, args=(counter, lock))
p2 = Process(target=increment_with_lock, args=(counter, lock))
p1.start()
p2.start()
p1.join()
p2.join()
print(f"Final counter value with mutex (Lock): {counter.value}")
# 세마포어를 사용하여
counter.value = 0 # 카운터 초기화
semaphore = Semaphore(2) # 동시에 최대 2개의 프로세스만 접근 허용
p1 = Process(target=increment_with_semaphore, args=(counter, semaphore))
p2 = Process(target=increment_with_semaphore, args=(counter, semaphore))
p1.start()
p2.start()
p1.join()
p2.join()
print(f"Final counter value with semaphore: {counter.value}")
Final counter value without synchronization: 100555
Final counter value with mutex (Lock): 200000
Final counter value with semaphore: 136282
* 세마포어도 완전히 경쟁 상태를 막지는 못한다!
동기: 작업 순서 보장
비동기: 작업 순서 보장 X
블로킹: 대기 가능
논블로킹: 대기 없이 작업
import threading
import time
# 두 개의 자원을 위한 락 생성
lock1 = threading.Lock()
lock2 = threading.Lock()
# 스레드 1의 작업
def thread1_work():
while True:
with lock1:
print("Thread 1 has acquired lock1")
time.sleep(1)
with lock2:
print("Thread 1 has acquired lock2")
print("Thread 1 is done")
# 스레드 2의 작업
def thread2_work():
while True:
with lock2:
print("Thread 2 has acquired lock2")
time.sleep(1)
with lock1:
print("Thread 2 has acquired lock1")
print("Thread 2 is done")
# 스레드 생성 및 시작
t1 = threading.Thread(target=thread1_work)
t2 = threading.Thread(target=thread2_work)
t1.start()
t2.start()
t1.join()
t2.join()
Thread 1 has acquired lock1
Thread 2 has acquired lock2
...이후 교착 상태
import threading
import time
# 상호 배제를 위한 락 객체
lock = threading.Lock()
# 원자 연산 예제로, 간단한 플래그 변수 사용
operation_done = False
# 스레드 지역 저장소 생성
thread_local = threading.local()
# 재진입 가능한 함수
def reentrant_function():
# 스레드 지역 저장소에 값 설정. 각 스레드는 자신만의 unique_value를 가짐
if not hasattr(thread_local, "unique_value"):
thread_local.unique_value = 0
# 상호 배제를 보장하기 위해 락을 사용
with lock:
# 원자 연산 사용 (여기서는 간단한 플래그 체크)
global operation_done
if not operation_done:
print(f"Performing an operation in {threading.current_thread().name}")
thread_local.unique_value += 1 # 스레드 지역 저장소 사용
operation_done = True # 원자 연산으로 상태 변경
else:
print(f"Operation was already performed by another thread")
# 재진입성을 보여주기 위한 출력. 동일한 함수가 다른 스레드에서 실행되어도, 각 스레드는 올바른 결과를 가짐
print(f"{threading.current_thread().name}'s unique value: {thread_local.unique_value}")
# 스레드를 생성하고 시작하는 함수
def start_thread():
for i in range(2):
t = threading.Thread(target=reentrant_function, name=f"Thread-{i+1}")
t.start()
time.sleep(1) # 스레드 실행 간격
start_thread()
Performing an operation in Thread-1
Thread-1's unique value: 1
Operation was already performed by another thread
Thread-2's unique value: 0
from multiprocessing import Process, Value
def increment(shared_value):
for _ in range(10000):
shared_value.value += 1
if __name__ == "__main__":
shared_value = Value('i', 0) # 공유 메모리 생성
p1 = Process(target=increment, args=(shared_value,))
p2 = Process(target=increment, args=(shared_value,))
p1.start()
p2.start()
p1.join()
p2.join()
print(f"Final value: {shared_value.value}")
Final value: 10037
# server.py
import socket
def server():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('localhost', 12345))
s.listen()
print("Server is listening")
conn, addr = s.accept()
with conn:
print('Connected by', addr)
while True:
data = conn.recv(1024)
if not data:
break
conn.sendall(data)
if __name__ == "__main__":
server()
import socket
def client():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect(('localhost', 12345))
s.sendall(b'Hello, world')
data = s.recv(1024)
print('Received', repr(data))
if __name__ == "__main__":
client()
그리고 터미널 두개로 각각 파이썬 파일을 실행하면
# server.py
Connected by ('127.0.0.1', 60587)
# client.py
Received b'Hello, world'
from multiprocessing import Process, Pipe
def f(conn):
conn.send(['hello', 42, None])
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv()) # ['hello', 42, None]
p.join()
['hello', 42, None]
from multiprocessing import Process, Queue
def f(q):
q.put([42, None, 'hello'])
if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print(q.get()) # prints "[42, None, 'hello']"
p.join()
[42, None, 'hello']