728x90
반응형

Overview

데이터의 일관성과 정합성을 유지하기 위하여 동시성 제어를 위해 사용되는 동기화 기법을 알아보고 각각의 차이점을 알아본다.

 

용어 정리

동기화(Synchronization)란?

여러 개의 스레드 또는 프로세스가 공유된 자원에 접근할 때 발생하는 문제를 해결하기 위해 사용되는 개념이다.

동기화를 통해 스레드 또는 프로세스들 간의 상호 작용을 조절하고, 데이터의 일관성과 정확성을 보장할 수 있다.


임계 영역(Critical Section)이란?

임계 영역은 여러 스레드나 프로세스에서 동시에 접근 가능한 영역을 말한다. 이 임계 영역에 동시에 접근하면 데이터의 일관성을 유지할 수 없거나, 예상치 못한 결과가 발생할 수 있어 임계 영역에 대한 동시 접근을 효율적으로 제어하기 위해 동기화 기법이 사용된다고 보면 된다.


경쟁 조건(Race Condition)이란?

여러 스레드나 프로세스가 공유된 자원에 동시에 접근할 때, 실행 순서나 타이밍 등에 의해 발생하는 예측할 수 없는 결과를 말한다. 경쟁 조건은 임계 영역에서 발생할 수 있으며, 일관성 없는 결과, 잘못된 데이터 처리, 비정상적인 동작 등의 문제를 일으킬 수 있다. 경쟁 조건을 해결하기 위해 동기화 기법을 사용하여 공유 자원에 대한 접근을 조절하고, 데이터의 일관성을 유지할 수 있습니다.

 

 

스핀락(Spinlock)

  • Busy Waiting(바쁜 대기) 방식을 사용한다. 
  • 자원이 해제될 때까지 반복적으로 검사한다.

장점

  • 사용 중인 자원에 대한 대기 시간이 짧을 때 유리하다.
  • 공유 자원에 대한 경합이 잠깐 동안 발생하는 경우 효과적이다.

단점

하지만, 자원을 얻을 수 없는 상태에서 반복적으로 검사하면서 CPU 시간을 낭비할 수 있다.

 

 

뮤텍스(Mutex)

  • 락(lock)과 언락(unlock)을 사용하여 임계 영역(Critical Section)에 접근하는 방식
  • 하나의 스레드만이 임계 영역에 접근할 수 있다.
  • 뮤텍스를 소유한 스레드만이 해당 뮤텍스를 해제할 수 있습니다.
  • 뮤텍스는 특정 스레드가 자원을 소유할 수 있으며, 다른 스레드들은 대기해야한다.

장점

위의 스핀락 방식과 다른 점을 해당 자원을 얻을 수 없을때는 휴식을 취하는 방식이다.

 

 

출처 : https://medium.com/geekculture/processes-threads-deadlock-semaphores-and-more-f70be5395ef6


세마포어(Semaphore)

  • 정수 변수를 사용하여 임계 영역에 접근한다.
  • 세마포어 값이 0 이상이면 접근을 허용하고, 0 미만이면 대기한다.
  • 세마포어 값은 P(임계 영역에 진입)와 V(임계 영역에서 나감) 연산으로 조절된다.
  • 세마포어는 뮤텍스와 달리 여러 개의 스레드가 동시에 접근할 수 있다.
  • 세마포어는 자원의 개수를 나타내기도 하며, 이를 활용하여 상호 배제, 동기화 등 다양한 동작을 구현할 수 있다.

세마포어 종류

바이너리 세마포어는 상호 배타적이며 0과 1의 두 값만 가질 수 있으며 1로 초기화된다. 여러 프로세스를 처리할 때 신호를 보내고 솔루션을 제공하는 데 사용된다.

카운팅 세마포어에는 상호 배제가 없으며 여러 인스턴스가 있는 리소스에 대한 액세스를 제어하는 ​​데 도움이 된다.

 

출처 : https://medium.com/geekculture/processes-threads-deadlock-semaphores-and-more-f70be5395ef6

 

 

Python 코드로 검증을 해보자

4개의 스레드를 생성하고, 각 스레드에서 10만번씩 counter 변수를 증가시키는 작업을 수행하도록 한다.

 

SpinLock

%%time

from time import sleep
import threading
from threading import Thread, Lock


