Migración de Confluent Schema Registry a Big Data Service Schema Registry

El registro de esquema de Big Data Service ofrece dos versiones diferentes de API, V1 y V2.

Estas API se utilizan para gestionar los metadatos de esquema y las versiones de esquema, así como su ciclo de vida.

En Big Data Service, los ID de versión de esquema se generan automáticamente. Por lo tanto, incluso si los textos del esquema siguen siendo los mismos en varios esquemas, los ID generados siempre son diferentes.

Ejemplo:

{ 
 "id": 1,
  "name": "sample-topic-1",
  "schemaText": "{\"type\":\"record\",\"name\":\"record\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}",
}

{
  "id": 2,
  "name": "sample-topic-2",
  "schemaText": "{\"type\":\"record\",\"name\":\"record\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}",
}

schemaText es el mismo, pero los ID son diferentes. Sin embargo, al migrar desde Confluent Schema Registry, los schemaText duplicados tienen el mismo ID.

Para solucionar este problema, hay dos API V2 disponibles para conservar el mismo ID si las schemaText son iguales en los esquemas.

Uso de SERDE

Los SERDE utilizan internamente las siguientes API V2.

KafkaAvroSerializerV2 and KafkaAvroDeserializerV2
  • Para utilizar estas clases SERDE,

    Cree un archivo de configuración de JAAS con el cliente de registro:

    RegistryClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true
            keyTab="/path/to/registry/keytab" storeKey=true useTicketCache=false
            principal="principal@realm"; };
  • En código Java, agregue lo siguiente a la configuración de Kafka Producer:
    import com.hortonworks.registries.schemaregistry.v2.serdes.avro.kafka.KafkaAvroSerializerV2;
    ..
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.setProperty("schema.registry.url", "http://<host name>:9093/api/v2");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializerV2.class.getName());

Uso de Clientes de Registro de Esquema

Ejemplo de uso de API de V2 mediante clientes de registro de esquema en código JAVA para agregar una nueva versión de esquema:

System.setProperty("java.security.auth.login.config","/path/to/registryClient_jaas.conf");
Map<String, Object> config = new HashMap<>();
config.put("schema.registry.url", "http://<registry host name>:9093/api/v2");
String topicName = "my-topic";
SchemaRegistryClient schemaRegistryClient = new SchemaRegistryClient(config);
 
try {
    SchemaMetadata schemaMetadata = new SchemaMetadata.Builder(topicName)
        .type(AvroSchemaProvider.TYPE)
        .schemaGroup("sample-group")
        .description("Sample schema")
        .compatibility(SchemaCompatibility.BACKWARD)
        .build();
    SchemaIdVersion v1 = schemaRegistryClient.addSchemaVersionV2(schemaMetadata, new SchemaVersion(schemaMetadata, "Initial version of the schema"));
    LOG.info("Registered schema [{}] and returned version [{}]", schemaMetadata, v1);
} catch (SchemaNotFoundException e) {
    LOG.info("Schema addition failed for topic {}", topicName);
    throw e;
}}