Cloud/kafka-core

Kafka File Producer App + File Consumer App

Tony Lim 2024. 6. 30. 16:46

file에 텍스트가 추가될  때 마다 Producer 메시지를 전송하도록 구현한다.

public class FileEventSource implements Runnable {
    public static final Logger logger = LoggerFactory.getLogger(FileEventSource.class.getName());

    //주기적으로 file을 모니터할 기간. ms 단위.
    private long updateInterval;

    //모니터할 File 객체
    private File file;

    //file에 변경사항이 발생했을 때 Producer를 이용하여 메시지를 전송하는 EventHandler
    private EventHandler eventHandler;
    private boolean sync;

    private long filePointer = 0;
    public boolean keepRunning = true;

    public FileEventSource(long updateInterval, File file, EventHandler eventHandler) {
        this.updateInterval = updateInterval;
        this.file = file;
        this.eventHandler = eventHandler;
    }

    //무한 반복 수행하되, updateInterval 동안 sleep하면서 파일에 내용이 추가되었는지 모니터링 후 메시지 전송.
    @Override
    public void run() {
        try {
            //this.keepRunning은 Main Thread가 종료시 false로 변경 되도록 Main 프로그램 수정.
            while(this.keepRunning) {
                //생성자에 입력된 updateInterval ms 동안 sleep
                Thread.sleep(this.updateInterval);
                //file의 크기를 계산.
                long len = this.file.length();
                //만약 최종 filePointer가 현재 file의 length보다 작다면 file이 초기화 되었음.
                if (len < this.filePointer) {
                    logger.info("log file was reset as filePointer is longer than file length");
                    //최종 filePointer를 file의 length로 설정.
                    filePointer = len;
                    // 만약 file의 length가 최종 filePointer보다 크다면 file이 append 되었음.
                } else if (len > this.filePointer){
                    //최종 filePointer에서 맨 마지막 파일까지 한 라인씩 읽고 이를 Producer에서 메시지로 전송.
                    readAppendAndSend();
                    // 만약 file의 length와 filePointer가 같다면 file에 변화가 없음.
                } else {
                    continue;
                }
            }
        } catch(InterruptedException e) {
            logger.error(e.getMessage());
        } catch(ExecutionException e) {
            logger.error(e.getMessage());
        } catch(Exception e) {
            logger.error(e.getMessage());
        }

    }

    //최종 filePointer에서 맨 마지막 파일까지 한 라인씩 읽고 sendMessage()를 이용하여 메시지로 전송
    public void readAppendAndSend() throws IOException, ExecutionException, InterruptedException {
        RandomAccessFile raf = new RandomAccessFile(this.file, "r");
        raf.seek(this.filePointer);
        String line = null;

        while((line = raf.readLine()) != null) {

            sendMessage(line);
        }
        //file이 변경되었으므로  file의 filePointer를 현재 file의 마지막으로 재 설정함.
        this.filePointer = raf.getFilePointer();
    }

    //한 라인을 parsing하여 key와 value로 MessageEvent를 만들고 이를 EventHandler를 이용하여 Producer 전송.
    private void sendMessage(String line) throws ExecutionException, InterruptedException {
        String[] tokens = line.split(",");
        String key = tokens[0];
        StringBuffer value = new StringBuffer();

        for(int i=1; i<tokens.length; i++) {
            if( i != (tokens.length -1)) {
                value.append(tokens[i] + ",");
            } else {
                value.append(tokens[i]);
            }
        }
        MessageEvent messageEvent = new MessageEvent(key, value.toString());
        eventHandler.onMessage(messageEvent);
    }

}

FileEventSource는 별도의 스레드로서 RandomAccessFile을 통해서 파일에 대한 offset , length를 관리하게 된다.

값들이 변경되면 변경된 만큼의 텍스트(message) 를 kafka producer를 가지고 있는 eventhandler에게 전달해준다.

public class FileAppendProducer {
    public static final Logger logger = LoggerFactory.getLogger(FileAppendProducer.class.getName());
    public static void main(String[] args) {
        String topicName = "file-topic";

        //KafkaProducer configuration setting
        // null, "hello world"

        Properties props  = new Properties();
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.14.147:9092");
        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //KafkaProducer object creation
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
        boolean sync = false;

        URL resource = FileProducer.class.getClassLoader().getResource("pizza_append.txt");
        String filePath= resource.getFile();
        File file = new File(filePath);
        EventHandler eventHandler = new FileEventHandler(kafkaProducer, topicName, sync);
        FileEventSource fileEventSource = new FileEventSource(100, file, eventHandler);
        Thread fileEventSourceThread = new Thread(fileEventSource);
        fileEventSourceThread.start();

        try {
            fileEventSourceThread.join();
        }catch(InterruptedException e) {
            logger.error(e.getMessage());
        }finally {
            kafkaProducer.close();
        }

    }
}

main 스레드는 EventSourceThread가 끝날때까지 종료되지 않는다.

Classloader가 load한 pizza_append.txt임으로 build/ 폴더에 존재하는 pizza_append.txt에 써진다. src/resource가 아니라

 


kafka consumer앱에서는producer가 만든 msg를 topic에서 poll로 읽어온후에 db에 저장하게 된다.

public class FileToDBConsumer<K extends Serializable, V extends Serializable> {
    public static final Logger logger = LoggerFactory.getLogger(FileToDBConsumer.class.getName());
    protected KafkaConsumer<K, V> kafkaConsumer;
    protected List<String> topics;

