Cloud/kafka-connect

Debezium MySQL CDC Source Connector - 2

Tony Lim 2024. 7. 25. 15:31

Debezium Snapshot

  • Debezium MySQL Connector가 최초 생성시 DB 스키마와 읽어들어야 할 테이블 데이터에 대한 초기 스냅샷을 수행한다.
  • 스냅샷은 Connector 생성 시점의 Binlog와 Binlog Position, DB와 읽어들일 테이블의 Schema(DDL) , 대상 테이블의 초기 레코드를 읽어서 카프카에 저장등의 작업을 수행하며 connect-offsets에 해당 connector명과 서버명으로 offset 저보가 없을 때 수행한다.
  • inital = Binlog Position, Schema, 테이블의 초기 레코드를 카프카로 저장하는 작업을 수행한다.
  • schema_only = 테이블의 초기 레코드를 카프카로 저장하지 않고 스키마 변경 정보만 기록한다.

 

Connector 최초 생성시 Inital SnapShot 수행 프로세스

  1. MySql DB에 Global read lock을 획득한다. 다른 client들이 테이블에 write하지 못한다.
  2. 현재 binlog의 position을 읽는다.
  3. Connector가 읽어들일 테이블의 스키마 정보를 가져온다.
  4. Global read lock해제 (다른 client들이 테이블 write가능) 하고 DDL 변경 사항을 Schema change Topic에 기록한다.
  5. Connector가 대상 테이블들의 기존 레코드를 읽은 뒤 Topic에 전송한다.
  6. Topic 에 기록된 레코드 offset을 connect-offset 토픽에 기록한다.
[
  "mysql_cdc_oc_source_01",
  {
    "server": "cdc01"
  }
]
{
  "ts_sec": 1721789742,
  "file": "binlog.000114",
  "pos": 1802860
}
[
  "mysql_cdc_oc_source_01",
  {
    "server": "cdc01"
  }
]
{
  "transaction_id": null,
  "ts_sec": 1721798860,
  "file": "binlog.000114",
  "pos": 1804221,
  "row": 1,
  "server_id": 1,
  "event": 2
}

connector-offset topic을 consume했을 떄 나온 결과이다.

첫번째 mysql_cdc_oc_source_01 은 snapshot이고 2번쨰는 insert이후에 나온 값이다.

server 의 이름의 용도는 혹시나 특정 mysql server가 내려가서 다른 server에서 복구시킨 경우 해당 topic을 좀 더 손쉽게 생성할 수 있다. server 이름만 바꾸면 되니까

 

오래된 binlog Purge시 Connector 기동 문제점

  • Connector는 자신이 읽어서 kafka로 보낸 offset정보를 binlog명과 binlog position 으로 connect-offsets 토픽이 기록한다.
  • 만약 오랫동안 Connector를 기동하지 않는다면 , binlog가 mysql expire log 기간 이상 저장되어 있을 경우 삭제될 수 있다. 이때 Connector는 connect-offsets에 기록된 binlog를 binlog 디렉토리에서 찾지 못해서 기동을 할 수 없다.
  • 이 경우 새롭게 Connector를 생성하거나 connect-offsets의 offsets를 재 설정이 필요하다. 또한 오랫동안 정지할 Connector는 미리 삭제하는 것이 좋다.

Numeric 과 Decimal Data Type

  • Precision과 scale 값을 가지는 숫자 Data Type
  • Mysql은 Numeric과 Decimal이 서로 동일하게 구현되어있다.
  • Decimal(7,3) 이면 9999.999 형태로 DB 저장
  • Decimal(10,0) 일 때 값을 100.1로 입력해도 100으로 저장된다.
  • Decimal(10) 이면 precision이 10, scale이 0이다.
  • Decimal(또는 numberic)과 같이 precision과 scale을 설정하지 않으면 기본적으로 precision이 10, scale이 0이다.  

CDC에서 변환

  • Debezium의 Default 설정인 decimal.handling.mode = precision 일 때 numeric / decimal 모두 bytes 형태로 변환한다.
  • org.apache.kafka.connect.data.Decimal을 이용하여 변환되며 schema에 parameters로 scale과 precsion값을 표현한다.

MySQl temporal type 변환시 Debezium 주요 이슈

  • date / time / datetime 타입 변환시 Debezium Source Connect의 변환 클래스명이 JDBC Sink Connector가 인지하지 못해서 JDBC Sink Connector 오류가 발생하게 된다.
  • timestamp 타입은 문자열로 변환되며 소스 DB의 Timezon에 관계없이 UTC Timezon으로 변경된다.
  • mysql에서는 datetime 은 연원일시분초 까지 있고  +9:00 처럼 timezone이 존재한다.

 

Mysql date, time ,datetime 타입 Debezium 변환

  • Debezium Source Connector는 date/time/datetime 변환모드를 설정할 수 있는 time.precision.mode 파라미터를 제공한다.
  • time.prcesion.mode의 기본값은 adaptive_time_microseconds 로 Debezium 내부 변환 티입을 가진다.
  • time.prcesion.mode = connect는 Connect의 기본 변환 타입을 가진다.
  • adaptive_time_microseconds  또는 connect 설정 시 date / time / datetime 변환 값은 서로 대동 소이하다.
  • adaptive_time_microseconds 설정 시에는 Debezium 변환 클래스를 JDBC Sink Connector가 인지하지 못해서 JDBC Sink Connector에서 오류가 발생한다.
  • 또한 JDBC Sink Connector는 datetime의 경우 millsecond 단위로 unix epoch time을 변환한다. Microsecond 단위로 변환된 unix epoch time 은 타겟 DB입력 시 오류가 발생한다.
  • 때문에 date / time / datetime 변환을 위해서는 Debezium Source connector에서 time.precision.mode = connect로 반드시 설정해야한다.

 

