카프카의 Offset과 Commit의 작동 원리를 알아보자
1. Kafka 2.x 버전에서의 Offset과 Commit 이해하기
1-1. 구조 설명
- Broker: Kafka 클러스터 내의 브로커다(Kafka 인스턴스). 파티션을 관리한다.
- Partition: 메시지가 저장되는 파티션이다. Consumer에게 메시지를 전달한다.
- __consumer_offsets: Consumer가 처리한 마지막 메시지의 위치(offset)를 저장하는 특수한 토픽이다.
- Consumer Group: 하나 이상의 Consumer로 구성된 그룹이다. 각 Consumer는 파티션에서 메시지를 가져와 처리한다.
1-2. 동작 과정
- Broker가 Partition을 관리: Broker는 여러 파티션을 관리하며, 각 파티션에 메시지를 저장한다.
- Partition에서 Consumer로 메시지 전달: 파티션은 Consumer에게 메시지를 전달한다.
- Consumer가 Offset Commit: Consumer는 메시지를 처리한 후, 그 위치(offset)를 __consumer_offsets 토픽에 commit한다. 이렇게 하면 다음에 Consumer가 다시 시작할 때 마지막으로 처리한 메시지 위치부터 시작할 수 있다.
2. Offset과 Commit
2-1. Offset
- 정의: 각 파티션 내에서 메시지의 위치를 나타내는 고유한 식별자이다.
- 동작 방식: Consumer가 메시지를 읽을 때마다 해당 메시지의 Offset을 저장하고, 다음에 읽을 때 이 Offset을 기반으로 메시지를 가져온다.
2-2. Commit
- 정의: Consumer가 읽은 마지막 Offset을 저장하는 과정이다.
- 동작 방식: Commit을 통해 Consumer가 중단된 후에도 이전에 읽은 위치에서 메시지를 다시 가져올 수 있다.
2-3. 예시 코드: Java에서 Offset Commit하기
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.Properties;
import java.util.Arrays;
public class OffsetCommitExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my_topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Consumed record with key %s and value %s%n", record.key(), record.value());
}
consumer.commitSync(); // Offset commit을 수행하는 부분
}
}
}
2-4. Zookeeper의 역할
- 정의: Kafka 2.2 이전 버전에서는 Zookeeper가 Offset을 저장하는 역할을 했다. 그러나 최신 버전에서는 Kafka 자체에서도 Offset을 저장할 수 있다.
- 동작 방식: Consumer가 Commit을 하면, Zookeeper에 Offset 정보가 저장된다.
2-5. Offset과 Commit의 중요성
- 데이터 무결성 보장
- 정의: Consumer가 중간에 실패하거나 재시작되더라도, Commit된 Offset을 통해 데이터 무결성을 보장할 수 있다.
- 동작 방식: Consumer가 다시 시작될 때, 마지막으로 Commit된 Offset부터 메시지를 다시 처리하기 시작한다.
- 분산 처리의 효율성
- 정의: 여러 Consumer가 하나의 Topic을 구독할 때, Offset을 통해 각 Consumer가 어디까지 메시지를 처리했는지 알 수 있다.
- 동작 방식: 각 Consumer는 자신이 처리한 마지막 Offset을 Commit하여, 다른 Consumer와 중복되지 않게 메시지를 처리한다.
3. 주의사항 및 Best Practices
3-1. Auto Commit 설정
- 정의: Kafka Consumer 설정에서 자동으로 Offset을 Commit할지 여부를 설정할 수 있다.
- 동작 방식: enable.auto.commit을 true로 설정하면, Kafka가 자동으로 Offset을 Commit한다. 하지만 이 경우, 메시지 처리가 실패했을 때 재처리가 어렵다.
- 예시 코드:
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
3-2. Commit Interval
- 정의: 자동 Commit을 사용할 경우, 얼마나 자주 Commit을 할지 설정할 수 있다.
- 동작 방식: auto.commit.interval.ms를 통해 Commit 주기를 설정할 수 있다.
- 예시 코드:
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
3-3. Commit 방법
- 정의: Offset을 Commit하는 방법에는 동기식(commitSync)과 비동기식(commitAsync)이 있다.
- 동작 방식: 동기식은 Commit이 완료될 때까지 대기하고, 비동기식은 바로 다음 작업을 수행한다.
- 예시 코드:
consumer.commitAsync(); // 비동기식 Offset Commit
4. Offset과 Commit의 고급 설정
4-1. Offset Reset Policy
- 정의: Consumer가 처음 시작할 때 또는 Offset이 더 이상 존재하지 않을 때 어떤 Offset부터 메시지를 읽을지 설정할 수 있다.
- 동작 방식: auto.offset.reset 설정을 통해 earliest, latest, none 중 하나를 선택할 수 있다.
- 예시 코드:
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
4-2. Commit Callback
- 정의: 비동기식 Commit을 할 때, Commit이 성공적으로 이루어졌는지 실패했는지 확인할 수 있는 Callback을 설정할 수 있다.
- 동작 방식: commitAsync 메서드에 Callback을 전달하여 Commit 결과에 따른 추가 작업을 수행할 수 있다.
- 예시 코드:
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
// Commit 실패 시 처리 로직
}
});
2023.10.19 - [Apache Kafka] - Apache Kafka 복제(Replication)와 장애 허용(Fault Tolerance) 메커니즘 (9편)
Apache Kafka 복제(Replication)와 장애 허용(Fault Tolerance) 메커니즘 (9편)
Kafka의 Replication과 Fault Tolerance는 데이터의 안정성과 가용성을 보장하는 중요한 요소이다. 이러한 개념들은 Kafka 클러스터가 장애 상황에서도 데이터 손실 없이 정상적으로 작동할 수 있게 해준
curiousjinan.tistory.com
2023.10.19 - [Apache Kafka] - Apache Kafka 메시지 포맷: 구조와 특징 (7편)
Apache Kafka 메시지 포맷: 구조와 특징 (7편)
이번 포스트에서는 카프카의 메시지 포맷에 대해서 알아보자 1. Kafka 메시지의 기본 구조 1-1. Kafka 메시지: Kafka에서 주고받는정보의 '상자'이다. 키(Key): 어떤 '칸'에 정보를 넣을지 결정한다. 값(Va
curiousjinan.tistory.com
'Apache Kafka' 카테고리의 다른 글
Kafka(카프카) 클러스터와 브로커의 동작 이해하기 (1) | 2023.12.27 |
---|---|
Kafka(카프카)의 기본 구조 이해하기 (클러스터, 브로커, 토픽, 파티션, 세그먼트) (1) | 2023.12.27 |
Kafka(카프카)란 무엇인가 (2) | 2023.12.27 |
SpringBoot와 Kafka(1) - 기본설명 (0) | 2023.10.19 |
Apache Kafka 복제(Replication)와 장애 허용(Fault Tolerance) 메커니즘 (9편) (1) | 2023.10.19 |