file에 텍스트가 추가될 때 마다 Producer 메시지를 전송하도록 구현한다.
public class FileEventSource implements Runnable {
public static final Logger logger = LoggerFactory.getLogger(FileEventSource.class.getName());
//주기적으로 file을 모니터할 기간. ms 단위.
private long updateInterval;
//모니터할 File 객체
private File file;
//file에 변경사항이 발생했을 때 Producer를 이용하여 메시지를 전송하는 EventHandler
private EventHandler eventHandler;
private boolean sync;
private long filePointer = 0;
public boolean keepRunning = true;
public FileEventSource(long updateInterval, File file, EventHandler eventHandler) {
this.updateInterval = updateInterval;
this.file = file;
this.eventHandler = eventHandler;
}
//무한 반복 수행하되, updateInterval 동안 sleep하면서 파일에 내용이 추가되었는지 모니터링 후 메시지 전송.
@Override
public void run() {
try {
//this.keepRunning은 Main Thread가 종료시 false로 변경 되도록 Main 프로그램 수정.
while(this.keepRunning) {
//생성자에 입력된 updateInterval ms 동안 sleep
Thread.sleep(this.updateInterval);
//file의 크기를 계산.
long len = this.file.length();
//만약 최종 filePointer가 현재 file의 length보다 작다면 file이 초기화 되었음.
if (len < this.filePointer) {
logger.info("log file was reset as filePointer is longer than file length");
//최종 filePointer를 file의 length로 설정.
filePointer = len;
// 만약 file의 length가 최종 filePointer보다 크다면 file이 append 되었음.
} else if (len > this.filePointer){
//최종 filePointer에서 맨 마지막 파일까지 한 라인씩 읽고 이를 Producer에서 메시지로 전송.
readAppendAndSend();
// 만약 file의 length와 filePointer가 같다면 file에 변화가 없음.
} else {
continue;
}
}
} catch(InterruptedException e) {
logger.error(e.getMessage());
} catch(ExecutionException e) {
logger.error(e.getMessage());
} catch(Exception e) {
logger.error(e.getMessage());
}
}
//최종 filePointer에서 맨 마지막 파일까지 한 라인씩 읽고 sendMessage()를 이용하여 메시지로 전송
public void readAppendAndSend() throws IOException, ExecutionException, InterruptedException {
RandomAccessFile raf = new RandomAccessFile(this.file, "r");
raf.seek(this.filePointer);
String line = null;
while((line = raf.readLine()) != null) {
sendMessage(line);
}
//file이 변경되었으므로 file의 filePointer를 현재 file의 마지막으로 재 설정함.
this.filePointer = raf.getFilePointer();
}
//한 라인을 parsing하여 key와 value로 MessageEvent를 만들고 이를 EventHandler를 이용하여 Producer 전송.
private void sendMessage(String line) throws ExecutionException, InterruptedException {
String[] tokens = line.split(",");
String key = tokens[0];
StringBuffer value = new StringBuffer();
for(int i=1; i<tokens.length; i++) {
if( i != (tokens.length -1)) {
value.append(tokens[i] + ",");
} else {
value.append(tokens[i]);
}
}
MessageEvent messageEvent = new MessageEvent(key, value.toString());
eventHandler.onMessage(messageEvent);
}
}
FileEventSource는 별도의 스레드로서 RandomAccessFile을 통해서 파일에 대한 offset , length를 관리하게 된다.
값들이 변경되면 변경된 만큼의 텍스트(message) 를 kafka producer를 가지고 있는 eventhandler에게 전달해준다.
public class FileAppendProducer {
public static final Logger logger = LoggerFactory.getLogger(FileAppendProducer.class.getName());
public static void main(String[] args) {
String topicName = "file-topic";
//KafkaProducer configuration setting
// null, "hello world"
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.14.147:9092");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//KafkaProducer object creation
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
boolean sync = false;
URL resource = FileProducer.class.getClassLoader().getResource("pizza_append.txt");
String filePath= resource.getFile();
File file = new File(filePath);
EventHandler eventHandler = new FileEventHandler(kafkaProducer, topicName, sync);
FileEventSource fileEventSource = new FileEventSource(100, file, eventHandler);
Thread fileEventSourceThread = new Thread(fileEventSource);
fileEventSourceThread.start();
try {
fileEventSourceThread.join();
}catch(InterruptedException e) {
logger.error(e.getMessage());
}finally {
kafkaProducer.close();
}
}
}
main 스레드는 EventSourceThread가 끝날때까지 종료되지 않는다.
Classloader가 load한 pizza_append.txt임으로 build/ 폴더에 존재하는 pizza_append.txt에 써진다. src/resource가 아니라
kafka consumer앱에서는producer가 만든 msg를 topic에서 poll로 읽어온후에 db에 저장하게 된다.
public class FileToDBConsumer<K extends Serializable, V extends Serializable> {
public static final Logger logger = LoggerFactory.getLogger(FileToDBConsumer.class.getName());
protected KafkaConsumer<K, V> kafkaConsumer;
protected List<String> topics;
private OrderDBHandler orderDBHandler;
public FileToDBConsumer(Properties consumerProps, List<String> topics,
OrderDBHandler orderDBHandler) {
this.kafkaConsumer = new KafkaConsumer<K, V>(consumerProps);
this.topics = topics;
this.orderDBHandler = orderDBHandler;
}
public void initConsumer() {
this.kafkaConsumer.subscribe(this.topics);
shutdownHookToRuntime(this.kafkaConsumer);
}
private void shutdownHookToRuntime(KafkaConsumer<K, V> kafkaConsumer) {
//main thread
Thread mainThread = Thread.currentThread();
//main thread 종료시 별도의 thread로 KafkaConsumer wakeup()메소드를 호출하게 함.
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
logger.info(" main program starts to exit by calling wakeup");
kafkaConsumer.wakeup();
try {
mainThread.join();
} catch(InterruptedException e) { e.printStackTrace();}
}
});
}
private void processRecord(ConsumerRecord<K, V> record) throws Exception {
OrderDTO orderDTO = makeOrderDTO(record);
orderDBHandler.insertOrder(orderDTO);
}
private OrderDTO makeOrderDTO(ConsumerRecord<K,V> record) throws Exception {
String messageValue = (String)record.value();
logger.info("###### messageValue:" + messageValue);
String[] tokens = messageValue.split(",");
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
OrderDTO orderDTO = new OrderDTO(tokens[0], tokens[1], tokens[2], tokens[3],
tokens[4], tokens[5], LocalDateTime.parse(tokens[6].trim(), formatter));
return orderDTO;
}
private void processRecords(ConsumerRecords<K, V> records) throws Exception{
List<OrderDTO> orders = makeOrders(records);
orderDBHandler.insertOrders(orders);
}
private List<OrderDTO> makeOrders(ConsumerRecords<K,V> records) throws Exception {
List<OrderDTO> orders = new ArrayList<>();
//records.forEach(record -> orders.add(makeOrderDTO(record)));
for(ConsumerRecord<K, V> record : records) {
OrderDTO orderDTO = makeOrderDTO(record);
orders.add(orderDTO);
}
return orders;
}
public void pollConsumes(long durationMillis, String commitMode) {
if (commitMode.equals("sync")) {
pollCommitSync(durationMillis);
} else {
pollCommitAsync(durationMillis);
}
}
private void pollCommitAsync(long durationMillis) {
try {
while (true) {
ConsumerRecords<K, V> consumerRecords = this.kafkaConsumer.poll(Duration.ofMillis(durationMillis));
logger.info("consumerRecords count:" + consumerRecords.count());
if(consumerRecords.count() > 0) {
try {
processRecords(consumerRecords);
} catch(Exception e) {
logger.error(e.getMessage());
}
}
// if(consumerRecords.count() > 0) {
// for (ConsumerRecord<K, V> consumerRecord : consumerRecords) {
// processRecord(consumerRecord);
// }
// }
//commitAsync의 OffsetCommitCallback을 lambda 형식으로 변경
this.kafkaConsumer.commitAsync((offsets, exception) -> {
if (exception != null) {
logger.error("offsets {} is not completed, error:{}", offsets, exception.getMessage());
}
});
}
}catch(WakeupException e) {
logger.error("wakeup exception has been called");
}catch(Exception e) {
logger.error(e.getMessage());
}finally {
logger.info("##### commit sync before closing");
kafkaConsumer.commitSync();
logger.info("finally consumer is closing");
close();
}
}
protected void pollCommitSync(long durationMillis) {
try {
while (true) {
ConsumerRecords<K, V> consumerRecords = this.kafkaConsumer.poll(Duration.ofMillis(durationMillis));
processRecords(consumerRecords);
try {
if (consumerRecords.count() > 0) {
this.kafkaConsumer.commitSync();
logger.info("commit sync has been called");
}
} catch (CommitFailedException e) {
logger.error(e.getMessage());
}
}
}catch(WakeupException e) {
logger.error("wakeup exception has been called");
}catch(Exception e) {
logger.error(e.getMessage());
}finally {
logger.info("##### commit sync before closing");
kafkaConsumer.commitSync();
logger.info("finally consumer is closing");
close();
}
}
protected void close() {
this.kafkaConsumer.close();
this.orderDBHandler.close();
}
public static void main(String[] args) {
String topicName = "file-topic";
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.14.147:9092");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "file-group");
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
String url = "jdbc:mysql://localhost:3316/kafka_consumer";
String user = "root";
String password = "1234";
OrderDBHandler orderDBHandler = new OrderDBHandler(url, user, password);
FileToDBConsumer<String, String> fileToDBConsumer = new
FileToDBConsumer<String, String>(props, List.of(topicName), orderDBHandler);
fileToDBConsumer.initConsumer();
String commitMode = "async";
fileToDBConsumer.pollConsumes(1000, commitMode);
}
}
'Cloud > kafka-core' 카테고리의 다른 글
Topic Segment (0) | 2024.07.09 |
---|---|
Multi Node Kafka Cluster (0) | 2024.07.02 |
Kafka Consumer - 03 (0) | 2024.06.18 |
Kafka Consumer - 02 (0) | 2024.06.04 |
Kafka Consumer - 01 (0) | 2024.05.16 |