Cloud 71

Debezium PostgreSQL CDC Source Connector

PostgreSQL Replication  postgres는 WAL(write ahead log) 파일 자체를 넘기는다.publication에 등록된 테이블은 복제대상 테이블로 선정된 테이블이다.WAL 을 SQL로 변경하기쉬운 형태로 decoder가 전송한다. Subscribe db 는 변경된 사항을 DML로 table에 적용한다. Dbezium PostgreSQL Source Connector여러 plugin을 지원하는데 알아야할것은 pgoutput 뿐이다.publication.name = Replication 적용할 Publication 이름이다.기본명은 dbz_publication이며 지정된 명칭의 publication이 db에 없으면 publication.autocrate.mode에 따라 새롭게 ..

Cloud/kafka-connect 2024.08.05

Debezium Source MySQL에서 JDBC Sink PostgreSQL 연동

MySQL 과 PostgreSQL 간 Date와 Time 관련 데이터 타입 변환할때 주의할점들Mysql = datetime (timezone 정보가 없다.) timestamp (timezone 정보가 있다.)postgreSQL = Timestamp (timezone 정보가 없다. ) Timestamptz(timezone 정보가 있다.) datetime이 없다. MySQL의 date/time/datetime 변환을 위해서는 Debezium Mysql Source connector에서 time.precision.mode=connect로 반드시 설정해야한다.Debezium Mysql은 date, time ,datetime의 경우 int형으로 변환하지만 timestamp의 경우 문자열로 변환한다.Debezium..

Cloud/kafka-connect 2024.08.03

Schema Registry

Schema RegistryConfluent Kafka는 Schema Registry를 통해 Schema 정보를 별도로 관리하는 기능을 제공한다.토픽으로 전송되는 Data의 Schema는 Schema Registry에서 ID + Version 별로 중앙 관리되므로 레코드 별로 Schema를 중복해서 전송할 필요가 없다. Converter 지원 포맷Json/Avro 포맷의 경우 Schema와 payload로 구성된다.Schema는 해당 레코드의 schema구성을, payload는 해당 레코드의 값을 가진다.Connector 별로 Json 포맷은 조금씩 다를 수 있지만 전반적으로 대부분 비슷하다Json Schema 의 경우 레코드 별로 Schema를 가지고 있으므로 메시지 용량이 커진다. 이의 개선을 위해 ..

Cloud/kafka-connect 2024.08.01

Debezium MySQL CDC Source Connector - 2

Debezium SnapshotDebezium MySQL Connector가 최초 생성시 DB 스키마와 읽어들어야 할 테이블 데이터에 대한 초기 스냅샷을 수행한다.스냅샷은 Connector 생성 시점의 Binlog와 Binlog Position, DB와 읽어들일 테이블의 Schema(DDL) , 대상 테이블의 초기 레코드를 읽어서 카프카에 저장등의 작업을 수행하며 connect-offsets에 해당 connector명과 서버명으로 offset 저보가 없을 때 수행한다.inital = Binlog Position, Schema, 테이블의 초기 레코드를 카프카로 저장하는 작업을 수행한다.schema_only = 테이블의 초기 레코드를 카프카로 저장하지 않고 스키마 변경 정보만 기록한다. Connector 최..

Cloud/kafka-connect 2024.07.25

Debezium MySQL CDC Source Connector - 1

CDC (Change Data Capture)CDC는 RDBMS와 NoSQL등의 Data Store시스템의 변경 데이터를 내부적으로 기록하고 있는 Redo Log/Bin Log/ WAL 등의 내부 트랜잭션 로그파일에서 (redo log같은) 변경데이타를 Capture하는 소프트웨어를 지칭한다.DB 내부 트랜잭션 로그 파일에서 변경 데이터를 추출하므로 소스 DB 성능에 큰 영향 없이 대용량의 변경 데이터를 매우 빠르게 추출하고 준 실시간으로 타겟 DB에 연동한다.DBMS 복제 (replication) 수준의 안정적이고 정확한 데이터 추출이 가능하다.성능과 안정성이 중요한 많은 중요 DBMS시스템에서 CDC를 활용한다. DB 복제 vs CDC 비교데이터 복제는 DB의 가용성을 극대화 하기 위해서 Primar..

Cloud/kafka-connect 2024.07.22

JDBC Sink Connector

카프카 토픽에서 메시지를 읽어들여서 타겟 DB 로 데이터 입력/ 수정/ 삭제를 수행한다.Connect의 Consumer가 주기적으로 카프카 토픽 메시지를 읽어서 타겟 DB로 데이터 연동한다.RDBMS에서 데이터 추출은 JDBC Source Connector , CDC Sourec Connector등을 사용하지만 RDBMS로 데이터 입력은 주로 JDBC Sink Conenctor를 사용한다.JDBC Sink Connector 생성{ "name": "mysql_jdbc_sink_customers_00", "config": { "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", ..

Cloud/kafka-connect 2024.07.20

JDBC Source Connector

JDBC Source Connector는 RDBMS별 JDBC Driver가 별도로 설치되어야 한다. JDBC Source Connector는 JDBC Drvier를 이용하여 DB 접속 및 데이터 추출을 수행 후 Producer를 이용하여 Kafka Broker로 데이터를 전송한다.JDBC Source Connector는 Source 시스템에 주기적으로 Query를 날려서 변경된 데이터를 추출하는 방식이다.Consumer는 아니지만 Consumer처럼 주기적으로 변경된 데이터를 추출하고 변경사항을 kafka에게 전송한다. JDBC Source Connector 모드 유형Incremental query 모드: 이전 데이터 추출 이후 새롭게 생성된 데이터나 업데이트된 데이터를 Kafka로 전송한다.모드 별로..

Cloud/kafka-connect 2024.07.16

Kafka Connect 개요

카프카 Connect는 Kafka 메시지 시스템(Broker/Producer/Consumer) 를 기반으로 다양한 데이터 소스 시스템(예: RDBMS)에서 발생한 데이터 이벤트를 다른 데이터 타켓 시스템으로 별도의 Kafka Client 코딩 없이 , Seamless하게 실시간으로 전달하기 위해서 만들어진 Kafka ComponentConnector = 여러 datasource 의 sink ,source 역할을 맡는다.Transformation = SMT(Single Message Transformation)Converter = Connector에서 보내는 message를 formatting을 해주는 역할이다.Connector가 source의 table을 변경을 계속 감지하고 있다가 insert된 row..

Cloud/kafka-connect 2024.07.11

Topic Segment

Kafka log , Segment카프카의 로그 메시지는 실제로는 segment로 저장이 된다.파티션은 단순히 파일 디렉토리만 되어 있고, 해당 파티션 디렉토리에 메시지 저장 segment를 file로 가지고 있다.파티션은 여러개의 segment로 구성되며 개별 segment는 데이터 용량이 차거나 일정 시간이 경과하면 close되고 새로운 segment를 생성하여 데이터를 연속적으로 저장한다.segment는 close 되면 더 이상 브로커가 write하지 않으며 read-only가 된다. 브로커는 여러개의 segment중 단 하나의 active segment에만 write와 read를 수행한다. 하나의 파티션은 단 하나의 active segment를 가진다. log.segment.bytes (broke..

Cloud/kafka-core 2024.07.09