Migrating from Confluent Schema Registry to Big Data Service Schema Registry
Big Data Service Schema Registry offers two different versions of APIs, V1 and V2.
These APIs are used for managing the Schema Metadata and Schema Versions and their lifecycle.
In Big Data Service, schema version IDs are auto generated. Therefore, even if the schema texts remains the same across various schemas, the generated IDs are always different.
Example:
{
"id": 1,
"name": "sample-topic-1",
"schemaText": "{\"type\":\"record\",\"name\":\"record\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}",
}
{
"id": 2,
"name": "sample-topic-2",
"schemaText": "{\"type\":\"record\",\"name\":\"record\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}",
}
The schemaText
is the same but the IDs are different. However, when you migrate from Confluent Schema Registry, the duplicate schemaText
s have the same ID.
To overcome this problem, two V2 APIs are available to preserve the same ID if the schemaText
s are same across schemas.
Using SERDEs
SERDEs uses the following V2 APIs internally.
KafkaAvroSerializerV2 and KafkaAvroDeserializerV2
-
To use these SERDE classes,
Create a JAAS configuration file with Registry Client:
RegistryClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/path/to/registry/keytab" storeKey=true useTicketCache=false principal="principal@realm"; };
- In Java code, add the following to the Kafka Producer configuration:
import com.hortonworks.registries.schemaregistry.v2.serdes.avro.kafka.KafkaAvroSerializerV2; .. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.setProperty("schema.registry.url", "http://<host name>:9093/api/v2"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializerV2.class.getName());
Using Schema Registry Clients
Sample usage of V2 APIs using Schema Registry clients in JAVA code to add new schema version:
System.setProperty("java.security.auth.login.config","/path/to/registryClient_jaas.conf");
Map<String, Object> config = new HashMap<>();
config.put("schema.registry.url", "http://<registry host name>:9093/api/v2");
String topicName = "my-topic";
SchemaRegistryClient schemaRegistryClient = new SchemaRegistryClient(config);
try {
SchemaMetadata schemaMetadata = new SchemaMetadata.Builder(topicName)
.type(AvroSchemaProvider.TYPE)
.schemaGroup("sample-group")
.description("Sample schema")
.compatibility(SchemaCompatibility.BACKWARD)
.build();
SchemaIdVersion v1 = schemaRegistryClient.addSchemaVersionV2(schemaMetadata, new SchemaVersion(schemaMetadata, "Initial version of the schema"));
LOG.info("Registered schema [{}] and returned version [{}]", schemaMetadata, v1);
} catch (SchemaNotFoundException e) {
LOG.info("Schema addition failed for topic {}", topicName);
throw e;
}}