Démarrage rapide du client Python Kafka et de Streaming

Publiez et utilisez des messages dans le service Streaming à l'aide du client Python Kafka.

Ce démarrage rapide vous explique comment utiliser le client Python Kafka avec Oracle Cloud Infrastructure Streaming pour publier et utiliser des messages.

Pour plus d'informations, reportez-vous à Utilisation de Streaming avec Apache Kafka. Pour plus d'informations sur les concepts clés et Streaming, reportez-vous à Présentation de Streaming.

Prérequis

  1. Afin d'utiliser le client Python Kafka avec Streaming, vous devez disposer des éléments suivants :

    • Un compte Oracle Cloud Infrastructure.
    • Un utilisateur créé dans ce compte, dans un groupe avec une stratégie qui octroie les droits d'accès requis. Pour obtenir un exemple de configuration d'un nouvel utilisateur, d'un nouveau groupe, d'un nouveau compartiment et d'une nouvelle stratégie, reportez-vous à Ajout d'utilisateurs. Pour obtenir la liste des stratégies standard que vous pouvez utiliser, reportez-vous à Stratégies courantes.
  2. Collectez les informations suivantes :

    • OCID de flux de données
    • Adresse des messages
    • OCID de pool de flux de données
    • Nom de domaine qualifié complet de pool de flux de données
    • Paramètres de la connexion Kafka :
      • Serveurs de démarrage
      • Chaînes de connexion SASL
      • Protocole de sécurité

    Pour connaître les étapes de création et de gestion des flux de données et des pools de flux de données, reportez-vous à Gestion des flux de données et à Gestion des pools de flux de données. Les flux de données correspondent à une rubrique Kafka.

  3. Python version 3.6 ou ultérieure, avec PIP installé et mis à jour.
  4. Visual Code Studio (recommandé) ou tout autre environnement de développement intégré (IDE).
  5. Installez des packages Confluent-Kafka pour Python à l'aide de la commande suivante :

    pip install confluent-kafka
    Remarque

    Vous pouvez installer ces packages globalement ou dans une installation virtualenv. Le package librdkafka est utilisé par le package confluent-kafka et intégré à la dernière version de confluent-kafka. Pour plus d'informations, reportez-vous à la documentation du client Python de Confluent.
  6. Installez les certificats racine d'autorité de certification SSL sur l'hôte où vous développez et exécutez ce démarrage rapide. Le client utilise des certificats d'autorité de certification pour vérifier le certificat du broker.

    Pour Windows, téléchargez le fichier cacert.pem distribué avec curl (télécharger cacert.pm). Pour les autres plates-formes, reportez-vous à Configuration du truststore SSL.

  7. L'authentification avec le protocole Kafka utilise des jetons d'authentification et le mécanisme SASL/PLAIN. Reportez-vous à Utilisation des jetons d'authentification pour plus d'informations sur la génération de jetons d'authentification. Si vous avez créé le flux de données et le pool de flux de données dans OCI, vous êtes déjà autorisé à utiliser ce flux de données conformément à OCI IAM. Vous devez donc créer des jetons d'authentification pour l'utilisateur OCI.

    Remarque

    Les jetons d'authentification de l'utilisateur OCI ne sont visibles qu'au moment de la création. Copiez-les et conservez-les en lieu sûr pour une utilisation ultérieure.

