Kafka Python-Client und Streaming - Schnellstart

Veröffentlichen und konsumieren Sie Nachrichten im Streaming-Service mit dem Kafka-Python-Client.

In diesem Schnellstart wird gezeigt, wie Sie den Kafka-Python-Client mit Oracle Cloud Infrastructure Streaming verwenden, um Nachrichten zu veröffentlichen und zu verwenden.

Weitere Informationen finden Sie unter Streaming mit Apache Kafka verwenden. Wichtige Konzepte und weitere Streamingdetails finden Sie unter Überblick über Streaming

Voraussetzungen

  1. Um den Kafka-Python-Client mit Streaming zu verwenden, wird Folgendes benötigt:

    • Ein Oracle Cloud Infrastructure-Account.
    • Ein in diesem Account erstellter Benutzer in einer Gruppe mit einer Policy, die die erforderlichen Berechtigungen erteilt. Ein Beispiel für die Einrichtung eines neuen Benutzers, einer neuen Gruppe, eines neuen Compartments und einer neuen Policy finden Sie unter Benutzer hinzufügen. Eine Liste der typischen Policys, die Sie verwenden können, finden Sie unter Allgemeine Policys.
  2. Erfassen Sie die folgenden Details:

    • Stream-OCID
    • Nachrichtenendpunkt
    • Streampool-OCID
    • Streampool-FQDN
    • Kafka-Verbindungseinstellungen:
      • Bootstrap-Server
      • SASL-Verbindungszeichenfolgen
      • Sicherheitsprotokoll

    Die Schritte zum Erstellen und Verwalten von Streams und Streampools finden Sie unter Streams verwalten und Streampools verwalten. Streams entsprechen einem Kafka-Topic.

  3. Python 3.6 oder höher, mit installiertem und aktualisiertem PIP.
  4. Visual Code Studio (empfohlen) oder eine andere Integrated Development Environment (IDE).
  5. Installieren Sie Confluent-Kafka-Packages für Python mit dem folgenden Befehl:

    pip install confluent-kafka
    Hinweis

    Sie können diese Packages global oder in einer virtuellen Umgebung installieren. Das Package librdkafka wird vom Package confluent-kafka verwendet und in Rädern für das neueste confluent-kafka-Release eingebettet. Weitere Informationen finden Sie in der Dokumentation für den Confluent-Python-Client.
  6. Installieren Sie die SSL-CA-Root-Zertifikate auf dem Host, auf dem Sie diesen Schnellstart entwickeln und ausführen. Der Client verwendet CA-Zertifikate, um das Zertifikat des Brokers zu verifizieren.

    Laden Sie unter Windows die mit curl verteilte Datei cacert.pem herunter (cacert.pm herunterladen). Informationen zu anderen Plattformen finden Sie unter SSL-Truststore konfigurieren.

  7. Bei der Authentifizierung mit dem Kafka-Protokoll werden Authentifizierungstoken und der SASL/PLAIN-Mechanismus verwendet. Informationen zur Generierung von Authentifizierungstoken finden Sie unter Mit Authentifizierungstoken arbeiten. Wenn Sie den Stream und Streampool in OCI erstellt haben, sind Sie bereits zur Verwendung dieses Streams gemäß OCI IAM autorisiert. Erstellen Sie daher Authentifizierungstoken für den OCI-Benutzer.

    Hinweis

    OCI-Benutzerauthentifizierungstoken sind nur zum Zeitpunkt der Erstellung sichtbar. Kopieren Sie es, und bewahren Sie es für die zukünftige Verwendung an einem sicheren Ort auf.

