Client Python Kafka et service de diffusion en continu - Démarrage rapide

Publier et consommer des messages dans le service de flux à l'aide du client Kafka Python.

Ce démarrage rapide vous montre comment utiliser le client Python Kafka avec le service de diffusion en continu pour Oracle Cloud Infrastructure Streaming pour publier et consommer des messages.

Pour plus d'informations, voir Utilisation du service de diffusion en continu avec Apache Kafka. Pour les concepts clés et plus de détails sur le service de diffusion en continu, voir Aperçu du service de diffusion en continu

Préalables

  1. Pour utiliser le client Kafka Python avec le service de diffusion en continu, vous devez disposer des éléments suivants :

    • Un compte Oracle Cloud Infrastructure
    • Utilisateur créé dans ce compte, dans un groupe avec une politique qui accorde les autorisations requises. Pour des exemples sur la configuration d'un nouvel utilisateur, d'un groupe, d'un compartiment et d'une politique, voir Ajout d'utilisateurs. Pour obtenir la liste des politiques types que vous pouvez utiliser, voir Politiques communes.
  2. Collectez les détails suivants :

    • OCID du flux
    • Point d'extrémité pour les messages
    • OCID du groupe de flux
    • Nom de domaine complet du groupe de flux
    • Paramètres de connexion Kafka :
      • Serveurs d'amorçage
      • Chaînes de connexion SASL
      • Protocole de sécurité

    Pour les étapes de création et de gestion des flux et des groupes de flux, voir Gestion des flux et Gestion des groupes de flux. Les flux correspondent à une rubrique Kafka.

  3. Python 3.6 ou version ultérieure, avec PIP installé et mis à jour.
  4. Visual Code Studio (recommandé) ou tout autre environnement de développement intégré (IDE).
  5. Installez les ensembles Confluent-Kafka pour Python à l'aide de la commande suivante :

    pip install confluent-kafka
    Note

    Vous pouvez installer ces ensembles globalement ou dans un environnement virtuel. L'ensemble librdkafka est utilisé par l'ensemble confluent-kafka et intégré dans les fichiers wheel pour la dernière version de confluent-kafka. Pour plus de détails, consultez la documentation sur le client Python Confluent.
  6. Installez les certificats racines de l'autorité de certification SSL sur l'hôte sur lequel vous développez et exécutez ce démarrage rapide. Le client utilise des certificats AC pour vérifier le certificat du courtier.

    Pour Windows, téléchargez le fichier cacert.pem distribué avec curl (téléchargez cacert.pm). Pour les autres plates-formes, voir Configurer le magasin de certificats SSL.

  7. L'authentification avec le protocole Kafka utilise des jetons d'authentification et le mécanisme SASL/PLAIN. Voir Utilisation des jetons d'authentification pour la génération du jeton d'authentification. Si vous avez créé le flux et le groupe de flux dans OCI, vous êtes déjà autorisé à utiliser ce flux selon le service GIA pour OCI, de sorte que vous devez créer des jetons d'authentification pour votre utilisateur OCI.

    Note

    Les jetons d'authentification d'utilisateur OCI ne sont visibles qu'au moment de la création. Copiez-le et conservez-le dans un endroit sûr pour une utilisation future.

