안녕하세요. 이벤트에 빠진 개발자 stark입니다!
최근 저는 DDD에 푹 빠지게 되었습니다. 그래서 회사 선배님께 책을 빌려서 읽기도 하고 세미나도 보면서 정말 많은 공부를 하고 있습니다. DDD관련 책을 읽던 도중 '이벤트 소싱'이라는 목차가 등장했고 읽다 보니 엄청난 호기심이 생겼습니다. 왜냐하면 저는 주로 헥사고날 아키텍처와 EDA를 사용하여 개발을 해왔지만 제가 이벤트를 사용하게 되는 상황은 단순히 비관심사를 분리하는 상황뿐이었습니다.
특히 이벤트와 DDD와 조금만 검색해 봐도 Command와 Query를 분리하는 CQRS가 나옵니다. 그러다 보니 어느 순간부터 저도 코드에 "적용해 볼까?"라는 생각이 들면서 계속해서 고민하게 되었습니다. 가장 걸림돌이 된 것은 제가 이벤트 소싱과 CQRS의 개념을 잘 모른다는 것이었습니다. 정보도 찾아보면 많은 것 같지만 막상 사용하려면 마땅한 예시가 없었습니다.
그래도 저는 두려워하지 않고 해 보면서 배우고 개선하면 된다는 생각을 가지고 있기에 더 체계적으로 정보를 찾아봤고 일단 그냥 시도하면서 문제점은 찾아서 빠르게 개선해 나가기로 결정했습니다. (이게 야생형인 걸까요?) 근데.. 이렇게 멋지게 도전하는 건 좋았는데 CQRS에 대한 정보를 찾아도 뭔지 잘 이해가 가지 않았습니다.
저의 AI 스승님이신 GPT나 Claude에게 물어봐도 계속 말이 달라져서 혼동이 왔습니다. 심지어 계속 물어보니 제대로 대답해주지 못해서 미안하다고 하더군요.. 그래서 너무 답답한 마음이 들어 그냥 직접 구현해 보면서 배우기로 했습니다. 물론 이것도 AI 선생님들의 도움을 받았지만 제 사고력에 AI의 검색력 + 생산 능력이 합쳐지니 상당히 만족스러운 결과물이 나왔습니다.
이제 직접 구현해 본 이벤트 소싱과 CQRS에 대해 소개드리겠습니다!
이벤트 소싱과 CQRS 역사부터 알고 넘어갑시다.
먼저 이벤트 소싱의 역사를 알아봅시다.
이벤트 소싱(Event Sourcing)의 기원은 회계 분야의 전통적인 복식부기 시스템으로 거슬러 올라갑니다. 복식부기에서는 거래 자체를 수정하지 않고, 새로운 거래를 추가하여 잔액을 조정하는 방식을 사용했는데, 이는 현대의 이벤트 소싱과 매우 유사한 개념입니다.
소프트웨어 개발 영역에서 이벤트 소싱이라는 용어를 처음 체계적으로 정립하고 소개한 사람은 Martin Fowler입니다. 그는 2005년 자신의 블로그에서 이벤트 소싱 패턴을 처음 소개했습니다. 이벤트 소싱의 핵심 아이디어는 애플리케이션의 상태 변화를 일련의 이벤트로 저장하고, 이러한 이벤트들을 재생하여 특정 시점의 상태를 재구성할 수 있다는 것입니다.
이후 Greg Young이 CQRS와 함께 이벤트 소싱을 더욱 발전시켰으며, 이 두 패턴의 시너지 효과로 인해 현재는 대부분 함께 사용되고 있습니다. 특히 금융, 게임, 의료 등 감사(audit)와 추적성(traceability)이 중요한 도메인에서 널리 채택되어 사용되고 있으며, 마이크로서비스 아키텍처의 등장과 함께 그 활용도가 더욱 높아지고 있습니다.
다음으로 CQRS의 역사를 알아봅시다.
CQRS(Command Query Responsibility Segregation)의 탄생은 1994년 Bertrand Meyer가 제안한 CQS(Command Query Separation) 원칙으로 거슬러 올라갑니다. CQS는 메서드를 Command(상태를 변경하는 메서드)와 Query(상태를 반환하는 메서드)로 분리하는 단순한 원칙이었습니다. 예를 들어, 계좌에서 출금하는 메서드는 상태를 변경하는 Command이고, 잔액을 조회하는 메서드는 상태를 반환하는 Query입니다.
이후 2010년경 Greg Young이 이 CQS 원칙을 시스템 아키텍처 수준으로 확장하여 CQRS 패턴을 제안했습니다. CQRS는 단순히 메서드 수준의 분리가 아닌, 시스템의 읽기 모델과 쓰기 모델을 완전히 분리하는 것을 제안합니다. 이는 각각의 모델이 자신의 책임에 더 적합한 형태로 최적화될 수 있다는 장점을 가져왔습니다.
CQRS는 보통 이벤트 소싱(Event Sourcing)과 함께 사용되는데, 이벤트 소싱은 2005년 Martin Fowler가 자신의 블로그에서 처음 소개한 패턴입니다. 이 두 패턴의 조합은 특히 복잡한 도메인과 높은 확장성이 필요한 시스템에서 큰 인기를 얻었으며, 현재까지도 많은 기업들이 이 패턴을 도입하여 사용하고 있습니다.
이벤트 소싱이 뭘까요?
이벤트 소싱(Event Sourcing)은 상태 변경을 이벤트의 형태로 기록하고 관리하는 방식입니다. 기존의 데이터베이스에서 현재 상태를 직접 저장하는 대신, 이벤트 소싱에서는 모든 상태 변화 기록(이벤트)을 순차적으로 저장하여 이후 필요시 이벤트들을 재생(Replay)하여 현재 상태를 유추합니다.
조금 더 쉽게 설명하자면 일반적인 상황에는 특정 데이터가 변경되면 DB에 변경된 상태를 그대로 업데이트합니다. 그러나 이벤트 소싱은 조금 다릅니다. 이벤트(행위)가 발생했다는 것 자체를 DB에 저장해 둡니다. 그리고 필요할 때마다 DB에서 저장된 이벤트(row)들을 조회해서 이벤트와 함께 저장된 정보(payload)를 사용해서 데이터의 상태를 업데이트시킵니다. (그렇기에 언제든 이벤트를 조회해서 Replay 하여 재실행도 가능합니다.)
예를 들어, 은행 계좌에서 '입금 1000원', '출금 500원'이라는 이벤트가 발생하면 DB의 이벤트 저장소 테이블에는 '입금, 출금'이 2가지의 이벤트가 순차적으로 기록될 것이며 저장할 payload에는 어떤 행위가 일어났는지를 저장하게 됩니다. 최종 계좌 잔액은 이 두 이벤트의 순차적인 적용으로 구할 수 있습니다.
이렇게 이벤트 소싱은 시스템의 상태를 정확하게 추적하고, 변경 이력에 대한 완전한 기록을 남길 수 있어 복구 작업, 디버깅, 감사 트레일 등에 매우 유용하며, CQRS와 자주 함께 사용됩니다. 이 두 가지 패턴을 조합하면 명령을 이벤트로 기록하고, 이벤트의 흐름에 따라 읽기 모델을 갱신해 보다 일관성 있고 확장성 높은 시스템을 설계할 수 있습니다.
CQRS는 뭘까요?
CQRS(Command Query Responsibility Segregation)는 애플리케이션의 명령(Command)과 조회(Query)를 분리하여 서로 독립적인 모델로 관리하는 아키텍처 패턴입니다. 이 접근 방식은 데이터 수정(Write) 작업과 읽기(Read) 작업의 책임을 명확히 구분하여 시스템의 확장성과 성능을 높이기 위한 목적으로 도입됩니다.
예를 들어 이커머스에서 상품의 재고를 관리한다고 할 때, 재고 수량을 업데이트하는 작업(명령)과 현재 재고 수량을 조회하는 작업(조회)은 서로 다른 요구사항을 갖기 때문에, CQRS를 통해 별도의 모델과 데이터 접근 방식을 설계함으로써 명령의 복잡성은 줄이고 조회 성능은 최적화할 수 있습니다. 이를 통해 조회는 대량의 캐시 시스템을 통해 빠르게 처리하고, 명령은 트랜잭션을 신중히 관리해 데이터의 일관성을 보장하는 등 각자의 특성에 맞는 최적화를 도모할 수 있습니다.
이제 CQRS의 '모델'에 대해서 이해해 봅시다. CQRS에서의 명령 모델과 조회 모델은 각각 다른 역할을 담당하는 데이터 접근 방식을 의미합니다. 명령 모델은 데이터의 수정 작업(예: 생성, 수정, 삭제)을 처리하는 역할을 하며, 조회 모델은 데이터의 조회 작업을 처리하는 역할을 합니다.
데이터베이스 관점에서 보면, 명령 모델과 조회 모델은 같은 테이블을 사용할 수도 있지만, 완전히 다른 접근 방식을 가지기 때문에 데이터베이스를 분리해서 사용할 때 더욱 큰 장점을 발휘할 수 있습니다. 즉, 같은 데이터베이스를 공유하면서도 서로 다른 테이블 구조나 인덱싱 전략을 사용하는 경우도 있을 수 있고, 완전히 독립된 데이터베이스를 사용해서 시스템의 확장성, 성능, 그리고 분리된 요구사항을 맞출 수도 있습니다.
예를 들어, 이커머스에서 주문을 생성하거나 업데이트하는 명령 모델은 트랜잭션의 정확성에 집중하고, 읽기 성능은 크게 신경 쓰지 않습니다. 반면, 조회 모델은 사용자에게 주문 내역을 빠르게 보여주기 위해 최적화되어 있으며, 복잡한 데이터 결합이나 캐싱 구조를 사용할 수 있습니다. 이렇게 명령과 조회의 작업을 분리함으로써 각 작업의 요구사항에 맞게 데이터베이스 설계를 최적화할 수 있습니다.
CQRS + 이벤트 소싱은 이렇게 동작합니다.
1개의 DB (커맨드, 쿼리)를 사용했을 때는 다음과 같이 동작합니다.
명령 모델과 조회 모델이 하나의 데이터베이스를 공유하면서 작업을 수행하는 경우, 명령 모델에서 발생한 변경 사항은 이벤트로 발행되어 이벤트 스토어에 저장됩니다. 이후 조회 모델이 이 이벤트를 처리하여 읽기 모델을 갱신하게 됩니다.
이러한 방식으로 데이터 변경 이력을 효과적으로 관리하고, 이벤트 소싱을 통해 상태를 재구성할 수 있습니다. 이 구조에서는 데이터 일관성을 유지하는 작업이 단순해지지만, 읽기와 쓰기 작업 간의 부하를 균형 있게 관리하는 것이 중요합니다.
# CQRS 데이터 흐름을 간단히 표현한 예시 다이어그램 (명령과 조회 모델 분리)
# 명령 모델 (Command Model): 데이터 생성, 수정, 삭제
# 조회 모델 (Query Model): 데이터 조회
+-----------------+ +---------------------+
| Command Request | | Query Request |
| (Create/Update) | | (Read Operation) |
+--------+--------+ +----------+----------+
| |
| |
v v
+--------+--------+ +----------+----------+
| Command Model | | Query Model |
| (Handle writes) | | (Handle reads) |
+--------+--------+ +----------+----------+
| |
| |
v |
+-------------------------+ |
| Event Store | |
| (같은 DB에 이벤트를 저장한다.)| |
+-------------+-----------+ |
| |
v v
+-----------------------------------------------------+
| Single Database (커맨드와 쿼리가 1개의 DB를 공유한다.) |
| (Data store used for both read and write operations)|
+-----------------------------------------------------+
2개의 DB (커맨드, 쿼리)를 사용했을 때는 다음과 같이 동작합니다.
명령 모델은 주문 정보를 Command 전용 DB에 저장합니다. 예를 들어, order 테이블과 order_items 테이블에 각각 필요한 데이터가 들어갑니다. 주문이 저장되면서, "주문 생성" 이벤트가 발행되고 Command 전용 DB내부에 있는 이벤트 스토어 테이블에 기록(저장)됩니다. 이후 비즈니스 로직에서 저장된 이벤트를 발행합니다.
조회 모델은 이 이벤트를 받아들여(Consume or Listen) Query Database에 주문 정보를 업데이트합니다. 이때 조회 데이터베이스는 order와 order_items에 저장된 컬럼 정보를 저장하기 위해 Query DB에도 동일한 테이블을 만들어서 저장하지 않고 두 컬럼 값이 합쳐진 하나의 테이블을 생성해서 JSON 형식으로 저장하거나, 미리 결합된 형태로 저장해서 사용자가 빠르게 조회할 수 있도록 최적화합니다.
# CQRS 데이터 흐름을 간단히 표현한 예시 다이어그램 (명령과 조회 모델 분리)
+-----------------+ +---------------------+
| Command Request | | Query Request |
| (Create/Update) | | (Read Operation) |
+--------+--------+ +----------+----------+
| |
| |
v v
+--------+--------+ +----------+----------+
| Command Model | | Query Model |
| (Handle writes) | | (Handle reads) |
+--------+--------+ +----------+----------+
| |
| |
v |
+--------+------------+ |
| Command Database | |
| (커맨드 전용 DB 사용) | |
+--------+------------+ |
| |
| 이벤트 발행 |
v v
+--------+----------------------------+---------+
| Query Database (쿼리 전용 DB: View/Cache) |
| (커맨드 DB와는 데이터 동기화) |
| 조회 성능을 최적화하기 위해 Command Database와는 다른 테이블 구조나 데이터 형식을 사용할 수 있습니다. |
| 조회에 필요한 데이터를 하나의 테이블에 모두 담거나, JSON 형식으로 구성하여 데이터를 쉽게 접근할 수 있도록 설계하는 경우가 많습니다. |
+-----------------------------------------------+
CQRS에 이벤트 소싱을 적용하는 이유를 알아봅시다.
1. 데이터 일관성 유지 및 이벤트 기반 모델링
이벤트 소싱은 모든 상태 변경을 이벤트로 DB 테이블에 기록하기 때문에, 명령 처리 과정에서 발생하는 변화를 정확하게 추적할 수 있습니다. 이는 CQRS의 "명령" 모델에서 발생하는 변경 사항을 이벤트로 저장함으로써, "조회" 모델은 이 이벤트들을 기반으로 업데이트될 수 있게 해 주며, 결과적으로 두 모델 간의 데이터 일관성을 유지할 수 있게 합니다. 즉, 명령으로 발생한 모든 변화가 기록되고, 이를 통해 조회 모델이 실시간으로 동기화될 수 있게 됩니다.
2. 시간을 되돌린 복구 기능(Time travel and Snapshots)
이벤트 소싱을 활용하면 특정 시점으로 시스템을 되돌릴 수 있습니다. 즉, 이벤트의 재생을 통해 원하는 시점까지 상태를 복구하거나 재구성할 수 있습니다. 이는 시스템 장애 시 복구하는 데 유리하며, 잘못된 상태로 인해 발생한 이슈를 수정하기 위해 특정 시점으로 롤백하는 등의 기능을 구현할 수 있게 해 줍니다. CQRS에서 이 기능을 활용하면, 조회 모델의 데이터를 빠르게 다시 생성하거나 다른 조회 모델을 새롭게 만들 때도 유용합니다.
3. 구독 기반의 실시간 처리
CQRS에 이벤트 소싱을 적용하면 조회 모델을 다양한 요구에 따라 실시간으로 변경하거나, 명령의 결과를 다른 서비스와 비동기적으로 연계하는 것이 쉬워집니다. 조회 모델은 이벤트 스트림을 구독하여 실시간으로 업데이트할 수 있기 때문에, 데이터를 최신 상태로 유지하는 데 최적화됩니다. 예를 들어, 사용자가 웹 페이지에서 실시간으로 업데이트된 데이터를 확인해야 하는 상황에서 이러한 이벤트 기반 설계는 매우 효율적입니다.
4. 조회 모델의 자유로운 재구성
이벤트 소싱을 사용하면 새로운 조회 모델을 쉽게 추가하거나 기존 모델을 개선할 수 있습니다. 과거의 모든 상태 변경 이벤트가 저장되어 있기 때문에, 새로운 조회 요구사항이 생길 때 과거 이벤트들을 모두 재생하여 필요한 새로운 조회 모델을 구성할 수 있습니다. 예를 들어, 기존 조회 모델은 단순히 사용자의 활동 로그만을 표시했지만, 이후 새로운 요구에 따라 사용자의 구매 이력 통계를 생성해야 한다면, 기존 이벤트 데이터를 재사용하여 새로운 조회 모델을 생성할 수 있습니다.
CQRS 프로젝트를 구성해 봅시다.
자 이제 어느 정도(간단하게?) CQRS와 이벤트 소싱에 대해서 이해가 되었을 것이라 생각합니다. 지금부터는 제가 작성한 코드를 통해 설명드리고자 합니다. 저는 은행을 예시로 하는 게 이벤트 소싱과 CQRS를 설명하기 제일 좋을 것이라고 판단했습니다. 그래서 Stark Bank 프로젝트를 만들었습니다.
작성한 코드는 아래의 깃허브에서 "basic-cqrs-implementation" 브랜치를 선택해서 확인하시면 됩니다. (필요하시다면 편하게 가져다 사용하셔도 됩니다.)
https://github.com/wlsdks/stark-bank
제가 구성한 CQRS 패키지입니다.
이번에는 제가 좋아하는 헥사고날 아키텍처를 사용하지 않고 간단하게 5개의 패키지로 CQRS 프로젝트를 구성했습니다. 이 프로젝트의 가장 핵심인 command, query 패키지를 먼저 만들고 프로젝트에서 공통적으로 사용할 common, config, domain 패키지를 구성하였습니다.
# CQRS 패키지 구조를 간단히 표현한 트리 (루트 패키지 기준)
└── com.example.cqrs
├── command
├── query
├── domain
├── config
└── common
메인이 되는 5개의 패키지 내부 구성은 다음과 같습니다.
제일 중요한 command, query 패키지 하위에는 굉장히 익숙한 mvc 패턴을 적용해서 controller, service, repository, entity를 구성하였습니다. 도메인 패키지에는 단순히 POJO 도메인 객체만 작성했고 나머지 common, config 패키지도 항상 봐왔던 설정이나 예외처리 등 공통적으로 사용되는 것들로 구성하였습니다.
여기서 중요하게 봐야 할 부분은 query 패키지 내부에 존재하는 "event > listener"입니다. 여기 있는 이벤트 리스너를 통해 command 비즈니스를 마치고 발행된 이벤트를 구독해서 읽기 DB를 업데이트하게 됩니다.
# CQRS 패키지 구조를 간단히 표현한 트리 (패키지 기준)
└── src
├── main
│ ├── java
│ │ └── com
│ │ └── example
│ │ └── cqrs
│ │ ├── command
│ │ │ ├── controller
│ │ │ ├── dto
│ │ │ ├── entity
│ │ │ │ ├── event
│ │ │ │ ├── enumerate
│ │ │ │ └── metadata
│ │ │ ├── mapper
│ │ │ ├── repository
│ │ │ ├── service
│ │ │ └── usecase
│ │ ├── common
│ │ │ ├── schedule
│ │ │ ├── exception
│ │ │ └── service
│ │ │ └── impl
│ │ ├── config
│ │ ├── domain
│ │ └── query
│ │ ├── controller
│ │ ├── dto
│ │ ├── entity
│ │ ├── event
│ │ │ └── listener
│ │ ├── mapper
│ │ ├── repository
│ │ ├── service
│ │ └── usecase
│ └── resources
│ └── application.yaml
CQRS와 이벤트 소싱을 적용한 커맨드의 비즈니스를 살펴봅시다.
저는 입출금이 가능한 인터넷 은행 프로젝트를 만들었습니다. 이에 맞게 Command 패키지에 다음과 같은 API를 생성하였습니다.
- 계좌를 생성합니다. (createAccount)
- 입금합니다. (deposit)
- 출금합니다. (withdraw)
- 계좌 간 이체합니다. (transfer)
@RequestMapping("/accounts")
@RequiredArgsConstructor
@RestController
public class AccountCommandController {
private final AccountCommandUseCase accountCommandUseCase;
/**
* @param accountId 계좌 ID
* @param userId 사용자 ID
* @return 계좌 생성 결과 메시지
* @apiNote 계좌를 생성합니다.
*/
@PostMapping("/{accountId}")
public ResponseEntity<String> createAccount(
@PathVariable String accountId,
@RequestHeader("X-User-Id") String userId
) {
CreateAccountRequest request = CreateAccountRequest.of(accountId, userId);
accountCommandUseCase.createAccount(request);
return ResponseEntity.ok("계좌가 생성되었습니다.");
}
/**
* @param accountId 계좌 ID
* @param amount 입금 금액
* @param userId 사용자 ID
* @return 입금 결과 메시지
* @apiNote 계좌에 입금을 수행합니다.
*/
@PostMapping("/{accountId}/deposit")
public ResponseEntity<String> deposit(
@PathVariable String accountId,
@RequestParam double amount,
@RequestHeader("X-User-Id") String userId
) {
DepositRequest request = DepositRequest.of(accountId, amount, userId);
accountCommandUseCase.depositMoney(request);
return ResponseEntity.ok("입금이 완료되었습니다.");
}
/**
* @param accountId 계좌 ID
* @param amount 출금 금액
* @param userId 사용자 ID
* @return 출금 결과 메시지
* @apiNote 계좌에서 출금을 수행합니다.
*/
@PostMapping("/{accountId}/withdraw")
public ResponseEntity<String> withdraw(
@PathVariable String accountId,
@RequestParam double amount,
@RequestHeader("X-User-Id") String userId
) {
WithdrawRequest request = WithdrawRequest.of(accountId, amount, userId);
accountCommandUseCase.withdrawMoney(request);
return ResponseEntity.ok("출금이 완료되었습니다.");
}
/**
* @param request 이체 요청 DTO
* @param userId 요청한 사용자 ID
* @return 이체 결과 메시지
* @apiNote 계좌 간 이체를 수행합니다.
*/
@PostMapping("/transfer")
public ResponseEntity<String> transfer(
@RequestBody TransferRequest request,
@RequestHeader("X-User-Id") String userId
) {
request = TransferRequest.of(
request.getFromAccountId(),
request.getToAccountId(),
request.getAmount(),
userId
);
// 이체 요청을 처리합니다.
accountCommandUseCase.transfer(request);
return ResponseEntity.ok("이체가 완료되었습니다.");
}
}
제일 중요한 비즈니스 로직을 살펴봅시다. (너무 많아서 계좌 생성 1개만 예시로 살펴보겠습니다.)
만약 제가 계좌를 생성하면 어떻게 될까요? 다음과 같은 순서로 동작하게 됩니다.
- 계좌 중복 검사 (예외 처리)
- 계좌 생성 이벤트 객체를 생성
- 생성된 이벤트 객체 저장 및 발행 (event store(DB)에 이벤트 저장)
- 스냅샷 저장이 필요한지 확인하고 필요하다면 저장
/**
* 새로운 계좌를 생성합니다.
* 계좌 생성 이벤트를 저장하고 발행합니다.
*
* @param request 계좌 생성 요청 DTO (계좌ID, 사용자ID 포함)
* @throws IllegalArgumentException 계좌가 이미 존재하는 경우
*/
@Transactional
@Override
public void createAccount(CreateAccountRequest request) {
// 계좌 중복 검사
if (accountViewRepository.existsById(request.getAccountId())) {
throw new IllegalArgumentException("이미 존재하는 계좌입니다.");
}
// 계좌 생성 이벤트 객체 생성
String correlationId = UUID.randomUUID().toString();
EventMetadata eventMetadata = EventMetadata.of(correlationId, null, request.getUserId());
AccountCreatedEvent event = AccountCreatedEvent.of(
request.getAccountId(),
LocalDateTime.now(),
0.0,
eventMetadata
);
// 이벤트 저장 및 발행
accountEventStoreUseCase.save(event);
eventPublisher.publishEvent(event);
// 스냅샷 저장이 필요한지 확인하고 필요한 경우 저장합니다.
checkAndSaveSnapshot(request.getAccountId());
}
근데 이벤트가 어떤 것인지를 몰라서 이해하기가 어렵습니다. 그러니 이벤트를 이해해 봅시다.
이벤트들은 모두 abstract 클래스를 상속받는 구조입니다. 계좌 생성, 입금, 출금 등 모든 행위는 이벤트로 발행되어야 하므로 아래에 있는 AbstractAccountEvent 클래스를 상속받습니다. 하단에 추가 예시로 '계좌 생성' 이벤트를 적어두었습니다.
이벤트 엔티티들은 이 클래스를 상속받기 때문에 기본적으로 "pk, 계좌 ID, 이벤트 발행 일시, 이벤트 발생 금액, 이벤트 처리 상태, 이벤트 메타데이터"를 가집니다. 이렇게 이벤트 테이블에서는 그 시점에 행위에 대한 정보들을 저장하고 있기 때문에 언제든지 데이터를 조회해서 원하는 시점으로 변경할 수 있습니다.
@Inheritance(strategy = InheritanceType.SINGLE_TABLE)
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@DiscriminatorColumn(name = "event_type")
@Getter
@Entity
@Table(name = "account_event")
public abstract class AbstractAccountEvent {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(nullable = false)
private String accountId; // 계좌 ID
@Column(nullable = false)
private LocalDateTime eventDate; // 이벤트 발생 일시
@Column(nullable = true)
private Double amount; // 이벤트 발생 금액
@Column(nullable = false)
@Enumerated(EnumType.STRING)
private EventStatus status = EventStatus.PENDING; // 이벤트 처리 상태
@Embedded
private EventMetadata metadata; // 이벤트 메타데이터
@Version
private Long version; // JPA 낙관적 잠금용 버전
}
// 계좌생성 이벤트 클래스
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@DiscriminatorValue("AccountCreatedEvent")
@Entity
public class AccountCreatedEvent extends AbstractAccountEvent {
public AccountCreatedEvent(String accountId, LocalDateTime eventDate,
double amount, EventMetadata metadata) {
super(accountId, eventDate, amount, metadata);
}
// factory method
public static AccountCreatedEvent of(String accountId, LocalDateTime eventDate, double amount, EventMetadata metadata) {
return new AccountCreatedEvent(accountId, eventDate, amount, metadata);
}
}
// 이벤트 처리상태
@AllArgsConstructor(access = AccessLevel.PRIVATE)
@Getter
public enum EventStatus {
PENDING("PENDING", "이벤트 처리 대기 중"),
PROCESSED("PROCESSED", "이벤트 처리 완료"),
FAILED("FAILED", "이벤트 처리 실패");
private final String code;
private final String description;
}
// 이벤트 메타데이터
@Embeddable
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@AllArgsConstructor(access = AccessLevel.PRIVATE)
@Builder
@Getter
public class EventMetadata {
@Column(nullable = false)
private String correlationId; // 연관 이벤트 그룹 ID
private String causationId; // 원인이 되는 이벤트 ID
@Column(nullable = false)
private String userId; // 처리한 사용자 ID
@Column(nullable = false)
@Enumerated(EnumType.STRING)
private EventSchemaVersion schemaVersion; // 이벤트 스키마 버전
}
// 스키마 버전 enum
@AllArgsConstructor(access = AccessLevel.PRIVATE)
@Getter
public enum EventSchemaVersion {
V1_0("1.0", "버전 1.0"),
V1_1("1.1", "버전 1.1"),;
private final String version;
private final String description;
}
이벤트가 테이블에 어떻게 저장되었는지 살펴봅시다.
제가 표에 적어둔 것 이외에도 추가적인 메타데이터가 있지만 지금은 이해하는데 필요한 컬럼만 적어두겠습니다!
event_type | id | accountid | amount | eventdate | status | userid | version |
AccountCreatedEvent | 1 | 1003 | 0 | 2024-11-23 12:45:00 | PROCESSED | stark | 1 |
위에 있는 이벤트가 DB저장된 후 발행되었으니 스프링의 EventListener가 동작해야겠죠?
계좌 생성 이벤트를 처리하는 이벤트 리스너가 동작합니다. 이벤트 리스너를 AFTER_COMMIT으로 사용했기에 트랜잭션이 포함되어 있지 않아서 @Transactional을 선언하고 propagation설정을 REQUIRES_NEW로 해줘야만 새로운 트랜잭션이 적용됩니다.
이벤트 리스너에서는 DB의 AccountView(읽기) 테이블의 계좌정보를 생성하고 저장합니다. 그리고 사용한 이벤트의 상태(EventStatus) 값을 PROCESSED로 변경합니다. 이렇게 리스너에서 '계좌 생성 이벤트'에 대한 최종적인 처리를 하게 됩니다.
/**
* 계좌 생성 이벤트를 처리합니다.
* 새로운 읽기 모델 계정을 생성하며, 실패 시 최대 3번까지 재시도합니다.
*
* @param event 계좌 생성 이벤트
* @throws EventHandlingException 모든 재시도 후에도 처리 실패 시
*/
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void handleAccountCreate(AccountCreatedEvent event) {
log.info("계좌 생성 이벤트 시작: {}", event.getAccountId());
try {
retryTemplate.execute(context -> {
AccountView account = AccountView.of(
event.getAccountId(),
event.getAmount()
);
accountViewRepository.save(account);
event.markAsProcessed(); // 이벤트 처리 완료 표시
accountEventStoreUseCase.saveEventStatus(event); // 상태 업데이트
log.info("계좌 생성 이벤트 처리 완료: {}", event.getAccountId());
return null;
});
} catch (Exception e) {
log.error("계좌 생성 이벤트 처리 실패: {}", event.getAccountId(), e);
event.markAsFailed(); // 이벤트 처리 실패 표시
accountEventStoreUseCase.saveEventStatus(event); // 실패 상태 업데이트
throw new EventHandlingException("계좌 생성 이벤트 처리 실패", e);
}
}
이벤트 리스너가 종료되면 account_read 테이블에는 다음과 같은 데이터가 저장됩니다.
command에서 이벤트(행위)를 DB에 저장한 뒤 발행하면 listener가 받아서 read 테이블의 데이터를 업데이트시키는 것입니다.
accountid | balance |
1003 | 0 |
근데 이것만 봐서는 CQRS를 사용하는게 어떤 장점이 있는지 이해가 가지 않습니다. 오히려 DB 통신만 늘어나고 손해 아니야? 이런 생각이 들기도 합니다. 그래서 비교를 위해 일반 프로젝트를 준비했습니다. 만약 CQRS가 아닌 일반적인 프로젝트였다면 이렇게 동작했을 것입니다.
- 계좌 중복 검사
- 계좌 생성
- DB에 저장
/**
* 새로운 계좌를 생성합니다.
*
* @param accountId 계좌 ID
* @param userId 사용자 ID
* @throws IllegalArgumentException 계좌가 이미 존재하는 경우
*/
@Transactional
public void createAccount(String accountId, String userId) {
// 계좌 중복 검사
if (accountRepository.existsById(accountId)) {
throw new IllegalArgumentException("이미 존재하는 계좌입니다.");
}
// 계좌 생성
Account newAccount = Account.create(accountId, userId);
// DB에 저장
accountRepository.save(newAccount);
}
일반적인 비즈니스 코드와 CQRS 이 두 가지 방식의 차이점이 보이시나요?
가장 눈에 띄는 차이점은 CQRS와 이벤트 소싱을 사용하면 모든 상태 변경을 이벤트로 저장한다는 점입니다. 예를 들어, 계좌를 생성할 때 단순히 계좌 정보만 저장하는 것이 아니라, "계좌가 생성되었다"는 행위 자체를 이벤트로 기록합니다.
반면 일반 비즈니스 코드에서는 계좌를 생성하고 DB에 저장하고 마무리합니다. 그래서 추후 계좌 생성 시점의 데이터가 필요할 때 그 상황을 저장된 데이터가 없기 때문에 당시의 상황을 알 수가 없습니다. 어떤 정보도 기록해두지 않고 계속해서 계좌 정보를 업데이트할 뿐입니다.
또 한가지 차이점으로 CQRS는 이벤트를 저장해야 하기에 추가적인 데이터베이스 호출이 필요하다는 단점을 가지고 있습니다. 계좌 생성 시 "계좌 정보 저장, 이벤트 저장" 이렇게 두 번의 데이터베이스 작업이 발생합니다. 하지만 이런 비용을 감수하더라도, 다음과 같은 강력한 이점들을 얻을 수 있습니다.
1. 정확한 시점과 컨텍스트 추적
이벤트 소싱을 통해 우리는 모든 행위의 정확한 발생 시점을 알 수 있습니다. 각 이벤트는 발생 시점의 메타데이터를 함께 저장하므로, 당시의 사용자 정보와 시스템 상태를 정확히 파악할 수 있습니다. 이는 마치 시스템의 블랙박스와 같은 역할을 하며, 모든 변경사항에 대한 풍부한 컨텍스트 정보를 제공합니다.
2. 안전한 시스템 복구
모든 상태 변경이 순차적인 이벤트로 저장되기 때문에, 시스템 장애가 발생하더라도 안전하게 복구할 수 있습니다. 저장된 이벤트들을 시간 순서대로 다시 실행하면 장애 발생 직전의 상태로 정확하게 복구할 수 있습니다. 이러한 방식은 데이터의 정합성을 보장하는 데 매우 효과적입니다.
3. 효과적인 고객 응대
고객으로부터 문의나 컴플레인이 접수되었을 때, 저장된 이벤트를 통해 정확한 상황 파악이 가능합니다. 모든 행위가 이벤트로 기록되어 있기 때문에, 고객의 문제 제기가 타당한지 객관적으로 검증할 수 있으며, 이를 바탕으로 데이터 기반의 정확하고 신뢰할 수 있는 응대가 가능합니다.
4. 데이터 기반 인사이트
사용자의 모든 행위가 시계열 데이터로 저장되므로, 이를 통해 의미 있는 패턴을 발견할 수 있습니다. 예를 들어 특정 시간대의 거래 빈도나 사용자들의 행동 패턴을 분석할 수 있으며, 이러한 데이터는 비즈니스 의사결정에 중요한 기초 자료로 활용될 수 있습니다.
5. 완벽한 버그 추적
시스템에서 버그나 장애가 발생했을 때, 저장된 이벤트들을 통해 문제 상황을 완벽하게 재현할 수 있습니다. 모든 상태 변화가 시간 순으로 기록되어 있기 때문에, 버그 발생 당시의 정확한 상황을 파악할 수 있으며, 이는 문제 해결을 위한 명확한 컨텍스트를 제공합니다.
마지막으로 계좌 생성뿐만 아니라 다른 command(입금, 출금)가 실행되었을 경우 이벤트 저장소를 살펴봅시다.
이벤트를 순서대로 따라가보면 먼저 계좌가 생성되고 입금이 3번 진행됩니다. 이 순간 잔액 balance는 15000원이 되었을 것입니다. 이후 1번의 출금이 진행되고 잔액은 5000원이 됩니다. 이렇게 DB의 이벤트 저장소 테이블을 통해 사용자의 모든 행위를 기록할 수 있기 때문에 언제든지 확인이 가능하고 복구도 가능하게 됩니다.
event_type | id | accountid | amount | eventdate | status | userid | version |
AccountCreatedEvent | 1 | 1003 | 0 | 2024-11-23 12:45:00 | PROCESSED | stark | 1 |
MoneyDepositedEvent | 2 | 1003 | 3000 | 2024-11-23 12:50:00 | PROCESSED | stark | 1 |
MoneyDepositedEvent | 3 | 1003 | 5000 | 2024-11-23 13:10:00 | PROCESSED | stark | 1 |
MoneyDepositedEvent | 4 | 1003 | 7000 | 2024-11-23 14:45:00 | PROCESSED | stark | 1 |
MoneyWithdrawnEvent | 5 | 1003 | 10000 | 2024-11-23 18:00:00 | PROCESSED | stark | 1 |
자 그럼 Query 비즈니스는 어떻게 동작할까요?
쿼리 패키지의 API는 다음과 같이 설계하였습니다.
이벤트 리스너를 통해 계좌의 Read 테이블을 "저장/업데이트"하고 사용자에게 제공하는 API는 순수하게 Read의 역할만을 수행합니다.
- 특정 계좌의 상세 정보를 조회합니다. (getAccount)
- 특정 계좌의 거래 이력을 조회합니다. (getAccountHistory)
- 특정 사용자의 모든 거래 이력을 조회합니다. (getUserTransactions)
- 연관된 거래들을 correlationId를 기준으로 조회합니다. (getRelatedTransactions)
아래 코드는 계좌 조회 관련 API를 제공하는 컨트롤러이며, CQRS 패턴의 Query(조회) 부분을 담당합니다.
/**
* 계좌 조회 관련 API를 제공하는 컨트롤러입니다.
* CQRS 패턴의 Query(조회) 부분을 담당합니다.
*/
@RequestMapping("/accounts")
@RequiredArgsConstructor
@RestController
public class AccountQueryController {
private final AccountQueryUseCase accountQueryUseCase;
/**
* 특정 계좌의 상세 정보를 조회합니다.
*
* @param accountId 조회할 계좌 ID
* @param userId 요청한 사용자 ID
* @return 계좌 상세 정보
*/
@GetMapping("/{accountId}")
public ResponseEntity<AccountDetailResponse> getAccount(
@PathVariable String accountId,
@RequestHeader("X-User-Id") String userId
) {
AccountDetailResponse response = AccountDetailResponse.from(accountQueryUseCase.getAccount(accountId));
return ResponseEntity.ok(response);
}
/**
* 특정 계좌의 거래 이력을 조회합니다.
*
* @param accountId 조회할 계좌 ID
* @param userId 요청한 사용자 ID
* @return 계좌의 모든 거래 이력
*/
@GetMapping("/{accountId}/history")
public ResponseEntity<List<AccountTransactionResponse>> getAccountHistory(
@PathVariable String accountId,
@RequestHeader("X-User-Id") String userId
) {
List<AccountTransactionResponse> response = accountQueryUseCase.getAccountHistory(accountId).stream()
.map(AccountTransactionResponse::from)
.collect(Collectors.toList());
return ResponseEntity.ok(response);
}
/**
* 특정 사용자의 모든 거래 이력을 조회합니다.
*
* @param userId 조회할 사용자 ID
* @return 사용자의 모든 거래 이력
*/
@GetMapping("/user/{userId}/transactions")
public ResponseEntity<List<AccountTransactionResponse>> getUserTransactions(
@PathVariable String userId
) {
List<AccountTransactionResponse> response = accountQueryUseCase.getUserTransactions(userId).stream()
.map(AccountTransactionResponse::from)
.collect(Collectors.toList());
return ResponseEntity.ok(response);
}
/**
* 연관된 거래들을 correlationId를 기준으로 조회합니다.
*
* @param correlationId 조회할 연관 ID
* @param userId 요청한 사용자 ID
* @return 연관된 모든 거래 이력
*/
@GetMapping("/transactions/{correlationId}")
public ResponseEntity<List<AccountTransactionResponse>> getRelatedTransactions(
@PathVariable String correlationId,
@RequestHeader("X-User-Id") String userId
) {
List<AccountTransactionResponse> response = accountQueryUseCase.getRelatedTransactions(correlationId).stream()
.map(AccountTransactionResponse::from)
.collect(Collectors.toList());
return ResponseEntity.ok(response);
}
}
계좌 정보를 조회하는 비즈니스 로직을 살펴보겠습니다.
비즈니스 로직은 정말 단순합니다. 단순히 accountView 테이블을 repository를 통해 조회하여 반환하는 형태입니다. 이 메서드뿐만 아니라 모든 비즈니스 메서드는 저장/업데이트는 하지 않고 순수하게 조회(Read)만을 수행합니다.
/**
* 특정 계좌의 정보를 조회합니다.
*
* @param accountId 조회할 계좌의 ID
* @return 계좌 정보 엔티티
* @throws IllegalArgumentException 계좌를 찾을 수 없는 경우
*/
@Override
public AccountView getAccount(String accountId) {
return accountViewRepository.findById(accountId)
.orElseThrow(() -> new IllegalArgumentException("계좌를 찾을 수 없습니다."));
}
계좌 생성 시나리오의 최종 정리를 해봤습니다.
- Client가 계좌 생성 요청을 보냅니다.
- Command Controller가 요청을 받아 Command Model에 전달합니다.
- Command Model은 계좌 생성 이벤트를 생성하여 Event Store에 저장합니다.
- Event Publisher는 계좌 생성 이벤트를 발행합니다.
- Query Model의 Event Subscriber는 이벤트를 수신하고, Query Database를 업데이트합니다.
- Client는 계좌 생성 완료 응답을 받습니다.
CQRS와 이벤트 소싱 생각보다 간단한데?
맞습니다. 막상 구성해 보면 그렇게 어렵지는 않습니다. 지금까지의 예시로 알 수 있듯이, CQRS 패턴은 Command와 Query의 역할을 명확히 분리한 구조일 뿐입니다. 어렵게 생각하실 필요 없이, 단순히 비즈니스 명령(저장/업데이트/삭제)을 Command에서 수행하고, 그 행위를 이벤트로 만들어서 DB에 저장하고 발행하면, 이벤트 리스너가 그 이벤트를 받아서 DB의 조회 테이블을 업데이트하는 것입니다.
이렇게 하면 조회(Read) 전용 테이블과 명령(Write) 전용 테이블을 별도로 구성하는 방법을 사용할 수도 있습니다. 이렇게 구성하면 Command는 자신의 데이터를 Write DB에 저장하고 이벤트를 발행하게 됩니다. 현재는 Read용 Account 테이블만 만들어 사용하고 있지만, Write용 Account 테이블을 생성해서 Command 명령을 수행할 때 Write 테이블의 상태를 변경하고, 이벤트를 저장한 뒤 발행해도 무방합니다. 이러면 Write용 Account 테이블은 백업용으로 사용할 수 있습니다.
결과적으로 Write 테이블에 백업 데이터가 남기 때문에 Read용 Account 테이블은 Write와는 다른 구조를 가질 수 있습니다. 예를 들면 Read 테이블을 JSON 형태로 설계합니다. 그리고 이벤트로 받은 데이터를 조회 시 필요한 데이터 뭉치로 만들어서 저장합니다. 이렇게 하면 조회할 때 Join 없이 한 번의 DB 호출만으로 필요한 데이터를 전부 가져오는 것이 가능하여 성능상으로 더 이득일 수 있습니다. 이를 통해 CQRS 패턴이 갖는 장점인 조회 성능 최적화와 책임 분리를 최대한 활용할 수 있습니다. (배민 세미나에서도 이렇게 사용하는 것을 봤습니다.)
근데 아직 개선점이 남아있습니다.
1. Transactional Outbox Pattern을 적용해서 비즈니스 메서드의 주관심사와 비관심사를 분리할 수 있습니다.
제가 이와 관련해서 써둔 글이 있는데 MSA에서 아웃박스 패턴을 통해 다른 서버와의 분산 트랜잭션을 관리하는 로직입니다. 물론 Kafka가 빠진 상황이라 조금 다르지만 BEFORE_COMMIT, AFTER_COMMIT을 활용한다는 점은 같습니다.
2024.10.13 - [Spring MSA] - [MSA] Transactional Outbox Pattern
비즈니스 코드에서 수행 중인 이벤트 저장 및 발행 로직을 BEFORE_COMMIT 이벤트 리스너로 추출해서 Outbox Pattern을 적용시킬 수 있습니다. 그러나 지금 코드에서도 형태만 다를 뿐 동일하게 동작하고 있으므로 이게 큰 개선점은 아니라고 할 수 있습니다. (이 패턴은 내부 이벤트에도 효과가 있지만 외부 이벤트 Kafka를 발행할 때 훨씬 큰 역할을 합니다.)
2. DB를 2개로 분리하는 작업
쓰기는 RDB를 사용하고 읽기는 NoSQL을 사용해서 읽기/쓰기를 완벽하게 최적화하는 방법입니다.
이렇게 2개의 DB를 사용하면 Command에서는 쓰기 RDB만을 사용하고 Query에서는 NoSQL을 사용해서 확실하게 명령과 조회가 분리됩니다. 또한 NoSQL에 JSON형태로 데이터를 저장해서 정말 필요한 데이터만 받도록 확실한 최적화가 가능하며 DB가 따로 존재해서 부하도 분산시킬 수 있습니다. 다만 이 경우에는 DB 간 동기화가 필요한 경우도 있을 수 있으니 그 부분은 항상 주의해야 합니다.
3. 이벤트 기록의 동시성 제어 (낙관적 락을 적용시켰지만 조금 더 상세히 다뤄볼 필요가 있어서 추가했습니다.)
현재 시스템에서는 이벤트 저장소에 이벤트를 기록할 때 동시성 제어가 충분하지 않을 수 있습니다. 여러 사용자가 동시에 동일한 계좌에 대해 거래를 요청하면, 이벤트의 순서가 뒤바뀌거나 데이터의 일관성이 깨질 위험이 있습니다. 이를 방지하기 위해 이벤트 저장 시 낙관적 락(Optimistic Locking)이나 비관적 락(Pessimistic Locking)을 적용하여 동시성 이슈를 처리해야 합니다.
특히, 이벤트 엔티티에 버전 관리 필드를 추가하고 JPA의 @Version 애노테이션을 활용하면 낙관적 락을 구현할 수 있습니다. (제가 엔티티에 적용해 놓은 방식입니다.) 이 방식은 데이터베이스 수준에서 버전을 체크하여 충돌을 감지하며, 충돌이 발생하면 예외를 발생시켜 상위 로직에서 재시도나 오류 처리를 할 수 있게 합니다.
마무리하며
CQRS를 공부하고 직접 구현해 보며 느낀 점은 "끝이 아닌 시작이다."라는 것입니다.
직접 프로젝트를 구성해 보며 CQRS를 제대로 이해하려면 아직도 많이 멀었다는 게 느껴졌습니다. 그래서 지금까지 한 내용을 branch로 백업해 두고 계속해서 코드를 개선하는 작업을 진행하려 합니다. 이러다 stark bank가 진짜 유명한 CQRS 예시 프로젝트가 될 수도 있지 않을까? 하는 꿈을 꿉니다.
저는 여기서 끝내지 않고 DB를 2개로 분리하여 CQRS를 작업해 보는 것으로 다음 포스팅을 준비할 생각입니다. 그리고 테스트를 진행해서 왜 CQRS를 사용하는지 정말 성능 최적화가 되는 것인지 제 두 눈으로 확인해 봐야겠습니다. 시간이 조금 걸릴 수도 있지만 기다려주신다면 좋은 글로 돌아오겠습니다.
지금까지 긴 글 읽어주셔서 감사합니다 :)
개선할 점에 대한 피드백은 언제든 환영합니다. 댓글로 내용을 적어주세요~
'MSA' 카테고리의 다른 글
[MSA] Transactional Outbox Pattern (1) | 2024.10.13 |
---|---|
[Spring] 헥사고날 아키텍처 (2) | 2024.08.10 |
MSA 서버 간 통신: SNS의 MessageAttributes로 완벽한 Zeropayload 전략 구현하기 (1) | 2023.12.01 |
MSA 환경에서 SNS 메시지 재발행을 위한 스프링 배치 및 스케쥴러 구현 (1) | 2023.11.28 |
Spring MSA 프로젝트에서 단일 책임 원칙을 지키기 위한 리팩토링 (3) | 2023.11.24 |