Integración del registro de esquemas con Apache Kafka

El registro de esquemas almacena un historial versionado de todos los esquemas basado en un nombre de asunto especificado, proporciona muchos valores de compatibilidad y permite la evolución del esquema a lo largo del tiempo. El registro de esquema vive fuera de los brokers de Kafka y por separado de ellos.

El registro de esquema tiene tres componentes principales:

  • Servidor web de registro: aplicación web que muestra los puntos finales de REST utilizados para gestionar entidades de esquema. Utilice un proxy web y un equilibrador de carga con muchos servidores web para proporcionar alta disponibilidad y escalabilidad. Para activar la alta disponibilidad del registro de esquemas, consulte Schema Registry HA.
  • Almacenamiento conectable/Almacenamiento de metadatos del esquema: almacén relacional que contiene los metadatos de las entidades del esquema. Se admiten el almacenamiento en memoria y las bases de datos MySQL.
  • Almacenamiento de Serdes: almacenamiento de archivos para los tarros de serializador y deserializador. Se admiten sistemas de archivos locales y almacenamiento HDFS. El almacenamiento del sistema de archivos local es el valor por defecto.

Los principales componentes de Schema Registry incluyen:

  • Servidor web de registro: aplicación web que muestra los puntos finales de REST utilizados para gestionar entidades de esquema. Utilice un proxy web y un equilibrador de carga con muchos servidores web para proporcionar alta disponibilidad y escalabilidad. Consulte Schema Registry HA.
  • Almacenamiento de metadatos del esquema de almacenamiento conectable: almacén relacional que contiene los metadatos de las entidades del esquema. Se admiten el almacenamiento en memoria y las bases de datos MySQL.
  • Almacenamiento de Serdes: almacenamiento de archivos para los tarros de serializador y deserializador. Se admiten sistemas de archivos locales y almacenamiento HDFS. El almacenamiento del sistema de archivos local es el valor por defecto.

Interacción con el registro de esquemas

Utilice los clientes del registro de esquema para que una aplicación interactúe directamente con el servidor del registro de esquema para acceder al esquema o cambiarlo.
Nota

La configuración de JAAS es necesaria solo para un cluster activado para Kerberos (cluster de HA).
  1. Cree un archivo de configuración de JAAS con el cliente de registro:
    RegistryClient {
      com.sun.security.auth.module.Krb5LoginModule required
      useKeyTab=true
      keyTab="/path/to/registry/keytab"
      storeKey=true
      useTicketCache=false
      principal="principal@realm";
    };
  2. En código Java, agregue lo siguiente a la configuración de Kafka Producer:
    import com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer;
    ..
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.setProperty("schema.registry.url", "http://<host name>:9093/api/v1");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
  3. Cree el cliente de Schema Registry definiendo la variable de entorno con las configuraciones de cliente de la siguiente manera:
    System.setProperty("java.security.auth.login.config","/path/to/registryClient_jaas.conf");
    
    Map<String, Object> config = new HashMap<>();
    config.put("schema.registry.url", "http://<registry host name>:9093/api/v1");
    
    String topicName = "my-topic";
    SchemaRegistryClient schemaRegistryClient = new SchemaRegistryClient(config);
    
    try {
        SchemaMetadata schemaMetadata = new SchemaMetadata.Builder(topicName)
            .type(AvroSchemaProvider.TYPE)
            .schemaGroup("sample-group")
            .description("Sample schema")
            .compatibility(SchemaCompatibility.BACKWARD)
            .build();
        SchemaIdVersion v1 = schemaRegistryClient.addSchemaVersion(schemaMetadata, new SchemaVersion(schemaMetadata, "Initial version of the schema"));
        LOG.info("Registered schema [{}] and returned version [{}]", schemaMetadata, v1);
    } catch (SchemaNotFoundException e) {
        LOG.info("Schema addition failed for topic {}", topicName);
        throw e;
    }}