Cloud/kafka-connect

Debezium MySQL CDC Source Connector - 1

Tony Lim 2024. 7. 22. 15:19

CDC (Change Data Capture)

  • CDC는 RDBMS와 NoSQL등의 Data Store시스템의 변경 데이터를 내부적으로 기록하고 있는 Redo Log/Bin Log/ WAL 등의 내부 트랜잭션 로그파일에서 (redo log같은) 변경데이타를 Capture하는 소프트웨어를 지칭한다.
  • DB 내부 트랜잭션 로그 파일에서 변경 데이터를 추출하므로 소스 DB 성능에 큰 영향 없이 대용량의 변경 데이터를 매우 빠르게 추출하고 준 실시간으로 타겟 DB에 연동한다.
  • DBMS 복제 (replication) 수준의 안정적이고 정확한 데이터 추출이 가능하다.
  • 성능과 안정성이 중요한 많은 중요 DBMS시스템에서 CDC를 활용한다.

 

DB 복제 vs CDC 비교

데이터 복제는 DB의 가용성을 극대화 하기 위해서 Primary(Master) 와 StandBy(Salve) 간의 데이터를 완벽하게 동기화 하는데 초점을 맞추기 때문에 아키텍쳐 유연성이 상대적으로 떨어진다(primary와 Standby가 db버전, os구성이 거의동일)

CDC는 DB버전이나 OS환경에 대한 구성 제약이 상대적으로 덜 엄격하며, 보다 다양한 비즈니스 활용이 가능하다.

 

성능이 훨씬 좋은 정보계 서버에서 뭔가 실시간 분석을 돌리고 싶다.


MySQL 복제 

  • MySQL 복제는 Binary Log를 기반으로 구현된다.
  • Master의 Binary Log를 Slave로 전송하고, 전송된 Binary Log를 SQL로 적용하여 복제한다.
  • 복제 Cluster 내의 Master와 Slave DB는 고유의 server_id 설정 값을 가진다.

 

MySQL BinLog 파일 위치기반 복제 + GTID 기반 복제

  • BinaryLog 파일위치기반 = 소스 DB의 변경 사항에 대한 이벤트를 Binary Log 파일명과 파일내의 위치값을 조합하여 식별한다.
  • GTID(Global Transaction ID)기반 = 복제 Cluster를 구성하는 여러 DB간에 소스 DB의 변경사항에 대한 이벤트를 고유 식별자로 식별한다.

 

Binary Log의 데이터 포맷

  • Statement 방식 = 실행된 SQL을 기록한다. Row 방식보다 상대적으로 처리 속도가 느리고 덜 안정적이다.
  • Row 방식 = 변경된 데이터를 기록한다. InnoDB Binary Log의 기본 데이터 포맷. 많은 데이터가 변경 될 경우에 용량이 커질 수 있다.

 

Row 포맷의 유형 (binlog_row_image)

  • full = 변경이 발생한 레코드의 모든 컬럼의 변경 전/ 후 값을 binary log에 기록(insert는 변경 후만, update는 변경 전/ 후, delete는 변경 전만)
  • minimal = 변경이 적용되어 필요한 컬럼만 기록한다. Insert는 SQL에 명시된 칼럼값만 기록한다. Update는 변경전 PK와 SQL에 명시된 칼럼값만 기록한다. Delete는 변경전 PK만 기록한다.
  • noblob = Full과 동일하지만 blob , text 칼럼의 변경 값은 기록하지 않는다.

Debezium Source Connector 기능 유의 사항

  • Debezium Source Connector는 Source데이터에서 Kafka 까지만 연계된다. Kafka에서 타겟(Sink) 연계는 JDBC Sink Connector로 수행되어야 한다.
  • 소스 DB의 DDL 변경을 타겟db에서 정확히 반영하기 어렵다
  • 타겟 db로의 데이터 입력이 JDBC SInk Connector의 기능에 제약된다.
  • 소스 데이터의 Truncate table, Drop table이 타겟(Sink)에 적용되지 않는다. (version2.1 에서 truncate table 반영 예정이다)

2가지 조건이 충족되었는지 확인한다.


