Clients Kafka

Les clients Kafka sont principalement des composants de l'écosystème Kafka qui lisent et écrivent des données dans des sujets Kafka.

Remarque

Lors de la configuration des clients Kafka, le module com.sun.security.auth.module.JndiLoginModule ne peut pas être utilisé pour la configuration JAAS des serveurs et des clients. Nous vous recommandons d'utiliser le module par défaut com.sun.security.auth.module.Krb5LoginModule pour l'authentification à l'aide de Kerberos.

Commandes courantes

Remarque

  • Le service Kafka doit être installé et démarré sur l'un des noeuds.
  • Pour exécuter les commandes courantes, connectez-vous via SSH au noeud sur lequel le service Kafka est installé.
  • Pour créer une rubrique afin de publier des événements :
    1. Créez un fichier client.properties avec les détails suivants et fournissez l'accès approprié à l'utilisateur Kafka
      sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true
      keyTab="/path/to/kafka.keytab" principal="kafka/<hostname>@REALM";
                                  
      security.protocol=SASL_PLAINTEXT
      sasl.kerberos.service.name=kafka
      bootstrap.servers=<broker_hostname>:6667
      compression.type=none
      sasl.mechanism=GSSAPI
                              
    2. Exécution :
      sudo -u kafka sh /usr/odh/current/kafka-broker/bin/kafka-topics.sh --bootstrap-server <hostname>:6667 --create --topic Test_Topic --partitions 3 --replication-factor 1 --command-config /usr/odh/current/kafka-broker/config/client.properties
  • Pour publier des événements sur des sujets :
    1. Créez un fichier producer.properties avec les détails suivants et fournissez l'accès approprié à l'utilisateur Kafka.
      sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required
      useKeyTab=true storeKey=true keyTab="/path/to/kafka.keytab" principal="kafka/<hostname>@REALM";
                                  
      security.protocol=SASL_PLAINTEXT
      sasl.kerberos.service.name=kafka
      bootstrap.servers=<broker_hostname>:6667
      compression.type=none
      sasl.mechanism=GSSAPI
    2. Exécution :
      sudo -u kafka sh /usr/odh/current/kafka-broker/bin/kafka-console-producer.sh --broker-list <hostname>>:6667 --topic Test_Topic --producer.config /usr/odh/current/kafka-broker/config/producer.properties
  • Pour lire des données publiées à partir de rubriques :
    1. Créez un fichier consumer.properties avec les détails suivants et fournissez l'accès approprié à l'utilisateur Kafka.
      security.protocol=SASL_PLAINTEXT
      sasl.kerberos.service.name=kafka
      bootstrap.servers=<broker_hostname>>:6667
      compression.type=none
      sasl.mechanism=GSSAPI
      group.id=test-consumer-group   
      sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule
      required useKeyTab=true storeKey=true keyTab="/path/to/kafka.keytab" principal="kafka/<hostname>@REALM";
    2. Exécution :
      sudo -u kafka sh /usr/odh/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server <hostname>:6667 --topic Test_Topic --consumer.config /usr/odh/current/kafka-broker/config/consumer.properties --from-beginning

Fournisseurs Kafka

Les producteurs de Kafka sont les éditeurs chargés d'écrire des enregistrements sur des sujets. En général, cela signifie écrire un programme à l'aide de l'API KafkaProducer.

Pour instancier un fournisseur, exécutez la commande suivante :

KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig);

Pour plus d'informations sur les paramètres de producteur utilisés dans ce constructeur, reportez-vous aux sections suivantes.

Sérialisation des clés et des valeurs

Pour chaque producteur, deux propriétés de sérialisation doivent être définies, key.serializer (pour la clé) et value.serializer (pour la valeur). Ecrivez du code personnalisé pour la sérialisation ou utilisez l'une des options fournies par Kafka. Par exemple :

  • ByteArraySerializer : données binaires
  • StringSerializer : représentations de chaîne

Accusés de réception

Le chemin d'écriture complet pour les enregistrements d'un producteur est vers la partition leader, puis vers toutes les répliques suiveuses. L'émetteur peut contrôler quel point du chemin déclenche un accusé de réception. Selon le paramètre acks, le fournisseur de portlets peut attendre que l'écriture se propage tout au long du système ou attendre uniquement le point de succès le plus proche.

