Cloud/kafka-connect

Debezium Source MySQL에서 JDBC Sink PostgreSQL 연동

Tony Lim 2024. 8. 3. 12:52

MySQL 과 PostgreSQL 간 Date와 Time 관련 데이터 타입 변환할때 주의할점들

Mysql = datetime (timezone 정보가 없다.) timestamp (timezone 정보가 있다.)

postgreSQL = Timestamp (timezone 정보가 없다. ) Timestamptz(timezone 정보가 있다.) datetime이 없다.

 

  • MySQL의 date/time/datetime 변환을 위해서는 Debezium Mysql Source connector에서 time.precision.mode=connect로 반드시 설정해야한다.
  • Debezium Mysql은 date, time ,datetime의 경우 int형으로 변환하지만 timestamp의 경우 문자열로 변환한다.
  • Debezium MySQL의 date는 PostgreSQL의 date로 time은 time으로 datetime은 timezone으로 각각 데이터 타입을 생성하면 정상적으로 Sink Connector에서 변환되어 PostgreSQL로 입력된다.
  • MySQL의 timestamp는 PostgreSQL Sink Connector에서 SMT를 이용하여 TimeStamp 형태로 변환되어야 한다. 이때 JDBC Sink Connector의 db.timezone = Asia / Seoul을 설정하지 않아도 자동으로 PostgreSQL의 DB Timezone을 반영하여 입력한다.

source mysql target postgres

{
    "name": "mysql_cdc_oc_source_avro_01",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "192.168.14.143",
        "database.port": "3316",
        "database.user": "root",
        "database.password": "1234",
        "database.server.id": "20000",
        "database.server.name": "mysqlavro",
        "database.include.list": "oc",
        "table.include.list": "oc.customers, oc.products, oc.orders, oc.order_items, oc.orders_datetime_tab",
        "schema.history.internal.kafka.bootstrap.servers": "localhost:9092",
        "schema.history.internal.kafka.topic": "schema-changes.mysql.oc",
				"database.allowPublicKeyRetrieval": "true",

        "time.precision.mode": "connect",
        "database.connectionTimezone": "Asia/Seoul",

        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://localhost:8081",
        "value.converter.schema.registry.url": "http://localhost:8081",

        "transforms": "rename_topic, unwrap",
        "transforms.rename_topic.type": "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.rename_topic.regex": "(.*)\\.(.*)\\.(.*)",
        "transforms.rename_topic.replacement": "$1-$2-$3",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false",

				"topic.prefix":"mysqlavro",
				"database.connectionTimeZone": "Asia/Seoul"

    }
}

mysql cdc는 달라질것이 없다. kafka에 topic을 없으면 만들고 메시지를 전송한다.

{
    "name": "postgres_jdbc_ops_sink_customers_avro_01",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "mysqlavro-oc-customers",
        "connection.url": "jdbc:postgresql://192.168.14.143:15432/postgres",
        "connection.user": "tony",
        "connection.password": "1234",
        "table.name.format": "ops_sink.customers_sink",

        "insert.mode": "upsert",
        "pk.fields": "customer_id",
        "pk.mode": "record_key",
        "delete.enabled": "true",

        "auto.evolve": "true",

        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://localhost:8081",
        "value.converter.schema.registry.url": "http://localhost:8081"
    }
}

cdc mysql 에서 만들어진 메시지들을 topics에 적어줘서 해당 topic으로 부터 가져오게 된다.

[2024-08-03 14:10:23,700] INFO [mysql_cdc_oc_source_avro_01|task-0|offsets] WorkerSourceTask{id=mysql_cdc_oc_source_avro_01-0} Committing offsets for 1 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)

source mysql 에 row를 하나 추가한 후에
위 메시지가 뜬 이후에도 postgres sink db에 아무것도 안들어왔으면 뭔가 설정이 잘못된거다.


MySQL Source 테이블의 스키마 변경 유형에 따른 PostgreSQL 자동 스키마 반영

 

'Cloud > kafka-connect' 카테고리의 다른 글

Debezium PostgreSQL CDC Source Connector  (0) 2024.08.05
Schema Registry  (0) 2024.08.01
Debezium MySQL CDC Source Connector - 3  (0) 2024.07.29
Debezium MySQL CDC Source Connector - 2  (0) 2024.07.25
Debezium MySQL CDC Source Connector - 1  (0) 2024.07.22