Intégrer le registre de schémas à Apache Kafka

Le registre de schémas stocke un historique avec contrôle des versions de tous les schémas en fonction d'un nom de sujet spécifié, fournit de nombreux paramètres de compatibilité et permet l'évolution du schéma au fil du temps. Le registre de schémas vit en dehors et séparément des courtiers Kafka.

Le registre de schémas comporte trois composants principaux :

  • Serveur Web de registre : Application Web exposant les points d'extrémité REST utilisés pour gérer les entités de schéma. Utilisez un proxy Web et un équilibreur de charge avec de nombreux serveurs Web pour fournir la haute disponibilité et l'évolutivité. Pour activer la haute disponibilité du registre de schémas, voir Haute disponibilité du registre de schémas.
  • Stockage enfichable/Stockage des métadonnées du schéma : Magasin relationnel contenant les métadonnées des entités du schéma. Le stockage en mémoire et les bases de données MySQL sont pris en charge.
  • Stockage Serdes : Stockage de fichiers pour le sérialiseur et les pots de désérialiseur. Les systèmes de fichiers locaux et le stockage HDFS sont pris en charge. Le stockage du système de fichiers local est la valeur par défaut.

Les principaux composants du registre de schéma sont les suivants :

  • Serveur Web de registre : Application Web exposant les points d'extrémité REST utilisés pour gérer les entités de schéma. Utilisez un proxy Web et un équilibreur de charge avec de nombreux serveurs Web pour assurer la haute disponibilité et l'évolutivité. Voir Haute disponibilité du registre de schéma.
  • Stockage des métadonnées du schéma du stockage enfichable : Magasin relationnel contenant les métadonnées des entités du schéma. Le stockage en mémoire et les bases de données MySQL sont pris en charge.
  • Stockage Serdes : Stockage de fichiers pour le sérialiseur et les pots de désérialiseur. Les systèmes de fichiers locaux et le stockage HDFS sont pris en charge. Le stockage du système de fichiers local est la valeur par défaut.

Interaction avec le registre de schémas

Utilisez les clients du registre de schémas pour qu'une application interagisse directement avec le serveur du registre de schémas pour accéder au schéma ou le modifier.
Note

La configuration JAAS n'est requise que pour une grappe activée pour Kerberos (grappe hautement disponible).
  1. Créez un fichier de configuration JAAS avec Registry Client :
    RegistryClient {
      com.sun.security.auth.module.Krb5LoginModule required
      useKeyTab=true
      keyTab="/path/to/registry/keytab"
      storeKey=true
      useTicketCache=false
      principal="principal@realm";
    };
  2. Dans le code Java, ajoutez ce qui suit à la configuration du fournisseur Kafka :
    import com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer;
    ..
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.setProperty("schema.registry.url", "http://<host name>:9093/api/v1");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
  3. Créez le client du registre de schémas en définissant la variable env avec les configurations de client comme suit :
    System.setProperty("java.security.auth.login.config","/path/to/registryClient_jaas.conf");
    
    Map<String, Object> config = new HashMap<>();
    config.put("schema.registry.url", "http://<registry host name>:9093/api/v1");
    
    String topicName = "my-topic";
    SchemaRegistryClient schemaRegistryClient = new SchemaRegistryClient(config);
    
    try {
        SchemaMetadata schemaMetadata = new SchemaMetadata.Builder(topicName)
            .type(AvroSchemaProvider.TYPE)
            .schemaGroup("sample-group")
            .description("Sample schema")
            .compatibility(SchemaCompatibility.BACKWARD)
            .build();
        SchemaIdVersion v1 = schemaRegistryClient.addSchemaVersion(schemaMetadata, new SchemaVersion(schemaMetadata, "Initial version of the schema"));
        LOG.info("Registered schema [{}] and returned version [{}]", schemaMetadata, v1);
    } catch (SchemaNotFoundException e) {
        LOG.info("Schema addition failed for topic {}", topicName);
        throw e;
    }}