Cloud/kafka-connect

Schema Registry

Tony Lim 2024. 8. 1. 09:59

Schema Registry

Confluent Kafka는 Schema Registry를 통해 Schema 정보를 별도로 관리하는 기능을 제공한다.

토픽으로 전송되는 Data의 Schema는 Schema Registry에서 ID + Version 별로 중앙 관리되므로 레코드 별로 Schema를 중복해서 전송할 필요가 없다.

 

Converter 지원 포맷

  • Json/Avro 포맷의 경우 Schema와 payload로 구성된다.
  • Schema는 해당 레코드의 schema구성을, payload는 해당 레코드의 값을 가진다.
  • Connector 별로 Json 포맷은 조금씩 다를 수 있지만 전반적으로 대부분 비슷하다
  • Json Schema 의 경우 레코드 별로 Schema를 가지고 있으므로 메시지 용량이 커진다. 이의 개선을 위해 Avro Format과 Schema Registry를 이용하여 Schema 정보의 중복 생성 제거를 한다.

 

avro 개요 및 장점

  • 데이터 직렬화를 수행하는 시스템
    • 보다 컴팩트하고 빠른 binary 데이터 포맷을 제공한다
  • 스키마를 제공
    • Json 포맷으로 쉽게 스키마 메시지를 정의한다.
    • 다양한 데이터 타입을 제공(String ,bytes, int , long ,float, double, boolean 뿐만 아니라 enum, arrays, maps등의 보다 복잡한 데이터 타입도 지원)
    • 하지만 기본적으로 하나의 메시지마다 별도의 스키마를 가지고 있는 구조이다.
  • 메시지를 자바 객체로 쉽게 만들어 줄 수 있다.

 

Avro 메시지를 kafka utility로 보내고 읽기

  • Avro 메시지는 avro 이진 파일 형태로 변환된후 직렬화 되므로 별도의 직렬화 Class인 AvroSerializer를 이용해야 함
  • kafka-avro-console-producer, kafka-avro-console-consumer 유틸리티를 이용하면 손쉽게 avro 메시지를 kafka 토픽 메시지로 전송하고 읽을 수 있다.
  • kafka-avro-console-producer, kafka-avro-console-consumer는 모두 schema registry기반에서 동작한다.

 

vi registry_start.sh
$CONFLUENT_HOME/bin/schema-registry-start $CONFLUENT_HOME/etc/schema-registry/schema-registry.properties

schema registry를 먼저기동한후에 

kafka-avro-console-producer  --broker-list localhost:9092 --topic avro_test \
--property value.schema='{
	"type": "record", 
	"name": "customer_short",
  "fields": [
      {"name": "customer_id", "type": "int" },
      {"name": "customer_name", "type": "string"}
  ]
}' \
--property schema.registry.url=http://localhost:8081

{"customer_id": 1, "customer_name" : "myname"} 이런식으로 스키마에 맞게 메시지를 작성하면 producer가 보내준다.

kacc avro_test --property schema.registry.url=http://localhost:8081 --from-beginning 

avro consumer로 읽어야 제대로 메시지가 파싱되서 읽혀진다.


Schema Registry 기반의 Connector 설정

{
    "name": "mysql_cdc_oc_source_avro_01",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "192.168.14.143",
        "database.port": "3316",
        "database.user": "root",
        "database.password": "1234",
        "database.server.id": "20000",
        "database.server.name": "mysqlavro",
        "database.include.list": "oc",
        "table.include.list": "oc.customers, oc.products, oc.orders, oc.order_items, oc.orders_datetime_tab",
        "schema.history.internal.kafka.bootstrap.servers": "localhost:9092",
        "schema.history.internal.kafka.topic": "schema-changes.mysql.oc",
	"database.allowPublicKeyRetrieval": "true",

        "time.precision.mode": "connect",
        "database.connectionTimezone": "Asia/Seoul",

        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://localhost:8081",
        "value.converter.schema.registry.url": "http://localhost:8081",

        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false",
				"topic.prefix":"mysqlavro",
				"database.connectionTimeZone": "Asia/Seoul"

    }
}

등록한 table.include.list에서 다 읽어오게 된다.

추가된것은 key.converter, value.converter , key.converter.schema.registry.url , value.converter.schema.registry.url

 

Schema registry 등록 주요 정보

Schema registry는 주요 정보를 Kafka의 내부 토픽인 _schemas에 저장하며 주요 요소는 아래와 같다.

subject

  • Schema Registry에 등록된 토픽별(또는 토픽 레코드 유형별 ) 기준 정보
  • Subject 내의 스키마 호환성 체크
  • Version은 subject내에서 변경한다.
  • Schema가 진화할 때 동일한 subject에서 새로운 schema id와 version을 가진다.

schemas = schema 정보

config = 전역 또는 subject 별 스키마 호환성 관련 정보 저장

 

