SMT를 이용하여 Debezium Source Connector 토픽이름 변경하기
{
"name": "mysql_cdc_oc_source_rename_topic",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "192.168.14.143",
"database.port": "3316",
"database.user": "root",
"database.password": "1234",
"database.server.id": "12001",
"database.server.name": "mysqlrename",
"database.include.list": "oc",
"table.include.list": "oc.customers",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "schema-changes.mysql.oc",
"database.allowPublicKeyRetrieval": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"transforms": "rename_topic, unwrap",
"transforms.rename_topic.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.rename_topic.regex": "(.*)\\.(.*)\\.(.*)",
"transforms.rename_topic.replacement": "$1-$2-$3",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false"
}
}
mysqlrealname.oc.customers가 기본적으로 생성이 되는 topic 이름인데 . 대신 - 으로 생성해준다.
또한 rename_topic ,unwrap 처럼 chaining해서 SMT를 적용할 수 있다는 것이다.
이거 테스트하다가 connector-offsets를 다 날린적이있다. kafka topic delete 로 지워버림
org.apache.kafka.common.config.ConfigException: Topic 'connect-offsets' supplied via the 'offset.storage.topic' property is required to have 'cleanup.policy=compact' to guarantee consistency and durability of source connector offsets, but found the topic currently has 'cleanup.policy=delete'. Continuing would likely result in eventually losing source connector offsets and problems restarting this Connect cluster in the future. Change the 'offset.storage.topic' property in the Connect worker configurations to use a topic with 'cleanup.policy=compact'. at org.apache.kafka.connect.util.TopicAdmin.verifyTopicCleanupPolicyOnlyCompact(TopicAdmin.java:581)
이런 에러가 떠러지는데 기본적으로 connector-offsets에 cleanup.policy=compact 를 해줘야하는것 같다.
kafka-configs --alter --entity-type topics --entity-name connect-offsets --add-config cleanup.policy=compact --bootstrap-server localhost:9092
통해서 추가를 해주면 정상적으로 kafka-connect가 기동된다.
Source 테이블의 스키마 변경시 Sink 테이블의 자동 스키마 반영에 대한 이해
auto.evolve=true 로 한 경우
- 해당 설정값은 Source 테이블의 스키마 변경을 JDBC Sink Connector가 자동으로 인지하여 Sink 테이블에 자동 반영하게 만들어준다.
- 하지만 스키마 변경 유형에 따라 제대로 자동 반영이 되지 않는 경우가 있으므로 매우 유의가 필요하다.
컬럼 추가
- Source 테이블의 Null 가능한 숫자형, date, datetime, timestamp 컬럼들은 정상저긍로 자동 반영이된다.
다만 datetime일 경우 datetime(3)으로 변경되어 반영된다. - Source 테이블의 Not Null 컬럼은 반드시 default 값을 명시적으로 선언해 줘야한다. (Schema Registry 적용시 이슈가 발생할 수 있다.)
- Source 테이블의 Not Null 은 Sink 테이블에서 제대로 인지 못해서 Null로 적용이 된다.
- Varchar 컬럼 추가시 Sink 테이블은 Text로 변환된다.
- Varchar 컬럼을 Not Null로 default 값 설정해서 추가하면 Sink 테이블은 Text를 Default값으로 변경하지 못해서 Sink Connector장애 발생한다.
- Sink Connector를 중지한 뒤 Sink 테이블 스키마를 수동으로 변경 권장한다.
컬럼 타입 변경
- Source 테이블의 컬럼 타입 변경은 Sink 테이블에서 반영되지 못한다.
- Sink Connector를 중지한 뒤 Sink 테이블 스키마를 수동으로 변경을 권장한다.
컬럼 삭제
- Source 테이블의 컬럼삭제는 Sink 테이블에서 반영되지 못한다.
- Sink Connector를 중지한 뒤 Sink 테이블 스키마를 수동으로 변경을 권장한다.
컬럼명 변경
- Source 테이블의 컬럼명 변경은 Sink 테이블에서 컬럼명 추가로 이상 반영한다.
- Sink Conenctor를 중지한 뒤에 Sink 테이블 스키마를 수동으로 변경을 권장한다.
무조건 수동으로 하는것이 좋아보인다.
Debezium Snapshot
- Debezium MySQL Connector가 최초 생성시 DB 스키마와 읽어들어야 할 테이블 데이터에 대한 초기 스냅샷 수행을 한다.
- 스냅샷은 Connector 생성 시점의 BinLog와 BinLog Position, DB와 읽어들일 테이블의 Schema(DDL) , 대상 테이블의 초기레코드를 읽어서 카프카에 저장등의 작업을 수행한다.
connect-offsets에 해당 connector 명과 서버명으로 offset 정보가 없을 때 수행한다. - inital = Binlog Position, Schema, 테이블의 초기 레코드를 카프카로 전송하는 작업을 수행한다.
- schema_only = 테이블의 초기 레코드를 카프카로 전송하지 않고 스키마 정보만 전송한다.
snapshot.mode 가 initial 일 경우 모든 binlog를 DB에 보관하고 있지 않은 오래된 테이블이나 대용량 테이블등은 최초 Debezium snapshot 수행 시 오류가 발생한다.
snapshot.mode를 schema_only로 설정하면 테이블의 Source 테이블의 초기 데이터를 Kafka로 전송하지 않으며 Soruce Connector가 생성된 시점부터 변경 데이터만 kafka로 전송한다.
이 경우 Source 테이블의 초기 데이터는 Sink 테이블로 수동 마이그레이션 적용이 필요하다.
schema_only 로 설정하여 대용량 테이블 이관 및 동기화
- 소스 테이블의 초기 데이터는 수동 마이그레이션
- 특정 시점 이후 Binlog부터 Source Connector를 생성한다.
Source Connector 생성 시점 이후 Source 테이블의 변경 데이터만 카프카로 전송한다. - 지속적으로 Source 테이블의 변경 사항을 카프카로 전송한다.
'Cloud > kafka-connect' 카테고리의 다른 글
Debezium Source MySQL에서 JDBC Sink PostgreSQL 연동 (0) | 2024.08.03 |
---|---|
Schema Registry (0) | 2024.08.01 |
Debezium MySQL CDC Source Connector - 2 (0) | 2024.07.25 |
Debezium MySQL CDC Source Connector - 1 (0) | 2024.07.22 |
JDBC Sink Connector (0) | 2024.07.20 |