Cloud/kafka-core

Kafka Producer - 02

Tony Lim 2024. 5. 3. 15:06
728x90

acks 값 에 따른 Producer의 전송 방식 차이 이해

  • Producer는 Leader broke가 메시지 A를 정상적으로 받았는지에 대한 Ack 메시지를 받지 않고
    다음 메시지인 메시지 B를 바로 전송
  • 메시지가 제대로 전송되었는지 브로커로 부터 확인을 받지 않기 때문에 메시지가 브로커에 기로되지 않더라도 재 전송하지 않음
  • 메시지 손실의 우려가 가장 크지만 가장 빠르게 전송할 수 있음
  • IOT 센서 데이터 같이 손실에 민감하지 않을 때사용함

  • Producer는 Leader broker 가 메시지 A를 정상적으로 받았는지에 대한 Ack 메시지를 받은 후 다음 메시지인 메시지B를 바로 전송. 만약 오류 메시지를 브로커로 부터 받으면 메시지 A 재전송
  • 메시지 A가 모든 Replicator에 완벽하게 복사되었는지의 여부는 확인하지 않고 B를 전송
  • 만약 Leader가 메시지를 복제 중에 다운될 경우 다음 Leader가 될 브로커에는 메시지가 없을 수 있기 때문에 메시지를 소실할 우려가 있음

  • Producer는 Leader broke가 메시지 A를 정상적으로 받은 뒤
    min.insync.replicas 개수 만큼의 Replicator에 복제를 수행한 뒤에 보내는 Ack 메시지를 받은 후
    다음 메시지인 메시지B를 바로 전송.
    만약 오류 메시지를 브로커로 부터 받으면 메시지 A를 재전송
  • 메시지 A가 모든 Replicator에 완벽하게 복사되었는지의 여부까지 확인 후에 메시지 B를 전송
  • 메시지 손실이 되지 않도록 모든 장애 상황을 감안한 전송모드 이지만
    Ack를 오래 기다려야하므로 상대적으로 전송속도가 느림
  • 이때 replica 2개가 다 고장나있으면  Message ack대신 NOT_ENOUGH_REPLICAS 라는 에러를 받게 된다.

Producer의 Sync와 Callback Async 에서의 acks와 retry

  • Callback 기반의 async에서도 동일하게 acks 설저엥 기반하여 retry가 수행된다.
  • retry에 따라 Producer의 원래 메시지 전송 순서와 Broker에 기록되는 메시지 전송 순서가 변경 될 수 있음
    • message A를 먼저보냈는데 실패하고 B를 나중에 보냈는데 성공하면 A를 retry에서 다시 보내기 때문에 B A 이 순서로 저장이 될 수 있다.
  • Sync 방식에서 acks=0일 경우 전송 후 ack/error 를 기다리지 않음

Record batch , Record Accumulator

KafkaProducer 객체의 send() 메소드는 호출 시 마다 하나의 ProducerRecord를 입력하지만 바로 전송되지 않고 내부 메모리에서 단일 메시지를 토픽 파티션에 따라 Record Batch 단위로 묶인 뒤 전송된다.
(토픽이 어떤 파티션으로 가야되는지는 Client단에서 이미 결정이 된다.)

메시지들은 Producer Client의 내부 메모리에 여러 개의 Batch들로 buffer.memory 설정 사이즈(Record Accumulator size) 만큼 보관될 수 있으며 여러개의 Batch들로 한꺼번에 전송될 수 있다.

  • Record Accumulator는 Partitioner에 의해서 메시지 배치가 전송이 될 토픽과 Partition에 따라 저장되는 Kafka Producer의 메모리 영역이다.
  • Sender Thread는 Record Accumulator에 누적된 메시지 배치를 꺼내서 브로커로 전송한다.
  • KafkaProducer의 Main Thread는 send() 메소드를 호출하고 Record Accumulator 에 데이터 저장하고 Sender Thread는 별개로 데이터를 브로커로 전송한다.

원래는 batch 다 안차도 sender thread batch size만큼 읽어서 가져가는데 liner.ms 를 통해서 좀 기다리게 만들 수 있다.

  • linger.ms 를 반드시 0 보다 크게 설정할 필요는 없다.
  • Producer와 Broker간의 전송이 매우 빠르고 Producer에서 메시지를 적절한 Record Accumulator에 누적된다면 linger.ms가 0이 되어도 무방하다.
  • 전반적인 Producer와 Broker간의 전송이 느리다면 linger.ms를 높여서 메시지가 배치로 적용될 수 있는 확률을 높이는 시도를 해볼만 하다.
  • linger.ms 는 보통 20ms이하로 설정을 권장한다.

기본적으로 KafkaProducer객체의 send메소드는 비동기이며 Batch기반으로 메시지를 전송한다.

이때 RecordMetaData recordMetadata = KafkaProducer.send().get()와 같은 방식으로 개별 메시지 별로 응답을 받을 때까지 block이 되는 방식으로는 메시지 배치 처리가 불가하다.

전송자체는 배치레벨이지만 막상 배치에는 메시지가 단 1개들어있게 된다. get에서 blocking당하고 있으니 실질적으로 batch에 메시지를 채워넣을 수 가 없기 때문이다.


Producer 재전송 매커니즘

Record Accumulator는 일종의 자료구조로 send를 호출해도 공간이 없으면 max.block.ms만큼 기다린다.
시간이 지나면 Timeout Exception을 던져버린다.

linger.ms 는 Sender Thread가 Record Accumulator에서 배치별로 가져가기 위한 최대 대기시간

