Cloud/kafka-connect

JDBC Sink Connector

Tony Lim 2024. 7. 20. 13:03
728x90

카프카 토픽에서 메시지를 읽어들여서 타겟 DB 로 데이터 입력/ 수정/ 삭제를 수행한다.

Connect의 Consumer가 주기적으로 카프카 토픽 메시지를 읽어서 타겟 DB로 데이터 연동한다.

RDBMS에서 데이터 추출은 JDBC Source Connector , CDC Sourec Connector등을 사용하지만 RDBMS로 데이터 입력은 주로 JDBC Sink Conenctor를 사용한다.


JDBC Sink Connector 생성

{
    "name": "mysql_jdbc_sink_customers_00",
    "config": {
        "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "mysql_jdbc_customers",
        "connection.url": "jdbc:mysql://192.168.14.143:3316/om_sink",
        "connection.user": "root",
        "connection.password": "1234",

        "insert.mode": "upsert",
        "pk.mode": "record_key",
        "pk.fields": "customer_id",
        "delete.enabled": "true",
        
        "table.name.format": "om_sink.customers_sink_base",
        
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "auto.create": "true"
    }
}

source에서 받아올 row들을 저장할 table을 (customers_sink_base)를 자동생성한다.

source에 저장된 data들이 잘 옮겨져있다.  source가 kafka에게 보낸 topic schema를 참고해서 table 을 create한것이기 때문에 원본 table이라 할 수 잇는 customers table과 column type들이 다르다.

이것이 auto.create = true를 하지말고 직접 수동으로 생성해야하는 이유이다.

{
    "name": "mysql_jdbc_sink_customers",
    "config": {
        "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "mysql_jdbc_customers",
        "connection.url": "jdbc:mysql://192.168.14.143:3316/om_sink",
        "connection.user": "root",
        "connection.password": "1234",
        
        "insert.mode": "upsert",
        "pk.mode": "record_key",
        "pk.fields": "customer_id",
        "delete.enabled": "true",

        "table.name.format": "om_sink.customers_sink",
        
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter"
    }
}

pk 값이 하나일때 예시이다.

{
    "name": "mysql_jdbc_sink_order_items",
    "config": {
        "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "mysql_jdbc_order_items",
        "connection.url": "jdbc:mysql://localhost:3306/om_sink",
        "connection.user": "connect_dev",
        "connection.password": "connect_dev",
        "insert.mode": "upsert",
        "pk.mode": "record_key",
        "pk.fields": "order_id, line_item_id",
        "delete.enabled": "true",
        "table.name.format": "om_sink.order_items_sink",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter"
    }
}

pk값이 여러개일(여기서는 2개, pk.fields) 떄 의 connector configuration 예시이다.


sink update

실제로 record의 column값이 update가 되면 source connector는 새로운 kafka event를 만들고 kafka에 record가 추가로 생성된다.

새로 insert될 record가 PK가 같으면 Sink connector는 update로 치환해서 적용된다.

[2024-07-22 14:39:26,664] INFO [mysql_jdbc_source_customers|task-0|offsets] WorkerSourceTask{id=mysql_jdbc_source_customers-0} Committing offsets for 2 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)

source db의 customers 2개row를 update하니 잠시후 위로그가 나오면서 sink db customers가 update가 성공적으로 되었다.


sink delete

 

기존의 jdbc source connector는 select 를 통해서 source db의 정보를 가지고 옮으로 delete된 record를 가져올 방법이없다.

CDC는 delete를 감지하고 kafka에게 전달할때 value를 null로 전달하게 된다.

sink connector는 이미 존재하는 pk에 대한 record값이 null이 오면 delete sql 을 수행해서 데이터를 삭제시켜준다.

 

 

 

 

728x90

'Cloud > kafka-connect' 카테고리의 다른 글

Debezium MySQL CDC Source Connector - 3  (0) 2024.07.29
Debezium MySQL CDC Source Connector - 2  (0) 2024.07.25
Debezium MySQL CDC Source Connector - 1  (0) 2024.07.22
JDBC Source Connector  (0) 2024.07.16
Kafka Connect 개요  (0) 2024.07.11