스레드(Thread)

[Thread] 코루틴(Coroutine)의 동시성 제어

Stark97 2024. 9. 22. 22:06
반응형
 
 
 

코루틴의 동시성 제어 방식을 알아보자.

📌 서론

코루틴은 비동기 프로그래밍을 간편하게 만들어주는 강력한 도구다. 하지만 여러 코루틴이 동시에 실행되면서 공유 자원에 접근할 때는 동시성 문제(concurrency issues)가 발생할 수 있다.

이번 포스팅에서는 코루틴에서 동시성을 안전하게 관리하기 위한 주요 방법들을 실전에 유용한 예제와 함께 쉽게 이해할 수 있도록 설명할 예정이다.

 

1. 뮤텍스(Mutex)와 동기화

뮤텍스의 개념과 필요성

  • 멀티스레드 환경에서 여러 스레드가 동시에 공유 자원에 접근하면 데이터 일관성 문제가 발생할 수 있다. 코루틴도 마찬가지로 여러 코루틴이 동시에 동일한 자원에 접근할 때 동기화가 필요하다. 이를 위해 뮤텍스(Mutex)를 사용하여 임계 구역을 보호할 수 있다.

Mutex의 기본 사용법

  • 뮤텍스는 상호 배제를 제공하여, 동시에 여러 코루틴이 공유 자원에 접근하지 못하도록 한다. Kotlin에서는 kotlinx.coroutines.sync.Mutex 클래스를 제공한다.

withLock 함수 활용

  • withLock 함수는 뮤텍스를 안전하게 잠그고 해제하는 데 사용된다. 이 함수는 블록 내의 코드를 임계 구역으로 보호하여 동시 접근을 방지한다.

예제 1. 뮤텍스를 사용한 안전한 카운터 증가

import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

val mutex = Mutex()
var counter = 0

fun main() = runBlocking {
    val dispatcher = Dispatchers.Default

    val jobs = List(1000) {
        launch(dispatcher) {
            repeat(1000) {
                mutex.withLock {
                    counter++
                }
            }
        }
    }
    jobs.forEach { it.join() }
    println("최종 카운터 값: $counter")
}

출력 결과

최종 카운터 값: 1000000

실행 예제 설명

  • 뮤텍스 초기화: val mutex = Mutex()로 뮤텍스를 생성한다.
  • 임계 구역 보호: mutex.withLock { counter++ }를 사용하여 counter++ 연산을 임계 구역으로 보호한다.
  • 멀티스레드 환경에서도 안전: Dispatchers.Default를 사용하여 멀티스레드 환경에서 동작시켜도 정확한 카운터 값을 유지한다.

 

만약 뮤텍스를 안 쓰면 결과가 어떻게 될까?


 

예제 2. 뮤텍스 없이 실행 (문제 발생)

import kotlinx.coroutines.*

var counter = 0

fun main() = runBlocking {
    // 멀티스레드 디스패처 사용
    val dispatcher = Dispatchers.Default

    val jobs = List(1000) {
        launch(dispatcher) {
            repeat(1000) {
                counter++
            }
        }
    }
    jobs.forEach { it.join() }
    println("최종 카운터 값: $counter")
}

출력 결과 (불일치 가능)

  • 뮤텍스를 안 쓰니 원하는 결과가 안 나온다.
최종 카운터 값: 817877

실행 예제 설명

  1. 디스패처 변경: Dispatchers.Default를 사용하여 코루틴이 여러 스레드에서 병렬로 실행되도록 한다.
  2. 코루틴 실행: 1000개의 코루틴이 각각 1000번씩 counter++를 수행한다.
  3. 결과 확인: 동시성 문제가 발생하면 최종 카운터 값이 1000000이 아닌 다른 값으로 나타날 수 있다.

 

주요 포인트

  • 뮤텍스(Mutex)는 공유 자원에 대한 동시 접근을 제어하여 데이터 일관성을 유지한다.
  • withLock 블록 내의 코드는 뮤텍스를 소유한 상태로 실행되며, 다른 코루틴이 접근하지 못하도록 보호된다.
  • 뮤텍스를 사용하지 않으면 데이터 경쟁 상태(race condition)가 발생할 수 있다.

2. 액터(Actors)를 이용한 메시지 기반 동시성 제어

액터 모델의 개념과 장점

  • 액터(Actor) 모델은 메시지 전달을 통해 코루틴 간의 통신을 관리하는 방식이다. 액터는 내부 상태를 비공개로 유지하며, 외부에서 직접 접근하지 않고 메시지를 통해서만 상태를 변경할 수 있다. 이를 통해 동시성 문제를 효과적으로 방지할 수 있다.

actor 빌더를 이용한 액터 생성 및 메시지 처리

  • Kotlin에서는 kotlinx.coroutines.channels.actor 빌더를 사용하여 액터를 생성할 수 있다. 액터는 특정 메시지 타입을 처리하도록 설계된다.

액터를 이용한 안전한 상태 변경

  • 액터를 사용하면 상태 변경을 메시지 기반으로 안전하게 수행할 수 있다. 이는 멀티스레드 환경에서 상태 일관성을 유지하는 데 매우 유용하다.

예제 코드: 액터를 이용한 카운터 관리

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.actor

// 메시지 타입 정의
sealed class CounterMsg
data object IncCounter : CounterMsg() // 카운터 증가
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // 카운터 값 요청

// 액터 정의
@OptIn(ObsoleteCoroutinesApi::class)
fun CoroutineScope.counterActor() = actor<CounterMsg>(Dispatchers.Default) {
    var counter = 0
    for (msg in channel) {
        when (msg) {
            is IncCounter -> counter++
            is GetCounter -> msg.response.complete(counter)
        }
    }
}

