Kafka-Clients

Kafka-Clients sind hauptsächlich Komponenten im Kafka-Ökosystem, die Daten in Kafka-Themen lesen und schreiben.

Hinweis

Beim Konfigurieren von Kafka-Clients kann das Modul com.sun.security.auth.module.JndiLoginModule nicht für die JAAS-Konfiguration von Servern und Clients verwendet werden. Es wird empfohlen, das Standardmodul com.sun.security.auth.module.Krb5LoginModule für die Authentifizierung mit Kerberos zu verwenden.

Häufige Befehle

Hinweis

  • Der Kafka-Service muss auf einem der Knoten installiert und gestartet werden.
  • Führen Sie zum Ausführen der allgemeinen Befehle eine SSH-Verbindung zum Knoten aus, auf dem der Kafka-Service installiert ist.
  • So erstellen Sie ein Thema zum Veröffentlichen von Ereignissen:
    1. Erstellen Sie eine Datei client.properties mit den folgenden Details, und erteilen Sie dem Kafka-Benutzer den entsprechenden Zugriff.
      sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true
      keyTab="/path/to/kafka.keytab" principal="kafka/<hostname>@REALM";
                                  
      security.protocol=SASL_PLAINTEXT
      sasl.kerberos.service.name=kafka
      bootstrap.servers=<broker_hostname>:6667
      compression.type=none
      sasl.mechanism=GSSAPI
                              
    2. Ausführen:
      sudo -u kafka sh /usr/odh/current/kafka-broker/bin/kafka-topics.sh --bootstrap-server <hostname>:6667 --create --topic Test_Topic --partitions 3 --replication-factor 1 --command-config /usr/odh/current/kafka-broker/config/client.properties
                                      
  • So veröffentlichen Sie Ereignisse in Themen:
    1. Erstellen Sie eine Datei producer.properties mit den folgenden Details, und erteilen Sie dem Kafka-Benutzer den entsprechenden Zugriff.
      sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required
      useKeyTab=true storeKey=true keyTab="/path/to/kafka.keytab" principal="kafka/<hostname>@REALM";
                                  
      security.protocol=SASL_PLAINTEXT
      sasl.kerberos.service.name=kafka
      bootstrap.servers=<broker_hostname>:6667
      compression.type=none
      sasl.mechanism=GSSAPI
    2. Ausführen:
      sudo -u kafka sh /usr/odh/current/kafka-broker/bin/kafka-console-producer.sh --broker-list <hostname>>:6667 --topic Test_Topic --producer.config /usr/odh/current/kafka-broker/config/producer.properties
                                      
  • So lesen Sie veröffentlichte Daten aus Themen:
    1. Erstellen Sie eine Datei consumer.properties mit den folgenden Details, und erteilen Sie dem Kafka-Benutzer den entsprechenden Zugriff.
      security.protocol=SASL_PLAINTEXT
      sasl.kerberos.service.name=kafka
      bootstrap.servers=<broker_hostname>>:6667
      compression.type=none
      sasl.mechanism=GSSAPI
      group.id=test-consumer-group   
      sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule
      required useKeyTab=true storeKey=true keyTab="/path/to/kafka.keytab" principal="kafka/<hostname>@REALM";
    2. Ausführen:
      sudo -u kafka sh /usr/odh/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server <hostname>:6667 --topic Test_Topic --consumer.config /usr/odh/current/kafka-broker/config/consumer.properties --from-beginning

Kafka-Produzenten

Kafka-Produzenten sind die Herausgeber, die für das Schreiben von Aufzeichnungen zu Themen verantwortlich sind. In der Regel bedeutet dies, ein Programm mit der API KafkaProducer zu schreiben.

Um einen Producer zu instanziieren, führen Sie Folgendes aus:

KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig);

Weitere Informationen zu den Producer-Einstellungen, die in diesem Konstruktor verwendet werden, finden Sie im Folgenden.

Serialisierung von Schlüsseln und Werten

Für jeden Producer müssen zwei Serialisierungseigenschaften festgelegt werden: key.serializer (für den Schlüssel) und value.serializer (für den Wert). Schreiben Sie benutzerdefinierten Code für die Serialisierung, oder verwenden Sie eine der von Kafka bereitgestellten Optionen. Beispiel:

  • ByteArraySerializer: Binärdaten
  • StringSerializer: Zeichenfolgendarstellungen

Bestätigungen

Der vollständige Schreibpfad für Datensätze von einem Producer ist für die Leader-Partition und dann für alle Follower-Replikate. Der Producer kann steuern, welcher Punkt im Pfad eine Bestätigung auslöst. Je nach Einstellung acks kann der Producer warten, bis der Schreibvorgang vollständig im System propagiert wird, oder nur auf den frühesten Erfolgspunkt warten.

Gültige acks-Werte sind:

  • 0: Warten Sie nicht auf eine Bestätigung von der Partition (schnellster Durchsatz).
  • 1: Warten Sie nur auf die Antwort der Leader-Partition.
  • all: Warten Sie, bis die Antworten der Followerpartitionen den Mindestdurchsatz (langsamster Durchsatz) erreichen.

