public class SimpleProducer {
public static void main(String[] args) {
String topicName = "simple-topic";
//KafkaProducer configuration setting
// null, "hello world"
Properties props = new Properties();
//bootstrap.servers, key.serializer.class, value.serializer.class
//props.setProperty("bootstrap.servers", "192.168.56.101:9092");
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.14.147:9092");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//KafkaProducer object creation
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
//ProducerRecord object creation
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName,"hello world 2");
//KafkaProducer message send
kafkaProducer.send(producerRecord);
kafkaProducer.flush();
kafkaProducer.close();
}
}
vm에 접속이 안될수 도 있는데 이때 hostexception이라면 wsl2 /etc/hosts에 vm ip , username을 적어주면 된다.
현재 Key, value를 둘다 String으로 해서 StringSerializer 이지 다른 type으로 하면 다른 Serializer를 써줘야 한다.
Kafka Producer 전송은 Producer Client의 별도 Thread가 전송을 담당한다는 점에서 기본적으로 Thread간 Async 전송임
즉 Producer Client의 Main Thread가 send() 메소드를 호출하여 메시지 전송을 시작하지만 바로 전송되지 않으며 내부 Buffer에 메시지를 저장 후에 별도의 Thread가 Kafka Broker에 실제 전송을 하는 방식이다.
new KafkaProducer가 생성되는 시점에 kafka io thread를 생성하게 된다.
메인 스레드에서는 Seralizer, Partioner 관련된 로직만 담당하고 실제로 network io는 별도의 스레드가 담당하게 된다.
public class SimpleProducerASync {
public static final Logger logger = LoggerFactory.getLogger(SimpleProducerASync.class.getName());
public static void main(String[] args) {
String topicName = "simple-topic";
//KafkaProducer configuration setting
// null, "hello world"
Properties props = new Properties();
//bootstrap.servers, key.serializer.class, value.serializer.class
//props.setProperty("bootstrap.servers", "192.168.56.101:9092");
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//KafkaProducer object creation
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
//ProducerRecord object creation
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, "hello world 2");
//kafkaProducer message send
kafkaProducer.send(producerRecord, (metadata, exception)-> {
if (exception == null) {
logger.info("\n ###### record metadata received ##### \n" +
"partition:" + metadata.partition() + "\n" +
"offset:" + metadata.offset() + "\n" +
"timestamp:" + metadata.timestamp());
} else {
logger.error("exception error from broker " + exception.getMessage());
}
});
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
kafkaProducer.close();
}
}
send 할 때 성공적으로 record를 보냈을시 호출할 callback을 전달하게 된다.
callback 은 kafka io thread에서 호출을 하게 된다.
for(int seq=0; seq < 20; seq++) {
//ProducerRecord object creation
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, String.valueOf(seq),"hello world " + seq);
logger.info("seq:" + seq);
//kafkaProducer message send
kafkaProducer.send(producerRecord, (metadata, exception) -> {
if (exception == null) {
logger.info("\n ###### record metadata received ##### \n" +
"partition:" + metadata.partition() + "\n" +
"offset:" + metadata.offset() + "\n" +
"timestamp:" + metadata.timestamp());
} else {
logger.error("exception error from broker " + exception.getMessage());
}
});
}
record를 전달할 때 key값을 전달해주면 특정 key 값은 특정 partition으로 저장이 된다.
key는 현재 String으로 변환해서 넣어줘야한다.
현재 StringSerializer를 쓰고 있기 떄문이다.
public class ProducerASyncCustomCB {
public static final Logger logger = LoggerFactory.getLogger(ProducerASyncCustomCB.class.getName());
public static void main(String[] args) {
String topicName = "multipart-topic";
//KafkaProducer configuration setting
// null, "hello world"
Properties props = new Properties();
//bootstrap.servers, key.serializer.class, value.serializer.class
//props.setProperty("bootstrap.servers", "192.168.56.101:9092");
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.14.147:9092");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//KafkaProducer object creation
KafkaProducer<Integer, String> kafkaProducer = new KafkaProducer<Integer, String>(props);
for(int seq=0; seq < 20; seq++) {
//ProducerRecord object creation
ProducerRecord<Integer, String> producerRecord = new ProducerRecord<>(topicName, seq,"hello world " + seq);
CustomCallback callback = new CustomCallback(seq);
//logger.info("seq:" + seq);
//kafkaProducer message send
kafkaProducer.send(producerRecord, callback);
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
kafkaProducer.close();
}
}
IntegerSerializer를 사용해서 key값을 Int로 변경한 예시이다.
kcc multipart-topic --group group01 --property print.key=true --property print.value=true
\--key-deserializer "org.apache.kafka.common.serialization.IntegerDeserializer"
kafka console consumer에서도 Key값이 integer로 오면 IntegerDeserializer로 바꿔줘야한다.
이떄 [2024-04-29 11:12:44,878] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4
요런 에러가 뜨면 이전에 보낸 key값이 String인 record들이 아직 어떤 consumer도 읽지 않아서 현재 consumer에서 읽힐 때 key가 String이라 4byte가 아니라서 나는 에러임으로 StringDesrializer(default) 로 먼저 읽어와야한다.
'Cloud > kafka-core' 카테고리의 다른 글
Kafka Consumer - 03 (0) | 2024.06.18 |
---|---|
Kafka Consumer - 02 (0) | 2024.06.04 |
Kafka Consumer - 01 (0) | 2024.05.16 |
Kafka Producer - 02 (0) | 2024.05.03 |
Kakfa Topic, Producer, Consumer (0) | 2024.04.11 |