Cloud/kafka-connect

Debezium PostgreSQL CDC Source Connector

Tony Lim 2024. 8. 5. 10:58

PostgreSQL Replication 

 

  • postgres는 WAL(write ahead log) 파일 자체를 넘기는다.
  • publication에 등록된 테이블은 복제대상 테이블로 선정된 테이블이다.
  • WAL 을 SQL로 변경하기쉬운 형태로 decoder가 전송한다. 
  • Subscribe db 는 변경된 사항을 DML로 table에 적용한다.

 

Dbezium PostgreSQL Source Connector

여러 plugin을 지원하는데 알아야할것은 pgoutput 뿐이다.

publication.name = Replication 적용할 Publication 이름이다.
기본명은 dbz_publication이며 지정된 명칭의 publication이 db에 없으면 publication.autocrate.mode에 따라 새롭게 생성하거나 오류를 발생시킨다.

publication.autocreate.mdoe = 지정된 publication 이 DB내에 없을 때 신규 생성 또는 오류가 발생한다.
all_tables는 DB내 모든 테이블을 다 등록한다.
disable은 오류를 발생시키고 filtered는 debezium include list에 따라 테이블을 등록한다.

 

confluent hub에서 cdc postgresql 을 다운받아서 lib안에 있는 jar들을 conenctor_plugin으로 옮긴후에 Connect를 재기동한후에 http call을 이용해서 정상적으로 기동이 되었는지 확인해본다.

{
    "name": "postgres_cdc_oc_source_01",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "192.168.14.143",
        "database.port": "15432",
        "database.user": "tony",
        "database.password": "1234",
        "database.dbname": "oc",
        "database.server.name": "pg01",
				"topic.prefix":"pg01",

        "plugin.name": "pgoutput",
        "slot.name": "debezium_01",

        "schema.include_list": "public",
        "table.include.list": "public.customers, public.products, public.orders, public.order_items",

        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter", 
        
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false"

    }
}

connector를 등록한후에 oc db table에 insert를 하나넣으면 pg01.public.customers topic에 추가된것을 consumer로 확인해본다.


PostgreSQL Publication 생성 및 적용

{
    "name": "postgres_cdc_oc_source_02",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "192.168.14.143",
        "database.port": "15432",
        "database.user": "tony",
        "database.password": "1234",
        "database.dbname": "oc",
        "topic.prefix": "pg02",


        "plugin.name": "pgoutput",
        "slot.name": "debezium_01",
        "publication.name": "pub_filtered",
        "publication.autocreate.mode": "filtered", 

        "schema.include_list": "public",
        "table.include.list": "public.customers, public.products, public.orders, public.order_items",

        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter", 
        
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false"

    }
}

slot 하나를 debezium_01로 추가한다.

select * from pg_catalog.pg_replication_slots;

으로 생성된것을 확인할 수 있다.

SELECT pg_drop_replication_slot('debezium_01');

connector를 내리고 실행해야 정상동작한다.