request.timeout.ms는 sender thread가 응답을 기다리는 최대시간이다. 이 시간이 지나고 나면 retry.backoff.ms만큼 기다렸다가 다시 재전송을 시도하게 된다.

delivery.timeout.ms 까지 넘으면 retry도 안하고 Timeout Exception을 던지게된다. 안넘으면 계속 retry를 하게 된다.

 

retries 라는 값은 재전송 횟수를 결저하게 된느데 MAX_INT로 되어있어도 delivery.timeout.ms를 넘게되면 retry를 종료하게 된다.

 retries만 사용해서 횟수로 조절 할 수 있다.

At Least Once 는 Ack를 받을 때까지 Producer 객체 자체에서 계속 전송을 시도한다는 의미이지 무한정 계속 시도한다는 의미는 아니다.


Producer의 max.in.flight.request.per.connectio

브로커 서버의 응답없이 Producer의 sender thread가 한번에 보낼 수 있는 메시지 배치의 개수, 메시지 개수가 아니라 배치이다. 기본값은 5이다.

위 예시는 batch를 2개 보내고 Ack를 기다리는 에시이다.

B0가 B1이 머저 만들어진 배치이다. B0는 write하는 도중에 fail되었다. B1은 잘 저장이 된상태이다.

이때 Producer가 B0를 재전송하고 성공하면 순서가 다르게 저장이 될 수 있다.

enable.idempotence=true 를 하게 되면 max.in.flight.request.per.connection이 1보다 큰 경우에 메시지 순서를 해결해준다.

하지만 애초에 순서가 보장이 잘 되어야하는 시스템은 분산시스템 말고 다른 시스템을 사용하는 것이 좋다.


at most once , at least once, exactly once

at most once = 최대 한번 전송, retry 같은것을 하지 앟는다. acks = 0

at least once = 최소 한번 전송, retry를 하겠다는 의미이다. acks = 1, all

exactly once = 정확히 한번 전송, 

  • 중복없이 전송(Idempotence) = Producer의 message 전송 retry시 중복을 제거함 
  • Transaction 기반 전송: Consumer -> Process -> Producer(주로 Kafka Streams) 에 주로 사용되는 Transaction 기반 처리
  • tx기반 전송하려면  Idempotence이어야 한다.

Producer는 브로커로 부터 Ack를 받은 다음 메시지 전송하되, Producer ID와 메시지 Sequence를 Header에 저장하여 전송한다.

메시지 Sequence는 메시지의 고유한 값이고 0 부터 시작하여 순차적으로 증가한다.

broker에서 메시지 sequence가 중복될 경우 이를 메시지 로그에 기록하지 않고 ack만 전송한다.
만일 현재가지고 있는 message sequence보다 1만큼 큰 경우에는 브로커에 저장하게 된다.

Idempotence 는 기본적으로 적용이 되어있지만 성능이 감소하긴한다. 

B0 , 1, 2 순으로 생성된 배치가 전송될때 B1 write이 실패하여서 B2 를 쓰려고 시도할때 sequence 순서가 안맞기 때문에 OutofOrderSequenceException을 던지게 된다.

B1, B2를 다시 보내게 된다.


Producer Idempotence 설정시 유의 사항

  • kafka 3.0 부터는 Producer의 기본 설정이 Idempotence이다.
  • 하지만 기본 설정중에 enable.idempotence=true 를 제외하고 다른 파라미터들을 잘못 설정하면
    (예를들어서 acks=1로 설정, 원래는 all로 설정해줘야함) Producer는 정상적으로 메시지를 보내지만 idempotence로는 동작하지 않는다
  • 명시적으로 enable.idempotence=true 를 설정한 뒤 다른 파라미터들을 잘못 설정하면 Config 오류가 발생하면서 Producer가 기동되지 않음

Custom Partitioner

 

default 는 key값을 hashing에서 partition으로 골고루 분배되게한다.

Partitioner interface를 구현해서 만들면된다.

public class CustomPartitioner implements Partitioner {
    public static final Logger logger = LoggerFactory.getLogger(CustomPartitioner.class.getName());
    private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();
    private String specialKeyName;

    @Override
    public void configure(Map<String, ?> configs) {
        specialKeyName = configs.get("custom.specialKey").toString();
    }

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitionInfoList = cluster.partitionsForTopic(topic);
        int numPartitions = partitionInfoList.size();
        int numSpecialPartitions = (int)(numPartitions * 0.5);
        int partitionIndex = 0;

        if (keyBytes == null) {
            //return stickyPartitionCache.partition(topic, cluster);
            throw new InvalidRecordException("key should not be null");
        }

        if (((String)key).equals(specialKeyName)) {
            partitionIndex = Utils.toPositive(Utils.murmur2(valueBytes)) % numSpecialPartitions;
        }
        else {
            partitionIndex = Utils.toPositive(Utils.murmur2(keyBytes)) % (numPartitions - numSpecialPartitions) + numSpecialPartitions;
        }
        logger.info("key:{} is sent to partition:{}", key.toString(), partitionIndex);

        return partitionIndex;
    }

    @Override
    public void close() {
    }
}

 

728x90

'Cloud > kafka-core' 카테고리의 다른 글

Kafka Consumer - 03  (0) 2024.06.18
Kafka Consumer - 02  (0) 2024.06.04
Kafka Consumer - 01  (0) 2024.05.16
Kafka Producer - 01  (0) 2024.04.17
Kakfa Topic, Producer, Consumer  (0) 2024.04.11