Kafka-Verbraucher

Kafka-Consumer sind die Abonnenten, die für das Lesen von Datensätzen aus einem oder mehreren Themen und einer oder mehreren Partitionen eines Themas verantwortlich sind. Das Abonnieren eines Themas erfolgt manuell oder automatisch. In der Regel bedeutet dies, ein Programm mit der API KafkaConsumer zu schreiben.

Führen Sie folgenden Befehl aus, um einen Consumer zu erstellen:

KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(consumerConfig);

Die Klasse KafkaConsumer verfügt über zwei generische Typparameter. Da Producer Daten (die Werte) mit Schlüsseln senden können, kann der Consumer Daten nach Schlüsseln lesen. In diesem Beispiel sind sowohl die Schlüssel als auch die Werte Zeichenfolgen. Wenn Sie verschiedene Arten definieren, müssen Sie einen Deserialisierer für die alternativen Typen definieren. Für Deserialisierer müssen Sie die Schnittstelle org.apache.kafka.common.serialization.Deserializer implementieren.

Zu den wichtigsten anzugebenden Konfigurationsparametern gehören:

  • bootstrap.servers: Eine Liste der Broker, mit denen die erste Verbindung hergestellt werden soll. Führen Sie zwei bis drei Broker auf. Sie müssen das vollständige Cluster nicht auflisten.
  • group.id: Jeder Consumer gehört zu einer Gruppe, um die Partitionen eines Themas freizugeben.
  • key.deserializer/value.deserializer: Geben Sie an, wie die Java-Darstellung in eine Bytefolge konvertiert wird, um Daten über das Kafka-Protokoll zu senden.

Themen abonnieren

Themen mit dem Methodenaufruf subscribe() abonnieren:

kafkaConsumer.subscribe(Collections.singletonList(topic), rebalanceListener)

Geben Sie eine Liste der Themen an, aus denen konsumiert werden soll, und einen Rebalancing Listener. Rebalancing ist ein wichtiger Teil des Lebens der Verbraucher. Wenn sich das Cluster oder der Zustand des Consumers ändert, wird ein Rebalancing ausgegeben. Dadurch wird sichergestellt, dass alle Partitionen einem Consumer zugewiesen werden.

Nach dem Abonnieren eines Themas fragt der Consumer nach neuen Datensätzen:

while (true) {
  data = kafkaConsumer.poll();
  // do something with 'data'
}

Die Abfrage gibt Datensätze zurück, die vom Client verarbeitet werden können. Nach der Verarbeitung der Datensätze schreibt der Client die Offsets synchron fest und wartet so, bis die Verarbeitung abgeschlossen ist, bevor mit dem Polling fortgefahren wird.

Speichern Sie den Fortschritt. Führen Sie die Methoden commitSync() und commitAsync() aus.

Wir empfehlen kein automatisches Commit. Manuelles Commit ist in den meisten Anwendungsfällen sinnvoll.

Von außerhalb des Clusters auf Big Data Service Kafka zugreifen

Schritte, um einen Benutzer von lokal zu kinitieren:

  1. Stellen Sie sicher, dass der Kerberos-Client in Ihrer Umgebung installiert ist.
  2. Kopieren Sie die Kafka-Schlüsseltabelle aus dem Cluster aus dem Speicherort /etc/security/keytabs/kafka.service.keytab in Ihre Umgebung.
  3. Kopieren Sie den Inhalt von /etc/krb5.conf vom Cluster Broker-Host in /etc/krb5.conf file Ihrer Umgebung (dies ist der Standardspeicherort der Konfigurationsdatei krb5).
  4. Erstellen Sie Hosteinträge für alle Clusterknoten in der Datei /etc/hosts.
  5. Verwenden Sie die Schlüsseltabelle und den Principal mit dem Befehl sudo kinit -kt <keytab path in local> <principal name> , um Kinit aus Ihrer Umgebung auszuführen.
  6. Stellen Sie sicher, dass das Ticket mit dem Befehl klist ausgegeben wird.

Gehen Sie nach Abschluss von kinit folgendermaßen vor, um Producer/Consumer auszuführen:

  1. Kopieren Sie /usr/odh/current/kafka-broker/config/kafka_jaas.conf in ein lokales Verzeichnis.
  2. Kopieren Sie /usr/odh/current/kafka-broker/config/ in localserver.properties.
  3. Stellen Sie den Umgebungsvariablenexport KAFKA_OPTS="-Djava.security.auth.login.config=/local/path/to/kafka_jaas.conf" ein.
  4. Stellen Sie sicher, dass kinit vollständig ist und ein gültiges Ticket verfügbar ist (nicht abgelaufen) kinit -kt /local/keytab/path/kafka.service.keytab kafka/<principal>.

Bei der vorherigen Einrichtung können Sie allgemeine Befehle ausführen, um die Kafka-Clients von außerhalb des Big Data Service-Clusters auszuführen oder Java/python-Clients auszuführen.