MySQL DateTime 과 TimeStamp

  • datetime은 timezone 정보를 가지지 않지만, timestamp 는 timezone 정보를 가진다.
  • DB는 기본 timezone 설정이 필요하다.
  • Datetime은 클라이언트에서 입력한 값 자체로 DB에 저장이 되지만, timestamp는 클라이언트 입력값이 UTC로 변환되어 DB에 저장이 된다.
    DB에 저장된 값을 클라이언트에서 출력할 때는 datetime은 DB에 저장된 값을 그대로 출력하지만, timestamp는 DB 저장값을 DB의 timezone 설정에 따라 변환한 뒤 출력한다.

 

Debezium 에서 timestamp 타입 변환

  • Debezium은 datetime의 경우 int64형으로 변환하지만 timestamp의 경우 문자열로 변환한다.
  • 문자열로 변환 시 UTC timezone을 적용하여 변환됨
  • Debezium Connector 설정 중 database.connectionTimezone = Asia/Seoul UTC를 KST로 변환하지 못함. 해당 설정은 Debezium에서 UTC로 변환하기 위한 Hint 일 뿐 KST로 문자열을 변환하지는 못한다.
  • 예를 들어 한국 시간 2022년 11월 22일 13시 30분 37초 timestamp는 debezium에서 '2022-11-22T04:30:37Z'와 같은 문자열로 변환이 된다. +9:00 , 9시간 뒤로 바꿔버림
  • 해당 문자열을 JDBC Sink Connector에서 인식하기 위해서는 TimeStamp형태로 변환을 해야하며, 이를 위해 JDBC Sink Connector에서 SMT를 적용해줘야한다.
{
    "name": "mysql_jdbc_oc_sink_timestamp_tab_01",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "test01.oc.orders_timestamp_tab",
        "connection.url": "jdbc:mysql://localhost:3306/oc_sink",
        "connection.user": "connect_dev",
        "connection.password": "connect_dev",
        "table.name.format": "oc_sink.orders_timestamp_tab_sink",
        "insert.mode": "upsert",
        "pk.fields": "order_id",
        "pk.mode": "record_key",
        "delete.enabled": "true",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",

        "transforms": "convertTS",
        "transforms.convertTS.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        "transforms.convertTS.field": "order_timestamp",
        "transforms.convertTS.format": "yyyy-MM-dd'T'HH:mm:ss'Z'",
        "transforms.convertTS.target.type": "Timestamp"
    }
}

transforms TimestampConverter를 이용해서 java의 SimpleDateFormat으로 변경할 수 있다.


Debezium Source Connector 배치 메시지 관련 파라미터

  • poll.interval.ms = Source Connector가 배치 메시지를 전송하기전 까지 기다리는 시간이다. 기본은 1000ms , buffer size만큼 채워지면 기다리지 않고 바로 전송된다.
  • max.batch.size = Batch message의 최대 record 건수이다. 기본은 2048 
  • max.queue.size = Producer를 통해 Kafka로 메시지를 전송하기 전에 대기하는 내부 Queue의 최대 레코드 사이즈이다. 기본값은 8192이며 max.batch.size보다 커야한다.
  • max.queue.size.in.bytes = Producer를 통해 Kafka로 메시지를 전송하기 전에 대기하는 내부 Queue의 최대 메모리 크기이다. 기본은 0이다.
    max.queue.size와 max.queue.size.in.bytes 가 동시에 설정이 되어있으면 어느 한 조건만 만족해도 되는 크기로 Queue가 결정이 된다.

 

JDBC Sink Conenctor 배치 메시지 처리

kafka에 2000개 정도의 message가 쌓여있고 가져올때는 fetch.min.bytes < x < max.partition.fetch.bytes 사이로 가져온다.

batch.size 가 3000이라도 Consumer가 가져온게 450게임으로 450개만 JDBC Sink Instance에 들어간다.

executeBatch()를 통해 450개를 한번에 처리한다. 이후에 __consumer_offsets 에 commit기록이 남는다.

 

Upsert 기반의 JDBC Batch 처리

on duplicate key update로 insert시 PK 겹치는 에러를 잡아서 update를 해주게 된다.

실제 source db에서는 여러개의 쿼리로 bin log가 만들어졌어도 Debezium cdc에서는 preparedStatement로 batch처리 해서 한번에 처리하게 된다.

 

JDBC Sink Connector의 성능 향상 방법

  • topic의 partition 수를 늘리고 거기에 맞게 tasks.max 수를 증가시킨다.
  • tasks.max 수는 Topic partition수와 CPU Core수에 맞게 적절하게 설정해야한다.
  • tasks.max 를 증가시켜도 target 테이블에 인덱스가 많이 있을 경우는 수행 성능이 증가하지 않는다.