Production de messages

  1. Ouvrez votre éditeur favori, tel que Visual Studio Code, à partir du répertoire de travail vide wd. Vous devez déjà disposer de packages confluent-kafka pour Python installés pour votre environnement Python actuel si les prérequis ont été respectés.
  2. Créez un fichier nommé Producer.py dans le répertoire wd avec le code suivant. Remplacez les valeurs de configuration de la correspondance conf. Le nom de la rubrique est le nom du flux de données que vous avez créé.

    from confluent_kafka import Producer, KafkaError  
      
    if __name__ == '__main__':  
      
      topic = "<topic_stream_name>"  
      conf = {  
        'bootstrap.servers': "<bootstrap_servers_endpoint>", #usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092  
        'security.protocol': 'SASL_SSL',  
      
        'ssl.ca.location': '</path/on/your/host/to/your/cert.pem/>',  # from step 6 of Prerequisites section
         # optionally instead of giving path as shown above, you can do 1. pip install certifi 2. import certifi and
         # 3. 'ssl.ca.location': certifi.where()
      
        'sasl.mechanism': 'PLAIN',
        'sasl.username': '<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>',  # from step 2 of Prerequisites section
        'sasl.password': '<your_OCI_user_auth_token>',  # from step 7 of Prerequisites section
       }  
      
       # Create Producer instance  
       producer = Producer(**conf)  
       delivered_records = 0  
      
       # Optional per-message on_delivery handler (triggered by poll() or flush())  
       # when a message has been successfully delivered or permanently failed delivery after retries.  
       def acked(err, msg):  
           global delivered_records  
           """Delivery report handler called on  
               successful or failed delivery of message """  
           if err is not None:  
               print("Failed to deliver message: {}".format(err))  
           else:  
               delivered_records += 1  
               print("Produced record to topic {} partition [{}] @ offset {}".format(msg.topic(), msg.partition(), msg.offset()))  
    
      for n in range(10):  
          record_key = "messageKey" + str(n)  
          record_value = "messageValue" + str(n)  
          print("Producing record: {}\t{}".format(record_key, record_value))  
          producer.produce(topic, key=record_key, value=record_value, on_delivery=acked)  
          # p.poll() serves delivery reports (on_delivery) from previous produce() calls.  
          producer.poll(0)  
    
      producer.flush()  
      print("{} messages were produced to topic {}!".format(delivered_records, topic))
  3. A partir du répertoire wd, exécutez la commande suivante :

    python Producer.py
  4. Afficher les derniers messages envoyés au flux de données : affichez les derniers messages envoyés au flux de données pour vérifier que la production a réussi.

Utilisation des messages

  1. Tout d'abord, assurez-vous que le flux de données dont vous souhaitez utiliser des messages en contient. Vous pouvez utiliser la console pour produire un message de test, ou vous servir du flux de données et des messages créés dans ce démarrage rapide.
  2. Ouvrez votre éditeur favori, tel que Visual Studio Code, à partir du répertoire de travail vide wd. Vous devez déjà disposer de packages confluent-kafka pour Python installés pour votre environnement Python actuel si les prérequis ont été respectés.
  3. Créez un fichier nommé Consumer.py dans le répertoire wd avec le code suivant. Remplacez les valeurs de configuration de la correspondance conf. Le nom de la rubrique est le nom du flux de données que vous avez créé.

    from confluent_kafka import Consumer
    
    
    if __name__ == '__main__':
    
      topic = "<topic_stream_name>"  
      conf = {  
        'bootstrap.servers': "<bootstrap_servers_endpoint>", #usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092  
        'security.protocol': 'SASL_SSL',  
      
        'ssl.ca.location': '</path/on/your/host/to/your/cert.pem/>',  # from step 6 of Prerequisites section
         # optionally instead of giving path as shown above, you can do 1. pip install certifi 2. import certifi and
         # 3. 'ssl.ca.location': certifi.where()
      
        'sasl.mechanism': 'PLAIN',
        'sasl.username': '<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>',  # from step 2 of Prerequisites section
        'sasl.password': '<your_OCI_user_auth_token>',  # from step 7 of Prerequisites section
       }  
    
        # Create Consumer instance
        consumer = Consumer(conf)
    
        # Subscribe to topic
        consumer.subscribe([topic])
    
        # Process messages
        try:
            while True:
                msg = consumer.poll(1.0)
                if msg is None:
                    # No message available within timeout.
                    # Initial message consumption may take up to
                    # `session.timeout.ms` for the consumer group to
                    # rebalance and start consuming
                    print("Waiting for message or event/error in poll()")
                    continue
                elif msg.error():
                    print('error: {}'.format(msg.error()))
                else:
                    # Check for Kafka message
                    record_key = "Null" if msg.key() is None else msg.key().decode('utf-8')
                    record_value = msg.value().decode('utf-8')
                    print("Consumed record with key "+ record_key + " and value " + record_value)
        except KeyboardInterrupt:
            pass
        finally:
            print("Leave group and commit final offsets")
            consumer.close()
  4. A partir du répertoire wd, exécutez la commande suivante :

    python Consumer.py
  5. Des messages semblables à celui qui suit s'affichent :

    Waiting for message or event/error in poll()
    Waiting for message or event/error in poll()
    Consumed record with key messageKey0 and value messageValue0
    Consumed record with key messageKey1 and value messageValue1
    Consumed record with key Null and value Example test message
    Remarque

    Si vous avez utilisé la console pour produire un message de test, la clé de chaque message est Null.