JDBC Source Connector는 RDBMS별 JDBC Driver가 별도로 설치되어야 한다. JDBC Source Connector는 JDBC Drvier를 이용하여 DB 접속 및 데이터 추출을 수행 후 Producer를 이용하여 Kafka Broker로 데이터를 전송한다.
JDBC Source Connector는 Source 시스템에 주기적으로 Query를 날려서 변경된 데이터를 추출하는 방식이다.
Consumer는 아니지만 Consumer처럼 주기적으로 변경된 데이터를 추출하고 변경사항을 kafka에게 전송한다.
JDBC Source Connector 모드 유형
- Incremental query 모드: 이전 데이터 추출 이후 새롭게 생성된 데이터나 업데이트된 데이터를 Kafka로 전송한다.
모드 별로 대상 컬럼을 지정해줘야 한다.- incrementing 모드 = insert만 가능하다. Auto inrcement PK 컬럼 필요
- timestamp 모드 = Insert/update만 가능
- timestamp + incrementing = insert/update 가능. 가장 안정적인 모드
- Delete 하는 기능이 없어서 운영에서 쓰기에는 무리가 있다.
- Bulk 모드 = 특정 테이블에 있는 데이터를 한번에 모두 Kafka로 전송한다. 전송 이후 테이블의 데이터는 모두 삭제되어야 불필요한 재전송을 하지 않는다.
주기적으로 가져올 때마다 PK offset을 기준으로 데이터를 추가적으로 가져올지말지 결정하게 된다.
timestamp모드에서는 create, update한 시점을 표기한 별도의 column이 필요하고 이를 기준으로 offset을 정한다.
JDBC Source Connector의 문제점
- Source DB의 데이터 삭제 (Delete) 이벤트를 Kafka로 전송할 수 없음
- CDC 기반 Connector보다 Source DB 성능에 상대적으로 더 큰 영향을 미칠 수 있음
- Source데이터의 모든 변경 이벤트가 제대로 Kafka로 전송되었다고 보장하기 어렵다.
https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc
JDBC Connector (Source and Sink)
Discover 200+ expert-built Apache Kafka connectors for seamless, real-time data streaming and integration. Connect with MongoDB, AWS S3, Snowflake, and more.
www.confluent.io
에서 zip에 jdbc sink, source connector를 띄우기 위한 jar들이 존재한다.
이때 mysql driver는 포함이 안되어있으므로 maven에서 다운 받는다. (oracle, postgres, mssql은 기본으로 존재한다)
spooldir connector를 했던것처럼
jdbc_connector에 모든 jar들을 옮긴다음에 connect-distributed를 재기동 시켜준다.
http 요청으로 잘 뜬것을 확인한다.
topic 명은 topic.prefix + table 명으로 생성된다.
지정된 topic 명으로 이미 생성된 topic이 없을 경우 자동 생성된다.
catalog.pattern = schema이름을 적어주는 곳이다.
왠만하면 각 table마다 topic을 만들어주는게 좋다. 기본적인 connector 구성이 그러하다.
jdbc source connector를 쪼개서 작업을 해주는것이 편리할 떄도 있다.
왜냐하면 table.whitelist를 안주게 되면 자동적으로 schema밑에 있는 table들에 대한 topic을 다 생성하려하다가
모든 table이 다 incrementing.column.name에서 지정한 column을 pk값으로 가지지 않는 경우도 있으므로
생성에 실패한다.
{
"name": "mysql_jdbc_kafka_source_00",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "jdbc:mysql://192.168.14.143:3316/kafka",
"connection.user": "root",
"connection.password": "1234",
"topic.prefix": "mysql_kafka_",
"topic.creation.default.replication.factor": 1,
"topic.creation.default.partitions": 1,
"catalog.pattern": "kafka",
"table.whitelist": "kafka.customers",
"poll.interval.ms": 10000,
"mode": "incrementing",
"incrementing.column.name": "customer_id"
}
}
connector_configs에 하나 추가 한다.
http POST http://localhost:8083/connectors @mysql_jdbc_kafka_source_00.json
http 요청 명령어로 정상적으로 추가가 되면
[2024-07-17 11:28:16,361] INFO [mysql_jdbc_kafka_source_00|task-0] Finding the database dialect that is best fit for the provided JDBC URL. (io.confluent.connect.jdbc.source.JdbcSourceTask:135)
[2024-07-17 11:28:16,361] INFO [mysql_jdbc_kafka_source_00|task-0] Validating JDBC URL. (io.confluent.connect.jdbc.dialect.DatabaseDialects:171)
[2024-07-17 11:28:16,361] INFO [mysql_jdbc_kafka_source_00|task-0] Validated JDBC URL. (io.confluent.connect.jdbc.dialect.DatabaseDialects:174)
[2024-07-17 11:28:16,361] INFO [mysql_jdbc_kafka_source_00|task-0] Validating JDBC URL. (io.confluent.connect.jdbc.dialect.DatabaseDialects:171)
[2024-07-17 11:28:16,361] INFO [mysql_jdbc_kafka_source_00|task-0] Validated JDBC URL. (io.confluent.connect.jdbc.dialect.DatabaseDialects:174)
[2024-07-17 11:28:16,361] INFO [mysql_jdbc_kafka_source_00|task-0] Using JDBC dialect MySql (io.confluent.connect.jdbc.source.JdbcSourceTask:138)
[2024-07-17 11:28:16,524] INFO [mysql_jdbc_kafka_source_00|task-0] Started JDBC source task (io.confluent.connect.jdbc.source.JdbcSourceTask:311)
[2024-07-17 11:28:16,524] INFO [mysql_jdbc_kafka_source_00|task-0] WorkerSourceTask{id=mysql_jdbc_kafka_source_00-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask:284)
[2024-07-17 11:28:16,530] INFO [mysql_jdbc_kafka_source_00|task-0] Begin using SQL query: SELECT * FROM `kafka`.`customers` WHERE `kafka`.`customers`.`customer_id` > ? ORDER BY `kafka`.`customers`.`customer_id` ASC (io.confluent.connect.jdbc.source.TableQuerier:182)
쿼리가 날라가면 정상적으로 동작한것이다.
이후 insert문으로 db에 데이터를 넣어주면 이것을 감지하고
[2024-07-17 11:45:57,712] INFO [mysql_jdbc_kafka_source_00|task-0] The task will send records to topic 'mysql_kafka_customers' for the first time. Checking whether topic exists (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask:533)
[2024-07-17 11:45:57,714] INFO [mysql_jdbc_kafka_source_00|task-0] Creating topic 'mysql_kafka_customers' (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask:542)
[2024-07-17 11:45:58,223] INFO [mysql_jdbc_kafka_source_00|task-0] Created topic (name=mysql_kafka_customers, numPartitions=1, replicationFactor=1, replicasAssignments=null, configs={}) on brokers at localhost:9092 (org.apache.kafka.connect.util.TopicAdmin:445)
[2024-07-17 11:45:58,223] INFO [mysql_jdbc_kafka_source_00|task-0] Created topic '(name=mysql_kafka_customers, numPartitions=1, replicationFactor=1, replicasAssignments=null, configs={})' using creation group TopicCreationGroup{name='default', inclusionPattern=.*, exclusionPattern=, numPartitions=1, replicationFactor=1, otherConfigs={}} (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask:550)
topic을 생성하고 db의 메시지를 저장하게 된다.
Schema를 가지는 JDBC Source Connector 생성 메시지
- 컬럼 추가/ 삭제 / 타입 변경 등에 대응하기 위해 개별 레코드마다 자신의 schema (컬럼 타입, 컬럼명등)의 정보를 별도로 가지고 있으며 이를 메시지로 생성한다.
- Schema 정보는 Sorce Connector에서 생성하지 않을 수 있지만 , 그럴 경우 JDBC Sink Connector에서 DB로 데이터를 입력할 수 없다.
- 하나의 메시지는 개별 레코드의 Schema 정보와 실제 값으로 구성되어 있다.
- RDMS관련 Source Connector는 Schema를 레코드 별로 가지고 있는 형태로 구성되며 Source Connector 별로 포멧은 다를 수 있다.
- Schema 정보로 인하여 개별 레코드에 대한 토픽 메시지 크기가 매우 커지게 된다.
- 개별 레코드의 Schema 정보를 중앙에서 관리할 필요가 있다. 이를 위해 Schema Registry 가 필요하면 이 경우 Source Connector는 레코드 별로 Schema를 생성하지 않아도 된다.
{
"name": "mysql_jdbc_kafka_source_01",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "jdbc:mysql://192.168.14.143:3316/kafka",
"connection.user": "root",
"connection.password": "1234",
"topic.prefix": "mysql_kafka_time_",
"topic.creation.default.replication.factor": 1,
"topic.creation.default.partitions": 1,
"catalog.pattern": "kafka",
"table.whitelist": "kafka.custkafkaers, kafka.products, kafka.orders, kafka.order_items",
"poll.interval.ms": 10000,
"mode": "timestamp",
"timestamp.column.name": "system_upd"
}
}
increment 모드에서 timestamp 모드로 변경하고 connector를 하나 더 띄운것이다.
이때 update 쿼리를 할때 connector가 감지하고 topic의 값도 변경하게 하려면 system_upd column의 값도 항상 now()로 manual 하게 update쿼리시 같이 추가해서 update해줘야한다.
안그러면 감지를 못하기 때문에 db에만 반영이 된다.
JDBC Source Connector Offset
incrementing의 경우 이다. 해당 쿼리의 ?는 connect offset의 incrementing 값이 들어간다. 그거보다 큰거만 가져오는것
timestamp의 경우이다. 해당 쿼리의 ?는 timestamp 값이 들어가고 그것보다 큰 값만 가져오게된다.
SMT (1개의 컬럼으로 구성된 PK를 Key값으로 설정하기)
{
"config": {
"connection.password": "1234",
"connection.url": "jdbc:mysql://192.168.14.143:3316/kafka",
"connection.user": "root",
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"incrementing.column.name": "customer_id",
"mode": "timestamp+incrementing",
"name": "mysql_jdbc_kafka_source_03",
"poll.interval.ms": "10000",
"table.whitelist": "customers",
"tasks.max": "1",
"timestamp.column.name": "system_upd",
"topic.prefix": "mysql_kafka_smt_key_",
"transforms": "create_key, extract_key",
"transforms.create_key.fields": "customer_id",
"transforms.create_key.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.extract_key.field": "customer_id",
"transforms.extract_key.type": "org.apache.kafka.connect.transforms.ExtractField$Key"
},
"name": "mysql_jdbc_kafka_source_03",
"tasks": [],
"type": "source"
}
create_key, extract_key는 각각 적용할 SMT를 의미한다.
create에서 cusotmer_id 값을 만들고 extract에서 특정 단일 값으로 바꾸게 되는것이다.
- ValueToKey는 메시지 Value에서 메시지 Key값을 변환
- ExtractField는 메시지에서 특정 필드만 추출한다.
- SMT Class는 type 속성을 반드시 지정해줘야한다. type속성은 SMT 클래스명이다.
- 개별 SMT class 별로 반드시 지정해 줘야하는 속성이 있음. ValueToKey는 fields 속성, ExtractField는 filed속성.
connect_transforms 키워드 위주로 검색하면 documentation이나온다.
ValueToKey만 적용할 경우에는 key의 type이 struct로 되며 payload가 컬럼명:컬럼값으로 적용이 된다.
ValueToKey + ExtractField가 적용할 경우 key의 type이 단일 type으로 되며 payload가 컬럼값으로 적용된다.
SMT(여러개의 컬럼으로 구성된 PK를 Key 값으로 설정하기)
{
"config": {
"connection.password": "1234",
"connection.url": "jdbc:mysql://192.168.14.143:3316/kafka",
"connection.user": "root",
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"mode": "timestamp",
"name": "mysql_jdbc_kafka_source_04",
"poll.interval.ms": "10000",
"table.whitelist": "order_items",
"tasks.max": "1",
"timestamp.column.name": "system_upd",
"topic.prefix": "mysql_kafka_smt_mkey_",
"transforms": "create_key",
"transforms.create_key.fields": "order_id, line_item_id",
"transforms.create_key.type": "org.apache.kafka.connect.transforms.ValueToKey"
},
"name": "mysql_jdbc_kafka_source_04",
"tasks": [],
"type": "source"
}
ExtractField를 하지 않은것을 확인한다. extract_key.field이기(fields가 아님) 때문에 1개의 Column일 때만 적용이 된다.
~/kafka/confluent-7.6.0/bin » kcc mysql_kafka_smt_mkey_order_items --from-beginning | jq '.' 130 ↵ tony@tony-ubuntu
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "order_id"
},
{
"type": "int32",
"optional": false,
"field": "line_item_id"
},
{
"type": "int32",
"optional": false,
"field": "product_id"
},
{
"type": "bytes",
"optional": false,
"name": "org.apache.kafka.connect.data.Decimal",
"version": 1,
"parameters": {
"scale": "2",
"connect.decimal.precision": "10"
},
"field": "unit_price"
},
{
"type": "int32",
"optional": false,
"field": "quantity"
},
{
"type": "int64",
"optional": false,
"name": "org.apache.kafka.connect.data.Timestamp",
"version": 1,
"field": "system_upd"
}
],
"optional": false,
"name": "order_items"
},
"payload": {
"order_id": 1,
"line_item_id": 1,
"product_id": 1,
"unit_price": "JxA=",
"quantity": 1,
"system_upd": 1721228877000
}
}
SMT 토픽 이름 변경하기
{
"name": "mysql_jdbc_om_source_05",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "jdbc:mysql://localhost:3306/om",
"connection.user": "connect_dev",
"connection.password": "connect_dev",
"topic.prefix": "mysql_om_smt_key_",
"catalog.patten": "om",
"table.whitelist": "om.customers",
"poll.interval.ms": 10000,
"mode": "timestamp+incrementing",
"incrementing.column.name": "customer_id",
"timestamp.column.name": "system_upd",
"transforms": "create_key, extract_key, rename_topic",
"transforms.create_key.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.create_key.fields": "customer_id",
"transforms.extract_key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extract_key.field": "customer_id",
"transforms.rename_topic.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.rename_topic.regex": "mysql_om_smt_key_(.*)",
"transforms.rename_topic.replacement": "mysql_$1"
}
}
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "customer_id"
},
{
"type": "string",
"optional": false,
"field": "email_address"
},
{
"type": "string",
"optional": false,
"field": "full_name"
},
{
"type": "int64",
"optional": false,
"name": "org.apache.kafka.connect.data.Timestamp",
"version": 1,
"field": "system_upd"
}
],
"optional": false,
"name": "customers"
},
"payload": {
"customer_id": 3,
"email_address": "testaddress_03@testdomain",
"full_name": "testuser_03",
"system_upd": 1721383132000
}
}
'Cloud > kafka-connect' 카테고리의 다른 글
Debezium MySQL CDC Source Connector - 3 (0) | 2024.07.29 |
---|---|
Debezium MySQL CDC Source Connector - 2 (0) | 2024.07.25 |
Debezium MySQL CDC Source Connector - 1 (0) | 2024.07.22 |
JDBC Sink Connector (0) | 2024.07.20 |
Kafka Connect 개요 (0) | 2024.07.11 |