Schemaregistrierung in Apache Kafka integrieren

Schema Registry speichert eine versionierte Historie aller Schemas basierend auf einem angegebenen Subject-Namen, bietet viele Kompatibilitätseinstellungen und ermöglicht die Schemaentwicklung im Laufe der Zeit. Schema Registry lebt außerhalb und getrennt von den Kafka-Brokern.

Schema Registry enthält drei Hauptkomponenten:

  • Registry-Webserver: Webanwendung, die REST-Endpunkte zur Verwaltung von Schemaentitäten bereitstellt. Verwenden Sie einen Webproxy und einen Load Balancer mit vielen Webservern, um HA und Skalierbarkeit bereitzustellen. Informationen zum Aktivieren von Schema Registry HA finden Sie unter Schema Registry HA.
  • Pluggable storage/Schema Metadata Storage: Relationaler Speicher, der die Metadaten für die Schema-Entitys enthält. In-Memory-Speicher und MySQL-Datenbanken werden unterstützt.
  • Serdes Storage: Dateispeicher für die Serializer- und Deserializer-Gläser. Lokale Dateisysteme und HDFS-Speicher werden unterstützt. Der lokale Dateisystemspeicher ist der Standardwert.

Zu den Hauptkomponenten von Schema Registry gehören:

  • Registry-Webserver: Webanwendung, die REST-Endpunkte zur Verwaltung von Schemaentitys bereitstellt. Verwenden Sie einen Webproxy und Load Balancer mit vielen Webservern, um HA und Skalierbarkeit bereitzustellen. Siehe Schema Registry HA.
  • Speichern von Metadaten für integrierbaren Speicher: Relationaler Speicher, der die Metadaten für die Schemaentitys enthält. In-Memory-Speicher und MySQL-Datenbanken werden unterstützt.
  • Serdes Storage: Dateispeicher für die Serializer- und Deserializer-Gläser. Lokale Dateisysteme und HDFS-Speicher werden unterstützt. Der lokale Dateisystemspeicher ist der Standardwert.

Mit Schema-Registry interagieren

Verwenden Sie die Schema-Registry-Clients für eine Anwendung, um direkt mit dem Schema-Registry-Server zu interagieren und auf das Schema zuzugreifen oder es zu ändern.
Hinweis

Die Einstellung der JAAS-Konfiguration ist nur für ein Kerberos-fähiges Cluster (HA-Cluster) erforderlich.
  1. JAAS-Konfigurationsdatei mit Registry-Client erstellen:
    RegistryClient {
      com.sun.security.auth.module.Krb5LoginModule required
      useKeyTab=true
      keyTab="/path/to/registry/keytab"
      storeKey=true
      useTicketCache=false
      principal="principal@realm";
    };
  2. Fügen Sie im Java-Code der Kafka Producer-Konfiguration Folgendes hinzu:
    import com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer;
    ..
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.setProperty("schema.registry.url", "http://<host name>:9093/api/v1");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
  3. Erstellen Sie den Schemaregistrierungsclient, indem Sie die Umgebungsvariable wie folgt mit den Clientkonfigurationen festlegen:
    System.setProperty("java.security.auth.login.config","/path/to/registryClient_jaas.conf");
    
    Map<String, Object> config = new HashMap<>();
    config.put("schema.registry.url", "http://<registry host name>:9093/api/v1");
    
    String topicName = "my-topic";
    SchemaRegistryClient schemaRegistryClient = new SchemaRegistryClient(config);
    
    try {
        SchemaMetadata schemaMetadata = new SchemaMetadata.Builder(topicName)
            .type(AvroSchemaProvider.TYPE)
            .schemaGroup("sample-group")
            .description("Sample schema")
            .compatibility(SchemaCompatibility.BACKWARD)
            .build();
        SchemaIdVersion v1 = schemaRegistryClient.addSchemaVersion(schemaMetadata, new SchemaVersion(schemaMetadata, "Initial version of the schema"));
        LOG.info("Registered schema [{}] and returned version [{}]", schemaMetadata, v1);
    } catch (SchemaNotFoundException e) {
        LOG.info("Schema addition failed for topic {}", topicName);
        throw e;
    }}