Consumer 개요
- 브로커의 Topic 메시지를 읽는 역할을 수행한다.
- 모든 Consumer들은 고유한 그룹아이디 group.id를 가지는 Consumer Group에 소속되어야 한다.
- 개별 Consumer Group내에서 여러 개의 Consumer들은 토픽 파티션 별로 분배된다.
- Consumer는 subscribe()를 호출하여 읽어 들이려는 토픽을 등록한다.
- Consumer는 poll 메소드를 이용하여 주기적으로 브로커의 토픽 파티션에서 메시지를 가져온다.
- 메시지를 성공적으로 가져왔으면 commit을 통해서 _consumer_offsets 에 다음에 읽을 offset 위치를 기재한다.
- KafkaConsumer는 Fetcher, ConsumerClientNetwork등의 주요 내부 객체와 별도의 Heart Beat Thread를 생성한다
- Fetch , ConsumerClientNetwork 객체는 Broker의 토픽 파티션에서 메시지를 Fetch및 poll 수행한다
- Heart Beat Thread는 Consumer의 정상적인 활동을 Group Coordinator에 보고하는 역할을 수행(Group Coordinator는 주어진 시간동안 Heart Beat를 받지 못하면 Consumer들의 Rebalance를 수행하라고 명령함
batch 단위로 (위 예시에는 3개씩) 가져오게 되고 가져온 이후에 commit을 날려서 topic parition의 offset을 가져온 만큼 증가시키게 된다.
KafkaConsumer.poll(Duration.ofMillis(1000))
브로커나 Consumer 내부 Queue에 데이터가 잇다면 바로 데이터를 반환하고 없으면 1초 (Duration.ofMillis(1000)인 경우)동안 수행하고 데이터 fetch를 브로커에 수행하면서 대기하다가 결과를 반환한다.
ConsumerNetworkClient는 poll (1000) 이라 써져있으니 broker에게 1초 동안 계속 요청을 하고 있으면 Linked Queue에 데이터를 저장하고 Fetcher가 가져가게 된다.
fetcher 주요 설정
- fetch.min.bytes
- Fetcher가 record들을 읽어들이는 최소 bytes. 브로커는 지정된 fetch.min.bytes 이상의 새로운 메시지가 쌓일때 까지 전송을 하지 않음 , 기본은 1이다.
- fetch.max.wait.ms
- 브로커에 fetch.min.bytes 이상의 메시지가 쌓일 때까지 최대 대기 시간. 기본은 500ms이다.
- fetch.max.bytes
- fetcher가 한번에 가져올 수 있는 최대 데이터 bytes. 기본은50MB이다.
- max.partition.fetch.bytes
- fetcher가 파티션별 한번에 최대로 가져올 수 있는 bytes
- max.poll.records
- fetcher가 한번에 가져올 수 있는 레코드수. 기본은 500
poll (시간) 의 경우에는 broker의 topic partition에 보낼게 하나도 없는 경우에 기다리는 시간이고 위의 경우에는 보낼것이 있는 경우에 기다리는 시간이다.
Wakeup으로 Consumer 종료하기
public class ConsumerWakeupV2 {
public static final Logger logger = LoggerFactory.getLogger(ConsumerWakeupV2.class.getName());
public static void main(String[] args) {
String topicName = "pizza-topic";
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.14.147:9092");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group_02");
props.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);
kafkaConsumer.subscribe(List.of(topicName));
//main thread
Thread mainThread = Thread.currentThread();
//main thread 종료시 별도의 thread로 KafkaConsumer wakeup()메소드를 호출하게 함.
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
logger.info(" main program starts to exit by calling wakeup");
kafkaConsumer.wakeup();
try {
mainThread.join();
} catch(InterruptedException e) { e.printStackTrace();}
}
});
int loopCnt = 0;
try {
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
logger.info(" ######## loopCnt: {} consumerRecords count:{}", loopCnt++, consumerRecords.count());
for (ConsumerRecord record : consumerRecords) {
logger.info("record key:{}, partition:{}, record offset:{} record value:{}",
record.key(), record.partition(), record.offset(), record.value());
}
try {
logger.info("main thread is sleeping {} ms during while loop", loopCnt*10000);
Thread.sleep(loopCnt*10000);
}catch(InterruptedException e) {
e.printStackTrace();
}
}
}catch(WakeupException e) {
logger.error("wakeup exception has been called");
}finally {
logger.info("finally consumer is closing");
kafkaConsumer.close();
}
//kafkaConsumer.close();
}
}
shutdown hook에서 별도의 스레드가 WakeupException을 던지도록 wakeup을 호출하고 main thread에서 정상적으로 kafkaConsumer.close()를 호출하고 종료할 수 있도록 mainThread.join을 통해 기다려주고 있다.
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group-01-1, groupId=group-01] Request joining group due to: consumer pro-actively leaving the group
[main] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
[main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter
[main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter
[main] INFO org.apache.kafka.common.metrics.Metrics - Metrics reporters closed
Runtime 이 종료될때 wake up을 호출해서 consumer 쪽에서 close를 제대로 호출해서 pro-actively하게 종료가 되었다.
__consumer_offsets
./kafka-console-consumer --consumer.config ./consumer_temp.config --bootstrap-server localhost:9092 --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
현재 동작하고 있는 consumer 들의 offset들을 알려준다.
주의할 점은 Consumer Group으로 Consumer가 새로 접속할시에 __consumer_offsets에 있는 offset 정보를 기반으로 메시지를 가져오기 때문에 earliest로 설정했어도 0번 offset부터 읽지 않는다는 점이다.
__consumer_offsets에 값이 없을 경우에는 earliest가 먹힌다. 있으면 먹히지 않음 ,latest도 마찬가지이다.
이 값은 consumer group 별로 존재한다. consumer별로 존재하는게 아니다.
즉 consumer group이 이미 존재하고 consumer가 해당 group을 property에 작성하면 auto.offset.reset이 먹히지 않고 __consumer_offsets의 offset을 기준으로 읽어들이게 된다.
kafka console consumer의 경우에는 매번 스크립트를 실행할 때마다 group을 새로 만들기 때문에 매번 __consumer_offsets에 정보가 없어서 earliest , latest이런게 적용이 된 것이다.
'Cloud > kafka-core' 카테고리의 다른 글
Kafka Consumer - 03 (0) | 2024.06.18 |
---|---|
Kafka Consumer - 02 (0) | 2024.06.04 |
Kafka Producer - 02 (0) | 2024.05.03 |
Kafka Producer - 01 (0) | 2024.04.17 |
Kakfa Topic, Producer, Consumer (0) | 2024.04.11 |