코루틴의 동시성 제어 방식을 알아보자.
📌 서론
코루틴은 비동기 프로그래밍을 간편하게 만들어주는 강력한 도구다. 하지만 여러 코루틴이 동시에 실행되면서 공유 자원에 접근할 때는 동시성 문제(concurrency issues)가 발생할 수 있다.
이번 포스팅에서는 코루틴에서 동시성을 안전하게 관리하기 위한 주요 방법들을 실전에 유용한 예제와 함께 쉽게 이해할 수 있도록 설명할 예정이다.
1. 뮤텍스(Mutex)와 동기화
뮤텍스의 개념과 필요성
- 멀티스레드 환경에서 여러 스레드가 동시에 공유 자원에 접근하면 데이터 일관성 문제가 발생할 수 있다. 코루틴도 마찬가지로 여러 코루틴이 동시에 동일한 자원에 접근할 때 동기화가 필요하다. 이를 위해 뮤텍스(Mutex)를 사용하여 임계 구역을 보호할 수 있다.
Mutex의 기본 사용법
- 뮤텍스는 상호 배제를 제공하여, 동시에 여러 코루틴이 공유 자원에 접근하지 못하도록 한다. Kotlin에서는 kotlinx.coroutines.sync.Mutex 클래스를 제공한다.
withLock 함수 활용
- withLock 함수는 뮤텍스를 안전하게 잠그고 해제하는 데 사용된다. 이 함수는 블록 내의 코드를 임계 구역으로 보호하여 동시 접근을 방지한다.
예제 1. 뮤텍스를 사용한 안전한 카운터 증가
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
val mutex = Mutex()
var counter = 0
fun main() = runBlocking {
val dispatcher = Dispatchers.Default
val jobs = List(1000) {
launch(dispatcher) {
repeat(1000) {
mutex.withLock {
counter++
}
}
}
}
jobs.forEach { it.join() }
println("최종 카운터 값: $counter")
}
출력 결과
최종 카운터 값: 1000000
실행 예제 설명
- 뮤텍스 초기화: val mutex = Mutex()로 뮤텍스를 생성한다.
- 임계 구역 보호: mutex.withLock { counter++ }를 사용하여 counter++ 연산을 임계 구역으로 보호한다.
- 멀티스레드 환경에서도 안전: Dispatchers.Default를 사용하여 멀티스레드 환경에서 동작시켜도 정확한 카운터 값을 유지한다.
만약 뮤텍스를 안 쓰면 결과가 어떻게 될까?
예제 2. 뮤텍스 없이 실행 (문제 발생)
import kotlinx.coroutines.*
var counter = 0
fun main() = runBlocking {
// 멀티스레드 디스패처 사용
val dispatcher = Dispatchers.Default
val jobs = List(1000) {
launch(dispatcher) {
repeat(1000) {
counter++
}
}
}
jobs.forEach { it.join() }
println("최종 카운터 값: $counter")
}
출력 결과 (불일치 가능)
- 뮤텍스를 안 쓰니 원하는 결과가 안 나온다.
최종 카운터 값: 817877
실행 예제 설명
- 디스패처 변경: Dispatchers.Default를 사용하여 코루틴이 여러 스레드에서 병렬로 실행되도록 한다.
- 코루틴 실행: 1000개의 코루틴이 각각 1000번씩 counter++를 수행한다.
- 결과 확인: 동시성 문제가 발생하면 최종 카운터 값이 1000000이 아닌 다른 값으로 나타날 수 있다.
주요 포인트
- 뮤텍스(Mutex)는 공유 자원에 대한 동시 접근을 제어하여 데이터 일관성을 유지한다.
- withLock 블록 내의 코드는 뮤텍스를 소유한 상태로 실행되며, 다른 코루틴이 접근하지 못하도록 보호된다.
- 뮤텍스를 사용하지 않으면 데이터 경쟁 상태(race condition)가 발생할 수 있다.
2. 액터(Actors)를 이용한 메시지 기반 동시성 제어
액터 모델의 개념과 장점
- 액터(Actor) 모델은 메시지 전달을 통해 코루틴 간의 통신을 관리하는 방식이다. 액터는 내부 상태를 비공개로 유지하며, 외부에서 직접 접근하지 않고 메시지를 통해서만 상태를 변경할 수 있다. 이를 통해 동시성 문제를 효과적으로 방지할 수 있다.
actor 빌더를 이용한 액터 생성 및 메시지 처리
- Kotlin에서는 kotlinx.coroutines.channels.actor 빌더를 사용하여 액터를 생성할 수 있다. 액터는 특정 메시지 타입을 처리하도록 설계된다.
액터를 이용한 안전한 상태 변경
- 액터를 사용하면 상태 변경을 메시지 기반으로 안전하게 수행할 수 있다. 이는 멀티스레드 환경에서 상태 일관성을 유지하는 데 매우 유용하다.
예제 코드: 액터를 이용한 카운터 관리
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.actor
// 메시지 타입 정의
sealed class CounterMsg
data object IncCounter : CounterMsg() // 카운터 증가
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // 카운터 값 요청
// 액터 정의
@OptIn(ObsoleteCoroutinesApi::class)
fun CoroutineScope.counterActor() = actor<CounterMsg>(Dispatchers.Default) {
var counter = 0
for (msg in channel) {
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
}
}
}
fun main(): Unit = runBlocking {
val counter = counterActor()
// 카운터 증가 메시지 전송
repeat(1000) { counter.send(IncCounter) }
// 카운터 값 요청
val response = CompletableDeferred<Int>()
counter.send(GetCounter(response))
println("최종 카운터 값: ${response.await()}") // 출력: 최종 카운터 값: 1000
counter.close() // 액터 종료
}
출력 결과
최종 카운터 값: 1000
코드 해석
1. 메시지 타입 정의
// 메시지 타입 정의
sealed class CounterMsg
data object IncCounter : CounterMsg() // 카운터 증가
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // 카운터 값 요청
- sealed class CounterMsg
- 액터가 처리할 수 있는 메시지 타입을 정의하는 추상 클래스다. sealed 키워드를 사용하여 이 클래스의 하위 클래스는 동일한 파일 내에서만 정의될 수 있도록 제한한다.
- 액터가 처리할 수 있는 메시지 타입을 정의하는 추상 클래스다. sealed 키워드를 사용하여 이 클래스의 하위 클래스는 동일한 파일 내에서만 정의될 수 있도록 제한한다.
- data object IncCounter : CounterMsg()
- 카운터를 증가시키는 메시지를 나타낸다. object 키워드를 사용하여 싱글톤 객체로 정의된다. 또한 CounterMsg를 상속받는다. data 키워드는 Kotlin 1.9부터 지원되며, 이전 버전에서는 object만 사용해야 한다.
- 카운터를 증가시키는 메시지를 나타낸다. object 키워드를 사용하여 싱글톤 객체로 정의된다. 또한 CounterMsg를 상속받는다. data 키워드는 Kotlin 1.9부터 지원되며, 이전 버전에서는 object만 사용해야 한다.
- class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg()
- GetCounter라는 이름의 클래스를 정의하며, response라는 속성을 가진다. 이 클래스도 CounterMsg를 상속받는다. 이때 CounterMsg가 sealed class이므로, GetCounter는 CounterMsg의 구체적인 구현 중 하나다.
- GetCounter라는 이름의 클래스를 정의하며, response라는 속성을 가진다. 이 클래스도 CounterMsg를 상속받는다. 이때 CounterMsg가 sealed class이므로, GetCounter는 CounterMsg의 구체적인 구현 중 하나다.
2. 액터 정의
// 액터 정의
@OptIn(ObsoleteCoroutinesApi::class)
fun CoroutineScope.counterActor() = actor<CounterMsg>(Dispatchers.Default) {
var counter = 0
for (msg in channel) {
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
}
}
}
- @OptIn(ObsoleteCoroutinesApi::class)
- 현재 사용 중인 API가 폐기 예정임을 명시적으로 수용하기 위해 사용된다. 최신 코루틴 라이브러리에서는 액터 관련 API가 변경되었을 수 있으므로 주의가 필요하다.
- 현재 사용 중인 API가 폐기 예정임을 명시적으로 수용하기 위해 사용된다. 최신 코루틴 라이브러리에서는 액터 관련 API가 변경되었을 수 있으므로 주의가 필요하다.
- fun CoroutineScope.counterActor() = actor<CounterMsg>(Dispatchers.Default) { ... }
- CounterMsg 타입의 메시지를 처리하는 액터를 생성하는 함수다. 이 함수는 CoroutineScope의 확장 함수로 정의되어 있어, 액터가 특정 코루틴 스코프 내에서 실행되도록 한다. 또한 Dispatchers를 통해 멀티스레드 환경을 구성한다.
- CounterMsg 타입의 메시지를 처리하는 액터를 생성하는 함수다. 이 함수는 CoroutineScope의 확장 함수로 정의되어 있어, 액터가 특정 코루틴 스코프 내에서 실행되도록 한다. 또한 Dispatchers를 통해 멀티스레드 환경을 구성한다.
- var counter = 0
- 액터 내부에서 관리하는 상태 변수로, 카운터의 현재 값을 저장한다.
- 액터 내부에서 관리하는 상태 변수로, 카운터의 현재 값을 저장한다.
- for (msg in channel)
- 액터는 메시지 채널을 통해 메시지를 받는다. 이 루프는 채널로부터 들어오는 메시지를 지속적으로 수신하여 처리한다.
- 액터는 메시지 채널을 통해 메시지를 받는다. 이 루프는 채널로부터 들어오는 메시지를 지속적으로 수신하여 처리한다.
- when (msg): 수신한 메시지의 타입에 따라 다른 동작을 수행한다.
- is IncCounter -> counter++: IncCounter 메시지를 받으면 카운터를 1 증가시킨다.
- is GetCounter -> msg.response.complete(counter): GetCounter 메시지를 받으면 현재 카운터 값을 response를 통해 반환한다.
메인 함수
fun main(): Unit = runBlocking {
val counter = counterActor()
// 카운터 증가 메시지 전송
repeat(1000) { counter.send(IncCounter) }
// 카운터 값 요청
val response = CompletableDeferred<Int>()
counter.send(GetCounter(response))
println("최종 카운터 값: ${response.await()}") // 출력: 최종 카운터 값: 1000
counter.close() // 액터 종료
}
- fun main(): Unit = runBlocking { ... }
- 메인 함수로, runBlocking을 사용하여 코루틴을 실행한다. runBlocking은 코루틴이 완료될 때까지 현재 스레드를 차단한다.
- 메인 함수로, runBlocking을 사용하여 코루틴을 실행한다. runBlocking은 코루틴이 완료될 때까지 현재 스레드를 차단한다.
- val counter = counterActor()
- 정의한 counterActor 함수를 호출하여 액터를 생성한다. 이 액터는 메시지를 수신하고 처리할 준비가 된 상태다.
- 정의한 counterActor 함수를 호출하여 액터를 생성한다. 이 액터는 메시지를 수신하고 처리할 준비가 된 상태다.
- repeat(1000) { counter.send(IncCounter) }
- 1000번 반복하면서 IncCounter 메시지를 액터에 전송한다. 이로 인해 액터 내부의 카운터가 1000번 증가하게 된다.
- 1000번 반복하면서 IncCounter 메시지를 액터에 전송한다. 이로 인해 액터 내부의 카운터가 1000번 증가하게 된다.
- val response = CompletableDeferred<Int>()
- 카운터 값을 비동기적으로 받을 수 있는 CompletableDeferred 객체를 생성한다.
- 카운터 값을 비동기적으로 받을 수 있는 CompletableDeferred 객체를 생성한다.
- counter.send(GetCounter(response))
- GetCounter 메시지를 액터에 전송하면서, response 객체를 전달한다. 액터는 이 메시지를 수신하면 현재 카운터 값을 response에 완료시킨다.
- GetCounter 메시지를 액터에 전송하면서, response 객체를 전달한다. 액터는 이 메시지를 수신하면 현재 카운터 값을 response에 완료시킨다.
- println("최종 카운터 값: ${response.await()}")
- response.await()를 호출하여 액터로부터 반환된 카운터 값을 기다린다. 이후 최종 카운터 값을 출력한다.
- response.await()를 호출하여 액터로부터 반환된 카운터 값을 기다린다. 이후 최종 카운터 값을 출력한다.
- counter.close(): 액터를 종료하여 더 이상 메시지를 수신하지 않도록 한다.
주요 포인트
- 액터 모델은 상태를 캡슐화하고, 메시지 전달을 통해 상태를 변경하여 동시성 문제를 방지한다.
- actor 빌더를 사용하여 액터를 생성하고, 메시지를 처리할 수 있다.
- 액터 내부의 상태는 외부에서 직접 접근할 수 없으며, 메시지를 통해서만 변경된다.
반응형
'유용한 개발지식 > 스레드(Thread)' 카테고리의 다른 글
[Thread] 스프링 톰캣 스레드 덤프 파헤치기: 상태값과 구조 분석 (5) | 2024.12.29 |
---|---|
[Test] HTTP 부하 테스트 도구 'hey'를 사용한 성능 측정 (1) | 2024.09.29 |
[Thread] 코루틴(Coroutine)의 예외처리 (0) | 2024.09.22 |
[Thread] 4. Kotlin의 코루틴(Coroutine)이란? (1) | 2024.09.19 |
[Thread] 3. 일반 스레드 vs 스레드풀 (I/O, CPU 성능 비교) (2) | 2024.09.16 |