Les valeurs acks valides sont les suivantes :

  • 0 : n'attendez aucun accusé de réception de la partition (débit le plus rapide).
  • 1 : attendez uniquement la réponse de la partition leader.
  • all : attendez que les réponses des partitions suiveuses atteignent le débit minimal (le plus lent).

Consommateurs Kafka

Les destinataires Kafka sont les abonnés chargés de lire les enregistrements d'un ou de plusieurs sujets et d'une ou de plusieurs partitions d'un sujet. Les destinataires abonnés à un sujet se produisent manuellement ou automatiquement. En général, cela signifie écrire un programme à l'aide de l'API KafkaConsumer.

Pour créer un consommateur, exécutez la commande suivante :

KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(consumerConfig);

La classe KafkaConsumer comporte deux paramètres de type générique. Comme les émetteurs peuvent envoyer des données (les valeurs) avec des clés, le destinataire peut lire les données par clés. Dans cet exemple, les clés et les valeurs sont des chaînes. Si vous définissez différents types, vous devez définir un désérialiseur pour prendre en charge les autres types. Pour les désérialiseurs, vous devez implémenter l'interface org.apache.kafka.common.serialization.Deserializer.

Les paramètres de configuration les plus importants à spécifier sont les suivants :

  • bootstrap.servers : liste des courtiers auxquels se connecter initialement. Listez deux à trois courtiers. Vous n'avez pas besoin de répertorier l'ensemble du cluster.
  • group.id : chaque destinataire appartient à un groupe pour partager les partitions d'un sujet.
  • key.deserializer/value.deserializer : indiquez comment convertir la représentation Java en une séquence d'octets pour envoyer des données via le protocole Kafka.

S'abonner à un sujet

S'abonner à un sujet à l'aide de l'appel de méthode subscribe() :

kafkaConsumer.subscribe(Collections.singletonList(topic), rebalanceListener)

Indiquez la liste des rubriques à utiliser et un processus d'écoute de rééquilibrage. Le rééquilibrage est une partie importante de la vie du consommateur. Chaque fois que le cluster ou l'état du consommateur change, un rééquilibrage est émis. Cela garantit que toutes les partitions sont affectées à un destinataire.

Après s'être abonné à un sujet, le consommateur interroge pour trouver de nouveaux enregistrements :

while (true) {
  data = kafkaConsumer.poll();
  // do something with 'data'
}

L'interrogation renvoie les enregistrements qui peuvent être traités par le client. Après le traitement des enregistrements, le client valide les décalages de manière synchrone, attendant la fin du traitement avant de poursuivre l'interrogation.

Veillez à enregistrer la progression. Exécutez les méthodes commitSync() et commitAsync().

Nous ne recommandons pas la validation automatique. La validation manuelle est appropriée dans la plupart des cas d'utilisation.

Accès à Big Data Service Kafka à partir de l'extérieur du cluster

Etapes pour lier un utilisateur à un utilisateur local :

  1. Assurez-vous que le client Kerberos est installé dans votre environnement.
  2. Copiez le fichier keytab Kafka du cluster à partir de l'emplacement /etc/security/keytabs/kafka.service.keytab vers votre environnement.
  3. Copiez le contenu de /etc/krb5.conf de l'hôte du broker de cluster vers le fichier /etc/krb5.conf file de votre environnement (emplacement par défaut du fichier de configuration krb5).
  4. Créez des entrées d'hôte pour tous les noeuds de cluster dans le fichier /etc/hosts.
  5. Effectuez un kinit à partir de votre environnement à l'aide de keytab et du principal à l'aide de la commande sudo kinit -kt <keytab path in local> <principal name>.
  6. Assurez-vous que le ticket est émis à l'aide de la commande klist.

Une fois le kinit terminé, procédez comme suit pour exécuter le producteur/consommateur :

  1. Copiez /usr/odh/current/kafka-broker/config/kafka_jaas.conf en local.
  2. Copiez /usr/odh/current/kafka-broker/config/ dans localserver.properties.
  3. Définissez l'export de variable d'environnement KAFKA_OPTS="-Djava.security.auth.login.config=/local/path/to/kafka_jaas.conf".
  4. Assurez-vous que le kinit est complet et que le ticket valide est disponible (non expiré) kinit -kt /local/keytab/path/kafka.service.keytab kafka/<principal>.

Avec la configuration précédente, vous pouvez suivre les commandes courantes pour exécuter les clients Kafka à partir d'un cluster Big Data Service externe ou pour exécuter n'importe quel client Java/python.