Integrazione del registro degli schemi con Apache Kafka

Schema Registry memorizza una cronologia con controllo delle versioni di tutti gli schemi in base a un nome oggetto specificato, fornisce molte impostazioni di compatibilità e consente l'evoluzione dello schema nel tempo. Schema Registry vive al di fuori e separatamente dai broker Kafka.

Schema Registry include tre componenti principali:

  • Web server del registro: applicazione Web che espone gli endpoint REST utilizzati per gestire le entità dello schema. Utilizza un proxy Web e un load balancer con molti server Web per fornire HA e scalabilità. Per abilitare Schema Registry HA, vedere Schema Registry HA.
  • Memorizzazione metadati di memorizzazione/schema collegabile: area di memorizzazione relazionale che contiene i metadati per le entità dello schema. Sono supportati lo storage in memoria e i database MySQL.
  • Memoria Serdes: Memoria di file per i vasi serializzatori e deserializzatori. Sono supportati i file system locali e lo storage HDFS. L'impostazione predefinita è lo storage del file system locale.

I componenti principali di Schema Registry includono:

  • Web server del registro: applicazione Web che espone gli endpoint REST utilizzati per gestire le entità dello schema. Utilizza un proxy Web e un load balancer con molti server Web per fornire HA e scalabilità. Vedere Schema Registry HA.
  • Memorizzazione metadati dello schema di memorizzazione collegabile: area di memorizzazione relazionale che contiene i metadati per le entità dello schema. Sono supportati lo storage in memoria e i database MySQL.
  • Memoria Serdes: Memoria di file per i vasi serializzatori e deserializzatori. Sono supportati i file system locali e lo storage HDFS. L'impostazione predefinita è lo storage del file system locale.

Interazione con il registro degli schemi

Utilizzare i client Schema Registry per un'applicazione per interagire direttamente con il server Schema Registry per accedere o modificare lo schema.
Nota

L'impostazione della configurazione JAAS è necessaria solo per un cluster HA (cluster abilitato per Kerberos).
  1. Creare un file di configurazione JAAS con il client del registro:
    RegistryClient {
      com.sun.security.auth.module.Krb5LoginModule required
      useKeyTab=true
      keyTab="/path/to/registry/keytab"
      storeKey=true
      useTicketCache=false
      principal="principal@realm";
    };
  2. Nel codice Java, aggiungere quanto segue alla configurazione del producer Kafka:
    import com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer;
    ..
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.setProperty("schema.registry.url", "http://<host name>:9093/api/v1");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
  3. Creare il client Schema Registry impostando la variabile di ambiente con le configurazioni client come indicato di seguito.
    System.setProperty("java.security.auth.login.config","/path/to/registryClient_jaas.conf");
    
    Map<String, Object> config = new HashMap<>();
    config.put("schema.registry.url", "http://<registry host name>:9093/api/v1");
    
    String topicName = "my-topic";
    SchemaRegistryClient schemaRegistryClient = new SchemaRegistryClient(config);
    
    try {
        SchemaMetadata schemaMetadata = new SchemaMetadata.Builder(topicName)
            .type(AvroSchemaProvider.TYPE)
            .schemaGroup("sample-group")
            .description("Sample schema")
            .compatibility(SchemaCompatibility.BACKWARD)
            .build();
        SchemaIdVersion v1 = schemaRegistryClient.addSchemaVersion(schemaMetadata, new SchemaVersion(schemaMetadata, "Initial version of the schema"));
        LOG.info("Registered schema [{}] and returned version [{}]", schemaMetadata, v1);
    } catch (SchemaNotFoundException e) {
        LOG.info("Schema addition failed for topic {}", topicName);
        throw e;
    }}