카프카 Connect는 Kafka 메시지 시스템(Broker/Producer/Consumer) 를 기반으로 다양한 데이터 소스 시스템
(예: RDBMS)에서 발생한 데이터 이벤트를 다른 데이터 타켓 시스템으로 별도의 Kafka Client 코딩 없이 , Seamless하게 실시간으로 전달하기 위해서 만들어진 Kafka Component
Connector = 여러 datasource 의 sink ,source 역할을 맡는다.
Transformation = SMT(Single Message Transformation)
Converter = Connector에서 보내는 message를 formatting을 해주는 역할이다.
Connector가 source의 table을 변경을 계속 감지하고 있다가 insert된 row를 가져오고 transforms을 거친후에 특정 format으로 kafka cluster로 전송하게 된다.
- Connect는 Connector를기동시키기 위한 Framework를 갖춘 JVM Process Model. Connect Process를 Worker Process로 지칭할 뿐이다.
- Connect는 서로 다른 여러개의 connector instance들을 자신의 Framework 내부로 로딩하고 호출/수행 시킬 수 있다.
- Connector Instance의 실제 수행은 Thread 레벨로 수행되며 이를 Task 라고 함. Connector가 병렬 Thread 수행이 가능할 경우 여러 개의 Task Thread들로 해당 Connector를 수행할 수 있다.
- Connect는 Connect cluster로 구성될 수 있으며, 1개의 노드에서 여러 개의 Worker Process를 또는 여러 개의 노드에서 여러 개의 Worker Process 들로 Connect Cluster를 구성할 수 있음
- Connect 유형은 standalone과 Distributed mode로 나뉜다. 단일 Worker Process 로만 Connect Cluster 구성이 가능할 경우 standalone mode, 여러 Worker process들로 구성이 가능할 경우 Distributed Mode
Sooldir Source Connector
Kafka Connect 에 새로운 Connector 생성 순서 및 유의사항
- Connector를 다운로드 받음. 하나의 Connector는 보통 여러개의 jar library들로 구성된다.
- 여러개의 jar library로 구성된 하나의 Connector를 plugin.path로 지정된 디렉토리에 별도의 서브 디렉토리로 만들어서 jar library들을 이동 시켜야한다.
- Kafka Connect는 기동시에 plugin.path로 지정된 디렉토리 밑의 서브 디렉토리들에 위치한 모든 jar 파일을 로딩한다.
- 따라서 신규로 Connector를 Connect로 올릴 시에는 반드시 Connect를 재기동 해야 반영이 된다.
- Connector는 Connector 명, Connector 클래스명, Connector 고유의 환경 설정등을 rest api를 통해서 kafka connect 에 전달하여 새롭게 생성된다.
- rest api 에서 성공 Response(HTTP 201) 이 반환이 되더라도 Kakfa Connect log메시지를 반드시 확인하여 제대로 Connector가 동작하는지 확인이 필요하다.
https://www.confluent.io/hub/jcustenborder/kafka-connect-spooldir
에서 필요한 jar들을 다운로드 받는다.
$CONFLUENT_HOME/etc/kafka/에 있는 connect-distributed.properties 에서
plugin.path=/home/tony/kafka/connector_plugins 을 지정 해준다음에
해당 경로에 spooldir_source directory를 만든후에 안에다 다운로드 받은 jar들을 옮겨주고 zookpeer, kafka , connect-distributed 를 기동시킨다.
curl -X GET -H "Content-Type: application/json" http://localhost:8083/connector-plugins
http get 요청을 통해 제대로 로딩이 되었는지 확인을 한다.
이제connect 에 connector를 등록 시킬 차례이다.
{
"name": "csv_spooldir_source",
"config": {
"tasks.max": "3",
"connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
"input.path": "/home/tony/kafka/spool_test_dir",
"input.file.pattern": "^.*\\.csv",
"error.path": "/home/tony/kafka/spool_test_dir/error",
"finished.path": "/home/tony/kafka/spool_test_dir/finished",
"empty.poll.wait.ms": 30000,
"halt.on.error": "false",
"topic": "spooldir-test-topic",
"csv.first.row.as.header": "true",
"schema.generation.enabled": "true"
}
}
input, error, finished path에 맞는 경로를 만들어주고 input path에는 test용 csv파일을 옮겨놓는다.
정상적으로 processing이 끝나면 input path에서 csv 파일이 사라지고 finished path로 csv 파일이 옮겨진다.
curl -X POST -H "Content-Type: application/json" http://localhost:8083/connectors --data @spooldir_source.json
등록을 한다.
정상적으로 돌고 있는것을 확인한다.
spooldir-test-topic도 생성되었는지 확인한다.
kafka console consumer 로 확인을 한다.
{
"schema": {
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"field": "id"
},
{
"type": "string",
"optional": true,
"field": "first_name"
},
{
"type": "string",
"optional": true,
"field": "last_name"
},
{
"type": "string",
"optional": true,
"field": "email"
},
{
"type": "string",
"optional": true,
"field": "gender"
},
{
"type": "string",
"optional": true,
"field": "ip_address"
},
{
"type": "string",
"optional": true,
"field": "last_login"
},
{
"type": "string",
"optional": true,
"field": "account_balance"
},
{
"type": "string",
"optional": true,
"field": "country"
},
{
"type": "string",
"optional": true,
"field": "favorite_color"
}
],
"optional": false,
"name": "com.github.jcustenborder.kafka.connect.model.Value"
},
"payload": {
"id": "991",
"first_name": "Dorolisa",
"last_name": "Taig",
"email": "dtaigri@imgur.com",
"gender": "Female",
"ip_address": "45.138.124.65",
"last_login": "2014-10-19T16:29:19Z",
"account_balance": "15788.64",
"country": "CN",
"favorite_color": "#0386fa"
}
}
csv가 formatting을 거쳐서 kafka topic에는 특정 format으로 저장이 된것을 확인 할 수 있다.
Connect는 Connector를 기동시키기 위한 Framework를 갖춘 jvm process model.
connect porcess를 worker process라고 지칭한다.
Connect는 서로 다른 여러 개의 Connector instance 들을 자신의 Framework 내부로 로딩하고 호출/수행 시킬수 있음
Connector Instance의 실제 수행은 Thread 레벨로 수행되며 이를 Task라고 한다.
Connector가 병렬 Thread수행이 가능할 경우 여러개의 Task Thread들로 해당 Connector를 수행할 수 있음
Connect는 Connector의 메시지 처리 offset 및 Connector 별 config 와 상태 정보를 내부토픽에 저장한다.
connect-offsets
- Source Connector 별로 메시지를 전송한 offset정보를 가지고 있음
- 한번 전송한 메시지를 중복 전송하지 않기 위함
- 기본 25개의 parittion으로 구성된다.
connect-configs
- Connector의 config 정보를 가진다. Connect 재 기동시 설정된 Connector를 기동한다.
- connect-status
- Connector의 상태정보를 가진다. Connect 재 기동시 설정된 Connector를 기동한다.
__consumer_offsets
- Consumer가 읽어들인 메시지의 offset 정보를 가지고 있다.
- Sink Connector가 읽어들인 메시지 offset정보를 가진다.
- 한번 읽어 들인 메시지를 중복해서 읽지 않기 위함이다.
- 기본으로 50개의 partition을 구성된다.
Sing Message Transform 특징
- Source 시스템의 메시지를 kafka로 전송전에 변환하거나 kafka에서 sink 시스템으로 데이터를 입력하기 전에 변환할 떄 적용된다.
- Connect는 Connector와 Config만으로 소통하므로 메시지 변환도 Config에 SMT를 기술하여 적용해야한다.
- SMT가 제공하는 변환 기능은 SMT를 구현한 Java 클래스를 기반으로 제공되며, Kafka Connect는 기본적인 SMT클래스를 가지고 있다.
- Connect에서 기본제공하는 SMT 클래스외에 3rd party에서 별도의 SMT를 Plugin 형태로 제공할 수도 있다.
- SMT 변환은 Chain 형태로 연속해서 적용할 수 있다.
- 복잡한 데이터 변환은 한계가 있다.
'Cloud > kafka-connect' 카테고리의 다른 글
Debezium MySQL CDC Source Connector - 3 (0) | 2024.07.29 |
---|---|
Debezium MySQL CDC Source Connector - 2 (0) | 2024.07.25 |
Debezium MySQL CDC Source Connector - 1 (0) | 2024.07.22 |
JDBC Sink Connector (0) | 2024.07.20 |
JDBC Source Connector (0) | 2024.07.16 |