Cloud/kafka-connect

Debezium MySQL CDC Source Connector - 3

Tony Lim 2024. 7. 29. 11:48

SMT를 이용하여 Debezium Source Connector 토픽이름 변경하기

{
    "name": "mysql_cdc_oc_source_rename_topic",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "192.168.14.143",
        "database.port": "3316",
        "database.user": "root",
        "database.password": "1234",
        "database.server.id": "12001",
        "database.server.name": "mysqlrename",
        "database.include.list": "oc",
        "table.include.list": "oc.customers",
        "database.history.kafka.bootstrap.servers": "localhost:9092",
        "database.history.kafka.topic": "schema-changes.mysql.oc",

        "database.allowPublicKeyRetrieval": "true",

        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",

        "transforms": "rename_topic, unwrap",
        "transforms.rename_topic.type": "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.rename_topic.regex": "(.*)\\.(.*)\\.(.*)",
        "transforms.rename_topic.replacement": "$1-$2-$3",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false"
    }
}

mysqlrealname.oc.customers가 기본적으로 생성이 되는 topic 이름인데 . 대신 - 으로 생성해준다.

또한 rename_topic ,unwrap 처럼 chaining해서 SMT를 적용할 수 있다는 것이다.

 

이거 테스트하다가 connector-offsets를 다 날린적이있다. kafka topic delete 로 지워버림

org.apache.kafka.common.config.ConfigException: Topic 'connect-offsets' supplied via the 'offset.storage.topic' property is required to have 'cleanup.policy=compact' to guarantee consistency and durability of source connector offsets, but found the topic currently has 'cleanup.policy=delete'. Continuing would likely result in eventually losing source connector offsets and problems restarting this Connect cluster in the future. Change the 'offset.storage.topic' property in the Connect worker configurations to use a topic with 'cleanup.policy=compact'. at org.apache.kafka.connect.util.TopicAdmin.verifyTopicCleanupPolicyOnlyCompact(TopicAdmin.java:581)

이런 에러가 떠러지는데 기본적으로 connector-offsets에 cleanup.policy=compact 를 해줘야하는것 같다.

kafka-configs --alter --entity-type topics --entity-name connect-offsets --add-config cleanup.policy=compact --bootstrap-server localhost:9092

통해서 추가를 해주면 정상적으로 kafka-connect가 기동된다.


Source 테이블의 스키마 변경시 Sink 테이블의 자동 스키마 반영에 대한 이해

auto.evolve=true 로 한 경우

  • 해당 설정값은 Source 테이블의 스키마 변경을 JDBC Sink Connector가 자동으로 인지하여 Sink 테이블에 자동 반영하게 만들어준다.
  • 하지만 스키마 변경 유형에 따라 제대로 자동 반영이 되지 않는 경우가 있으므로 매우 유의가 필요하다.

컬럼 추가

  • Source 테이블의 Null 가능한 숫자형, date, datetime, timestamp 컬럼들은 정상저긍로 자동 반영이된다.
    다만 datetime일 경우 datetime(3)으로 변경되어 반영된다.
  • Source 테이블의 Not Null 컬럼은 반드시 default 값을 명시적으로 선언해 줘야한다. (Schema Registry 적용시 이슈가 발생할 수 있다.)
  • Source 테이블의 Not Null 은 Sink 테이블에서 제대로 인지 못해서 Null로 적용이 된다.
  • Varchar 컬럼 추가시 Sink 테이블은 Text로 변환된다.
  • Varchar 컬럼을 Not Null로 default 값 설정해서 추가하면 Sink 테이블은 Text를 Default값으로 변경하지 못해서 Sink Connector장애 발생한다.
  • Sink Connector를 중지한 뒤 Sink 테이블 스키마를 수동으로 변경 권장한다.

컬럼 타입 변경

  • Source 테이블의 컬럼 타입 변경은 Sink 테이블에서 반영되지 못한다.
  • Sink Connector를 중지한 뒤 Sink 테이블 스키마를 수동으로 변경을 권장한다.

컬럼 삭제

  • Source 테이블의 컬럼삭제는 Sink 테이블에서 반영되지 못한다.
  • Sink Connector를 중지한 뒤 Sink 테이블 스키마를 수동으로 변경을 권장한다.

컬럼명 변경

  • Source 테이블의 컬럼명 변경은 Sink 테이블에서 컬럼명 추가로 이상 반영한다.
  • Sink Conenctor를 중지한 뒤에 Sink 테이블 스키마를 수동으로 변경을 권장한다.

무조건 수동으로 하는것이 좋아보인다. 


Debezium Snapshot

  • Debezium MySQL Connector가 최초 생성시 DB 스키마와 읽어들어야 할 테이블 데이터에 대한 초기 스냅샷 수행을 한다.
  • 스냅샷은 Connector 생성 시점의 BinLog와 BinLog Position, DB와 읽어들일 테이블의 Schema(DDL) , 대상 테이블의 초기레코드를 읽어서 카프카에 저장등의 작업을 수행한다.
    connect-offsets에 해당 connector 명과 서버명으로 offset 정보가 없을 때 수행한다.
  • inital = Binlog Position, Schema, 테이블의 초기 레코드를 카프카로 전송하는 작업을 수행한다.
  • schema_only = 테이블의 초기 레코드를 카프카로 전송하지 않고 스키마 정보만 전송한다.

 

snapshot.mode 가 initial 일 경우 모든 binlog를 DB에 보관하고 있지 않은 오래된 테이블이나 대용량 테이블등은 최초 Debezium snapshot 수행 시 오류가 발생한다.

snapshot.mode를 schema_only로 설정하면 테이블의 Source 테이블의 초기 데이터를 Kafka로 전송하지 않으며 Soruce Connector가 생성된 시점부터 변경 데이터만 kafka로 전송한다.

이 경우 Source 테이블의 초기 데이터는 Sink 테이블로 수동 마이그레이션 적용이 필요하다.

 

schema_only 로 설정하여 대용량 테이블 이관 및 동기화

  1. 소스 테이블의 초기 데이터는 수동 마이그레이션
  2. 특정 시점 이후 Binlog부터 Source Connector를 생성한다.
    Source Connector 생성 시점 이후 Source 테이블의 변경 데이터만 카프카로 전송한다.
  3. 지속적으로 Source 테이블의 변경 사항을 카프카로 전송한다.

 

'Cloud > kafka-connect' 카테고리의 다른 글

Debezium Source MySQL에서 JDBC Sink PostgreSQL 연동  (0) 2024.08.03
Schema Registry  (0) 2024.08.01
Debezium MySQL CDC Source Connector - 2  (0) 2024.07.25
Debezium MySQL CDC Source Connector - 1  (0) 2024.07.22
JDBC Sink Connector  (0) 2024.07.20