Nachrichten erzeugen

  1. Öffnen Sie Ihren bevorzugten Editor, wie Visual Studio Code, im leeren Arbeitsverzeichnis wd. confluent-kafka-Packages für Python sollten bereits für Ihre aktuelle Python-Umgebung installiert sein, nachdem Sie die Schritte unter Voraussetzungen ausgeführt haben.
  2. Erstellen Sie eine Datei namens Producer.py im Verzeichnis wd mit dem folgenden Code. Ersetzen Sie die Konfigurationswerte in der Map conf. Der Name des Topics ist der Name des erstellten Streams.

    from confluent_kafka import Producer, KafkaError  
      
    if __name__ == '__main__':  
      
      topic = "<topic_stream_name>"  
      conf = {  
        'bootstrap.servers': "<bootstrap_servers_endpoint>", #usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092  
        'security.protocol': 'SASL_SSL',  
      
        'ssl.ca.location': '</path/on/your/host/to/your/cert.pem/>',  # from step 6 of Prerequisites section
         # optionally instead of giving path as shown above, you can do 1. pip install certifi 2. import certifi and
         # 3. 'ssl.ca.location': certifi.where()
      
        'sasl.mechanism': 'PLAIN',
        'sasl.username': '<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>',  # from step 2 of Prerequisites section
        'sasl.password': '<your_OCI_user_auth_token>',  # from step 7 of Prerequisites section
       }  
      
       # Create Producer instance  
       producer = Producer(**conf)  
       delivered_records = 0  
      
       # Optional per-message on_delivery handler (triggered by poll() or flush())  
       # when a message has been successfully delivered or permanently failed delivery after retries.  
       def acked(err, msg):  
           global delivered_records  
           """Delivery report handler called on  
               successful or failed delivery of message """  
           if err is not None:  
               print("Failed to deliver message: {}".format(err))  
           else:  
               delivered_records += 1  
               print("Produced record to topic {} partition [{}] @ offset {}".format(msg.topic(), msg.partition(), msg.offset()))  
    
      for n in range(10):  
          record_key = "messageKey" + str(n)  
          record_value = "messageValue" + str(n)  
          print("Producing record: {}\t{}".format(record_key, record_value))  
          producer.produce(topic, key=record_key, value=record_value, on_delivery=acked)  
          # p.poll() serves delivery reports (on_delivery) from previous produce() calls.  
          producer.poll(0)  
    
      producer.flush()  
      print("{} messages were produced to topic {}!".format(delivered_records, topic))
  3. Führen Sie im Verzeichnis wd den folgenden Befehl aus:

    python Producer.py
  4. Zeigen Sie die neuesten Nachrichten an, die an den Stream gesendet wurden, um die neuesten Nachrichten anzuzeigen, die an den Stream gesendet wurden, um zu prüfen, ob die Produktion erfolgreich war.

Nachrichten konsumieren

  1. Stellen Sie zunächst sicher, dass der Stream, aus dem Sie Nachrichten konsumieren möchten, Nachrichten enthält. Sie können eine Testnachricht mit der Konsole erstellen oder den Stream und die Nachrichten verwenden, die wir in diesem Schnellstart erstellt haben.
  2. Öffnen Sie Ihren bevorzugten Editor, wie Visual Studio Code, im leeren Arbeitsverzeichnis wd. confluent-kafka-Packages für Python sollten bereits für Ihre aktuelle Python-Umgebung installiert sein, nachdem Sie die Schritte unter Voraussetzungen ausgeführt haben.
  3. Erstellen Sie eine Datei namens Consumer.py im Verzeichnis wd mit dem folgenden Code. Ersetzen Sie die Konfigurationswerte in der Map conf. Der Name des Topics ist der Name des erstellten Streams.

    from confluent_kafka import Consumer
    
    
    if __name__ == '__main__':
    
      topic = "<topic_stream_name>"  
      conf = {  
        'bootstrap.servers': "<bootstrap_servers_endpoint>", #usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092  
        'security.protocol': 'SASL_SSL',  
      
        'ssl.ca.location': '</path/on/your/host/to/your/cert.pem/>',  # from step 6 of Prerequisites section
         # optionally instead of giving path as shown above, you can do 1. pip install certifi 2. import certifi and
         # 3. 'ssl.ca.location': certifi.where()
      
        'sasl.mechanism': 'PLAIN',
        'sasl.username': '<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>',  # from step 2 of Prerequisites section
        'sasl.password': '<your_OCI_user_auth_token>',  # from step 7 of Prerequisites section
       }  
    
        # Create Consumer instance
        consumer = Consumer(conf)
    
        # Subscribe to topic
        consumer.subscribe([topic])
    
        # Process messages
        try:
            while True:
                msg = consumer.poll(1.0)
                if msg is None:
                    # No message available within timeout.
                    # Initial message consumption may take up to
                    # `session.timeout.ms` for the consumer group to
                    # rebalance and start consuming
                    print("Waiting for message or event/error in poll()")
                    continue
                elif msg.error():
                    print('error: {}'.format(msg.error()))
                else:
                    # Check for Kafka message
                    record_key = "Null" if msg.key() is None else msg.key().decode('utf-8')
                    record_value = msg.value().decode('utf-8')
                    print("Consumed record with key "+ record_key + " and value " + record_value)
        except KeyboardInterrupt:
            pass
        finally:
            print("Leave group and commit final offsets")
            consumer.close()
  4. Führen Sie im Verzeichnis wd den folgenden Befehl aus:

    python Consumer.py
  5. Meldungen wie die Folgenden sollten angezeigt werden:

    Waiting for message or event/error in poll()
    Waiting for message or event/error in poll()
    Consumed record with key messageKey0 and value messageValue0
    Consumed record with key messageKey1 and value messageValue1
    Consumed record with key Null and value Example test message
    Hinweis

    Wenn Sie die Konsole zum Erzeugen einer Testnachricht verwendet haben, lautet der Schlüssel für jede Nachricht Null