Consumer의 subscribe, poll , commit 로직
Consumer는 subscribe()를 호출하여 읽어 들이려는 토픽을 등록한다.
Consumer는 poll 메소드를 이용하여 주기적으로 브로커의 토픽 파티션에서 메시지를 가져온다.
메시지를 성공적으로 가져왔으면 commit을 통해서 __consumer_offsets에 다음에 읽을 offset을 기재한다.
group meta data에 어떤 consumer가 어떤 partition을 consume하고 있다가 나와있지
__consumer_offsets에 나와있지 않다.
중복 읽기가 발생하는 상황
consumer1이 poll만하고 아직 commit을 하지 않은 상황에서 고장이나서 종료되면
conusmer2가 읽어야하는 부분은 5부터 읽어야된다.
읽기 누락 상황
poll을 한후에 process를 다 끝내지 않고 그냥 바로 commit을 한 경우
(이런경우는 흔치 않다. 사용자가 잘못 사용하는 경우이다.)
Conusmer의 auto commit
Consumer의 파라미터로 auto.enable.commit=true 인 경우 읽어온 메시지를 브로커에 바로 commit 적용하지 않고, auto.commit.interval.ms 에 정해진 주기(기본 5초) 마다 Consumer가 자동으로 Commit을 수행한다.
Consumer가 읽어온 메시지보다 브로커의 Commit이 오래 되었으므로 Consumer의 장애/재기동 및 Rebalancing 후 브로커에서 이미 읽어온 메시지를 다시 읽어와서 중복처리 될 수 있음
한번 poll 해올 때마다 3초 걸린다고 가저하면 2번째 poll이 끝나고 3번쨰 poll이 일어나는 시점은 6초가 지난시점이다.
이 3번째 poll에서 기본 설정 주기인 5초가 지났음으로 commit이 전달 되게 되고 0~3까지 읽었으니 4가 전달되게 된다.
public static void pollAutoCommit(KafkaConsumer<String, String> kafkaConsumer) {
int loopCnt = 0;
try {
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
logger.info(" ######## loopCnt: {} consumerRecords count:{}", loopCnt++, consumerRecords.count());
for (ConsumerRecord record : consumerRecords) {
logger.info("record key:{}, partition:{}, record offset:{} record value:{}",
record.key(), record.partition(), record.offset(), record.value());
}
try {
logger.info("main thread is sleeping {} ms during while loop", 10000);
Thread.sleep(10000);
}catch(InterruptedException e) {
e.printStackTrace();
}
}
}catch(WakeupException e) {
logger.error("wakeup exception has been called");
}finally {
logger.info("finally consumer is closing");
kafkaConsumer.close();
}
}
별도의 commit하는 로직이 따로 없다. 저절로 되는것이니까
Conusmer manual commit(sync, async)
Sync
- Consumer객체의 commitSync() 메소드를 사용
- 메시지 배치를 poll을 통해서 읽어오고 해당 메시지들의 마지막 offset을 브로커에 commit을 적용한다.
- 브로커에 commit 적용이 성공적으로 될 때까지 블로킹을 적용한다.
- commit 적용 완료후에 다시메시지를 읽어 들인다.
- 브로커에 Commit 적용이 실패할 경우 다시 commit 적용 요청을 한다.
- 다음 poll을 진행하지 않는다. (async와 큰 차이점)
- 비동기 방식 대비 수행시간이 더 느리다.
Async
- Conusmer 객체의 commitAsync() 메소드를 사용
- 메시지 배치를 poll()을 통해서 읽어오고 해당 메시지들의 마지막 offset을 브로커에 commit 적용 요청하지만 브로커에 commit 적용이 성공적으로 되었음을 기다리지 않고 (블로킹 하지 않음) 계속 메시지를 읽어온다.
- 브로커에 Commit 적용이 실패해도 다시 Commit 시도를 하지 않는다. 때문에 Consumer장애 또는 Rebalance 시 한번 읽은 메시지를 다시 중복해서 가져 올 수 있음
- 동기방식대기 수행시간이 더 빠르다.
private static void pollCommitSync(KafkaConsumer<String, String> kafkaConsumer) {
int loopCnt = 0;
try {
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
logger.info(" ######## loopCnt: {} consumerRecords count:{}", loopCnt++, consumerRecords.count());
for (ConsumerRecord record : consumerRecords) {
logger.info("record key:{}, partition:{}, record offset:{} record value:{}",
record.key(), record.partition(), record.offset(), record.value());
}
try {
if(consumerRecords.count() > 0 ) {
kafkaConsumer.commitSync();
logger.info("commit sync has been called");
}
} catch(CommitFailedException e) {
logger.error(e.getMessage());
}
}
}catch(WakeupException e) {
logger.error("wakeup exception has been called");
}catch(Exception e) {
logger.error(e.getMessage());
}finally {
logger.info("finally consumer is closing");
kafkaConsumer.close();
}
}
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
kafkaConsumer.commitSync를 호출해준다. 이때 AutoCommit은 default가 true인데 false로 해줘야 한다.
private static void pollCommitAsync(KafkaConsumer<String, String> kafkaConsumer) {
int loopCnt = 0;
try {
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
logger.info(" ######## loopCnt: {} consumerRecords count:{}", loopCnt++, consumerRecords.count());
for (ConsumerRecord record : consumerRecords) {
logger.info("record key:{}, partition:{}, record offset:{} record value:{}",
record.key(), record.partition(), record.offset(), record.value());
}
kafkaConsumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if(exception != null) {
logger.error("offsets {} is not completed, error:{}", offsets, exception.getMessage());
}
}
});
}
}catch(WakeupException e) {
logger.error("wakeup exception has been called");
}catch(Exception e) {
logger.error(e.getMessage());
}finally {
logger.info("##### commit sync before closing");
kafkaConsumer.commitSync();
logger.info("finally consumer is closing");
kafkaConsumer.close();
}
}
onComplete에 실행을 할 callback을 넘겨주게 된다. 뭔가 실패한 경우에만 exception이 null이 아니게 된다.
Consumer에서 Topic의 특정 파티션만 할당하기
Consumer에게 여러 개의 파티션이 있는 Topic에서 특정 파티션만 할당 가능하다. 배치 처리시 특정 key레벨의 파티션을 특정 Consumer에 할다하여 처리할 경우 적용
KafkaConsumer의 assign 메소드에 TopicPartition 객체로 특정 파티션을 인자로 입력하여 할당
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);
TopicPartition topicPartition = new TopicPartition(topicName, 0);
// kafkaConsumer.subscribe(List.of(topicName));
kafkaConsumer.assign(Arrays.asList(topicPartition));
subscribe를 사용하지 않고 partition을 직접 assign해준다.
Consumer에서 topic 특정 partition offset 부터 읽어오기
특정 메시지가 누락되었을 경우 해당 메시지를 다시 읽어오게 된다. 보통 유지보수 차원에서 사용하게 된다.
TopicPartition 객체로 할당할 특정 파티션을 설정하고 seek() 메소드로 읽어올 offset 설정
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);
TopicPartition topicPartition = new TopicPartition(topicName, 0);
//kafkaConsumer.subscribe(List.of(topicName));
kafkaConsumer.assign(Arrays.asList(topicPartition));
kafkaConsumer.seek(topicPartition, 5L);
어떤 partition 인지 assign하고 , 해당 partition의 몇번째 topic을 읽어올것인지를 결정해준다.
주의할 점은 기존 group id 와 동일한 group id를 사용하면서 commit을 수행하면 __consumer_offsets을 재갱신할 수 있으므로 유의가 필요하다.
엥간하면 읽어오고 commit하지 말거나 아예 다른 group id를 사용하는것이 좋겠다.
또한 지정한 offset을 읽어오라했는데 해당 offset만큼 데이터가 없으면, 그냥 처음부터 읽어오게 된다.
'Cloud > kafka-core' 카테고리의 다른 글
Multi Node Kafka Cluster (0) | 2024.07.02 |
---|---|
Kafka File Producer App + File Consumer App (0) | 2024.06.30 |
Kafka Consumer - 02 (0) | 2024.06.04 |
Kafka Consumer - 01 (0) | 2024.05.16 |
Kafka Producer - 02 (0) | 2024.05.03 |