Debezium Parameters

  • 하나의 Source Connector로 여러 개의 Source 테이블 데이터를 유연하게 추출할 수 있으며 하나의 Source 테이블은 하나의 Topic으로 생성된다.
  • Source 테이블의 PK는 자동으로 Kafka 토픽의 Key값으로 생성된다.
  • 토픽명은 기본적으로 database server name.database명.table명으로 생성된다. SMT를 통해 변경이 가능하다.
  • Source 테이블의 DDL 변경 사항을 kafka 토픽으로 저장 할 수 있다.
  • 기본 Topic 메시지는 JDBC Source Connector와 다르게 구성되어 있으며 (before/ after 값 구성) JDBC Sink Connector에서 인식하고 데이터를 입력하기 위해서는 SMT를 이용해서 메시지 재 변경이 필요하다.
  • delete를 위한 tombstone 메시지를 생성하기 위해 SMT를 적용해야 한다.
  • Date, datetime, timezone 등의 일자 / 시간관련 타입, numeric ,decimal 등의 precision / scale 관련 타입은 JDBC Sink Connector가 호환될 수 있는 타입으로 만들어져야 한다.

 

{
    "name": "mysql_cdc_oc_source_01",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "192.168.14.143",
        "database.port": "3316",
        "database.user": "root",
        "database.password": "1234",
        "database.server.id": "10001",
        "database.server.name": "mysql01",
        "database.include.list": "oc",
        "table.include.list": "oc.customers, oc.products, oc.orders, oc.order_items", 
        "schema.history.internal.kafka.bootstrap.servers": "localhost:9092",
        "schema.history.internal.kafka.topic": "schema-changes.mysql.oc",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",

        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false",
				"topic.prefix":"cdc01",
				"database.connectionTimeZone": "Asia/Seoul"
    }
}
  • database.server.id = Mysql 접속시 Connector가 가지는 고유ID. 여러개의 CDC Connector들은 반드시 고유한 ID를 가져야한다.
  • database.server.name = Mysql 접속시 Connector가 가지는 명칭. 해당 값으로 접두어를 가지는 카프카 토픽명이 생성됨
  • database.include.list = 데이터를 추출할 DB들이다. 설정하지 않으면 모든 DB의 정보를 추출하게 되고 여러개의 DB들을 설정하 떄는 콤마로 분리한다.
  • table.include.list = 데이터를 추출할 테이블들이다. 설정하지 않으면 database.include.list에 있는 DB들의 모든 테이블을 추출한다.
  • poll.interval.ms = Connector가 새로운 이벤트 발생을 대기하는 시간이다. 기본은 1초다.
  • database.history.kafka.bootstrap.servers
    • database.history~  는 schema.history로 바뀌었다. 2.0 이상부터
  • tombstones.on.delete = Delete event 발생시 tombstone event 발생여부. 기본은 true 이고 Sink 쪽에서 Delete 수행하기 위해서 반드시 true로 설정되어야 한다.
  • transforms = After message 만 생성하기 위한 SMT 적용을 해야한다.
  • transforms.unwrap.type = After message만 생성하기위해서 SMT 적용할 때 쓰이는 클래스이다.
  • transforms.unwrap.drop.tombstones = After Message 생성 시 tombstone 메시지의 경우 삭제 여부를 의미한다. Sink쪽에서 Delete 수행하기 위해서 반드시 false가 되어야한다.
  • time.precision.mode = Debezium의 time 관련 precisio은 kafka connect 지원 기본 레벨로 변경한다. Micro seconds 단위를 milli seconds 단위로 변환한다. 기본은 adaptive_time_microseconds'
  • database.connectionTimeZone = Timestamp with timezone 컬럼에 대해서 UTC 변환 참조 설정이다.

 

https://www.inflearn.com/questions/869101/%EC%88%98%EC%97%85%EC%A7%88%EB%AC%B8-mysql-cdc-oc-source-test01-json-%EC%97%85%EB%A1%9C%EB%93%9C-%EC%97%90%EB%9F%AC?commentId=337622#869101

 

[수업질문] mysql_cdc_oc_source_test01.json 업로드 에러 - 인프런

안녕하세요. 'Debezium Source Connector 생성하기'수강중에 진행이 막혀 질문드립니다.config를 register_connector로 등록할 때 발생한 이슈인데해당 이슈 : { 'error_code': 400, 'message': 'Co...

www.inflearn.com

register할 때 문제가 발생하는데 위에 내가 써논 예시처럼 하면 문제가 안된다.

 

'Cloud > kafka-connect' 카테고리의 다른 글

Debezium MySQL CDC Source Connector - 3  (0) 2024.07.29
Debezium MySQL CDC Source Connector - 2  (0) 2024.07.25
JDBC Sink Connector  (0) 2024.07.20
JDBC Source Connector  (0) 2024.07.16
Kafka Connect 개요  (0) 2024.07.11