Cloud/kafka-connect

Kafka Connect 개요

Tony Lim 2024. 7. 11. 10:57
728x90

카프카 Connect는 Kafka 메시지 시스템(Broker/Producer/Consumer) 를 기반으로 다양한 데이터 소스 시스템

(예: RDBMS)에서 발생한 데이터 이벤트를 다른 데이터 타켓 시스템으로 별도의 Kafka Client 코딩 없이 , Seamless하게 실시간으로 전달하기 위해서 만들어진 Kafka Component


Connector = 여러 datasource 의 sink ,source 역할을 맡는다.

Transformation = SMT(Single Message Transformation)

Converter = Connector에서 보내는 message를 formatting을 해주는 역할이다.

Connector가 source의 table을 변경을 계속 감지하고 있다가 insert된 row를 가져오고 transforms을 거친후에 특정 format으로 kafka cluster로 전송하게 된다.


  • Connect는 Connector를기동시키기 위한 Framework를 갖춘 JVM Process Model. Connect Process를 Worker Process로 지칭할 뿐이다.
  • Connect는 서로 다른 여러개의 connector instance들을 자신의 Framework 내부로 로딩하고 호출/수행 시킬 수 있다.
  • Connector Instance의 실제 수행은 Thread 레벨로 수행되며 이를 Task 라고 함. Connector가 병렬 Thread 수행이 가능할 경우 여러 개의 Task Thread들로 해당 Connector를 수행할 수 있다.
  • Connect는 Connect cluster로 구성될 수 있으며, 1개의 노드에서 여러 개의 Worker Process를 또는 여러 개의 노드에서 여러 개의 Worker Process 들로 Connect Cluster를 구성할 수 있음
  • Connect 유형은 standalone과 Distributed mode로 나뉜다. 단일 Worker Process 로만 Connect Cluster 구성이 가능할 경우 standalone mode, 여러 Worker process들로 구성이 가능할 경우 Distributed Mode

 


Sooldir Source Connector

 

Kafka Connect 에 새로운 Connector 생성 순서 및 유의사항

  • Connector를 다운로드 받음. 하나의 Connector는 보통 여러개의 jar library들로 구성된다.
  • 여러개의 jar library로 구성된 하나의 Connector를 plugin.path로 지정된 디렉토리에 별도의 서브 디렉토리로 만들어서 jar library들을 이동 시켜야한다.
  • Kafka Connect는 기동시에 plugin.path로 지정된 디렉토리 밑의 서브 디렉토리들에 위치한 모든 jar 파일을 로딩한다.
  • 따라서 신규로 Connector를 Connect로 올릴 시에는 반드시 Connect를 재기동 해야 반영이 된다.
  • Connector는 Connector 명, Connector 클래스명, Connector 고유의 환경 설정등을 rest api를 통해서 kafka connect 에 전달하여 새롭게 생성된다.
  • rest api 에서 성공 Response(HTTP 201) 이 반환이 되더라도 Kakfa Connect log메시지를 반드시 확인하여 제대로 Connector가 동작하는지 확인이 필요하다.

 

https://www.confluent.io/hub/jcustenborder/kafka-connect-spooldir

 

Spooldir Source Connector

Discover 200+ expert-built Apache Kafka connectors for seamless, real-time data streaming and integration. Connect with MongoDB, AWS S3, Snowflake, and more.

www.confluent.io

에서 필요한 jar들을 다운로드 받는다.

$CONFLUENT_HOME/etc/kafka/에 있는 connect-distributed.properties   에서

plugin.path=/home/tony/kafka/connector_plugins 을 지정 해준다음에

해당 경로에 spooldir_source directory를 만든후에 안에다 다운로드 받은 jar들을 옮겨주고 zookpeer, kafka , connect-distributed 를 기동시킨다.

curl -X GET -H "Content-Type: application/json" http://localhost:8083/connector-plugins

http get 요청을 통해 제대로 로딩이 되었는지 확인을 한다.

이제connect 에 connector를 등록 시킬 차례이다.

{
  "name": "csv_spooldir_source",
  "config": {
    "tasks.max": "3",
    "connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
    "input.path": "/home/tony/kafka/spool_test_dir",
    "input.file.pattern": "^.*\\.csv",
    "error.path": "/home/tony/kafka/spool_test_dir/error",
    "finished.path": "/home/tony/kafka/spool_test_dir/finished",
    "empty.poll.wait.ms": 30000,
    "halt.on.error": "false",
    "topic": "spooldir-test-topic",
    "csv.first.row.as.header": "true",
    "schema.generation.enabled": "true"
   }
}

