Migrazione da Confluent Schema Registry a Big Data Service Schema Registry
Big Data Service Schema Registry offre due diverse versioni delle API, V1 e V2.
Queste interfacce API vengono utilizzate per gestire i metadati e le versioni dello schema e il relativo ciclo di vita.
In Big Data Service gli ID della versione dello schema vengono generati automaticamente. Pertanto, anche se i testi dello schema rimangono gli stessi in vari schemi, gli ID generati sono sempre diversi.
esempio:
{
"id": 1,
"name": "sample-topic-1",
"schemaText": "{\"type\":\"record\",\"name\":\"record\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}",
}
{
"id": 2,
"name": "sample-topic-2",
"schemaText": "{\"type\":\"record\",\"name\":\"record\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}",
}
schemaText è lo stesso, ma gli ID sono diversi. Tuttavia, quando si esegue la migrazione da Confluent Schema Registry, gli schemaText duplicati hanno lo stesso ID.
Per risolvere questo problema, sono disponibili due API V2 per conservare lo stesso ID se gli schemi schemaText sono uguali.
Utilizzo di SERDE
SERDE utilizza le seguenti API V2 internamente.
KafkaAvroSerializerV2 and KafkaAvroDeserializerV2
-
Per utilizzare queste classi SERDE,
Creare un file di configurazione JAAS con il client del registro:
RegistryClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/path/to/registry/keytab" storeKey=true useTicketCache=false principal="principal@realm"; };
- Nel codice Java, aggiungere quanto segue alla configurazione del producer Kafka:
import com.hortonworks.registries.schemaregistry.v2.serdes.avro.kafka.KafkaAvroSerializerV2; .. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.setProperty("schema.registry.url", "http://<host name>:9093/api/v2"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializerV2.class.getName());
Uso dei client del registro schema
Esempio di utilizzo delle API V2 mediante i client Schema Registry nel codice JAVA per aggiungere la nuova versione dello schema:
System.setProperty("java.security.auth.login.config","/path/to/registryClient_jaas.conf");
Map<String, Object> config = new HashMap<>();
config.put("schema.registry.url", "http://<registry host name>:9093/api/v2");
String topicName = "my-topic";
SchemaRegistryClient schemaRegistryClient = new SchemaRegistryClient(config);
try {
SchemaMetadata schemaMetadata = new SchemaMetadata.Builder(topicName)
.type(AvroSchemaProvider.TYPE)
.schemaGroup("sample-group")
.description("Sample schema")
.compatibility(SchemaCompatibility.BACKWARD)
.build();
SchemaIdVersion v1 = schemaRegistryClient.addSchemaVersionV2(schemaMetadata, new SchemaVersion(schemaMetadata, "Initial version of the schema"));
LOG.info("Registered schema [{}] and returned version [{}]", schemaMetadata, v1);
} catch (SchemaNotFoundException e) {
LOG.info("Schema addition failed for topic {}", topicName);
throw e;
}}