Rest API 를 통한 Schema Registry관리

  • GET, POST, PUT, DELETE Method 기반 rest api 를 통해 schema registry 관리를 수행한다.
  • Subject 및 Config의 정보 추출 및 주요 속성에 대한 수정/ 변경 / 삭제 가능하다
  • 가급적 Delete는 자제해야한다. Delete수행시 Soft Delete 이후 다시 Hard Delete를 수행해야 완전 삭제할 수 있다.
  • Config는 Subject 별로 별도 설정하지 않으면 전역 config를 그대로 따르게 된다.

스키마 레지스티리 기반의 JDBC Sink Connector 설정

{
    "name": "mysql_jdbc_oc_sink_customers_avro_01",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "mysqlavro.oc.customers",
        "connection.url": "jdbc:mysql://192.168.14.143:3316/oc_sink",
        "connection.user": "root",
        "connection.password": "1234",
        "table.name.format": "oc_sink.customers_sink",
        "insert.mode": "upsert",
        "pk.fields": "customer_id",
        "pk.mode": "record_key",
        "delete.enabled": "true",

        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://localhost:8081",
        "value.converter.schema.registry.url": "http://localhost:8081"

    }
}

sink connector도 마찬가지로 아래 4 row가 추가되었다.

 

Avro의 읽기 스키마 와 쓰기 스키마

  • 쓰기 스키마 = 앱이 데이터 전송시 데이터를 부호화 하기 위해 사용하는 스키마
  • 읽기 스키마 = 앱이 데이터를 복호화화여 읽어 들일 시 사용하는 스키마

avro는 record별로 스키마가 존재한다.  쓰기랑 읽기랑 다를 이유가있나?

여러 producer(app)이 있을 수 있고 여러 consumer(수신 app)이 있을 수있다.

producer가 중구난방으로 schema를 변경하면 consumer schema도 계속 변해야하는데  그러면 consumer앱도 변경해야하고 여러가지로 번거롭다.

 

Avro의 읽기 스키마와 쓰기 스키마 호환성

  • avro는 읽기 스키마와 쓰기 스키마가 동일하지 않아도 되며 호환 가능하기만 하면 된다.
  • 데이터를 복호화 할때 (읽을때) avro는 쓰기 스키마와 읽기 스키마의 차이가 호환이 가능하다면 읽어 들일 때 그 차이를 해소 시켜서 읽어들인다.
  • 속성의 순서가 달라도 서로 상관 없다.
  • 쓰기 스키마에는 있지만 읽기 스키마에 없는 속성은 읽어들일때 해당 속성을 무시한다.
  • 쓰기 스키마에는 없지만 읽기 스키마에는 있는 속성은 읽어 들일 때 읽기 스키마에 선언된 기본값으로 읽어들인다.

 

Avro 스키마 호환성 - 하위 호환성

하위 호환성 = 새로운 버전의 읽기 스키마는 예전 버전의 쓰기 스키마를 처리 할 수 있다.

case1 = 에전 버전에서 속성이 추가된 새로운 버전의 읽기 스키마는 기본값으로 해당 속성을 읽는다.
따라서 읽기 스키마에 새로운 버전으로 컬럼이 추가될 경우 반드시 기본값 설정이 필요하다.

case2 = 예전 버전에서 속성이 삭제된 새로운 버전의 읽기 스키마는 해당 속성을 무시한다.

 

Avro 스키마 호환성 - 상위 호환성

상위 호환성 = 예전 버전의 읽기 스키마는 새로운 버전의 쓰기 스키마를 처리 할 수 있다.

case1 = 새로운 버전 쓰기 스키마에 신규속성이 추가될 경우에 예전 버전의 읽기 스키마는 해당 속성을 무시한다.

case2 = 새로운 버전의 스기 스키마에 기존 속성이 삭제될 때 예전 버전의 읽기 스키마는 해당 속성의 기본값으로 읽어 들인다.
따라서 새로운 버전에서 삭제되는 기존 속성은 반드시 예전 버전에서 기본값을 가지고 있어야 한다.

 

avro 스키마 호환성 체크

  • 새로 추가되는 컬럼이 기본 값을 가지고 있지 않거나 (하위 호환성 오류) , 삭제되는 기존 칼럼이 기본 값을 가지고 있지 않는 경우에 (상위 호환성 오류) 호환성 오류가 발생한다.
  • 기존 컬럼 타입 변경, 기존 컬럼명 변경은 스키마 호환성 체크가 제대로 되지 않는다.

 


Schema Registrty의 Subject

  • Schema Registry에 등록된 토픽별 (또는 토픽 레코드 유형별) 기준 정보 (스키마 namespace)
  • Subject내의 스키마 호환성(compatiblity) 체크
  • Version은 subject내에서 변경
  • Schema가 진화할 때 동일한 subject에서 새로운 Schema id와 version을 가진다.

 

Schema Registry Subject의 Naming 유형