class Spinlock:
    def __init__(self):
        self.lock = Lock()
        self.is_locked = False

    def acquire(self):
        while True:
            acquired = self.lock.acquire(blocking=False)
            if acquired:
                self.is_locked = True
                return

    def release(self):
        self.is_locked = False
        self.lock.release()
        
spinlock = Spinlock()
counter = 0        

def increment():
    for _ in range(100000):
        spinlock.acquire()
        global counter
        counter += 1
        spinlock.release()

threads = []
for _ in range(4):
    t = threading.Thread(target=increment)
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print("Counter:", counter)

# Counter: 400000
# CPU times: total: 2.89 s
# Wall time: 1.51 s

 

Mutex

%%time
import threading

class Mutex:
    def __init__(self):
        self.lock = threading.Lock()

    def acquire(self):
        self.lock.acquire()

    def release(self):
        self.lock.release()

# 뮤텍스 사용 예시
mutex = Mutex()
counter = 0

def increment():
    for _ in range(100000):
        mutex.acquire()
        global counter      
        counter += 1
        mutex.release()

threads = []
for _ in range(4):
    t = threading.Thread(target=increment)
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print("Counter:", counter)

# Counter: 400000
# CPU times: total: 2.89 s
# Wall time: 1.51 s

 

Semaphore

%%time
import threading

class Semaphore:
    def __init__(self, initial_count):
        self.lock = threading.Lock()
        self.count = initial_count
        self.condition = threading.Condition(lock=self.lock)

    def acquire(self):
        with self.condition:
            while self.count <= 0:
                self.condition.wait()
            self.count -= 1

    def release(self):
        with self.condition:
            self.count += 1
            self.condition.notify()

semaphore = Semaphore(1) 
counter = 0

def increment():
    for _ in range(100000):
        semaphore.acquire()
        global counter
        counter += 1
        semaphore.release()

threads = []
for _ in range(4):
    t = threading.Thread(target=increment)
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print("Counter:", counter)

# Counter: 400000
# CPU times: total: 1.11 s
# Wall time: 1.09 s

 

실행시간은 Mutex < SemaPhore  < SpinLock  순으로 빨랐다. 그러면 Mutex말고 SpinLock을 속도면으로 빠르니 이걸 사용하면 되겠지? 라고 하는 것은 매우 단순하고 멍청한 생각일 것이다.

SpinLock과 Mutex의 용도를 알고 사용해야한다. 가장 큰 차이점은  스레드가 휴식을 취하고있는지(Mutex) CPU를 계속 사용하여 바로 앞에서 대기(SpinLock)하고 있는지의 차이이다.

 

SpinLock은 CPU를 계속 사용하여 대기하고 있으므로 스레드에 대한 블로킹이 없지만 기다리는 동안 CPU 자원을 낭비하게 된다. 주로 진입이 짧은 작업에 유리하다. 반면에 Mutex는 대기 시간이 길거나 대기하는 스레드 수가 많은 경우에 유리하다. "보통 먼말인지 모르겠다"할때는 보편적으로 Mutex를 사용하면 된다.

 

바이너리 세마포어 vs 카운팅 세마포어

아래 예시만 봐도 충분히 알 수 있다.

바이너리 세마포어는 상호 배타적이며 여러 프로세스를 처리할 때 신호를 보내고 솔루션을 제공하는데 사용하며
카운팅 세마포어에는 상호 배제가 없으며 여러 인스턴스가 있는 리소스에 대한 액세스를 제어하는 ​​데 사용된다.

 

바이너리 세마포어

import threading

# 바이너리 세마포어 생성 (초기 값 1)
binary_semaphore = threading.Semaphore(1)

def worker():
    binary_semaphore.acquire()
    print("Worker is executing")
    binary_semaphore.release()

# 스레드 생성
thread1 = threading.Thread(target=worker)
thread2 = threading.Thread(target=worker)

# 스레드 시작
thread1.start()
thread2.start()

# 스레드 종료 대기
thread1.join()
thread2.join()

카운팅 세마포어

import threading

# 카운팅 세마포어 생성 (초기 값 3)
counting_semaphore = threading.Semaphore(3)

def worker():
    counting_semaphore.acquire()
    print("Worker is executing")
    counting_semaphore.release()

# 스레드 생성
thread1 = threading.Thread(target=worker)
thread2 = threading.Thread(target=worker)
thread3 = threading.Thread(target=worker)

# 스레드 시작
thread1.start()
thread2.start()
thread3.start()

# 스레드 종료 대기
thread1.join()
thread2.join()
thread3.join()
728x90
반응형