시작하며
안녕하세요. 개발자 stark입니다.
Apache Kafka를 사용하다 보면 메시지 전송 보장 방식(message delivery guarantee)이라는 개념을 접하게 됩니다. 이는 어떤 메시지가 몇 번 전달되는지를 보장하는 방식으로, 애플리케이션이 데이터를 처리하거나 주고받을 때 '중복 처리'나 '데이터 유실'을 어떻게 다룰 것인지를 결정하는 중요한 요소입니다. 이번 포스팅에서는 Kafka의 대표적인 메시지 전달 보장 방식인 At Most Once(최대 한 번), At Least Once(최소 한 번), Exactly Once(정확히 한 번)이 무엇을 의미하는지 기본 개념부터 이해해 봅시다.
메시지 전달 보장 방식이란?
'메시지 전달 보장 방식'은 시스템이 메시지를 처리하거나 전달할 때 발생할 수 있는 중복 또는 손실 상황에 대해 어떤 수준까지 보장해 주는지를 뜻합니다. 분산 시스템이나 메시징 시스템(예: Kafka)에서 네트워크 장애나 애플리케이션 크래시(crash) 등이 발생할 경우 메시지가 한 번 이상 전달되거나, 아예 전달되지 못하는 상황이 발생할 수 있습니다. 이러한 상황에서 각각의 보장 방식은 다음과 같은 의미를 가집니다. 이 세 가지 방식을 이해할 때 핵심은 중복(duplication)과 유실(loss)입니다.
- At Most Once (최대 한 번): 메시지가 기껏해야 한 번만 전달됩니다. 즉, 중복은 발생하지 않지만, 메시지가 유실될 수 있습니다.
- At Least Once (최소 한 번): 메시지가 적어도 한 번 이상 전달됩니다. 즉, 메시지 유실은 없지만, 중복 전달(두 번 이상 전달)이 발생할 수 있습니다.
- Exactly Once (정확히 한 번): 메시지가 정확히 한 번만 전달되도록 보장됩니다. 중복도 없고 유실도 없는 이상적인 시나리오입니다.
각각 자세히 살펴보기 전에 전체적으로 어떤 차이점이 있는지 살펴봅시다.
각 전달 방식의 처리 흐름은 다음과 같습니다.
At Most Once (최대 한 번 전달 보장)
'At Most Once'는 메시지를 한 번 이하로 전달하는 방식입니다. 다시 말해, '각 메시지는 많아야 한 번 처리'되며 어떠한 상황에서도 두 번 이상 처리되지 않습니다. 대신, 문제가 발생하면 아예 처리되지 않고 넘어갈 위험이 있습니다.
Kafka에서 'At Most Once' 처리를 구현하려면 메시지를 받자마자 “처리되었다”라고 표시(오프셋 커밋) 해야 합니다. Consumer(소비자)가 Broker로부터 메시지를 가져오면, 아직 실제 로직을 수행하기 전에 해당 메시지의 오프셋(offset)을 커밋(commit)해버리는 것입니다. 이렇게 하면 중복 처리는 일어나지 않지만, 만약 커밋 후에 애플리케이션이 장애가 나서 실제 처리를 못 했더라도 Kafka 입장에서는 이미 처리가 완료된 것으로 간주합니다. 그 결과 해당 메시지는 다시 전달되지 않기 때문에 유실되게 됩니다.
특징을 요약하자면 다음과 같습니다.
- 중복 처리 없음 (최대 한 번 처리).
- 처리 직전 장애 발생 시 데이터 유실 가능.
- 구현이 간단하고 처리 지연이 적음 (확인 절차 최소화).
At Most Once의 처리 흐름을 살펴봅시다.
- 먼저 정상적으로 처리가 되었을 경우의 흐름은 다음과 같습니다.
- 장애가 발생한 경우의 흐름은 다음과 같습니다.
간단한 예시 코드 (Spring, Java 기반)
이제부터 살펴볼 예시는 Spring Boot 환경에서 Kafka Consumer가 At Most Once 방식으로 동작하도록 구성한 예시 코드입니다. 이 방식은 자동 커밋(auto commit) 옵션을 활성화하여, 메시지를 수신하자마자 오프셋이 커밋되므로(실제로는 컨테이너가 주기적으로 커밋), 만약 처리 중 예외가 발생해도 해당 메시지는 다시 처리되지 않습니다.
컨슈머 설정부터 진행합니다.
- 컨슈머 설정에서는 ENABLE_AUTO_COMMIT_CONFIG를 true로 설정해, KafkaListener가 메시지를 받자마자(또는 주기적으로) 오프셋을 커밋하도록 합니다.
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "at-most-once-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 자동 커밋을 활성화하여, 메시지 수신 시점에 오프셋이 커밋됨
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// 자동 커밋 주기 (예: 1초)
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
다음으로 "log-topic"을 구독하는 Kafka Listener를 선언합니다. 오토 커밋이 활성화된 상태이므로, 메시지가 수신되면 바로 오프셋이 커밋되기 때문에, 메시지 처리 도중 예외가 발생하더라도 해당 메시지는 다시 처리되지 않습니다. (비즈니스 오류가 발생해도 재처리 X)
@Service
public class AtMostOnceConsumerService {
@KafkaListener(topics = "log-topic",
groupId = "at-most-once-group",
containerFactory = "kafkaListenerContainerFactory"
)
public void listen(String message) {
// 이 시점에서 Kafka 컨테이너에 의해 오프셋이 자동 커밋되었을 가능성이 있음
process(message);
}
private void process(String message) {
// 메시지 처리 로직
System.out.println("Processing message: " + message);
// 예시: 처리 중 예외 발생 (이 경우, 이미 오프셋이 커밋되어 있으므로 재처리되지 않음)
if(message.contains("error")) {
throw new RuntimeException("오류 발생!");
}
}
}
실무 예를 들어 생각해 보면, 로그 수집 시스템이 At Most Once로 동작한다면 네트워크 순간 장애 시 일부 로그를 아예 잃어버릴 수 있지만 중복된 로그는 없을 것입니다. 중복으로 인한 오류를 매우 민감하게 받아들이지만, 약간의 데이터 손실은 치명적이지 않은 상황이라면 At Most Once도 고려될 수 있습니다. 하지만 일반적으로 중요한 데이터의 경우 데이터 유실이 바람직하지 않기 때문에 실제 활용은 드뭅니다.
어떤 상황에 적합한가?
- 시스템 로그, 사용자 활동 로그, 성능 메트릭 등의 수집 작업에서는 일부 데이터가 유실되어도 전체 추세 분석에 큰 영향이 없습니다.
- 고유성이 중요한 작업(예: 사용자 계정 생성, 고유 ID 할당)에서는 중복 처리보다 데이터 손실이 더 수용 가능할 수 있습니다.
- 실시간 데이터 스트리밍(예: IoT 센서 데이터, 라이브 분석)에서는 일부 데이터 손실보다 낮은 지연 시간과 높은 처리량이 더 중요할 수 있습니다.
주의사항: At-Most-Once 패턴은 데이터 유실 가능성이 있으므로, 모든 메시지가 반드시 처리되어야 하는 중요한 비즈니스 트랜잭션(예: 결제 처리, 주문 관리)에는 적합하지 않습니다. 이러한 경우에는 데이터 무결성을 보장하는 At-Least-Once 또는 Exactly-Once 패턴을 고려해야 합니다.
At Least Once (최소 한 번 전달)
'At Least Once' 전달 보장은 메시지가 적어도 한 번은 소비자에게 전달됨을 의미합니다. 즉, 어떠한 경우에도 메시지가 유실되지 않도록 보장하지만, 네트워크 오류나 재시도 로직 때문에 동일한 메시지가 둘 이상 중복 전달될 수 있습니다. 예를 들어 프로듀서(Producer)가 브로커로 메시지를 보냈지만 확인 응답(ACK)을 못 받았을 때, 프로듀서는 해당 메시지를 다시 보내게 되고 이로 인해 같은 메시지가 중복 저장되거나 소비자에게 두 번 처리될 수 있습니다.
실패 시나리오 1: Ack 응답 손실
실패 시나리오 2: Consumer 오프셋 커밋 실패
카프카에서는 기본적으로 이 'At Least Once(최소 한 번 전달)' 전략을 따르며(데이터 손실보다는 중복 허용), 대부분의 메시징 시나리오에서 사용됩니다. 중복 전달이 가능하다는 단점이 있지만, 구현이 비교적 간단하고 데이터 손실을 피할 수 있다는 장점 때문에 널리 쓰이는 전송 보장 방식입니다.
실무 예시 (자주 발생하는 상황)
실무에서는 컨슈머(Consumer) 측 처리 중에 장애가 발생하여 메시지가 중복 처리되는 상황이 종종 발생합니다. 예를 들어 주문 처리 애플리케이션을 생각해 봅시다. 컨슈머가 주문 메시지를 읽어서 결제 처리를 완료한 후 오프셋(commit)을 저장하기 전에 서버가 다운되면, 해당 주문 메시지는 처리됐음에도 불구하고 커밋되지 않았기 때문에 재시작된 컨슈머나 다른 인스턴스에 의해 다시 처리됩니다. 이로 인해 동일한 주문에 대해 두 번 결제가 시도되거나, 중복된 확인 이메일이 발송되는 문제가 생길 수 있습니다.
'최소 한 번' 보장 환경에서는 이런 중복 처리를 애플리케이션 레벨에서 감지하여 멱등성(idempotence) 있게 처리하거나, 중복으로 인한 영향을 복구해야 합니다.
애플리케이션 레벨에서 멱등성 있게 구현한 예시는 다음과 같습니다. (결제 처리)
@Transactional
public void processPayment(String orderId, BigDecimal amount) {
// 1. 이미 처리된 주문인지 확인
if (paymentRepository.existsByOrderId(orderId)) {
log.info("주문 {} 이미 처리됨. 중복 건너뜀", orderId);
return; // 이미 처리된 주문은 무시
}
// 2. 결제 처리 로직
PaymentResult result = paymentGateway.processPayment(
orderId, amount);
// 3. 결제 결과 저장 (멱등성 보장용)
Payment payment = new Payment();
payment.setOrderId(orderId);
payment.setAmount(amount);
payment.setTransactionId(result.getTransactionId());
payment.setProcessedAt(LocalDateTime.now());
paymentRepository.save(payment);
// 4. 이메일 발송 (멱등성 보장)
if (!emailRepository.existsByOrderIdAndType(
orderId, EmailType.ORDER_CONFIRMATION)) {
emailService.sendOrderConfirmation(orderId);
}
}
간단한 예시 코드 (Spring, Java 기반)
다음은 카프카 컨슈머를 최소 한 번 전달로 동작시키는 간단한 자바 예시 코드입니다. 핵심은 메시지를 처리한 후에 오프셋을 커밋함으로써, 처리 도중 장애가 발생하면 커밋이 이루어지지 않아 해당 메시지를 다시 가져와 재처리할 수 있게 하는 것입니다 (메시지 중복 가능).
먼저, 컨슈머 설정 클래스를 작성합시다.
- 자동 커밋을 비활성화하고 수동 커밋을 사용하도록 ConcurrentKafkaListenerContainerFactory를 설정합니다. ContainerProperties.AckMode.MANUAL 설정으로 수동 커밋 모드를 활성화하여 명시적인 ack.acknowledge() 호출에 의해서만 오프셋이 커밋되도록 합니다.
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
// Kafka 브로커 주소 설정
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// 컨슈머 그룹 ID 설정
props.put(ConsumerConfig.GROUP_ID_CONFIG, "example-group");
// 키와 값의 디시리얼라이저 설정
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 자동 커밋 비활성화
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 수동 커밋(Manual Ack) 모드 설정
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
}
다음은 리스너 코드를 작성합시다.
- 이 카프카 리스너는 메시지 처리 로직이 성공하면 Acknowledgment.acknowledge()를 호출하여 오프셋을 커밋하며, 처리 도중 문제가 발생하면 커밋을 하지 않아 메시지가 재처리되도록 합니다.
@Component
public class OrderListener {
@KafkaListener(topics = "orders",
groupId = "example-group",
containerFactory = "kafkaListenerContainerFactory"
)
public void listen(ConsumerRecord<String, String> record,
Acknowledgment ack) {
try {
// 1. 메시지 수신
String message = record.value();
log.info("Received message: {}", message);
// 2. 메시지 처리 로직 (예: 주문 데이터 DB 저장, 결제 요청 등)
processBusinessLogic(message);
// 3. 처리 완료 후 오프셋 커밋 -> 최소 한 번 보장 (장애 시 재처리로 중복 발생 가능)
ack.acknowledge();
} catch (Exception e) {
// 4. 예외 발생 시 오프셋 커밋하지 않음
log.error("Error processing message: {}", e.getMessage(), e);
// ack.acknowledge()를 호출하지 않음
// -> 메시지가 재처리됨
}
}
private void processBusinessLogic(String orderData) {
// 실제 주문 처리 로직 구현 (예: 주문 데이터 파싱, DB 저장 등)
System.out.println("Order processed: " + orderData);
}
}
이와 같이 스프링 카프카 리스너를 활용하면, Kafka의 최소 한 번 처리 패턴을 손쉽게 구현할 수 있습니다. 이를 통해 메시지 처리를 확실하게 진행하며, 장애 발생 시에도 메시지 유실 없이 재처리할 수 있습니다. 이렇게 하면 만약 처리 과정에서 예외 발생 또는 컨슈머 장애로 ack.acknowledge()까지 도달하지 못하면, 해당 오프셋이 커밋되지 않으므로 다음에 컨슈머가 재시작될 때 같은 메시지를 다시 받게 됩니다. 이 방식이 바로 최소 한 번(At-Least-Once) 처리이며, 메시지를 확실히 잃어버리지 않고자 할 때 사용하는 패턴입니다. (반대로, 오프셋을 미리 커밋하고 처리하는 방식은 최대 한 번(At-Most-Once)에 해당하지만, 그 경우 장애 시 메시지가 손실될 수 있습니다.)
마지막으로 At Least Once에서의 재처리 흐름을 살펴봅시다.
어떤 상황에 적합한가?
- 데이터 손실을 피하는 것이 최우선일 때 적합합니다. 시스템이 불안정하거나 네트워크 이슈가 있어도 메시지를 잃어버리면 안 되는 경우 선택합니다.
- 구현 난이도와 성능 측면에서 유리합니다. 특별한 추가 구성 없이 기본 설정으로도 동작하며, Exactly-Once에 비해 오버헤드가 적고 처리량이 높습니다.
- 중복 허용 또는 처리 가능한 시나리오에 잘 맞습니다. 예를 들어 로그 수집, 분석 이벤트, 메트릭 처리 등에서는 중복된 데이터가 들어와도 크게 문제가 없거나 후에 중복 제거를 통해 정합성을 맞출 수 있는 경우에 유용합니다.
- 주의사항: 중복으로 인한 영향이 큰 경우(예: 금전적 거래)에는 추가적인 대비책이 필요합니다. At-Least-Once만으로는 중복 방지가 되지 않으므로, 컨슈머 측에서 멱등 처리 로직을 넣거나 Exactly-Once 방식을 고려해야 합니다.
참고사항: Outbox, Inbox 패턴
Transactional Outbox Pattern이나 Inbox Pattern들은 기본적으로 애플리케이션 레벨에서 데이터베이스 트랜잭션과 메시지 전송의 일관성을 맞추기 위한 방법입니다. 대부분의 경우, 아웃박스 패턴을 사용할 때 메시지 발행은 별도의 프로세스로 이루어지기 때문에, 그 자체는 보통 At-Least-Once 전송 보장(즉, 메시지가 최소 한 번은 전송되고, 중복 전송될 가능성이 있음) 환경에서 동작합니다. 즉, 메시지가 재전송될 수 있으므로 애플리케이션에서는 멱등성(idempotency)이나 인박스 패턴을 통해 중복 처리를 방지할 필요가 있습니다. 궁금하시다면 아래 2가지 글을 통해 어떤 식으로 아키텍처가 구현되는지 확인하실 수 있습니다.
2024.10.13 - [아키텍처] - [MSA] Transactional Outbox Pattern
[MSA] Transactional Outbox Pattern
안녕하세요! 글쓰는 개발자 stark입니다. 저는 최근 1년간 주로 MSA 프로젝트를 진행해 왔는데요 프로젝트를 설계하고 개발하면서 항상 같은 고민을 해왔습니다. 바로 MSA 서버 간 데이터 동기화 방
curiousjinan.tistory.com
2025.04.07 - [아키텍처] - [MSA] Kafka 컨슈머 Inbox 패턴
[MSA] Kafka 컨슈머 Inbox 패턴
시작하며안녕하세요. 개발자 stark입니다. 최근 Transactional Outbox Pattern에 대한 대화를 하게 되었는데 제가 아직 이 패턴에 대해 모르는 점이 많다는 것을 깨달았습니다. 그래서 이 궁금증을 해결
curiousjinan.tistory.com
Exactly Once (정확히 한 번 전달)
Exactly Once 전달 보장은 각 메시지가 정확히 한 번만 소비되도록 하는 것을 목표로 합니다. 즉 메시지 손실도 없고 중복도 없는 이상적인 처리입니다. 시스템이 어떤 장애를 겪더라도 각 이벤트의 효과가 한 번만 적용되는 것을 보장하는 것이죠. 이것을 달성하기 위해 카프카는 멱등성(idempotence) 프로듀서와 트랜잭션 등의 메커니즘을 도입했습니다. 다만 분산 시스템에서 진정한 의미의 "정확히 한 번" 전달은 이론적으로 매우 어려운 문제이며 , 실제로는 중복 기록을 방지하거나 중복된 메시지를 무시하는 방법으로 "한 번만 처리된 것처럼" 만드는 방식입니다.
카프카는 0.11 버전부터 이러한 Exactly-Once 처리(일명 EOS: Exactly-Once Semantics)를 지원해 왔으며, 프로듀서-브로커-컨슈머가 협력하여 중복을 식별/배제하고 원자적으로(offset과 메시지를 함께) 커밋함으로써 결과적으로 한 번만 처리된 것과 동일한 효과를 얻습니다. 이 방식에서는 프로듀서가 보내는 메시지마다 고유 식별자와 순번을 붙여 브로커에 전송하고, 컨슈머 측에서도 트랜잭션 내에서 처리와 오프셋 커밋을 관리하여 중복이나 손실 없이 데이터를 제공하게 됩니다. 그 결과, Exactly-Once 모드에서는 각 메시지가 시스템 내 딱 한 번만 나타나게 되는 것입니다.
실무 예시 (자주 발생하는 상황)
Exactly-Once 보장이 필요한 대표적인 사례로는 금융 거래나 결제 시스템을 들 수 있습니다. 예를 들어 은행 계좌 이체 이벤트를 처리할 때 동일 메시지가 두 번 처리되어 한 거래가 중복으로 적용된다면, 한 번의 이체 요청으로 돈이 두 번 빠져나가는 치명적인 오류가 발생할 수 있습니다.
이러한 비즈니스적으로 중대한 데이터의 경우 반드시 중복 없이 한 번만 처리되어야 하며, Kafka의 Exactly-Once 기능을 통해 이러한 시나리오를 지원할 수 있습니다.
Exactly Once의 현실적 의미
간단한 예시 코드 (Spring, Java 기반)
카프카에서 정확히 한 번 전송을 구현하려면 프로듀서와 컨슈머 설정에 몇 가지 옵션을 추가하고, 트랜잭션 API를 사용해야 합니다. 아래 Java 코드 예시는 멱등 프로듀서와 트랜잭션을 활용하여 Exactly-Once 전송을 달성하는 방법을 보여줍니다.
먼저 Producer 측의 코드를 설정해 줍니다.
- acks=all과 enable.idempotence=true를 통해 프로듀서가 중복 없이 안정적인 전송을 할 수 있게 합니다.
- transactional.id를 지정하고, KafkaTemplate을 통해 트랜잭션 모드로 동작하도록 구성합니다
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
// Kafka 브로커 주소
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// 키, 값 시리얼라이저 설정
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 모든 복제본이 ACK 응답할 때까지 대기하여 내구성을 보장
configProps.put(ProducerConfig.ACKS_CONFIG, "all");
// 멱등성 활성화: 중복 전송 방지를 위한 설정
configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// 트랜잭션 식별자 설정 (고유해야 함)
configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-" + UUID.randomUUID()); // 트랜잭션 ID
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
KafkaTemplate<String, String> template = new KafkaTemplate<>(producerFactory());
// 트랜잭션 지원 활성화
template.setTransactionIdPrefix("tx-");
return template;
}
}
컨슈머 측의 코드도 설정해 줍니다.
- isolation.level을 read_committed로 설정하여, 커밋된(트랜잭션이 완료된) 메시지만 읽도록 합니다.
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "exactly-once-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 커밋된 트랜잭션의 메시지만 읽도록 설정
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 컨슈머 측도 수동 커밋 모드 사용 (정확히 한 번 처리 시 오프셋 커밋을 트랜잭션에 포함시킬 수 있음)
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
}
아래 코드는 @KafkaListener를 사용하여 "source-topic"에서 메시지를 수신한 후, KafkaTemplate의 트랜잭션 기능을 통해 다른 토픽에 여러 메시지를 전송하는 예제입니다.
- acks=all과 enable.idempotence=true로 중복 없는 안정적인 전송을 보장합니다.
- transactional.id를 설정한 후, KafkaTemplate의 executeInTransaction 메서드를 호출하여 트랜잭션을 시작합니다.
- 트랜잭션 범위 내에서 여러 메시지 전송 및 오프셋 커밋을 하나의 원자적 작업으로 처리합니다. 모든 메시지가 성공하면 한꺼번에 커밋되고 중간에 에러가 발생하면 해당 트랜잭션 내 모든 전송이 롤백되어, 부분 실패나 중복 결과가 남지 않습니다.
- 컨슈머 측에서는 read_committed 설정 덕분에 트랜잭션이 완료된 메시지만 읽도록 설정되어 불완전한 트랜잭션의 메시지는 필터링됩니다. 즉, 오직 커밋된 메시지만 읽게 됩니다.
@RequiredArgsConstructor
@Service
public class ExactlyOnceListenerService {
private final KafkaTemplate<String, String> kafkaTemplate;
private final Logger logger = LoggerFactory.getLogger(ExactlyOnceService.class);
@KafkaListener(
topics = "source-topic",
groupId = "exactly-once-group",
containerFactory = "kafkaListenerContainerFactory"
)
public void processMessage(ConsumerRecord<String, String> record, Acknowledgment ack) {
String key = record.key();
String value = record.value();
try {
// 트랜잭션 내에서 메시지 처리 및 전송
kafkaTemplate.executeInTransaction(operations -> {
logger.info("Processing message: {}", value);
// 비즈니스 로직 처리
String processedValue = processBusinessLogic(value);
// 처리된 메시지를 다른 토픽으로 전송
operations.send("destination-topic", key, processedValue);
return null;
});
// 메시지 처리 성공 시 수동으로 오프셋 커밋
ack.acknowledge();
logger.info("Message processed successfully and offset committed");
} catch (Exception e) {
// 오류 발생 시 롤백 (ack를 호출하지 않음)
logger.error("Error processing message: {}", e.getMessage(), e);
// 재시도 정책에 따라 후속 조치 가능
}
}
private String processBusinessLogic(String value) {
// 실제 비즈니스 로직 구현
return "Processed: " + value;
}
}
이 예시에서의 메시지 처리 흐름은 다음과 같습니다.
예외처리 흐름은 다음과 같습니다.
Producer 측
- acks=all과 enable.idempotence=true를 통해 프로듀서가 중복 없이 안정적으로 메시지를 전송합니다.
- transactional.id 설정 후 KafkaTemplate 내에서 executeInTransaction()을 호출하면, 프로듀서는 트랜잭션 모드로 동작하여 여러 메시지 전송과 컨슈머 오프셋 커밋을 하나의 원자적 작업으로 묶습니다.
- 트랜잭션 내 모든 메시지가 성공적으로 전송되면 commitTransaction()이 호출되어 한꺼번에 커밋되며, 중간에 에러 발생 시 abortTransaction()으로 롤백되어 부분 실패나 중복 결과가 남지 않습니다.
Consumer 측
- isolation.level=read_committed 설정으로, 커밋 완료된 트랜잭션의 메시지만 읽게 되어 트랜잭션이 불완전한 동안의 중간 결과는 필터링됩니다.
- @KafkaListener에서 메시지 처리 후, 수동으로 오프셋을 커밋(ack.acknowledge())하여 Exactly-Once 처리를 보완합니다.
이와 같이 스프링 카프카에서 프로듀서와 컨슈머의 설정 및 트랜잭션 API를 활용하면, 프로듀서가 메시지를 재시도하더라도 브로커가 멱등성을 통해 중복 메시지를 걸러내고, 컨슈머는 커밋된 메시지만 처리함으로써 Exactly Once 전송 보장을 구현할 수 있습니다.
구현의 핵심 포인트는 다음과 같습니다.
어떤 상황에 적합한가
- 데이터 정확성이 최우선인 업무에 적합합니다. 금융 거래, 결제 시스템, 재고 관리, 은행 계좌 이체 등 중복이나 손실이 치명적인 도메인에서는 Exactly-Once를 통해 한 번만 처리되도록 해야 합니다.
- 엔드투엔드 정합성이 요구되는 파이프라인에 사용됩니다. 예를 들어 카프카 스트림즈(Kafka Streams) 애플리케이션이나 ETL 파이프라인 등에서는 입력부터 출력까지 한 번씩만 처리하도록 Exactly-Once (처리) 모드를 활용할 수 있습니다.
- 구현 및 운영 오버헤드가 있으므로 신중히 선택해야 합니다. Exactly-Once 보장은 가장 높은 구현 복잡도와 비용을 수반하며, 성능 측면에서도 추가 지연이 발생할 수 있습니다. 따라서 모든 시스템에 적용하기보다는, 정말로 중복 발생이 용납되지 않는 핵심 부분에만 사용하는 것이 일반적입니다. 예를 들어 실시간 분석 시스템에서도 약간의 중복은 감당할 수 있다면 At-Least-Once로 구현하고, 꼭 필요한 부분만 Exactly-Once로 설계합니다.
출처
Demystifying Apache Kafka Message Delivery Semantics
Apache Kafka® supports 3 message delivery semantics: at-most-once, at-least-once, and exactly-once. Learn the trade-offs between each configuration.
keen.io
Message Delivery Guarantees for Apache Kafka | Confluent Documentation
At most once - In this case, a consumer reads a set of messages, saves its position in the log, and then processes the messages. If the consumer process crashes after saving its position, but before saving the output of its message processing, the consumer
docs.confluent.io
Kafka Consume & Produce: At-Least-Once Delivery
Understanding the meaning and ramifications of the at-least-once delivery guarantees provided by Kafka.
www.lydtechconsulting.com
Exactly-once semantics with Kafka transactions
Apache Kafka is a distributed streaming platform that allows efficient and reliable processing of messages. It is widely used in various industries for data streaming applications, such as processing real-time data, event sourcing, and microservices integr
strimzi.io
'Apache Kafka' 카테고리의 다른 글
Kafka에서 Locale.ROOT 사용의 중요성 (3) | 2024.12.06 |
---|---|
[kafka] Docker로 카프카 실행하기 (KRaft 모드) (7) | 2024.08.25 |
[kafka] Spring실행 시 consumer 연결문제 해결 (8) | 2024.02.18 |
[kafka] 스프링부트와 kafka를 이용한 slack 예외알림 구현 (1) | 2024.02.18 |
[Kafka] 슬랙 webhook 설정하기 (kafka에서 호출) (0) | 2024.02.18 |