카프카 토픽에서 메시지를 읽어들여서 타겟 DB 로 데이터 입력/ 수정/ 삭제를 수행한다.
Connect의 Consumer가 주기적으로 카프카 토픽 메시지를 읽어서 타겟 DB로 데이터 연동한다.
RDBMS에서 데이터 추출은 JDBC Source Connector , CDC Sourec Connector등을 사용하지만 RDBMS로 데이터 입력은 주로 JDBC Sink Conenctor를 사용한다.
JDBC Sink Connector 생성
{
"name": "mysql_jdbc_sink_customers_00",
"config": {
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "mysql_jdbc_customers",
"connection.url": "jdbc:mysql://192.168.14.143:3316/om_sink",
"connection.user": "root",
"connection.password": "1234",
"insert.mode": "upsert",
"pk.mode": "record_key",
"pk.fields": "customer_id",
"delete.enabled": "true",
"table.name.format": "om_sink.customers_sink_base",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"auto.create": "true"
}
}
source에서 받아올 row들을 저장할 table을 (customers_sink_base)를 자동생성한다.
source에 저장된 data들이 잘 옮겨져있다. source가 kafka에게 보낸 topic schema를 참고해서 table 을 create한것이기 때문에 원본 table이라 할 수 잇는 customers table과 column type들이 다르다.
이것이 auto.create = true를 하지말고 직접 수동으로 생성해야하는 이유이다.
{
"name": "mysql_jdbc_sink_customers",
"config": {
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "mysql_jdbc_customers",
"connection.url": "jdbc:mysql://192.168.14.143:3316/om_sink",
"connection.user": "root",
"connection.password": "1234",
"insert.mode": "upsert",
"pk.mode": "record_key",
"pk.fields": "customer_id",
"delete.enabled": "true",
"table.name.format": "om_sink.customers_sink",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
}
}
pk 값이 하나일때 예시이다.
{
"name": "mysql_jdbc_sink_order_items",
"config": {
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "mysql_jdbc_order_items",
"connection.url": "jdbc:mysql://localhost:3306/om_sink",
"connection.user": "connect_dev",
"connection.password": "connect_dev",
"insert.mode": "upsert",
"pk.mode": "record_key",
"pk.fields": "order_id, line_item_id",
"delete.enabled": "true",
"table.name.format": "om_sink.order_items_sink",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
}
}
pk값이 여러개일(여기서는 2개, pk.fields) 떄 의 connector configuration 예시이다.
sink update
실제로 record의 column값이 update가 되면 source connector는 새로운 kafka event를 만들고 kafka에 record가 추가로 생성된다.
새로 insert될 record가 PK가 같으면 Sink connector는 update로 치환해서 적용된다.
[2024-07-22 14:39:26,664] INFO [mysql_jdbc_source_customers|task-0|offsets] WorkerSourceTask{id=mysql_jdbc_source_customers-0} Committing offsets for 2 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
source db의 customers 2개row를 update하니 잠시후 위로그가 나오면서 sink db customers가 update가 성공적으로 되었다.
sink delete
기존의 jdbc source connector는 select 를 통해서 source db의 정보를 가지고 옮으로 delete된 record를 가져올 방법이없다.
CDC는 delete를 감지하고 kafka에게 전달할때 value를 null로 전달하게 된다.
sink connector는 이미 존재하는 pk에 대한 record값이 null이 오면 delete sql 을 수행해서 데이터를 삭제시켜준다.
'Cloud > kafka-connect' 카테고리의 다른 글
Debezium MySQL CDC Source Connector - 3 (0) | 2024.07.29 |
---|---|
Debezium MySQL CDC Source Connector - 2 (0) | 2024.07.25 |
Debezium MySQL CDC Source Connector - 1 (0) | 2024.07.22 |
JDBC Source Connector (0) | 2024.07.16 |
Kafka Connect 개요 (0) | 2024.07.11 |