input, error, finished path에 맞는 경로를 만들어주고 input path에는 test용 csv파일을 옮겨놓는다.

정상적으로 processing이 끝나면 input path에서 csv 파일이 사라지고 finished path로 csv 파일이 옮겨진다.

curl -X POST -H "Content-Type: application/json" http://localhost:8083/connectors --data @spooldir_source.json 

등록을 한다.

정상적으로 돌고 있는것을 확인한다.

spooldir-test-topic도 생성되었는지 확인한다.

kafka console consumer 로 확인을 한다. 

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "string",
        "optional": true,
        "field": "id"
      },
      {
        "type": "string",
        "optional": true,
        "field": "first_name"
      },
      {
        "type": "string",
        "optional": true,
        "field": "last_name"
      },
      {
        "type": "string",
        "optional": true,
        "field": "email"
      },
      {
        "type": "string",
        "optional": true,
        "field": "gender"
      },
      {
        "type": "string",
        "optional": true,
        "field": "ip_address"
      },
      {
        "type": "string",
        "optional": true,
        "field": "last_login"
      },
      {
        "type": "string",
        "optional": true,
        "field": "account_balance"
      },
      {
        "type": "string",
        "optional": true,
        "field": "country"
      },
      {
        "type": "string",
        "optional": true,
        "field": "favorite_color"
      }
    ],
    "optional": false,
    "name": "com.github.jcustenborder.kafka.connect.model.Value"
  },
  "payload": {
    "id": "991",
    "first_name": "Dorolisa",
    "last_name": "Taig",
    "email": "dtaigri@imgur.com",
    "gender": "Female",
    "ip_address": "45.138.124.65",
    "last_login": "2014-10-19T16:29:19Z",
    "account_balance": "15788.64",
    "country": "CN",
    "favorite_color": "#0386fa"
  }
}

csv가 formatting을 거쳐서 kafka topic에는 특정 format으로 저장이 된것을 확인 할 수 있다.


Connect는 Connector를 기동시키기 위한 Framework를 갖춘 jvm process model.
connect porcess를 worker process라고 지칭한다.

Connect는 서로 다른 여러 개의 Connector instance 들을 자신의 Framework 내부로 로딩하고 호출/수행 시킬수 있음

Connector Instance의 실제 수행은 Thread 레벨로 수행되며 이를 Task라고 한다. 
Connector가 병렬 Thread수행이 가능할 경우 여러개의 Task Thread들로 해당 Connector를 수행할 수 있음

 

Connect는 Connector의 메시지 처리 offset 및 Connector 별 config 와 상태 정보를 내부토픽에 저장한다.

connect-offsets 

  • Source Connector 별로 메시지를 전송한 offset정보를 가지고 있음
  • 한번 전송한 메시지를 중복 전송하지 않기 위함
  • 기본 25개의 parittion으로 구성된다.

connect-configs

  • Connector의 config 정보를 가진다. Connect 재 기동시 설정된 Connector를 기동한다.
  • connect-status
  • Connector의 상태정보를 가진다. Connect 재 기동시 설정된 Connector를 기동한다.

__consumer_offsets

  • Consumer가 읽어들인 메시지의 offset 정보를 가지고 있다.
  • Sink Connector가 읽어들인 메시지 offset정보를 가진다.
  • 한번 읽어 들인 메시지를 중복해서 읽지 않기 위함이다.
  • 기본으로 50개의 partition을 구성된다.

Sing Message Transform 특징

  • Source 시스템의 메시지를 kafka로 전송전에 변환하거나 kafka에서 sink 시스템으로 데이터를 입력하기 전에 변환할 떄 적용된다.
  • Connect는 Connector와 Config만으로 소통하므로 메시지 변환도 Config에 SMT를 기술하여 적용해야한다.
  • SMT가 제공하는 변환 기능은 SMT를 구현한 Java 클래스를 기반으로 제공되며, Kafka Connect는 기본적인 SMT클래스를 가지고 있다.
  • Connect에서 기본제공하는 SMT 클래스외에 3rd party에서 별도의 SMT를 Plugin 형태로 제공할 수도 있다.
  • SMT 변환은 Chain 형태로 연속해서 적용할 수 있다.
  • 복잡한 데이터 변환은 한계가 있다.

 

 

 

 

 

728x90

'Cloud > kafka-connect' 카테고리의 다른 글

Debezium MySQL CDC Source Connector - 3  (0) 2024.07.29
Debezium MySQL CDC Source Connector - 2  (0) 2024.07.25
Debezium MySQL CDC Source Connector - 1  (0) 2024.07.22
JDBC Sink Connector  (0) 2024.07.20
JDBC Source Connector  (0) 2024.07.16