CDC (Change Data Capture)
- CDC는 RDBMS와 NoSQL등의 Data Store시스템의 변경 데이터를 내부적으로 기록하고 있는 Redo Log/Bin Log/ WAL 등의 내부 트랜잭션 로그파일에서 (redo log같은) 변경데이타를 Capture하는 소프트웨어를 지칭한다.
- DB 내부 트랜잭션 로그 파일에서 변경 데이터를 추출하므로 소스 DB 성능에 큰 영향 없이 대용량의 변경 데이터를 매우 빠르게 추출하고 준 실시간으로 타겟 DB에 연동한다.
- DBMS 복제 (replication) 수준의 안정적이고 정확한 데이터 추출이 가능하다.
- 성능과 안정성이 중요한 많은 중요 DBMS시스템에서 CDC를 활용한다.
DB 복제 vs CDC 비교
데이터 복제는 DB의 가용성을 극대화 하기 위해서 Primary(Master) 와 StandBy(Salve) 간의 데이터를 완벽하게 동기화 하는데 초점을 맞추기 때문에 아키텍쳐 유연성이 상대적으로 떨어진다(primary와 Standby가 db버전, os구성이 거의동일)
CDC는 DB버전이나 OS환경에 대한 구성 제약이 상대적으로 덜 엄격하며, 보다 다양한 비즈니스 활용이 가능하다.
성능이 훨씬 좋은 정보계 서버에서 뭔가 실시간 분석을 돌리고 싶다.
MySQL 복제
- MySQL 복제는 Binary Log를 기반으로 구현된다.
- Master의 Binary Log를 Slave로 전송하고, 전송된 Binary Log를 SQL로 적용하여 복제한다.
- 복제 Cluster 내의 Master와 Slave DB는 고유의 server_id 설정 값을 가진다.
MySQL BinLog 파일 위치기반 복제 + GTID 기반 복제
- BinaryLog 파일위치기반 = 소스 DB의 변경 사항에 대한 이벤트를 Binary Log 파일명과 파일내의 위치값을 조합하여 식별한다.
- GTID(Global Transaction ID)기반 = 복제 Cluster를 구성하는 여러 DB간에 소스 DB의 변경사항에 대한 이벤트를 고유 식별자로 식별한다.
Binary Log의 데이터 포맷
- Statement 방식 = 실행된 SQL을 기록한다. Row 방식보다 상대적으로 처리 속도가 느리고 덜 안정적이다.
- Row 방식 = 변경된 데이터를 기록한다. InnoDB Binary Log의 기본 데이터 포맷. 많은 데이터가 변경 될 경우에 용량이 커질 수 있다.
Row 포맷의 유형 (binlog_row_image)
- full = 변경이 발생한 레코드의 모든 컬럼의 변경 전/ 후 값을 binary log에 기록(insert는 변경 후만, update는 변경 전/ 후, delete는 변경 전만)
- minimal = 변경이 적용되어 필요한 컬럼만 기록한다. Insert는 SQL에 명시된 칼럼값만 기록한다. Update는 변경전 PK와 SQL에 명시된 칼럼값만 기록한다. Delete는 변경전 PK만 기록한다.
- noblob = Full과 동일하지만 blob , text 칼럼의 변경 값은 기록하지 않는다.
Debezium Source Connector 기능 유의 사항
- Debezium Source Connector는 Source데이터에서 Kafka 까지만 연계된다. Kafka에서 타겟(Sink) 연계는 JDBC Sink Connector로 수행되어야 한다.
- 소스 DB의 DDL 변경을 타겟db에서 정확히 반영하기 어렵다
- 타겟 db로의 데이터 입력이 JDBC SInk Connector의 기능에 제약된다.
- 소스 데이터의 Truncate table, Drop table이 타겟(Sink)에 적용되지 않는다. (version2.1 에서 truncate table 반영 예정이다)
2가지 조건이 충족되었는지 확인한다.
Debezium Parameters
- 하나의 Source Connector로 여러 개의 Source 테이블 데이터를 유연하게 추출할 수 있으며 하나의 Source 테이블은 하나의 Topic으로 생성된다.
- Source 테이블의 PK는 자동으로 Kafka 토픽의 Key값으로 생성된다.
- 토픽명은 기본적으로 database server name.database명.table명으로 생성된다. SMT를 통해 변경이 가능하다.
- Source 테이블의 DDL 변경 사항을 kafka 토픽으로 저장 할 수 있다.
- 기본 Topic 메시지는 JDBC Source Connector와 다르게 구성되어 있으며 (before/ after 값 구성) JDBC Sink Connector에서 인식하고 데이터를 입력하기 위해서는 SMT를 이용해서 메시지 재 변경이 필요하다.
- delete를 위한 tombstone 메시지를 생성하기 위해 SMT를 적용해야 한다.
- Date, datetime, timezone 등의 일자 / 시간관련 타입, numeric ,decimal 등의 precision / scale 관련 타입은 JDBC Sink Connector가 호환될 수 있는 타입으로 만들어져야 한다.
{
"name": "mysql_cdc_oc_source_01",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "192.168.14.143",
"database.port": "3316",
"database.user": "root",
"database.password": "1234",
"database.server.id": "10001",
"database.server.name": "mysql01",
"database.include.list": "oc",
"table.include.list": "oc.customers, oc.products, oc.orders, oc.order_items",
"schema.history.internal.kafka.bootstrap.servers": "localhost:9092",
"schema.history.internal.kafka.topic": "schema-changes.mysql.oc",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"topic.prefix":"cdc01",
"database.connectionTimeZone": "Asia/Seoul"
}
}
- database.server.id = Mysql 접속시 Connector가 가지는 고유ID. 여러개의 CDC Connector들은 반드시 고유한 ID를 가져야한다.
- database.server.name = Mysql 접속시 Connector가 가지는 명칭. 해당 값으로 접두어를 가지는 카프카 토픽명이 생성됨
- database.include.list = 데이터를 추출할 DB들이다. 설정하지 않으면 모든 DB의 정보를 추출하게 되고 여러개의 DB들을 설정하 떄는 콤마로 분리한다.
- table.include.list = 데이터를 추출할 테이블들이다. 설정하지 않으면 database.include.list에 있는 DB들의 모든 테이블을 추출한다.
- poll.interval.ms = Connector가 새로운 이벤트 발생을 대기하는 시간이다. 기본은 1초다.
- database.history.kafka.bootstrap.servers
- database.history~ 는 schema.history로 바뀌었다. 2.0 이상부터
- tombstones.on.delete = Delete event 발생시 tombstone event 발생여부. 기본은 true 이고 Sink 쪽에서 Delete 수행하기 위해서 반드시 true로 설정되어야 한다.
- transforms = After message 만 생성하기 위한 SMT 적용을 해야한다.
- transforms.unwrap.type = After message만 생성하기위해서 SMT 적용할 때 쓰이는 클래스이다.
- transforms.unwrap.drop.tombstones = After Message 생성 시 tombstone 메시지의 경우 삭제 여부를 의미한다. Sink쪽에서 Delete 수행하기 위해서 반드시 false가 되어야한다.
- time.precision.mode = Debezium의 time 관련 precisio은 kafka connect 지원 기본 레벨로 변경한다. Micro seconds 단위를 milli seconds 단위로 변환한다. 기본은 adaptive_time_microseconds'
- database.connectionTimeZone = Timestamp with timezone 컬럼에 대해서 UTC 변환 참조 설정이다.
register할 때 문제가 발생하는데 위에 내가 써논 예시처럼 하면 문제가 안된다.
'Cloud > kafka-connect' 카테고리의 다른 글
Debezium MySQL CDC Source Connector - 3 (0) | 2024.07.29 |
---|---|
Debezium MySQL CDC Source Connector - 2 (0) | 2024.07.25 |
JDBC Sink Connector (0) | 2024.07.20 |
JDBC Source Connector (0) | 2024.07.16 |
Kafka Connect 개요 (0) | 2024.07.11 |