fun main(): Unit = runBlocking {
    val counter = counterActor()

    // 카운터 증가 메시지 전송
    repeat(1000) { counter.send(IncCounter) }

    // 카운터 값 요청
    val response = CompletableDeferred<Int>()
    counter.send(GetCounter(response))
    println("최종 카운터 값: ${response.await()}") // 출력: 최종 카운터 값: 1000

    counter.close() // 액터 종료
}

출력 결과

최종 카운터 값: 1000

코드 해석

1. 메시지 타입 정의

// 메시지 타입 정의
sealed class CounterMsg
data object IncCounter : CounterMsg() // 카운터 증가
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // 카운터 값 요청
  • sealed class CounterMsg
    • 액터가 처리할 수 있는 메시지 타입을 정의하는 추상 클래스다. sealed 키워드를 사용하여 이 클래스의 하위 클래스는 동일한 파일 내에서만 정의될 수 있도록 제한한다.

  • data object IncCounter : CounterMsg()
    • 카운터를 증가시키는 메시지를 나타낸다. object 키워드를 사용하여 싱글톤 객체로 정의된다. 또한 CounterMsg를 상속받는다. data 키워드는 Kotlin 1.9부터 지원되며, 이전 버전에서는 object만 사용해야 한다.

  • class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg()
    • GetCounter라는 이름의 클래스를 정의하며, response라는 속성을 가진다. 이 클래스도 CounterMsg를 상속받는다. 이때 CounterMsg가 sealed class이므로, GetCounter는 CounterMsg의 구체적인 구현 중 하나다.

2. 액터 정의

// 액터 정의
@OptIn(ObsoleteCoroutinesApi::class)
fun CoroutineScope.counterActor() = actor<CounterMsg>(Dispatchers.Default) {
    var counter = 0
    for (msg in channel) {
        when (msg) {
            is IncCounter -> counter++
            is GetCounter -> msg.response.complete(counter)
        }
    }
}
  • @OptIn(ObsoleteCoroutinesApi::class)
    • 현재 사용 중인 API가 폐기 예정임을 명시적으로 수용하기 위해 사용된다. 최신 코루틴 라이브러리에서는 액터 관련 API가 변경되었을 수 있으므로 주의가 필요하다.

  • fun CoroutineScope.counterActor() = actor<CounterMsg>(Dispatchers.Default) { ... }
    • CounterMsg 타입의 메시지를 처리하는 액터를 생성하는 함수다. 이 함수는 CoroutineScope의 확장 함수로 정의되어 있어, 액터가 특정 코루틴 스코프 내에서 실행되도록 한다. 또한 Dispatchers를 통해 멀티스레드 환경을 구성한다.

  • var counter = 0
    • 액터 내부에서 관리하는 상태 변수로, 카운터의 현재 값을 저장한다.

  • for (msg in channel)
    • 액터는 메시지 채널을 통해 메시지를 받는다. 이 루프는 채널로부터 들어오는 메시지를 지속적으로 수신하여 처리한다.

  • when (msg): 수신한 메시지의 타입에 따라 다른 동작을 수행한다.
    • is IncCounter -> counter++: IncCounter 메시지를 받으면 카운터를 1 증가시킨다.
    • is GetCounter -> msg.response.complete(counter): GetCounter 메시지를 받으면 현재 카운터 값을 response를 통해 반환한다.

메인 함수

fun main(): Unit = runBlocking {
    val counter = counterActor()

    // 카운터 증가 메시지 전송
    repeat(1000) { counter.send(IncCounter) }

    // 카운터 값 요청
    val response = CompletableDeferred<Int>()
    counter.send(GetCounter(response))
    println("최종 카운터 값: ${response.await()}") // 출력: 최종 카운터 값: 1000

    counter.close() // 액터 종료
}
  • fun main(): Unit = runBlocking { ... }
    • 메인 함수로, runBlocking을 사용하여 코루틴을 실행한다. runBlocking은 코루틴이 완료될 때까지 현재 스레드를 차단한다.

  • val counter = counterActor()
    • 정의한 counterActor 함수를 호출하여 액터를 생성한다. 이 액터는 메시지를 수신하고 처리할 준비가 된 상태다.

  • repeat(1000) { counter.send(IncCounter) }
    • 1000번 반복하면서 IncCounter 메시지를 액터에 전송한다. 이로 인해 액터 내부의 카운터가 1000번 증가하게 된다.

  • val response = CompletableDeferred<Int>()
    • 카운터 값을 비동기적으로 받을 수 있는 CompletableDeferred 객체를 생성한다.

  • counter.send(GetCounter(response))
    • GetCounter 메시지를 액터에 전송하면서, response 객체를 전달한다. 액터는 이 메시지를 수신하면 현재 카운터 값을 response에 완료시킨다.

  • println("최종 카운터 값: ${response.await()}")
    • response.await()를 호출하여 액터로부터 반환된 카운터 값을 기다린다. 이후 최종 카운터 값을 출력한다.

  • counter.close(): 액터를 종료하여 더 이상 메시지를 수신하지 않도록 한다.

주요 포인트

  • 액터 모델은 상태를 캡슐화하고, 메시지 전달을 통해 상태를 변경하여 동시성 문제를 방지한다.
  • actor 빌더를 사용하여 액터를 생성하고, 메시지를 처리할 수 있다.
  • 액터 내부의 상태는 외부에서 직접 접근할 수 없으며, 메시지를 통해서만 변경된다.
반응형