Clients Kafka
Les clients Kafka sont principalement des composants de l'écosystème Kafka qui lisent et écrivent des données dans des sujets Kafka.
Lors de la configuration des clients Kafka, le module
com.sun.security.auth.module.JndiLoginModule
ne peut pas être utilisé pour la configuration JAAS des serveurs et des clients. Nous vous recommandons d'utiliser le module par défaut com.sun.security.auth.module.Krb5LoginModule
pour l'authentification à l'aide de Kerberos.Commandes courantes
- Le service Kafka doit être installé et démarré sur l'un des noeuds.
- Pour exécuter les commandes courantes, connectez-vous via SSH au noeud sur lequel le service Kafka est installé.
- Pour créer une rubrique afin de publier des événements :
- Créez un fichier
client.properties
avec les détails suivants et fournissez l'accès approprié à l'utilisateur Kafkasasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/path/to/kafka.keytab" principal="kafka/<hostname>@REALM"; security.protocol=SASL_PLAINTEXT sasl.kerberos.service.name=kafka bootstrap.servers=<broker_hostname>:6667 compression.type=none sasl.mechanism=GSSAPI
- Exécution :
sudo -u kafka sh /usr/odh/current/kafka-broker/bin/kafka-topics.sh --bootstrap-server <hostname>:6667 --create --topic Test_Topic --partitions 3 --replication-factor 1 --command-config /usr/odh/current/kafka-broker/config/client.properties
- Créez un fichier
- Pour publier des événements sur des sujets :
- Créez un fichier
producer.properties
avec les détails suivants et fournissez l'accès approprié à l'utilisateur Kafka.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/path/to/kafka.keytab" principal="kafka/<hostname>@REALM"; security.protocol=SASL_PLAINTEXT sasl.kerberos.service.name=kafka bootstrap.servers=<broker_hostname>:6667 compression.type=none sasl.mechanism=GSSAPI
- Exécution :
sudo -u kafka sh /usr/odh/current/kafka-broker/bin/kafka-console-producer.sh --broker-list <hostname>>:6667 --topic Test_Topic --producer.config /usr/odh/current/kafka-broker/config/producer.properties
- Créez un fichier
- Pour lire des données publiées à partir de rubriques :
- Créez un fichier
consumer.properties
avec les détails suivants et fournissez l'accès approprié à l'utilisateur Kafka.security.protocol=SASL_PLAINTEXT sasl.kerberos.service.name=kafka bootstrap.servers=<broker_hostname>>:6667 compression.type=none sasl.mechanism=GSSAPI group.id=test-consumer-group sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/path/to/kafka.keytab" principal="kafka/<hostname>@REALM";
- Exécution :
sudo -u kafka sh /usr/odh/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server <hostname>:6667 --topic Test_Topic --consumer.config /usr/odh/current/kafka-broker/config/consumer.properties --from-beginning
- Créez un fichier
Fournisseurs Kafka
Les producteurs de Kafka sont les éditeurs chargés d'écrire des enregistrements sur des sujets. En général, cela signifie écrire un programme à l'aide de l'API KafkaProducer.
Pour instancier un fournisseur, exécutez la commande suivante :
KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig);
Pour plus d'informations sur les paramètres de producteur utilisés dans ce constructeur, reportez-vous aux sections suivantes.
Sérialisation des clés et des valeurs
Pour chaque producteur, deux propriétés de sérialisation doivent être définies, key.serializer
(pour la clé) et value.serializer
(pour la valeur). Ecrivez du code personnalisé pour la sérialisation ou utilisez l'une des options fournies par Kafka. Par exemple :
ByteArraySerializer
: données binairesStringSerializer
: représentations de chaîne
Accusés de réception
Le chemin d'écriture complet pour les enregistrements d'un producteur est vers la partition leader, puis vers toutes les répliques suiveuses. L'émetteur peut contrôler quel point du chemin déclenche un accusé de réception. Selon le paramètre acks
, le fournisseur de portlets peut attendre que l'écriture se propage tout au long du système ou attendre uniquement le point de succès le plus proche.
Les valeurs acks
valides sont les suivantes :
0
: n'attendez aucun accusé de réception de la partition (débit le plus rapide).1
: attendez uniquement la réponse de la partition leader.all
: attendez que les réponses des partitions suiveuses atteignent le débit minimal (le plus lent).
Consommateurs Kafka
Les destinataires Kafka sont les abonnés chargés de lire les enregistrements d'un ou de plusieurs sujets et d'une ou de plusieurs partitions d'un sujet. Les destinataires abonnés à un sujet se produisent manuellement ou automatiquement. En général, cela signifie écrire un programme à l'aide de l'API KafkaConsumer.
Pour créer un consommateur, exécutez la commande suivante :
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(consumerConfig);
La classe KafkaConsumer comporte deux paramètres de type générique. Comme les émetteurs peuvent envoyer des données (les valeurs) avec des clés, le destinataire peut lire les données par clés. Dans cet exemple, les clés et les valeurs sont des chaînes. Si vous définissez différents types, vous devez définir un désérialiseur pour prendre en charge les autres types. Pour les désérialiseurs, vous devez implémenter l'interface org.apache.kafka.common.serialization.Deserializer
.
Les paramètres de configuration les plus importants à spécifier sont les suivants :
bootstrap.servers
: liste des courtiers auxquels se connecter initialement. Listez deux à trois courtiers. Vous n'avez pas besoin de répertorier l'ensemble du cluster.group.id
: chaque destinataire appartient à un groupe pour partager les partitions d'un sujet.key.deserializer/value.deserializer
: indiquez comment convertir la représentation Java en une séquence d'octets pour envoyer des données via le protocole Kafka.
S'abonner à un sujet
S'abonner à un sujet à l'aide de l'appel de méthode subscribe()
:
kafkaConsumer.subscribe(Collections.singletonList(topic), rebalanceListener)
Indiquez la liste des rubriques à utiliser et un processus d'écoute de rééquilibrage. Le rééquilibrage est une partie importante de la vie du consommateur. Chaque fois que le cluster ou l'état du consommateur change, un rééquilibrage est émis. Cela garantit que toutes les partitions sont affectées à un destinataire.
Après s'être abonné à un sujet, le consommateur interroge pour trouver de nouveaux enregistrements :
while (true) {
data = kafkaConsumer.poll();
// do something with 'data'
}
L'interrogation renvoie les enregistrements qui peuvent être traités par le client. Après le traitement des enregistrements, le client valide les décalages de manière synchrone, attendant la fin du traitement avant de poursuivre l'interrogation.
Veillez à enregistrer la progression. Exécutez les méthodes commitSync()
et commitAsync()
.
Nous ne recommandons pas la validation automatique. La validation manuelle est appropriée dans la plupart des cas d'utilisation.
Accès à Big Data Service Kafka à partir de l'extérieur du cluster
Etapes pour lier un utilisateur à un utilisateur local :
- Assurez-vous que le client Kerberos est installé dans votre environnement.
- Copiez le fichier keytab Kafka du cluster à partir de l'emplacement
/etc/security/keytabs/kafka.service.keytab
vers votre environnement. - Copiez le contenu de
/etc/krb5.conf
de l'hôte du broker de cluster vers le fichier/etc/krb5.conf file
de votre environnement (emplacement par défaut du fichier de configuration krb5). - Créez des entrées d'hôte pour tous les noeuds de cluster dans le fichier
/etc/hosts
. - Effectuez un kinit à partir de votre environnement à l'aide de keytab et du principal à l'aide de la commande
sudo kinit -kt <keytab path in local> <principal name>
. - Assurez-vous que le ticket est émis à l'aide de la commande
klist
.
Une fois le kinit terminé, procédez comme suit pour exécuter le producteur/consommateur :
- Copiez
/usr/odh/current/kafka-broker/config/kafka_jaas.conf
en local. - Copiez
/usr/odh/current/kafka-broker/config/
danslocalserver.properties
. - Définissez l'export de variable d'environnement
KAFKA_OPTS="-Djava.security.auth.login.config=/local/path/to/kafka_jaas.conf"
. -
Assurez-vous que le kinit est complet et que le ticket valide est disponible (non expiré)
kinit -kt /local/keytab/path/kafka.service.keytab kafka/<principal>
.
Avec la configuration précédente, vous pouvez suivre les commandes courantes pour exécuter les clients Kafka à partir d'un cluster Big Data Service externe ou pour exécuter n'importe quel client Java/python.