Création d'un client Kafka

Le client Kafka est une application qui vous permet d'interagir avec un cluster Kafka. Vous créez des applications de fournisseur de portlets pour envoyer des données aux rubriques Kafka et aux applications de destinataire pour lire les messages des rubriques Kafka.

Installation de Kafka

Sur chaque instance de calcul que vous créez, vous installez les bibliothèques et les outils client Apache Kafka.

  1. Connectez-vous à l'instance de calcul créée.
    ssh -i <private-key-file> <username>@<public-ip-address>
  2. Apache Kafka requiert Java. Installez Java dans l'instance de calcul, si elle n'est pas déjà installée. Vous pouvez vérifier si Java est installé en exécutant la commande de version.
    java -version
    sudo yum install java-11-openjdk -y
  3. Téléchargez la version d'Apache Kafka à installer à partir du serveur Apache Kafka officiel.
    wget https://downloads.apache.org/kafka/<version>/kafka_<version>.tgz
  4. Extrayez le package téléchargé.
    tar -xzf kafka_<version>.tgz

Configuration du client

Sur chaque instance de calcul que vous créez, vous configurez le fichier de propriétés client.

  1. Connectez-vous à l'instance de calcul créée.
    ssh -i <private-key-file> <username>@<public-ip-address>
  2. Accédez au répertoire config dans l'emplacement de la bibliothèque client Apache Kafka installée.
    cd kafka_<version>/config
  3. Créez un fichier nommé client.properties.
    nano client.properties
  4. Selon l'authentification que vous avez configurée pour le cluster Kafka, créez les paramètres de propriété de sécurité dans le fichier client.properties.
    • Pour l'authentification SASL/SCRAM
      security.protocol=SASL_SSL 
      sasl.mechanism=SCRAM-SHA-512 
      sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="<vault-username>" password="<vault-secret>"
    • Pour l'authentification mTLS
      security.protocol=SSL
      ssl.certificate.location=/leaf.cert
      ssl.key.location=/leaf.key
      ssl.keystore.password=<password>
      ssl.keystore.location=/kafka-keystore.p12

Utilisation de l'interface de ligne de commande Kafka

Après avoir installé et configuré l'application client, utilisez les outils de la CLI Apache Kafka intégrés pour gérer et interagir avec le cluster Kafka dans Streaming avec Apache Kafka.

Apache Kafka inclut des clients intégrés. Par exemple, clients Consommateur Java et Producteur Java.

En outre, la communauté Kafka fournit de nombreux autres clients que vous pouvez utiliser.

Voici quelques références rapides utiles :

Gestion de la configuration client

Les clients Kafka ont besoin de configurations spécifiques pour indiquer comment le client doit envoyer et recevoir des messages, gérer les erreurs et gérer sa connexion au cluster Kafka.

Apache Kafka fournit des outils d'interface de ligne de commande (CLI) dans le répertoire /bin. Par exemple, l'outil kafka-configs.sh disponible dans les bibliothèques client Kafka installées peut être utilisé pour gérer les configurations client.

Voici quelques exemples de commandes CLI courantes. Obtenir les détails du cluster pour l'URL d'initialisation dans les commandes. Spécifiez le chemin du fichier client.properties que vous avez créé lors de la configuration du client.

Affichage de la configuration des rubriques

Pour afficher la configuration d'une rubrique, exécutez l'outil kafka-configs.sh en indiquant le nom de la rubrique pour laquelle vous voulez visualiser la configuration.

bin/kafka-configs.sh
--bootstrap-server $URL
--command-config /path/to/client.properties
--entity-type topics
--entity-name $TOPIC_NAME
--describe
Affichage des informations de partition et de réplication pour tous les sujets

Pour afficher les informations de partition et de réplication de toutes les rubriques, exécutez l'outil kafka-configs.sh.

bin/kafka-configs.sh
--bootstrap-server $URL
--command-config /path/to/client.properties
--describe | grep Factor | sort 
Modification de la configuration d'un sujet

Pour modifier une configuration de sujet, telle que la conservation et le segment, exécutez l'outil kafka-configs.sh en indiquant le nom de la rubrique pour laquelle vous souhaitez modifier la configuration.

bin/kafka-configs.sh
--bootstrap-server $URL
--command-config /path/to/client.properties
--entity-type topics
--entity-name $TOPIC_NAME
--alter
--add-config retention.ms=1, segment.ms=60000
Suppression de la configuration de rubrique

Pour enlever une configuration de rubrique, telle que la conservation et le segment, exécutez l'outil kafka-configs.sh en indiquant le nom de la rubrique pour laquelle enlever la configuration.

bin/kafka-configs.sh
--bootstrap-server $URL
--command-config /path/to/client.properties
--entity-type topics
--entity-name $TOPIC_NAME
--alter
--delete-config retention.ms,segment.ms
Affichage de l'utilisation du disque pour chaque partition

Pour afficher l'utilisation du disque pour chaque partition, exécutez l'outil kafka-log-dirs.sh et indiquez le chemin d'accès au fichier journal de sortie.

bin/kafka-log-dirs.sh
--bootstrap-server $URL
--command-config /path/to/client.properties
--describe | tail -1 > /tmp/logdirs.output.txt

Exécutez ensuite la commande suivante pour filtrer le fichier journal et afficher son contenu. Ajustez brokers[0] pour cibler un broker spécifique.

cat /tmp/logdirs.output.txt
  | jq -r '.brokers[0] | .logDirs[0].partitions[] | .partition + " " + (.size|tostring)' 
  | sort 
  | awk 'BEGIN {sum=0} {sum+=$2} END {print sum}'
Activation du journal de trace

Pour activer le journal de trace pour un broker, créez un fichier log4j.properties avec les propriétés suivantes.

log4j.rootLogger=TRACE, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n 

Ensuite, exécutez la commande suivante.

export KAFKA_OPTS="-Dlog4j.configuration=file:/path/to/log4j.properties"

Surveillance des mesures client

Vous devez surveiller les applications client et le cluster Kafka.

Utilisez le service OCI Monitoring pour surveiller les mesures émises par les brokers Kafka.

Pour les mesures côté client, vous devez créer votre propre tableau de bord personnalisé pour surveiller les applications client. Surveillez au minimum les mesures client suivantes :

  • record-send-rate
  • request-latency-avg
  • error-rate
  • records-consumed-rate
  • lag
  • fetch-latency-avg
  • Retries
  • disconnects