1. SpringBoot + Kafka
Apache Kafka는 여러 다양한 상황에서 사용될 수 있다. 실시간 이벤트 스트리밍, 실시간 분석, 데이터 파이프라이닝, 로그 수집 등의 다양한 애플리케이션에서 Kafka를 활용하곤 한다.
Spring Boot 개발자로서 Apache Kafka는 다음과 같은 상황에서 사용될 수 있다.
1. 마이크로서비스 아키텍처:
- Kafka는 마이크로서비스 간에 데이터를 비동기적으로 전달하는 메시지 브로커 역할을 수행할 수 있다.
- 서비스 간의 커뮤니케이션에 대한 복잡성을 줄이고 시스템의 전체적인 성능을 향상시키는데 도움이 된다.
2. 이벤트 소싱과 CQRS:
- 이벤트 소싱(Event Sourcing)과 CQRS(Command Query Responsibility Segregation) 패턴은 복잡한 비즈니스 로직을 처리하거나, 시스템의 상태를 이력 기반으로 관리하고 싶을 때 유용하다.
- Kafka는 이벤트 소싱과 CQRS에 필요한 이벤트 저장소의 역할을 수행할 수 있다.
3. 실시간 분석:
- Kafka는 대량의 실시간 데이터를 처리하는데 사용될 수 있으므로, 실시간 대시보드, 모니터링 시스템, 실시간 비즈니스 지표 분석 등에 사용할 수 있다.
4. 로그 수집:
- 로그 데이터를 중앙화된 위치로 수집하고 처리하는 데 Kafka를 사용할 수 있다.
- 이렇게 수집된 로그 데이터는 분석, 모니터링, 알림, 디버깅 등 다양한 목적으로 활용될 수 있다.
Spring Boot와 Kafka를 연결하는 방법으로는 Spring Kafka 라이브러리를 사용하는 방법이 있다. 이 라이브러리를 사용하면 Kafka를 사용하여 메시지를 송수신하는 애플리케이션을 쉽게 구축할 수 있다.
2. 이벤트 소싱과 CQRS 란?
2-1. 이벤트 소싱 (Event Sourcing):
- 이벤트 소싱은 시스템의 상태를 이벤트 로그로 저장하는 아키텍처 스타일이다.
- 시스템의 상태 변경을 이벤트라는 단위로 캡쳐하고, 이러한 이벤트들을 순차적으로 저장한다.
- 이 방법으로 시스템의 상태는 이벤트 스트림을 재생함으로써 언제든지 재구성될 수 있다.
- 주요 장점 중 하나는 모든 상태 변경에 대한 감사 추적이 가능하다는 것이다. 또한 이벤트 로그를 사용하여 시스템의 과거 상태를 재구성하거나, 향후 상태를 예측하는 것도 가능하다.
2-2. CQRS (Command Query Responsibility Segregation):
- CQRS는 시스템을 두 가지 다른 모델로 분리하는 아키텍처 패턴이다.
- 하나는 상태를 수정하는 명령을 처리하고, 다른 하나는 상태를 조회하는 쿼리를 처리한다. 이 패턴을 사용하면 각 모델을 서로 다른 방식으로 최적화할 수 있다.
- 예를 들어, 읽기 모델은 확장성과 성능을 위해 복제될 수 있고, 쓰기 모델은 데이터 일관성을 유지하기 위해 단일화 될 수 있다.
2-3. 이벤트 소싱과 CQRS는 함께 사용되는 경우가 많다.
- 이벤트 소싱이 시스템의 상태를 이벤트 로그로 저장하면, 이 로그를 사용하여 읽기 모델을 만들고 업데이트할 수 있다. 이 방식은 이벤트 소싱이 제공하는 감사 추적 기능을 유지하면서, 읽기 모델을 확장하고 최적화하는 데 CQRS가 제공하는 유연성을 활용할 수 있게 해준다. 이때 Kafka는 이벤트 소싱과 CQRS 아키텍처에 필요한 이벤트 저장소의 역할을 수행한다.
- Kafka의 토픽은 이벤트 스트림으로 사용될 수 있으며, 각 이벤트는 토픽에 순서대로 저장된다. 이렇게 저장된 이벤트들은 시스템의 상태를 재구성하거나, 읽기 모델을 업데이트하는 데 사용될 수 있다.
- Kafka는 또한 이벤트의 순서를 보장하고, 필요한 만큼 오래된 이벤트를 저장하는 기능을 제공하므로, 이벤트 소싱에 아주 적합하다.
3. 이벤트 소싱과 CQRS는 어떤 시스템에서 사용되는가?
이벤트 소싱과 CQRS 패턴은 주로 복잡한 비즈니스 로직이나 이력 기반으로 상태를 관리하고자 하는 시스템에서 사용된다.
주로 대규모 분산 시스템, 마이크로서비스 아키텍처(MSA), 또는 실시간 분석을 요구하는 시스템에서 활용된다.
예시를 들어보자면, 온라인 은행 애플리케이션을 생각해볼 수 있다.
1. 이벤트 소싱:
- 은행에서는 모든 거래(예: 입금, 출금, 이체 등)가 이벤트로 간주될 수 있다.
- 이러한 거래 이벤트를 순차적으로 Kafka에 저장하면, 이벤트 스트림은 은행 계좌의 전체 거래 이력을 캡쳐하는 것이 가능하다. 나중에 이 이벤트 스트림을 사용하여 특정 시점에서의 계좌 잔액을 재현하거나, 모든 거래를 검토하여 부정 행위를 감지하는 것도 가능하다.
2. CQRS:
- 은행 애플리케이션의 사용자들은 주로 두 가지 종류의 작업을 수행한다. 거래를 실행하거나(명령), 계좌 잔액이나 거래 내역을 조회하는 것(쿼리).
- CQRS를 적용하면, 명령을 처리하는 모델과 쿼리를 처리하는 모델을 분리하여 각각의 성능과 확장성을 최적화할 수 있다.
- 예를 들어, 명령 모델은 ACID 트랜잭션을 지원하는 관계형 데이터베이스에, 쿼리 모델은 빠른 읽기 성능을 제공하는 NoSQL 데이터베이스에 저장될 수 있다.
- 이렇게 이벤트 소싱과 CQRS는 은행 애플리케이션의 거래 처리와 이력 관리, 성능 최적화 등의 요구 사항을 충족하는 데 도움이 될 수 있다. 이 외에도, 주문 관리, 재고 관리, 사회 네트워킹 앱 등의 다양한 도메인에서도 이 패턴들을 활용할 수 있다.
4. 그럼 이걸 Spring Boot에서 어떻게 사용하는가?
4-1. Spring Boot와 Kafka를 함께 사용하려면 Spring Kafka라는 라이브러리를 사용할 수 있다.
- Spring Kafka는 Kafka에 메시지를 보내고 받는 작업을 추상화하므로, 개발자는 메시지 송수신과 관련된 저수준의 세부 사항을 걱정하지 않고 비즈니스 로직에 집중할 수 있다.
- Spring Kafka를 사용하면 Kafka를 이용한 이벤트 소싱과 CQRS 아키텍처를 구현하는데 필요한 기능들을 쉽게 사용할 수 있다.
- 예를 들어, KafkaTemplate 클래스를 사용하여 메시지를 보내거나, @KafkaListener 어노테이션을 사용하여 메시지를 받을 수 있다.
4-2. 이벤트 소싱과 CQRS를 Spring Boot와 Kafka를 이용해 구현하려면 다음과 같은 단계를 따르면 된다.
1. 애플리케이션의 이벤트를 정의하고, 이 이벤트를 Kafka 토픽에 보낼 수 있도록 애플리케이션을 설정한다.
- 이벤트를 정의한다는게 무슨 의미일까?
- 애플리케이션의 이벤트를 정의한다"는 말은, 애플리케이션에서 발생하는 중요한 동작이나 상태 변화를 나타내는 이벤트 객체나 메시지를 설계한다는 뜻이다. 이는 메서드를 정의하는 것과는 다른 개념이다.
- 예를 들어, 온라인 쇼핑몰 애플리케이션에서는 "주문 생성", "상품 배송 시작", "반품 요청" 등의 이벤트가 있을 수 있다. 이러한 이벤트는 애플리케이션의 핵심 비즈니스 로직을 구성하며, 이벤트가 발생할 때마다 시스템의 상태가 변경된다.
- 이벤트를 정의할 때는 이벤트의 유형, 발생 시간, 이벤트가 발생한 원인, 이벤트가 포함해야 할 데이터 등을 고려해야 한다. 예를 들어, "주문 생성" 이벤트는 주문 ID, 주문한 상품, 주문 금액, 주문 시간 등의 정보를 포함할 수 있다.
- 이런 이벤트를 정의한 후에는, 이벤트를 Kafka 토픽에 보내기 위해 애플리케이션을 설정한다. 이는 Spring Kafka의 KafkaTemplate 클래스를 사용해 수행할 수 있다. 이후, 이벤트가 발생할 때마다 애플리케이션은 해당 이벤트를 Kafka에 보내서 저장한다.
- Kafka에 저장된 이벤트는 필요에 따라 다양한 방식으로 사용될 수 있다. 예를 들어, 이벤트를 사용해서 애플리케이션의 상태를 재구성하거나, 읽기 모델을 업데이트하거나, 다른 서비스로 이벤트를 전달할 수 있다. 이런 작업은 Kafka Consumer를 사용해서 수행할 수 있으며, 이는 Spring Kafka의 @KafkaListener 어노테이션을 사용해 구현할 수 있다.
- 애플리케이션의 이벤트를 정의한다"는 말은, 애플리케이션에서 발생하는 중요한 동작이나 상태 변화를 나타내는 이벤트 객체나 메시지를 설계한다는 뜻이다. 이는 메서드를 정의하는 것과는 다른 개념이다.
2. 이벤트가 발생할 때마다, 애플리케이션은 이벤트를 Kafka에 보낸다.
3. Kafka에 저장된 이벤트는 필요에 따라 사용될 수 있다. 예를 들어, 이벤트를 사용하여 애플리케이션의 상태를 재구성하거나, 읽기 모델을 업데이트하거나, 다른 서비스로 이벤트를 전달할 수 있다.
4. 읽기 모델을 업데이트하기 위해 Kafka Consumer를 사용할 수 있다. Consumer는 Kafka 토픽에서 이벤트를 읽어와서 읽기 모델을 업데이트한다. 이 때, 각 읽기 모델은 자신의 요구에 맞게 데이터를 가공하거나 저장할 수 있다.
5. SpringBoot + Kafka의 어노테이션 모음
@EnableKafka와 @KafkaListener는 Kafka를 사용하기 위해 필요한 주요 어노테이션이다.
이 두 가지 어노테이션을 사용하면 KafkaTemplate을 사용하여 메시지를 전송하고, @KafkaListener 어노테이션을 사용하여 메시지를 구독하여 처리할 수 있다. 하지만 Kafka의 기능은 다양하며, 다른 상황에 따라 추가적인 어노테이션이나 설정을 사용할 수 있다.
1. @KafkaHandler:
- 다른 메서드와 함께 사용되는 @KafkaListener 어노테이션과 함께, 동일한 클래스 내에서 다양한 메시지 유형에 대한 처리를 구분할 수 있도록 도와준다.
2. @KafkaListenerErrorHandler:
- Kafka에서 예외가 발생할 때 처리를 담당하는 핸들러 메서드를 정의할 수 있다. 예외 처리 로직을 따로 구현하고자 할 때 사용된다.
3. @KafkaListenerContainerFactory:
- 커스텀 KafkaListenerContainer를 생성하고 구성하기 위해 사용된다. 필요에 따라 다양한 속성을 설정하여 KafkaListenerContainer의 동작을 세부적으로 제어할 수 있다.
4. @KafkaTemplate (Producer):
- KafkaTemplate 빈을 커스터마이징하고자 할 때 사용된다. 커스텀 KafkaTemplate을 정의하고 해당 빈을 주입받아 사용할 수 있다.
5. @KafkaListenerConfigurer:
- @KafkaListener에 대한 전역적인 설정을 제공하기 위해 사용된다. 필요한 경우KafkaListenerConfigurer 인터페이스를 구현하여 Kafka 리스너의 공통 구성 요소를 정의할 수 있다.
6. SpringBoot + Kafka 예시 코드
6-1. 일반적으로 MVC 패턴을 사용하는 Spring Boot 애플리케이션에서는 다음과 같은 클래스들을 가질 수 있다:
- Controller 클래스:
- 사용자의 HTTP 요청을 받아 이에 대한 처리를 담당한다.
- 요청에 따라 적절한 서비스 메서드를 호출하고, 그 결과를 사용자에게 응답으로 반환한다.
- Service 클래스:
- 비즈니스 로직을 구현하는 클래스이다.
- Controller로부터 호출을 받아 필요한 처리를 수행하고, 그 결과를 Controller에 반환한다.
- 이 과정에서 Repository를 통해 데이터베이스와의 상호작용을 수행하거나, Kafka로 이벤트를 전송하는 등의 작업을 수행할 수 있다.
- Repository 클래스:
- 데이터베이스와의 상호작용을 담당하는 클래스이다.
- 데이터를 조회, 생성, 수정, 삭제하는 등의 CRUD 연산을 수행한다.
- Event 클래스:
- 이벤트를 나타내는 클래스이다.
- 이벤트가 발생한 상황의 상태 정보를 포함하고 있으며, 이를 Kafka로 전송하여 다른 시스템이나 컴포넌트에 알린다.
6-2. 이제 패키지를 어떻게 나누는지 설명하겠다.
- controller: 웹 애플리케이션의 컨트롤러 클래스를 포함한다.
- service: 비즈니스 로직을 구현하는 서비스 클래스를 포함한다.
- repository: 데이터 액세스를 위한 리포지토리 클래스를 포함한다.
- dto: 데이터 전달을 위한 DTO (데이터 전송 객체) 클래스를 포함한다.
- config: 애플리케이션의 설정과 관련된 클래스를 포함한다. 이 패키지에는 Kafka와 관련된 설정을 위한 KafkaConfig 클래스를 추가할 수 있다.
- kafka: Kafka와 관련된 클래스를 포함한다. 이 패키지에는 Kafka 이벤트를 수신하고 처리하는 Consumer 클래스, KafkaTemplate을 사용하여 이벤트를 발행하는 Producer 클래스 등이 포함될 수 있다.
6-3. 아래는 이 클래스들이 어떻게 상호작용하는지 나타내는 예시다.
- 이 예시에서, OrderController는 사용자의 주문 요청을 받아서 OrderService의 createOrder 메서드를 호출한다. OrderService는 주문 정보를 데이터베이스에 저장하고, 주문 생성 이벤트를 생성하여 Kafka로 전송한다.
- 이 이벤트는 OrderCreatedEvent 클래스를 이용해 표현된다. 이렇게 Kafka로 전송된 이벤트는 다른 시스템이나 컴포넌트에서 받아 필요한 처리를 수행할 수 있다.
컨트롤러
@Controller
public class OrderController {
@Autowired
private OrderService orderService;
@PostMapping("/orders")
public ResponseEntity<Void> createOrder(@RequestBody Order order) {
orderService.createOrder(order);
return ResponseEntity.status(HttpStatus.CREATED).build();
}
}
서비스
@Service
public class OrderService {
@Autowired
private OrderRepository orderRepository;
private final OrderEventProducer orderEventProducer;
public OrderService(OrderEventProducer orderEventProducer) {
this.orderEventProducer = orderEventProducer;
}
public void createOrder(Order order) {
// 주문을 데이터베이스에 저장
orderRepository.save(order);
// 주문 생성 이벤트를 생성하고 Kafka로 전송
OrderCreatedEvent event = new OrderCreatedEvent(order.getId(), order.getProductId(), order.getAmount(), LocalDateTime.now());
orderEventProducer.publishOrderEvent(event);
}
}
Producer 클래스
@Component
public class OrderEventProducer {
private final KafkaTemplate<String, OrderCreatedEvent> kafkaTemplate;
public OrderEventProducer(KafkaTemplate<String, OrderCreatedEvent> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void publishOrderEvent(OrderCreatedEvent event) {
kafkaTemplate.send("order-events", event);
}
}
configuration
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
public KafkaTemplate<String, OrderCreatedEvent> kafkaTemplate(ProducerFactory<String, OrderCreatedEvent> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
}
consumer
@Component
public class OrderEventConsumer {
@KafkaListener(topics = "order-events", groupId = "my-group")
public void listenOrderEvents(OrderCreatedEvent event) {
// 이벤트 처리 로직을 구현합니다.
System.out.println("Received OrderCreatedEvent: " + event);
// ...
}
}
Repository
@Repository
public interface OrderRepository extends JpaRepository<Order, Long> {
}
kafka 이벤트
public class OrderCreatedEvent {
private String orderId;
private String productId;
private int amount;
private LocalDateTime timestamp;
// 생성자, getters, setters...
@Override
public String toString() {
return "OrderCreatedEvent{" +
"orderId='" + orderId + '\\'' +
", productId='" + productId + '\\'' +
", amount=" + amount +
", timestamp=" + timestamp +
'}';
}
}
2023.10.16 - [Apache Kafka] - Apache Kafka(1) - 카프카란 무엇인가?
2023.10.19 - [Apache Kafka] - Apache Kafka(9) - Replication과 Fault Tolerance
'유용한 개발지식 > Apache Kafka' 카테고리의 다른 글
Kafka(카프카) 클러스터와 브로커의 동작 이해하기 (1) | 2023.12.27 |
---|---|
Kafka(카프카)의 기본 구조 이해하기 (클러스터, 브로커, 토픽, 파티션, 세그먼트) (1) | 2023.12.27 |
Kafka(카프카)란 무엇인가 (2) | 2023.12.27 |
Apache Kafka 복제(Replication)와 장애 허용(Fault Tolerance) 메커니즘 (9편) (1) | 2023.10.19 |
Apache Kafka 오프셋(Offset) 관리와 커밋(Commit) 작동 원리 (8편) (0) | 2023.10.19 |