Debezium PostgreSQL CDC Source Connector
PostgreSQL Replication
- postgres는 WAL(write ahead log) 파일 자체를 넘기는다.
- publication에 등록된 테이블은 복제대상 테이블로 선정된 테이블이다.
- WAL 을 SQL로 변경하기쉬운 형태로 decoder가 전송한다.
- Subscribe db 는 변경된 사항을 DML로 table에 적용한다.
Dbezium PostgreSQL Source Connector
여러 plugin을 지원하는데 알아야할것은 pgoutput 뿐이다.
publication.name = Replication 적용할 Publication 이름이다.
기본명은 dbz_publication이며 지정된 명칭의 publication이 db에 없으면 publication.autocrate.mode에 따라 새롭게 생성하거나 오류를 발생시킨다.
publication.autocreate.mdoe = 지정된 publication 이 DB내에 없을 때 신규 생성 또는 오류가 발생한다.
all_tables는 DB내 모든 테이블을 다 등록한다.
disable은 오류를 발생시키고 filtered는 debezium include list에 따라 테이블을 등록한다.
confluent hub에서 cdc postgresql 을 다운받아서 lib안에 있는 jar들을 conenctor_plugin으로 옮긴후에 Connect를 재기동한후에 http call을 이용해서 정상적으로 기동이 되었는지 확인해본다.
{
"name": "postgres_cdc_oc_source_01",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "192.168.14.143",
"database.port": "15432",
"database.user": "tony",
"database.password": "1234",
"database.dbname": "oc",
"database.server.name": "pg01",
"topic.prefix":"pg01",
"plugin.name": "pgoutput",
"slot.name": "debezium_01",
"schema.include_list": "public",
"table.include.list": "public.customers, public.products, public.orders, public.order_items",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false"
}
}
connector를 등록한후에 oc db table에 insert를 하나넣으면 pg01.public.customers topic에 추가된것을 consumer로 확인해본다.
PostgreSQL Publication 생성 및 적용
{
"name": "postgres_cdc_oc_source_02",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "192.168.14.143",
"database.port": "15432",
"database.user": "tony",
"database.password": "1234",
"database.dbname": "oc",
"topic.prefix": "pg02",
"plugin.name": "pgoutput",
"slot.name": "debezium_01",
"publication.name": "pub_filtered",
"publication.autocreate.mode": "filtered",
"schema.include_list": "public",
"table.include.list": "public.customers, public.products, public.orders, public.order_items",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false"
}
}
slot 하나를 debezium_01로 추가한다.
select * from pg_catalog.pg_replication_slots;
으로 생성된것을 확인할 수 있다.
SELECT pg_drop_replication_slot('debezium_01');
connector를 내리고 실행해야 정상동작한다.