Group Coordinator + Consumer Rebalance
Consumer는 rebalancing이 진행이되면 consumer는 소비하는것을 멈추고 rebalancing에만 집중한다.
이때 producer는 계속 생산이 되므로 , rebalancing이 끝나면 안 읽었던것들을 다 읽어야한다. 이 과정에서 오래걸리나 문제가 발생하는 경우가 생긴다.
Group Coordinator에서 Consumer가 새로 추가되거나 종료될때 , 새로운 partition이 추가될 때 , consumer heartbeat에서 응답이 오지 않을 경우 , rebalancing을 수행하라고 Consumer group의 leader consumer에게 명령을 내린다.
consumer가 group에 추가및 삭제 될 때마다 state가 변경이된다.
새로운 consumer id를 GroupCoordinator가 할당을 하게 된다.
group 01 state가 preparing 에서 stabilized 로 변경되는것을 확인할 수 잇다.
현재 자기자신이 leader인것을 확인 할 수 있다. (id가 같음)
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group_01-1, groupId=group_01] Discovered group coordinator tony-ubuntu.example.com:9092 (id: 2147483647 rack: null)
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group_01-1, groupId=group_01] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group_01-1, groupId=group_01] Request joining group due to: need to re-join with the given member-id: consumer-group_01-1-7a1b2bf6-971e-4590-ad7e-f84b135ab15f
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group_01-1, groupId=group_01] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group_01-1, groupId=group_01] Successfully joined group with generation Generation{generationId=8, memberId='consumer-group_01-1-7a1b2bf6-971e-4590-ad7e-f84b135ab15f', protocol='range'}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group_01-1, groupId=group_01] Finished assignment for group at generation 8: {consumer-group_01-1-7a1b2bf6-971e-4590-ad7e-f84b135ab15f=Assignment(partitions=[pizza-topic-0, pizza-topic-1, pizza-topic-2])}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group_01-1, groupId=group_01] Successfully synced group in generation Generation{generationId=8, memberId='consumer-group_01-1-7a1b2bf6-971e-4590-ad7e-f84b135ab15f', protocol='range'}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group_01-1, groupId=group_01] Notifying assignor about the new Assignment(partitions=[pizza-topic-0, pizza-topic-1, pizza-topic-2])
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker - [Consumer clientId=consumer-group_01-1, groupId=group_01] Adding newly assigned partitions: pizza-topic-0, pizza-topic-1, pizza-topic-2
처음에 rejoin이 뜨는 이유는 consumer입장에서 kafka로 하여금 필요한 정보를 받아오는 과정이다.
(partition은 몇개인지 자신의 id는 뭔지 등등)
Consumer Static Group Membership
많은 Consumer를 가지는 Consumer gorup에서 rebalance가 발생하면 모든 Consuemr들이 Rebalance 를 수행하므로 많은 시간이 소모되고 대량 데이터 처리시 Lag가 더 길어질 수 있음
유지보수 차원의 Consumer Restart도 Rebalance 를 초래하므로 불필요한 Rebalance를 발생시키지 않을 방법이 필요하다.
consumer group내의 consumer들에게 고정된 id를 부여
consumer 별로 consumer group 최초 조인 시 할당된 파티션을 그대로 유지하고 Consumer가 shutdown되어도 session.timeout.ms 내에 재 기동되면 rebalance가 수행되지 않고 , 기존 파티션이 재 할당됨.
Heartbeat 스레드
Heart Beat thread를 통해서 브로커의 group coordinator에게 consumer의 상태를 전송한다.
heart beat, poll 관련 주요 Consumer parameters
- heartbeat.interval.ms default= 3000ms
- heart beat thread가 heart beat를 보내는 간격
- session.timeout.ms 보다 낮게 설정이 되어야함. 보통 1/3보다 낮게 설정함
- session.timeout.ms default=45000ms
- 브로커가 Consumer로 heart beat 을 기다리는 최대 시간. 브로커는 이 시간동안 heart beat를 consumer로 부터 받지 못하면 해당 consumer를 group에서 제외하도록 rebalancing 명령을 지시함
- max.poll.interval.ms default=300000ms
- 이전 poll 호출 후 다음 호출 poll 까지 브로커가 대기하는 시간. 해당 시간 동안에 poll 호출이 consumer로 부터 이뤄지지 않으면 해당 consumer는 문제가 있는것으로 판단하고 브로커는 rebalance 명령을 보냄
- 즉 poll로 데이터를 가지고 오고 가공하고 다음 poll까지의 시간이 너무 길면 안된다.
- poll timeout이 나와서 broker가 group에서 remove 되고 rebalance가 일어난후에 다시 poll을 하면 다시 rejoin이 일어나게 된다.
첫번쨰 poll의 경우에는 record가 partition에 있어도 가져오지 않는다. 로그를 보면 re-join등 consumer group에 참여같은 것들을 하게 된다.
consumer 여러개의 topic 읽기
public class ConsumerMTopicRebalance {
public static final Logger logger = LoggerFactory.getLogger(ConsumerMTopicRebalance.class.getName());
public static void main(String[] args) {
//String topicName = "pizza-topic";
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.14.147:9092");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-assign");
// props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-01-static");
// props.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "3");
props.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getName());
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);
kafkaConsumer.subscribe(List.of("topic-p3-t1", "topic-p3-t2"));
//main thread
Thread mainThread = Thread.currentThread();
//main thread 종료시 별도의 thread로 KafkaConsumer wakeup()메소드를 호출하게 함.
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
logger.info(" main program starts to exit by calling wakeup");
kafkaConsumer.wakeup();
try {
mainThread.join();
} catch(InterruptedException e) { e.printStackTrace();}
}
});
try {
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord record : consumerRecords) {
logger.info("topic:{}, record key:{}, partition:{}, record offset:{} record value:{}",
record.topic(), record.key(), record.partition(), record.offset(), record.value());
}
}
}catch(WakeupException e) {
logger.error("wakeup exception has been called");
}finally {
logger.info("finally consumer is closing");
kafkaConsumer.close();
}
//kafkaConsumer.close();
}
}
subscribe에서 여러 topic들을 list로 넘겨준다.
Consumer Rebalance (Eager, Cooperative)
Eager (기본모드)
rebalance 수행시 기존 consumer들의 모든 파티션 할당을 취소하고 잠시 메시지를 읽지 않음. 이후 새롭게 Consumer에 파티션을 다시 할당 받고 다시 메시지를 읽음
모든 Consumer가 잠시 메시지를 읽지 않는 시간으로 인해 Lag가 상대저으로 크게 발생할 가능성이 있다.
파티션 할당 전략 (partition.assignment.strategy) 중 Range , Round Robin , Sticky 방식이 여기에 해당
(Incremental) Cooperative
Rebalance 수행시 기존 Consumer들의 모든 파티션 할당을 취소하지 않고 대상이 되는 Consumer들에 대해서 파티션에 따라 점진적으로 Consumer를 할당하면서 Rebalance를 수행한다.
전체 Consumer 가 메시지 읽기를 중지 하지 않으며 개별 Consumer가 협력적으로 영향을 받는 파티션만 Rebalance로 재 분배한다. 많은 Consumer를 가지는 Consumer Group 내에서 Rebalance 시간이 오래 걸릴시 활용도 높음
파티션 할당 전략
1) Consumer의 부하를 파티션 별로 균등하게 할당
2) 데이터 처리 및 리밸런싱의 효율성 극대화
- Range 할당 전략
- 서로 다른 2개 이상의 토픽을 Consumer들이 Subscription 할 시 토픽별 동일한 파티션을 특정 Consumer에게 할당하는 전략
- 여러 토픽들에서 동일한 키값으로 되어 있는 파티션은 특정 Consumer에 할당하여 해당 Consumer가 여러 토픽의 동일 키값으로 데이터 처리를 용이하게 할 수 있도록 지원
- Round Robind 할당 전략
- 파티션 별로 Consumer들이 균등하게 부하를 분배할 수 있도록 여러 토픽들의 파티션들을 Consumer들에게 순차적인 Round Robin 방식으로 할당
- Sticky 할당 전략
- 최초에 할당된 파티션과 Consumer 매핑을 Rebalance 수행되어도 가급적 그대로 유지 할 수 있도록 지원하는 전략
- 하지만 Eager protocol 기반이므로 Rebalance 시 모든 Consumer의 파티션 매핑이 해제 된 후에 다시 매핑되는 형태임
- Cooperative Sticky 할당 전략
- 최초에 할당된 파티션과 Consumer 매핑을 Rebalance가 수행되어도 가급적 그대로 유지 할 수 있도록 지원함과 동시에 Cooperative Protocol 기반으로 Rebalance 시 모든 Consumer의 파티션 매핑이 해제 되지 않고 Rebalance 연관된 파티션과 Consumer만 재 매핑됨
A (order, order_id) , B (order_item, order_id) 이런식으로 topic의 key값이 구성이 되어있을 때 , Range가 각 consumer가 알맞은 데이터를 처리하기 용이하다
'Cloud > kafka-core' 카테고리의 다른 글
Kafka File Producer App + File Consumer App (0) | 2024.06.30 |
---|---|
Kafka Consumer - 03 (0) | 2024.06.18 |
Kafka Consumer - 01 (0) | 2024.05.16 |
Kafka Producer - 02 (0) | 2024.05.03 |
Kafka Producer - 01 (0) | 2024.04.17 |