    private OrderDBHandler orderDBHandler;
    public FileToDBConsumer(Properties consumerProps, List<String> topics,
                            OrderDBHandler orderDBHandler) {
        this.kafkaConsumer = new KafkaConsumer<K, V>(consumerProps);
        this.topics = topics;
        this.orderDBHandler = orderDBHandler;
    }
    public void initConsumer() {
        this.kafkaConsumer.subscribe(this.topics);
        shutdownHookToRuntime(this.kafkaConsumer);
    }

    private void shutdownHookToRuntime(KafkaConsumer<K, V> kafkaConsumer) {
        //main thread
        Thread mainThread = Thread.currentThread();

        //main thread 종료시 별도의 thread로 KafkaConsumer wakeup()메소드를 호출하게 함.
        Runtime.getRuntime().addShutdownHook(new Thread() {
            public void run() {
                logger.info(" main program starts to exit by calling wakeup");
                kafkaConsumer.wakeup();

                try {
                    mainThread.join();
                } catch(InterruptedException e) { e.printStackTrace();}
            }
        });

    }

    private void processRecord(ConsumerRecord<K, V> record) throws Exception {
        OrderDTO orderDTO = makeOrderDTO(record);
        orderDBHandler.insertOrder(orderDTO);
    }

    private OrderDTO makeOrderDTO(ConsumerRecord<K,V> record) throws Exception {
        String messageValue = (String)record.value();
        logger.info("###### messageValue:" + messageValue);
        String[] tokens = messageValue.split(",");
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        OrderDTO orderDTO = new OrderDTO(tokens[0], tokens[1], tokens[2], tokens[3],
                tokens[4], tokens[5], LocalDateTime.parse(tokens[6].trim(), formatter));

        return orderDTO;
    }


    private void processRecords(ConsumerRecords<K, V> records) throws Exception{
        List<OrderDTO> orders = makeOrders(records);
        orderDBHandler.insertOrders(orders);
    }

    private List<OrderDTO> makeOrders(ConsumerRecords<K,V> records) throws Exception {
        List<OrderDTO> orders = new ArrayList<>();
        //records.forEach(record -> orders.add(makeOrderDTO(record)));
        for(ConsumerRecord<K, V> record : records) {
            OrderDTO orderDTO = makeOrderDTO(record);
            orders.add(orderDTO);
        }
        return orders;
    }


    public void pollConsumes(long durationMillis, String commitMode) {
        if (commitMode.equals("sync")) {
            pollCommitSync(durationMillis);
        } else {
            pollCommitAsync(durationMillis);
        }
    }
    private void pollCommitAsync(long durationMillis) {
        try {
            while (true) {
                ConsumerRecords<K, V> consumerRecords = this.kafkaConsumer.poll(Duration.ofMillis(durationMillis));
                logger.info("consumerRecords count:" + consumerRecords.count());
                if(consumerRecords.count() > 0) {
                    try {
                        processRecords(consumerRecords);
                    } catch(Exception e) {
                        logger.error(e.getMessage());
                    }
                }
//                if(consumerRecords.count() > 0) {
//                    for (ConsumerRecord<K, V> consumerRecord : consumerRecords) {
//                        processRecord(consumerRecord);
//                    }
//                }
                //commitAsync의 OffsetCommitCallback을 lambda 형식으로 변경
                this.kafkaConsumer.commitAsync((offsets, exception) -> {
                    if (exception != null) {
                        logger.error("offsets {} is not completed, error:{}", offsets, exception.getMessage());
                    }
                });
            }
        }catch(WakeupException e) {
            logger.error("wakeup exception has been called");
        }catch(Exception e) {
            logger.error(e.getMessage());
        }finally {
            logger.info("##### commit sync before closing");
            kafkaConsumer.commitSync();
            logger.info("finally consumer is closing");
            close();
        }
    }

    protected void pollCommitSync(long durationMillis) {
        try {
            while (true) {
                ConsumerRecords<K, V> consumerRecords = this.kafkaConsumer.poll(Duration.ofMillis(durationMillis));
                processRecords(consumerRecords);
                try {
                    if (consumerRecords.count() > 0) {
                        this.kafkaConsumer.commitSync();
                        logger.info("commit sync has been called");
                    }
                } catch (CommitFailedException e) {
                    logger.error(e.getMessage());
                }
            }
        }catch(WakeupException e) {
            logger.error("wakeup exception has been called");
        }catch(Exception e) {
            logger.error(e.getMessage());
        }finally {
            logger.info("##### commit sync before closing");
            kafkaConsumer.commitSync();
            logger.info("finally consumer is closing");
            close();
        }
    }
    protected void close() {
        this.kafkaConsumer.close();
        this.orderDBHandler.close();
    }

    public static void main(String[] args) {
        String topicName = "file-topic";

        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.14.147:9092");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "file-group");
        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        String url = "jdbc:mysql://localhost:3316/kafka_consumer";
        String user = "root";
        String password = "1234";
        OrderDBHandler orderDBHandler = new OrderDBHandler(url, user, password);

        FileToDBConsumer<String, String> fileToDBConsumer = new
                FileToDBConsumer<String, String>(props, List.of(topicName), orderDBHandler);
        fileToDBConsumer.initConsumer();
        String commitMode = "async";

        fileToDBConsumer.pollConsumes(1000, commitMode);

    }

}

 

 

 

 

'Cloud > kafka-core' 카테고리의 다른 글

Topic Segment  (0) 2024.07.09
Multi Node Kafka Cluster  (0) 2024.07.02
Kafka Consumer - 03  (0) 2024.06.18
Kafka Consumer - 02  (0) 2024.06.04
Kafka Consumer - 01  (0) 2024.05.16