Schema Registry
Schema Registry
Confluent Kafka는 Schema Registry를 통해 Schema 정보를 별도로 관리하는 기능을 제공한다.
토픽으로 전송되는 Data의 Schema는 Schema Registry에서 ID + Version 별로 중앙 관리되므로 레코드 별로 Schema를 중복해서 전송할 필요가 없다.
Converter 지원 포맷
- Json/Avro 포맷의 경우 Schema와 payload로 구성된다.
- Schema는 해당 레코드의 schema구성을, payload는 해당 레코드의 값을 가진다.
- Connector 별로 Json 포맷은 조금씩 다를 수 있지만 전반적으로 대부분 비슷하다
- Json Schema 의 경우 레코드 별로 Schema를 가지고 있으므로 메시지 용량이 커진다. 이의 개선을 위해 Avro Format과 Schema Registry를 이용하여 Schema 정보의 중복 생성 제거를 한다.
avro 개요 및 장점
- 데이터 직렬화를 수행하는 시스템
- 보다 컴팩트하고 빠른 binary 데이터 포맷을 제공한다
- 스키마를 제공
- Json 포맷으로 쉽게 스키마 메시지를 정의한다.
- 다양한 데이터 타입을 제공(String ,bytes, int , long ,float, double, boolean 뿐만 아니라 enum, arrays, maps등의 보다 복잡한 데이터 타입도 지원)
- 하지만 기본적으로 하나의 메시지마다 별도의 스키마를 가지고 있는 구조이다.
- 메시지를 자바 객체로 쉽게 만들어 줄 수 있다.
Avro 메시지를 kafka utility로 보내고 읽기
- Avro 메시지는 avro 이진 파일 형태로 변환된후 직렬화 되므로 별도의 직렬화 Class인 AvroSerializer를 이용해야 함
- kafka-avro-console-producer, kafka-avro-console-consumer 유틸리티를 이용하면 손쉽게 avro 메시지를 kafka 토픽 메시지로 전송하고 읽을 수 있다.
- kafka-avro-console-producer, kafka-avro-console-consumer는 모두 schema registry기반에서 동작한다.
vi registry_start.sh
$CONFLUENT_HOME/bin/schema-registry-start $CONFLUENT_HOME/etc/schema-registry/schema-registry.properties
schema registry를 먼저기동한후에
kafka-avro-console-producer --broker-list localhost:9092 --topic avro_test \
--property value.schema='{
"type": "record",
"name": "customer_short",
"fields": [
{"name": "customer_id", "type": "int" },
{"name": "customer_name", "type": "string"}
]
}' \
--property schema.registry.url=http://localhost:8081
{"customer_id": 1, "customer_name" : "myname"} 이런식으로 스키마에 맞게 메시지를 작성하면 producer가 보내준다.
kacc avro_test --property schema.registry.url=http://localhost:8081 --from-beginning
avro consumer로 읽어야 제대로 메시지가 파싱되서 읽혀진다.
Schema Registry 기반의 Connector 설정
{
"name": "mysql_cdc_oc_source_avro_01",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "192.168.14.143",
"database.port": "3316",
"database.user": "root",
"database.password": "1234",
"database.server.id": "20000",
"database.server.name": "mysqlavro",
"database.include.list": "oc",
"table.include.list": "oc.customers, oc.products, oc.orders, oc.order_items, oc.orders_datetime_tab",
"schema.history.internal.kafka.bootstrap.servers": "localhost:9092",
"schema.history.internal.kafka.topic": "schema-changes.mysql.oc",
"database.allowPublicKeyRetrieval": "true",
"time.precision.mode": "connect",
"database.connectionTimezone": "Asia/Seoul",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter.schema.registry.url": "http://localhost:8081",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"topic.prefix":"mysqlavro",
"database.connectionTimeZone": "Asia/Seoul"
}
}
등록한 table.include.list에서 다 읽어오게 된다.
추가된것은 key.converter, value.converter , key.converter.schema.registry.url , value.converter.schema.registry.url
Schema registry 등록 주요 정보
Schema registry는 주요 정보를 Kafka의 내부 토픽인 _schemas에 저장하며 주요 요소는 아래와 같다.
subject
- Schema Registry에 등록된 토픽별(또는 토픽 레코드 유형별 ) 기준 정보
- Subject 내의 스키마 호환성 체크
- Version은 subject내에서 변경한다.
- Schema가 진화할 때 동일한 subject에서 새로운 schema id와 version을 가진다.
schemas = schema 정보
config = 전역 또는 subject 별 스키마 호환성 관련 정보 저장
Rest API 를 통한 Schema Registry관리
- GET, POST, PUT, DELETE Method 기반 rest api 를 통해 schema registry 관리를 수행한다.
- Subject 및 Config의 정보 추출 및 주요 속성에 대한 수정/ 변경 / 삭제 가능하다
- 가급적 Delete는 자제해야한다. Delete수행시 Soft Delete 이후 다시 Hard Delete를 수행해야 완전 삭제할 수 있다.
- Config는 Subject 별로 별도 설정하지 않으면 전역 config를 그대로 따르게 된다.
스키마 레지스티리 기반의 JDBC Sink Connector 설정
{
"name": "mysql_jdbc_oc_sink_customers_avro_01",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "mysqlavro.oc.customers",
"connection.url": "jdbc:mysql://192.168.14.143:3316/oc_sink",
"connection.user": "root",
"connection.password": "1234",
"table.name.format": "oc_sink.customers_sink",
"insert.mode": "upsert",
"pk.fields": "customer_id",
"pk.mode": "record_key",
"delete.enabled": "true",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter.schema.registry.url": "http://localhost:8081"
}
}
sink connector도 마찬가지로 아래 4 row가 추가되었다.
Avro의 읽기 스키마 와 쓰기 스키마
- 쓰기 스키마 = 앱이 데이터 전송시 데이터를 부호화 하기 위해 사용하는 스키마
- 읽기 스키마 = 앱이 데이터를 복호화화여 읽어 들일 시 사용하는 스키마
avro는 record별로 스키마가 존재한다. 쓰기랑 읽기랑 다를 이유가있나?
여러 producer(app)이 있을 수 있고 여러 consumer(수신 app)이 있을 수있다.
producer가 중구난방으로 schema를 변경하면 consumer schema도 계속 변해야하는데 그러면 consumer앱도 변경해야하고 여러가지로 번거롭다.
Avro의 읽기 스키마와 쓰기 스키마 호환성
- avro는 읽기 스키마와 쓰기 스키마가 동일하지 않아도 되며 호환 가능하기만 하면 된다.
- 데이터를 복호화 할때 (읽을때) avro는 쓰기 스키마와 읽기 스키마의 차이가 호환이 가능하다면 읽어 들일 때 그 차이를 해소 시켜서 읽어들인다.
- 속성의 순서가 달라도 서로 상관 없다.
- 쓰기 스키마에는 있지만 읽기 스키마에 없는 속성은 읽어들일때 해당 속성을 무시한다.
- 쓰기 스키마에는 없지만 읽기 스키마에는 있는 속성은 읽어 들일 때 읽기 스키마에 선언된 기본값으로 읽어들인다.
Avro 스키마 호환성 - 하위 호환성
하위 호환성 = 새로운 버전의 읽기 스키마는 예전 버전의 쓰기 스키마를 처리 할 수 있다.
case1 = 에전 버전에서 속성이 추가된 새로운 버전의 읽기 스키마는 기본값으로 해당 속성을 읽는다.
따라서 읽기 스키마에 새로운 버전으로 컬럼이 추가될 경우 반드시 기본값 설정이 필요하다.
case2 = 예전 버전에서 속성이 삭제된 새로운 버전의 읽기 스키마는 해당 속성을 무시한다.
Avro 스키마 호환성 - 상위 호환성
상위 호환성 = 예전 버전의 읽기 스키마는 새로운 버전의 쓰기 스키마를 처리 할 수 있다.
case1 = 새로운 버전 쓰기 스키마에 신규속성이 추가될 경우에 예전 버전의 읽기 스키마는 해당 속성을 무시한다.
case2 = 새로운 버전의 스기 스키마에 기존 속성이 삭제될 때 예전 버전의 읽기 스키마는 해당 속성의 기본값으로 읽어 들인다.
따라서 새로운 버전에서 삭제되는 기존 속성은 반드시 예전 버전에서 기본값을 가지고 있어야 한다.
avro 스키마 호환성 체크
- 새로 추가되는 컬럼이 기본 값을 가지고 있지 않거나 (하위 호환성 오류) , 삭제되는 기존 칼럼이 기본 값을 가지고 있지 않는 경우에 (상위 호환성 오류) 호환성 오류가 발생한다.
- 기존 컬럼 타입 변경, 기존 컬럼명 변경은 스키마 호환성 체크가 제대로 되지 않는다.
Schema Registrty의 Subject
- Schema Registry에 등록된 토픽별 (또는 토픽 레코드 유형별) 기준 정보 (스키마 namespace)
- Subject내의 스키마 호환성(compatiblity) 체크
- Version은 subject내에서 변경
- Schema가 진화할 때 동일한 subject에서 새로운 Schema id와 version을 가진다.
Schema Registry Subject의 Naming 유형
TopicNameStrategy
- Subject 이름을 정하는 기본설정
- 개별 토픽별로 토픽 Key/토픽 Value에 따라 스키마를 등록한다.
- 토픽내의 메시지 레코드가 동일한 스키마를 따르도록 설정한다.
- 스키마 호환성이 설정될 경우에 해당 호환성에 적합하지 않은 스키마를 가지는 메시지는 토픽에 저장되지 않는다.
RecordNameStrategy
- 특정 레코드 타입별로 스키마를 따르도록 설정한다.
- 하나의 토픽내에 여러 타입의 이벤트가 있을 경우 이들 이벤트 유형별로 스키마를 지정할 때 사용한다.
- Kafka 전체에서 특정 레코트 타입별로 스키마를 설정하므로 서로 다른 토픽에서 동일한 레코드 타입들이 있을 경우에도 동일 스키마가 적용 된다.
TopicRecordNameStrategy
- RecordNameStrategy와 유사하지만 토픽내의 특정 레코드 타입별로 스키마를 따르도록 설정한다.
- 개별 토픽의 특정 레코드 타입별로 스키마를 설정하므로 서로 다른 토픽에서 동일한 레코드 타입들이 있을 경우에는 서로 다른 스키마가 적용 된다.
V1에서 V2로 스키마가 변경이되면 subject별로 호환성체크를 한 이후에 id =3으로 저장되고 version이 변경된다.
Schema Registry에서 스키마 발전과 호환성
- 업무가 계속 변함에 따라 스키마도 변경된다.
- Schema Registry는 Subject단위로 , 변경된 스키마의 버전과 호환성을 관리한다.
- Schema Registry에 Subject 별로 호환성이 설정되어 있다면 해당 호환성에 맞지 않는 스키마 변경은 허용되지 않는다.
- Schema registry를 이용한 Producer와 Consumer App은 별도의 스케마를 개별적으로 가지고 있는다.
- 스키마가 변경됨에 따라 Producer 또는 Consumer가 가지고 있는 스키마를 개별적으로 update할 수 있다.
- schema registry에 스키마에 대해 호환성이 설정되면 해당 호환성에 부합하는 스키마 변경이 되었을 때만 Producer와 Consumer App은 스키마 update가 가능하다
CDC/JDBC Source Connector 에서 Schema Registry 호환성 유형별 제약
- 컬럼 추가시 Default가 없을 지라도 null 값을 default 스키마 값으로 설정함으로 backward 호환성 문제가 없다.
- Source 테이블의 컬럼 추가는 스키마 호환성이 통과되면 auto.evolve=true이면 자동으로 Sink 테이블에도 컬럼이 추가된다. 단 Not Null 은 Null로 , Varchar는 Text로 적용되는 현상은 그대로 유지된다.
- Source 테이블의 컬럼 삭제가 Schema Registry에 전송 되지 않지만 Backward 호환성 문제 없이 데이터가 전송된다.
- 스키마 호환성에 문제가 없다는 의미는 Producer와 Consumer가 스키마 호환성 오류 없이 통과되었다는 의미이다.
auto.evolve=true 일지라도 target db에 자동으로 컬럼 삭제는 되지 않는다.
컬럼 타입 변경은 어떤 유형의 호환성 체크도 통과하지 못하고 Connector 오류가 발생한다.
스키마 호환성과 스키마 업데이트 궁금한 점이 있습니다. - 인프런 | 커뮤니티 질문&답변
누구나 함께하는 인프런 커뮤니티. 모르면 묻고, 해답을 찾아보세요.
www.inflearn.com
CDC / JDBC Connector에서 스키마 호환성이 필요한가?
- 전송 메시지의 스키마는 소스쪽 스키마와 동일하다
- CDC/JDBC Soruce Connector 전송 메시지 스키마는 DB 스키마와 동일하며 업무적인 스키마 적합성은 이미 DB 레벨에서 체크된다.
- CDC/JDBC Source Connector는 변경되는 스키마를 재기동 없이 동적으로 바로 적용하여 메시지를 전송한다.
- DB 컬럼 타입 변경등은 스키마 호환성 제약 때문에 전송 자체가 안되므로 CDC/JDBC Soruce Connector가 Schema Registry를 적용하여 스키마를 전송시에는 스키마 호환성을 None으로 설정하는것이 바람직하다