Client Python Kafka et service de diffusion en continu - Démarrage rapide
Publier et consommer des messages dans le service de flux à l'aide du client Kafka Python.
Ce démarrage rapide vous montre comment utiliser le client Python Kafka avec le service de diffusion en continu pour Oracle Cloud Infrastructure Streaming pour publier et consommer des messages.
Pour plus d'informations, voir Utilisation du service de diffusion en continu avec Apache Kafka. Pour les concepts clés et plus de détails sur le service de diffusion en continu, voir Aperçu du service de diffusion en continu
Préalables
-
Pour utiliser le client Kafka Python avec le service de diffusion en continu, vous devez disposer des éléments suivants :
- Un compte Oracle Cloud Infrastructure
- Utilisateur créé dans ce compte, dans un groupe avec une politique qui accorde les autorisations requises. Pour des exemples sur la configuration d'un nouvel utilisateur, d'un groupe, d'un compartiment et d'une politique, voir Ajout d'utilisateurs. Pour obtenir la liste des politiques types que vous pouvez utiliser, voir Politiques communes.
-
Collectez les détails suivants :
- OCID du flux
- Point d'extrémité pour les messages
- OCID du groupe de flux
- Nom de domaine complet du groupe de flux
- Paramètres de connexion Kafka :
- Serveurs d'amorçage
- Chaînes de connexion SASL
- Protocole de sécurité
Pour les étapes de création et de gestion des flux et des groupes de flux, voir Gestion des flux et Gestion des groupes de flux. Les flux correspondent à une rubrique Kafka.
- Python 3.6 ou version ultérieure, avec PIP installé et mis à jour.
- Visual Code Studio (recommandé) ou tout autre environnement de développement intégré (IDE).
-
Installez les ensembles
Confluent-Kafka
pour Python à l'aide de la commande suivante :pip install confluent-kafka
Note
Vous pouvez installer ces ensembles globalement ou dans un environnement virtuel. L'ensemblelibrdkafka
est utilisé par l'ensembleconfluent-kafka
et intégré dans les fichiers wheel pour la dernière version deconfluent-kafka
. Pour plus de détails, consultez la documentation sur le client Python Confluent. -
Installez les certificats racines de l'autorité de certification SSL sur l'hôte sur lequel vous développez et exécutez ce démarrage rapide. Le client utilise des certificats AC pour vérifier le certificat du courtier.
Pour Windows, téléchargez le fichier
cacert.pem
distribué avec curl (téléchargez cacert.pm). Pour les autres plates-formes, voir Configurer le magasin de certificats SSL. -
L'authentification avec le protocole Kafka utilise des jetons d'authentification et le mécanisme SASL/PLAIN. Voir Utilisation des jetons d'authentification pour la génération du jeton d'authentification. Si vous avez créé le flux et le groupe de flux dans OCI, vous êtes déjà autorisé à utiliser ce flux selon le service GIA pour OCI, de sorte que vous devez créer des jetons d'authentification pour votre utilisateur OCI.
Note
Les jetons d'authentification d'utilisateur OCI ne sont visibles qu'au moment de la création. Copiez-le et conservez-le dans un endroit sûr pour une utilisation future.
Production de messages
- Ouvrez votre éditeur favori, tel que Visual Studio Code, à partir du répertoire de travail vide
wd
. Une fois les préalables satisfaits, les ensemblesconfluent-kafka
doivent déjà être installés pour Python pour votre environnement Python courant. -
Créez un fichier nommé
Producer.py
dans le répertoirewd
avec le code suivant. Remplacez les valeurs de configuration dans le mappageconf
et le nom de la rubrique est le nom du flux que vous avez créé.from confluent_kafka import Producer, KafkaError if __name__ == '__main__': topic = "<topic_stream_name>" conf = { 'bootstrap.servers': "<bootstrap_servers_endpoint>", #usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092 'security.protocol': 'SASL_SSL', 'ssl.ca.location': '</path/on/your/host/to/your/cert.pem/>', # from step 6 of Prerequisites section # optionally instead of giving path as shown above, you can do 1. pip install certifi 2. import certifi and # 3. 'ssl.ca.location': certifi.where() 'sasl.mechanism': 'PLAIN', 'sasl.username': '<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>', # from step 2 of Prerequisites section 'sasl.password': '<your_OCI_user_auth_token>', # from step 7 of Prerequisites section } # Create Producer instance producer = Producer(**conf) delivered_records = 0 # Optional per-message on_delivery handler (triggered by poll() or flush()) # when a message has been successfully delivered or permanently failed delivery after retries. def acked(err, msg): global delivered_records """Delivery report handler called on successful or failed delivery of message """ if err is not None: print("Failed to deliver message: {}".format(err)) else: delivered_records += 1 print("Produced record to topic {} partition [{}] @ offset {}".format(msg.topic(), msg.partition(), msg.offset())) for n in range(10): record_key = "messageKey" + str(n) record_value = "messageValue" + str(n) print("Producing record: {}\t{}".format(record_key, record_value)) producer.produce(topic, key=record_key, value=record_value, on_delivery=acked) # p.poll() serves delivery reports (on_delivery) from previous produce() calls. producer.poll(0) producer.flush() print("{} messages were produced to topic {}!".format(delivered_records, topic))
-
Depuis le répertoire
wd
, exécutez la commande suivante :python Producer.py
- Afficher les derniers messages envoyés au flux pour voir les derniers messages envoyés au flux pour vérifier que la production a réussi.
Consommation de messages
- Assurez-vous tout d'abord que le flux à partir duquel vous voulez consommer des messages contient des messages. Vous pouvez utiliser la console pour produire un message de test ou utiliser le flux et les messages que nous avons créés dans ce démarrage rapide.
- Ouvrez votre éditeur favori, tel que Visual Studio Code, à partir du répertoire de travail vide
wd
. Une fois les préalables satisfaits, les ensemblesconfluent-kafka
doivent déjà être installés pour Python pour votre environnement Python courant. -
Créez un fichier nommé
Consumer.py
dans le répertoirewd
avec le code suivant. Remplacez les valeurs de configuration dans le mappageconf
et le nom de la rubrique est le nom du flux que vous avez créé.from confluent_kafka import Consumer if __name__ == '__main__': topic = "<topic_stream_name>" conf = { 'bootstrap.servers': "<bootstrap_servers_endpoint>", #usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092 'security.protocol': 'SASL_SSL', 'ssl.ca.location': '</path/on/your/host/to/your/cert.pem/>', # from step 6 of Prerequisites section # optionally instead of giving path as shown above, you can do 1. pip install certifi 2. import certifi and # 3. 'ssl.ca.location': certifi.where() 'sasl.mechanism': 'PLAIN', 'sasl.username': '<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>', # from step 2 of Prerequisites section 'sasl.password': '<your_OCI_user_auth_token>', # from step 7 of Prerequisites section } # Create Consumer instance consumer = Consumer(conf) # Subscribe to topic consumer.subscribe([topic]) # Process messages try: while True: msg = consumer.poll(1.0) if msg is None: # No message available within timeout. # Initial message consumption may take up to # `session.timeout.ms` for the consumer group to # rebalance and start consuming print("Waiting for message or event/error in poll()") continue elif msg.error(): print('error: {}'.format(msg.error())) else: # Check for Kafka message record_key = "Null" if msg.key() is None else msg.key().decode('utf-8') record_value = msg.value().decode('utf-8') print("Consumed record with key "+ record_key + " and value " + record_value) except KeyboardInterrupt: pass finally: print("Leave group and commit final offsets") consumer.close()
-
Depuis le répertoire
wd
, exécutez la commande suivante :python Consumer.py
-
Des messages similaires aux suivants doivent s'afficher :
Waiting for message or event/error in poll() Waiting for message or event/error in poll() Consumed record with key messageKey0 and value messageValue0 Consumed record with key messageKey1 and value messageValue1 Consumed record with key Null and value Example test message
Note
Si vous avez utilisé la console pour produire un message de test, la clé de chaque message estNull
.
Étapes suivantes
Pour plus d'informations, voir les ressources suivantes :