acks 값 에 따른 Producer의 전송 방식 차이 이해
- Producer는 Leader broke가 메시지 A를 정상적으로 받았는지에 대한 Ack 메시지를 받지 않고
다음 메시지인 메시지 B를 바로 전송 - 메시지가 제대로 전송되었는지 브로커로 부터 확인을 받지 않기 때문에 메시지가 브로커에 기로되지 않더라도 재 전송하지 않음
- 메시지 손실의 우려가 가장 크지만 가장 빠르게 전송할 수 있음
- IOT 센서 데이터 같이 손실에 민감하지 않을 때사용함
- Producer는 Leader broker 가 메시지 A를 정상적으로 받았는지에 대한 Ack 메시지를 받은 후 다음 메시지인 메시지B를 바로 전송. 만약 오류 메시지를 브로커로 부터 받으면 메시지 A 재전송
- 메시지 A가 모든 Replicator에 완벽하게 복사되었는지의 여부는 확인하지 않고 B를 전송
- 만약 Leader가 메시지를 복제 중에 다운될 경우 다음 Leader가 될 브로커에는 메시지가 없을 수 있기 때문에 메시지를 소실할 우려가 있음
- Producer는 Leader broke가 메시지 A를 정상적으로 받은 뒤
min.insync.replicas 개수 만큼의 Replicator에 복제를 수행한 뒤에 보내는 Ack 메시지를 받은 후
다음 메시지인 메시지B를 바로 전송.
만약 오류 메시지를 브로커로 부터 받으면 메시지 A를 재전송 - 메시지 A가 모든 Replicator에 완벽하게 복사되었는지의 여부까지 확인 후에 메시지 B를 전송
- 메시지 손실이 되지 않도록 모든 장애 상황을 감안한 전송모드 이지만
Ack를 오래 기다려야하므로 상대적으로 전송속도가 느림 - 이때 replica 2개가 다 고장나있으면 Message ack대신 NOT_ENOUGH_REPLICAS 라는 에러를 받게 된다.
Producer의 Sync와 Callback Async 에서의 acks와 retry
- Callback 기반의 async에서도 동일하게 acks 설저엥 기반하여 retry가 수행된다.
- retry에 따라 Producer의 원래 메시지 전송 순서와 Broker에 기록되는 메시지 전송 순서가 변경 될 수 있음
- message A를 먼저보냈는데 실패하고 B를 나중에 보냈는데 성공하면 A를 retry에서 다시 보내기 때문에 B A 이 순서로 저장이 될 수 있다.
- Sync 방식에서 acks=0일 경우 전송 후 ack/error 를 기다리지 않음
Record batch , Record Accumulator
KafkaProducer 객체의 send() 메소드는 호출 시 마다 하나의 ProducerRecord를 입력하지만 바로 전송되지 않고 내부 메모리에서 단일 메시지를 토픽 파티션에 따라 Record Batch 단위로 묶인 뒤 전송된다.
(토픽이 어떤 파티션으로 가야되는지는 Client단에서 이미 결정이 된다.)
메시지들은 Producer Client의 내부 메모리에 여러 개의 Batch들로 buffer.memory 설정 사이즈(Record Accumulator size) 만큼 보관될 수 있으며 여러개의 Batch들로 한꺼번에 전송될 수 있다.
- Record Accumulator는 Partitioner에 의해서 메시지 배치가 전송이 될 토픽과 Partition에 따라 저장되는 Kafka Producer의 메모리 영역이다.
- Sender Thread는 Record Accumulator에 누적된 메시지 배치를 꺼내서 브로커로 전송한다.
- KafkaProducer의 Main Thread는 send() 메소드를 호출하고 Record Accumulator 에 데이터 저장하고 Sender Thread는 별개로 데이터를 브로커로 전송한다.
원래는 batch 다 안차도 sender thread batch size만큼 읽어서 가져가는데 liner.ms 를 통해서 좀 기다리게 만들 수 있다.
- linger.ms 를 반드시 0 보다 크게 설정할 필요는 없다.
- Producer와 Broker간의 전송이 매우 빠르고 Producer에서 메시지를 적절한 Record Accumulator에 누적된다면 linger.ms가 0이 되어도 무방하다.
- 전반적인 Producer와 Broker간의 전송이 느리다면 linger.ms를 높여서 메시지가 배치로 적용될 수 있는 확률을 높이는 시도를 해볼만 하다.
- linger.ms 는 보통 20ms이하로 설정을 권장한다.
기본적으로 KafkaProducer객체의 send메소드는 비동기이며 Batch기반으로 메시지를 전송한다.
이때 RecordMetaData recordMetadata = KafkaProducer.send().get()와 같은 방식으로 개별 메시지 별로 응답을 받을 때까지 block이 되는 방식으로는 메시지 배치 처리가 불가하다.
전송자체는 배치레벨이지만 막상 배치에는 메시지가 단 1개들어있게 된다. get에서 blocking당하고 있으니 실질적으로 batch에 메시지를 채워넣을 수 가 없기 때문이다.
Producer 재전송 매커니즘
Record Accumulator는 일종의 자료구조로 send를 호출해도 공간이 없으면 max.block.ms만큼 기다린다.
시간이 지나면 Timeout Exception을 던져버린다.
linger.ms 는 Sender Thread가 Record Accumulator에서 배치별로 가져가기 위한 최대 대기시간
request.timeout.ms는 sender thread가 응답을 기다리는 최대시간이다. 이 시간이 지나고 나면 retry.backoff.ms만큼 기다렸다가 다시 재전송을 시도하게 된다.
delivery.timeout.ms 까지 넘으면 retry도 안하고 Timeout Exception을 던지게된다. 안넘으면 계속 retry를 하게 된다.
retries 라는 값은 재전송 횟수를 결저하게 된느데 MAX_INT로 되어있어도 delivery.timeout.ms를 넘게되면 retry를 종료하게 된다.
retries만 사용해서 횟수로 조절 할 수 있다.
At Least Once 는 Ack를 받을 때까지 Producer 객체 자체에서 계속 전송을 시도한다는 의미이지 무한정 계속 시도한다는 의미는 아니다.
Producer의 max.in.flight.request.per.connection
브로커 서버의 응답없이 Producer의 sender thread가 한번에 보낼 수 있는 메시지 배치의 개수, 메시지 개수가 아니라 배치이다. 기본값은 5이다.
위 예시는 batch를 2개 보내고 Ack를 기다리는 에시이다.
B0가 B1이 머저 만들어진 배치이다. B0는 write하는 도중에 fail되었다. B1은 잘 저장이 된상태이다.
이때 Producer가 B0를 재전송하고 성공하면 순서가 다르게 저장이 될 수 있다.
enable.idempotence=true 를 하게 되면 max.in.flight.request.per.connection이 1보다 큰 경우에 메시지 순서를 해결해준다.
하지만 애초에 순서가 보장이 잘 되어야하는 시스템은 분산시스템 말고 다른 시스템을 사용하는 것이 좋다.
at most once , at least once, exactly once
at most once = 최대 한번 전송, retry 같은것을 하지 앟는다. acks = 0
at least once = 최소 한번 전송, retry를 하겠다는 의미이다. acks = 1, all
exactly once = 정확히 한번 전송,
- 중복없이 전송(Idempotence) = Producer의 message 전송 retry시 중복을 제거함
- Transaction 기반 전송: Consumer -> Process -> Producer(주로 Kafka Streams) 에 주로 사용되는 Transaction 기반 처리
- tx기반 전송하려면 Idempotence이어야 한다.
Producer는 브로커로 부터 Ack를 받은 다음 메시지 전송하되, Producer ID와 메시지 Sequence를 Header에 저장하여 전송한다.
메시지 Sequence는 메시지의 고유한 값이고 0 부터 시작하여 순차적으로 증가한다.
broker에서 메시지 sequence가 중복될 경우 이를 메시지 로그에 기록하지 않고 ack만 전송한다.
만일 현재가지고 있는 message sequence보다 1만큼 큰 경우에는 브로커에 저장하게 된다.
Idempotence 는 기본적으로 적용이 되어있지만 성능이 감소하긴한다.
B0 , 1, 2 순으로 생성된 배치가 전송될때 B1 write이 실패하여서 B2 를 쓰려고 시도할때 sequence 순서가 안맞기 때문에 OutofOrderSequenceException을 던지게 된다.
B1, B2를 다시 보내게 된다.
Producer Idempotence 설정시 유의 사항
- kafka 3.0 부터는 Producer의 기본 설정이 Idempotence이다.
- 하지만 기본 설정중에 enable.idempotence=true 를 제외하고 다른 파라미터들을 잘못 설정하면
(예를들어서 acks=1로 설정, 원래는 all로 설정해줘야함) Producer는 정상적으로 메시지를 보내지만 idempotence로는 동작하지 않는다 - 명시적으로 enable.idempotence=true 를 설정한 뒤 다른 파라미터들을 잘못 설정하면 Config 오류가 발생하면서 Producer가 기동되지 않음
Custom Partitioner
default 는 key값을 hashing에서 partition으로 골고루 분배되게한다.
Partitioner interface를 구현해서 만들면된다.
public class CustomPartitioner implements Partitioner {
public static final Logger logger = LoggerFactory.getLogger(CustomPartitioner.class.getName());
private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();
private String specialKeyName;
@Override
public void configure(Map<String, ?> configs) {
specialKeyName = configs.get("custom.specialKey").toString();
}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitionInfoList = cluster.partitionsForTopic(topic);
int numPartitions = partitionInfoList.size();
int numSpecialPartitions = (int)(numPartitions * 0.5);
int partitionIndex = 0;
if (keyBytes == null) {
//return stickyPartitionCache.partition(topic, cluster);
throw new InvalidRecordException("key should not be null");
}
if (((String)key).equals(specialKeyName)) {
partitionIndex = Utils.toPositive(Utils.murmur2(valueBytes)) % numSpecialPartitions;
}
else {
partitionIndex = Utils.toPositive(Utils.murmur2(keyBytes)) % (numPartitions - numSpecialPartitions) + numSpecialPartitions;
}
logger.info("key:{} is sent to partition:{}", key.toString(), partitionIndex);
return partitionIndex;
}
@Override
public void close() {
}
}
'Cloud > kafka-core' 카테고리의 다른 글
Kafka Consumer - 03 (0) | 2024.06.18 |
---|---|
Kafka Consumer - 02 (0) | 2024.06.04 |
Kafka Consumer - 01 (0) | 2024.05.16 |
Kafka Producer - 01 (0) | 2024.04.17 |
Kakfa Topic, Producer, Consumer (0) | 2024.04.11 |