Production de messages

  1. Ouvrez votre éditeur favori, tel que Visual Studio Code, à partir du répertoire de travail vide wd. Une fois les préalables satisfaits, les ensembles confluent-kafka doivent déjà être installés pour Python pour votre environnement Python courant.
  2. Créez un fichier nommé Producer.py dans le répertoire wd avec le code suivant. Remplacez les valeurs de configuration dans le mappage conf et le nom de la rubrique est le nom du flux que vous avez créé.

    from confluent_kafka import Producer, KafkaError  
      
    if __name__ == '__main__':  
      
      topic = "<topic_stream_name>"  
      conf = {  
        'bootstrap.servers': "<bootstrap_servers_endpoint>", #usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092  
        'security.protocol': 'SASL_SSL',  
      
        'ssl.ca.location': '</path/on/your/host/to/your/cert.pem/>',  # from step 6 of Prerequisites section
         # optionally instead of giving path as shown above, you can do 1. pip install certifi 2. import certifi and
         # 3. 'ssl.ca.location': certifi.where()
      
        'sasl.mechanism': 'PLAIN',
        'sasl.username': '<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>',  # from step 2 of Prerequisites section
        'sasl.password': '<your_OCI_user_auth_token>',  # from step 7 of Prerequisites section
       }  
      
       # Create Producer instance  
       producer = Producer(**conf)  
       delivered_records = 0  
      
       # Optional per-message on_delivery handler (triggered by poll() or flush())  
       # when a message has been successfully delivered or permanently failed delivery after retries.  
       def acked(err, msg):  
           global delivered_records  
           """Delivery report handler called on  
               successful or failed delivery of message """  
           if err is not None:  
               print("Failed to deliver message: {}".format(err))  
           else:  
               delivered_records += 1  
               print("Produced record to topic {} partition [{}] @ offset {}".format(msg.topic(), msg.partition(), msg.offset()))  
    
      for n in range(10):  
          record_key = "messageKey" + str(n)  
          record_value = "messageValue" + str(n)  
          print("Producing record: {}\t{}".format(record_key, record_value))  
          producer.produce(topic, key=record_key, value=record_value, on_delivery=acked)  
          # p.poll() serves delivery reports (on_delivery) from previous produce() calls.  
          producer.poll(0)  
    
      producer.flush()  
      print("{} messages were produced to topic {}!".format(delivered_records, topic))
  3. Depuis le répertoire wd, exécutez la commande suivante :

    python Producer.py
  4. Afficher les derniers messages envoyés au flux pour voir les derniers messages envoyés au flux pour vérifier que la production a réussi.

Consommation de messages

  1. Assurez-vous tout d'abord que le flux à partir duquel vous voulez consommer des messages contient des messages. Vous pouvez utiliser la console pour produire un message de test ou utiliser le flux et les messages que nous avons créés dans ce démarrage rapide.
  2. Ouvrez votre éditeur favori, tel que Visual Studio Code, à partir du répertoire de travail vide wd. Une fois les préalables satisfaits, les ensembles confluent-kafka doivent déjà être installés pour Python pour votre environnement Python courant.
  3. Créez un fichier nommé Consumer.py dans le répertoire wd avec le code suivant. Remplacez les valeurs de configuration dans le mappage conf et le nom de la rubrique est le nom du flux que vous avez créé.

    from confluent_kafka import Consumer
    
    
    if __name__ == '__main__':
    
      topic = "<topic_stream_name>"  
      conf = {  
        'bootstrap.servers': "<bootstrap_servers_endpoint>", #usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092  
        'security.protocol': 'SASL_SSL',  
      
        'ssl.ca.location': '</path/on/your/host/to/your/cert.pem/>',  # from step 6 of Prerequisites section
         # optionally instead of giving path as shown above, you can do 1. pip install certifi 2. import certifi and
         # 3. 'ssl.ca.location': certifi.where()
      
        'sasl.mechanism': 'PLAIN',
        'sasl.username': '<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>',  # from step 2 of Prerequisites section
        'sasl.password': '<your_OCI_user_auth_token>',  # from step 7 of Prerequisites section
       }  
    
        # Create Consumer instance
        consumer = Consumer(conf)
    
        # Subscribe to topic
        consumer.subscribe([topic])
    
        # Process messages
        try:
            while True:
                msg = consumer.poll(1.0)
                if msg is None:
                    # No message available within timeout.
                    # Initial message consumption may take up to
                    # `session.timeout.ms` for the consumer group to
                    # rebalance and start consuming
                    print("Waiting for message or event/error in poll()")
                    continue
                elif msg.error():
                    print('error: {}'.format(msg.error()))
                else:
                    # Check for Kafka message
                    record_key = "Null" if msg.key() is None else msg.key().decode('utf-8')
                    record_value = msg.value().decode('utf-8')
                    print("Consumed record with key "+ record_key + " and value " + record_value)
        except KeyboardInterrupt:
            pass
        finally:
            print("Leave group and commit final offsets")
            consumer.close()
  4. Depuis le répertoire wd, exécutez la commande suivante :

    python Consumer.py
  5. Des messages similaires aux suivants doivent s'afficher :

    Waiting for message or event/error in poll()
    Waiting for message or event/error in poll()
    Consumed record with key messageKey0 and value messageValue0
    Consumed record with key messageKey1 and value messageValue1
    Consumed record with key Null and value Example test message
    Note

    Si vous avez utilisé la console pour produire un message de test, la clé de chaque message est Null.