MySQL 과 PostgreSQL 간 Date와 Time 관련 데이터 타입 변환할때 주의할점들
Mysql = datetime (timezone 정보가 없다.) timestamp (timezone 정보가 있다.)
postgreSQL = Timestamp (timezone 정보가 없다. ) Timestamptz(timezone 정보가 있다.) datetime이 없다.
- MySQL의 date/time/datetime 변환을 위해서는 Debezium Mysql Source connector에서 time.precision.mode=connect로 반드시 설정해야한다.
- Debezium Mysql은 date, time ,datetime의 경우 int형으로 변환하지만 timestamp의 경우 문자열로 변환한다.
- Debezium MySQL의 date는 PostgreSQL의 date로 time은 time으로 datetime은 timezone으로 각각 데이터 타입을 생성하면 정상적으로 Sink Connector에서 변환되어 PostgreSQL로 입력된다.
- MySQL의 timestamp는 PostgreSQL Sink Connector에서 SMT를 이용하여 TimeStamp 형태로 변환되어야 한다. 이때 JDBC Sink Connector의 db.timezone = Asia / Seoul을 설정하지 않아도 자동으로 PostgreSQL의 DB Timezone을 반영하여 입력한다.
source mysql target postgres
{
"name": "mysql_cdc_oc_source_avro_01",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "192.168.14.143",
"database.port": "3316",
"database.user": "root",
"database.password": "1234",
"database.server.id": "20000",
"database.server.name": "mysqlavro",
"database.include.list": "oc",
"table.include.list": "oc.customers, oc.products, oc.orders, oc.order_items, oc.orders_datetime_tab",
"schema.history.internal.kafka.bootstrap.servers": "localhost:9092",
"schema.history.internal.kafka.topic": "schema-changes.mysql.oc",
"database.allowPublicKeyRetrieval": "true",
"time.precision.mode": "connect",
"database.connectionTimezone": "Asia/Seoul",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter.schema.registry.url": "http://localhost:8081",
"transforms": "rename_topic, unwrap",
"transforms.rename_topic.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.rename_topic.regex": "(.*)\\.(.*)\\.(.*)",
"transforms.rename_topic.replacement": "$1-$2-$3",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"topic.prefix":"mysqlavro",
"database.connectionTimeZone": "Asia/Seoul"
}
}
mysql cdc는 달라질것이 없다. kafka에 topic을 없으면 만들고 메시지를 전송한다.
{
"name": "postgres_jdbc_ops_sink_customers_avro_01",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "mysqlavro-oc-customers",
"connection.url": "jdbc:postgresql://192.168.14.143:15432/postgres",
"connection.user": "tony",
"connection.password": "1234",
"table.name.format": "ops_sink.customers_sink",
"insert.mode": "upsert",
"pk.fields": "customer_id",
"pk.mode": "record_key",
"delete.enabled": "true",
"auto.evolve": "true",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter.schema.registry.url": "http://localhost:8081"
}
}
cdc mysql 에서 만들어진 메시지들을 topics에 적어줘서 해당 topic으로 부터 가져오게 된다.
[2024-08-03 14:10:23,700] INFO [mysql_cdc_oc_source_avro_01|task-0|offsets] WorkerSourceTask{id=mysql_cdc_oc_source_avro_01-0} Committing offsets for 1 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
source mysql 에 row를 하나 추가한 후에
위 메시지가 뜬 이후에도 postgres sink db에 아무것도 안들어왔으면 뭔가 설정이 잘못된거다.
MySQL Source 테이블의 스키마 변경 유형에 따른 PostgreSQL 자동 스키마 반영
'Cloud > kafka-connect' 카테고리의 다른 글
Debezium PostgreSQL CDC Source Connector (0) | 2024.08.05 |
---|---|
Schema Registry (0) | 2024.08.01 |
Debezium MySQL CDC Source Connector - 3 (0) | 2024.07.29 |
Debezium MySQL CDC Source Connector - 2 (0) | 2024.07.25 |
Debezium MySQL CDC Source Connector - 1 (0) | 2024.07.22 |