반응형
카프카의 Offset과 Commit의 작동 원리를 알아보자
1. Kafka 2.x 버전에서의 Offset과 Commit 이해하기
1-1. 구조 설명
- Broker: Kafka 클러스터 내의 브로커다(Kafka 인스턴스). 파티션을 관리한다.
- Partition: 메시지가 저장되는 파티션이다. Consumer에게 메시지를 전달한다.
- __consumer_offsets: Consumer가 처리한 마지막 메시지의 위치(offset)를 저장하는 특수한 토픽이다.
- Consumer Group: 하나 이상의 Consumer로 구성된 그룹이다. 각 Consumer는 파티션에서 메시지를 가져와 처리한다.
1-2. 동작 과정
- Broker가 Partition을 관리: Broker는 여러 파티션을 관리하며, 각 파티션에 메시지를 저장한다.
- Partition에서 Consumer로 메시지 전달: 파티션은 Consumer에게 메시지를 전달한다.
- Consumer가 Offset Commit: Consumer는 메시지를 처리한 후, 그 위치(offset)를 __consumer_offsets 토픽에 commit한다. 이렇게 하면 다음에 Consumer가 다시 시작할 때 마지막으로 처리한 메시지 위치부터 시작할 수 있다.
2. Offset과 Commit
2-1. Offset
- 정의: 각 파티션 내에서 메시지의 위치를 나타내는 고유한 식별자이다.
- 동작 방식: Consumer가 메시지를 읽을 때마다 해당 메시지의 Offset을 저장하고, 다음에 읽을 때 이 Offset을 기반으로 메시지를 가져온다.
2-2. Commit
- 정의: Consumer가 읽은 마지막 Offset을 저장하는 과정이다.
- 동작 방식: Commit을 통해 Consumer가 중단된 후에도 이전에 읽은 위치에서 메시지를 다시 가져올 수 있다.
2-3. 예시 코드: Java에서 Offset Commit하기
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.Properties;
import java.util.Arrays;
public class OffsetCommitExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my_topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Consumed record with key %s and value %s%n", record.key(), record.value());
}
consumer.commitSync(); // Offset commit을 수행하는 부분
}
}
}
2-4. Zookeeper의 역할
- 정의: Kafka 2.2 이전 버전에서는 Zookeeper가 Offset을 저장하는 역할을 했다. 그러나 최신 버전에서는 Kafka 자체에서도 Offset을 저장할 수 있다.
- 동작 방식: Consumer가 Commit을 하면, Zookeeper에 Offset 정보가 저장된다.
2-5. Offset과 Commit의 중요성
- 데이터 무결성 보장
- 정의: Consumer가 중간에 실패하거나 재시작되더라도, Commit된 Offset을 통해 데이터 무결성을 보장할 수 있다.
- 동작 방식: Consumer가 다시 시작될 때, 마지막으로 Commit된 Offset부터 메시지를 다시 처리하기 시작한다.
- 분산 처리의 효율성
- 정의: 여러 Consumer가 하나의 Topic을 구독할 때, Offset을 통해 각 Consumer가 어디까지 메시지를 처리했는지 알 수 있다.
- 동작 방식: 각 Consumer는 자신이 처리한 마지막 Offset을 Commit하여, 다른 Consumer와 중복되지 않게 메시지를 처리한다.
3. 주의사항 및 Best Practices
3-1. Auto Commit 설정
- 정의: Kafka Consumer 설정에서 자동으로 Offset을 Commit할지 여부를 설정할 수 있다.
- 동작 방식: enable.auto.commit을 true로 설정하면, Kafka가 자동으로 Offset을 Commit한다. 하지만 이 경우, 메시지 처리가 실패했을 때 재처리가 어렵다.
- 예시 코드:
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
3-2. Commit Interval
- 정의: 자동 Commit을 사용할 경우, 얼마나 자주 Commit을 할지 설정할 수 있다.
- 동작 방식: auto.commit.interval.ms를 통해 Commit 주기를 설정할 수 있다.
- 예시 코드:
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
3-3. Commit 방법
- 정의: Offset을 Commit하는 방법에는 동기식(commitSync)과 비동기식(commitAsync)이 있다.
- 동작 방식: 동기식은 Commit이 완료될 때까지 대기하고, 비동기식은 바로 다음 작업을 수행한다.
- 예시 코드:
consumer.commitAsync(); // 비동기식 Offset Commit
4. Offset과 Commit의 고급 설정
4-1. Offset Reset Policy
- 정의: Consumer가 처음 시작할 때 또는 Offset이 더 이상 존재하지 않을 때 어떤 Offset부터 메시지를 읽을지 설정할 수 있다.
- 동작 방식: auto.offset.reset 설정을 통해 earliest, latest, none 중 하나를 선택할 수 있다.
- 예시 코드:
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
4-2. Commit Callback
- 정의: 비동기식 Commit을 할 때, Commit이 성공적으로 이루어졌는지 실패했는지 확인할 수 있는 Callback을 설정할 수 있다.
- 동작 방식: commitAsync 메서드에 Callback을 전달하여 Commit 결과에 따른 추가 작업을 수행할 수 있다.
- 예시 코드:
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
// Commit 실패 시 처리 로직
}
});
2023.10.19 - [Apache Kafka] - Apache Kafka 복제(Replication)와 장애 허용(Fault Tolerance) 메커니즘 (9편)
2023.10.19 - [Apache Kafka] - Apache Kafka 메시지 포맷: 구조와 특징 (7편)
반응형
'유용한 개발지식 > Apache Kafka' 카테고리의 다른 글
Kafka(카프카) 클러스터와 브로커의 동작 이해하기 (1) | 2023.12.27 |
---|---|
Kafka(카프카)의 기본 구조 이해하기 (클러스터, 브로커, 토픽, 파티션, 세그먼트) (1) | 2023.12.27 |
Kafka(카프카)란 무엇인가 (2) | 2023.12.27 |
SpringBoot와 Kafka(1) - 기본설명 (0) | 2023.10.19 |
Apache Kafka 복제(Replication)와 장애 허용(Fault Tolerance) 메커니즘 (9편) (1) | 2023.10.19 |