TopicNameStrategy

  • Subject 이름을 정하는 기본설정
  • 개별 토픽별로 토픽 Key/토픽 Value에 따라 스키마를 등록한다.
  • 토픽내의 메시지 레코드가 동일한 스키마를 따르도록 설정한다.
  • 스키마 호환성이 설정될 경우에 해당 호환성에 적합하지 않은 스키마를 가지는 메시지는 토픽에 저장되지 않는다.

RecordNameStrategy

  • 특정 레코드 타입별로 스키마를 따르도록 설정한다.
  • 하나의 토픽내에 여러 타입의 이벤트가 있을 경우 이들 이벤트 유형별로 스키마를 지정할 때 사용한다.
  • Kafka 전체에서 특정 레코트 타입별로 스키마를 설정하므로 서로 다른 토픽에서 동일한 레코드 타입들이 있을 경우에도 동일 스키마가 적용 된다.

TopicRecordNameStrategy

  • RecordNameStrategy와 유사하지만 토픽내의 특정 레코드 타입별로 스키마를 따르도록 설정한다.
  • 개별 토픽의 특정 레코드 타입별로 스키마를 설정하므로 서로 다른 토픽에서 동일한 레코드 타입들이 있을 경우에는 서로 다른 스키마가 적용 된다.

 

V1에서 V2로 스키마가 변경이되면 subject별로 호환성체크를 한 이후에  id =3으로 저장되고 version이 변경된다.


Schema Registry에서 스키마 발전과 호환성

  • 업무가 계속 변함에 따라 스키마도 변경된다.
  • Schema Registry는 Subject단위로 , 변경된 스키마의 버전과 호환성을 관리한다.
  • Schema Registry에 Subject 별로 호환성이 설정되어 있다면 해당 호환성에 맞지 않는 스키마 변경은 허용되지 않는다.

  • Schema registry를 이용한 Producer와 Consumer App은 별도의 스케마를 개별적으로 가지고 있는다.
  • 스키마가 변경됨에 따라 Producer 또는 Consumer가 가지고 있는 스키마를 개별적으로 update할 수 있다.
  • schema registry에 스키마에 대해 호환성이 설정되면 해당 호환성에 부합하는 스키마 변경이 되었을 때만 Producer와 Consumer App은 스키마 update가 가능하다

  

CDC/JDBC Source Connector 에서 Schema Registry 호환성 유형별 제약

  • 컬럼 추가시 Default가 없을 지라도 null 값을 default 스키마 값으로 설정함으로 backward 호환성 문제가 없다.
  • Source 테이블의 컬럼 추가는 스키마 호환성이 통과되면 auto.evolve=true이면 자동으로 Sink 테이블에도 컬럼이 추가된다. 단 Not Null 은 Null로 , Varchar는 Text로 적용되는 현상은 그대로 유지된다.
  • Source 테이블의 컬럼 삭제가 Schema Registry에 전송 되지 않지만 Backward 호환성 문제 없이 데이터가 전송된다.
  • 스키마 호환성에 문제가 없다는 의미는 Producer와 Consumer가 스키마 호환성 오류 없이 통과되었다는 의미이다.
    auto.evolve=true 일지라도 target db에 자동으로 컬럼 삭제는 되지 않는다.

컬럼 타입 변경은 어떤 유형의 호환성 체크도 통과하지 못하고 Connector 오류가 발생한다.

https://www.inflearn.com/community/questions/739031/%EC%8A%A4%ED%82%A4%EB%A7%88-%ED%98%B8%ED%99%98%EC%84%B1%EA%B3%BC-%EC%8A%A4%ED%82%A4%EB%A7%88-%EC%97%85%EB%8D%B0%EC%9D%B4%ED%8A%B8-%EA%B6%81%EA%B8%88%ED%95%9C-%EC%A0%90%EC%9D%B4-%EC%9E%88%EC%8A%B5%EB%8B%88%EB%8B%A4#662682

 

스키마 호환성과 스키마 업데이트 궁금한 점이 있습니다. - 인프런 | 커뮤니티 질문&답변

누구나 함께하는 인프런 커뮤니티. 모르면 묻고, 해답을 찾아보세요.

www.inflearn.com


CDC / JDBC Connector에서 스키마 호환성이 필요한가?

  • 전송 메시지의 스키마는 소스쪽 스키마와 동일하다
  • CDC/JDBC Soruce Connector 전송 메시지 스키마는 DB 스키마와 동일하며 업무적인 스키마 적합성은 이미 DB 레벨에서 체크된다.
  • CDC/JDBC Source Connector는 변경되는 스키마를 재기동 없이 동적으로 바로 적용하여 메시지를 전송한다.
  • DB 컬럼 타입 변경등은 스키마 호환성 제약 때문에 전송 자체가 안되므로 CDC/JDBC Soruce Connector가 Schema Registry를 적용하여 스키마를 전송시에는 스키마 호환성을 None으로 설정하는것이 바람직하다