Kafka Pythonクライアントおよびストリーミング・クイックスタート

このクイックスタートでは、Oracle Cloud Infrastructure StreamingKafka Pythonクライアントを使用して、メッセージを公開および消費する方法を示します。

詳細は、Apache Kafkaでのストリーミングの使用を参照してください。主要概念およびストリーミングの詳細は、ストリーミングの概要を参照してください。

前提条件

  1. ストリーミングでKafka Pythonクライアントを使用するには、次が必要です:

    • Oracle Cloud Infrastructureアカウント。
    • そのアカウントで作成され、必要な権限を付与するポリシーがあるグループに含まれるユーザー。新しいユーザー、グループ、コンパートメントおよびポリシーの設定方法の例は、ユーザーの追加を参照してください。使用する一般的なポリシーのリストは、共通ポリシーを参照してください。
  2. 次の詳細を収集します:

    • ストリームOCID
    • メッセージ・エンドポイント
    • ストリーム・プールOCID
    • ストリーム・プールFQDN
    • Kafka接続設定:
      • ブートストラップ・サーバー
      • SASL接続文字列
      • セキュリティ・プロトコル

    ストリームの詳細を表示する手順は、ストリームおよびストリーム・プールのリストを参照してください。既存のストリームがない場合は、ストリームの作成およびストリーム・プールの作成を参照してください。ストリームは、Kafkaトピックに対応します。

  3. Python 3.6以降(PIPがインストールおよび更新済)。
  4. Visual Code Studio (推奨)またはその他の統合開発環境(IDE)。
  5. 次のコマンドを使用して、Python用のConfluent-Kafkaパッケージをインストールします:

    pip install confluent-kafka
    ノート

    これらのパッケージは、グローバルに、またはvirtualenv内にインストールできます。librdkafkaパッケージは、confluent-kafkaパッケージによって使用され、最新のconfluent-kafkaリリースのwheelに埋め込まれています。詳細は、Confluent Pythonクライアントのドキュメントを参照してください。
  6. このクイックスタートを開発および実行しているホストにSSL CAルート証明書をインストールします。クライアントは、CA証明書を使用してブローカの証明書を検証します。

    Windowsの場合、curlとともに配布されたcacert.pemファイルをダウンロードします(cacert.pmのダウンロード)。他のプラットフォームの場合、SSLトラスト・ストアの構成を参照してください。

  7. Kafkaプロトコルを使用した認証では、認証トークンとSASL/PLAINメカニズムが使用されます。認証トークンの生成については、認証トークンの作業を参照してください。OCIでストリームおよびストリーム・プールを作成した場合は、OCI IAMに従ってこのストリームを使用する権限がすでに付与されているため、OCIユーザーの認証トークンを作成する必要があります。

    ノート

    OCIユーザーの認証トークンは、作成時にのみ表示されます。それをコピーして、将来の使用に備えて安全な場所に保管してください。

メッセージの生成

  1. 空の作業ディレクトリwdから、Visual Studio Codeなどのお気に入りのエディタを開きます。前提条件を満たした後、現在のPython環境にPython用のconfluent-kafkaパッケージがすでにインストールされている必要があります。
  2. 次のコードを使用して、wdディレクトリにProducer.pyという名前のファイルを作成します。マップconfの構成値を置き換えてください。トピックの名前は、作成したストリームの名前です。

    from confluent_kafka import Producer, KafkaError  
      
    if __name__ == '__main__':  
      
      topic = "<topic_stream_name>"  
      conf = {  
        'bootstrap.servers': "<bootstrap_servers_endpoint>", #usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092  
        'security.protocol': 'SASL_SSL',  
      
        'ssl.ca.location': '</path/on/your/host/to/your/cert.pem/>',  # from step 6 of Prerequisites section
         # optionally instead of giving path as shown above, you can do 1. pip install certifi 2. import certifi and
         # 3. 'ssl.ca.location': certifi.where()
      
        'sasl.mechanism': 'PLAIN',
        'sasl.username': '<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>',  # from step 2 of Prerequisites section
        'sasl.password': '<your_OCI_user_auth_token>',  # from step 7 of Prerequisites section
       }  
      
       # Create Producer instance  
       producer = Producer(**conf)  
       delivered_records = 0  
      
       # Optional per-message on_delivery handler (triggered by poll() or flush())  
       # when a message has been successfully delivered or permanently failed delivery after retries.  
       def acked(err, msg):  
           global delivered_records  
           """Delivery report handler called on  
               successful or failed delivery of message """  
           if err is not None:  
               print("Failed to deliver message: {}".format(err))  
           else:  
               delivered_records += 1  
               print("Produced record to topic {} partition [{}] @ offset {}".format(msg.topic(), msg.partition(), msg.offset()))  
    
      for n in range(10):  
          record_key = "messageKey" + str(n)  
          record_value = "messageValue" + str(n)  
          print("Producing record: {}\t{}".format(record_key, record_value))  
          producer.produce(topic, key=record_key, value=record_value, on_delivery=acked)  
          # p.poll() serves delivery reports (on_delivery) from previous produce() calls.  
          producer.poll(0)  
    
      producer.flush()  
      print("{} messages were produced to topic {}!".format(delivered_records, topic))
  3. wdディレクトリから、次のコマンドを実行します:

    python Producer.py
  4. コンソールを使用して、ストリームに送信された最新のメッセージを表示し、生成が成功したことを確認します。

メッセージの消費

  1. 最初に、メッセージを消費するストリームにメッセージが含まれていることを確認します。コンソールを使用してテスト・メッセージを生成するか、このクイックスタートで作成したストリームおよびメッセージを使用できます。
  2. 空の作業ディレクトリwdから、Visual Studio Codeなどのお気に入りのエディタを開きます。前提条件を満たした後、現在のPython環境にPython用のconfluent-kafkaパッケージがすでにインストールされている必要があります。
  3. 次のコードを使用して、wdディレクトリにConsumer.pyという名前のファイルを作成します。マップconfの構成値を置き換えてください。トピックの名前は、作成したストリームの名前です。

    from confluent_kafka import Consumer
    
    
    if __name__ == '__main__':
    
      topic = "<topic_stream_name>"  
      conf = {  
        'bootstrap.servers': "<bootstrap_servers_endpoint>", #usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092  
        'security.protocol': 'SASL_SSL',  
      
        'ssl.ca.location': '</path/on/your/host/to/your/cert.pem/>',  # from step 6 of Prerequisites section
         # optionally instead of giving path as shown above, you can do 1. pip install certifi 2. import certifi and
         # 3. 'ssl.ca.location': certifi.where()
      
        'sasl.mechanism': 'PLAIN',
        'sasl.username': '<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>',  # from step 2 of Prerequisites section
        'sasl.password': '<your_OCI_user_auth_token>',  # from step 7 of Prerequisites section
       }  
    
        # Create Consumer instance
        consumer = Consumer(conf)
    
        # Subscribe to topic
        consumer.subscribe([topic])
    
        # Process messages
        try:
            while True:
                msg = consumer.poll(1.0)
                if msg is None:
                    # No message available within timeout.
                    # Initial message consumption may take up to
                    # `session.timeout.ms` for the consumer group to
                    # rebalance and start consuming
                    print("Waiting for message or event/error in poll()")
                    continue
                elif msg.error():
                    print('error: {}'.format(msg.error()))
                else:
                    # Check for Kafka message
                    record_key = "Null" if msg.key() is None else msg.key().decode('utf-8')
                    record_value = msg.value().decode('utf-8')
                    print("Consumed record with key "+ record_key + " and value " + record_value)
        except KeyboardInterrupt:
            pass
        finally:
            print("Leave group and commit final offsets")
            consumer.close()
  4. wdディレクトリから、次のコマンドを実行します:

    python Consumer.py
  5. 次のようなメッセージが表示されます:

    Waiting for message or event/error in poll()
    Waiting for message or event/error in poll()
    Consumed record with key messageKey0 and value messageValue0
    Consumed record with key messageKey1 and value messageValue1
    Consumed record with key Null and value Example test message
    ノート

    コンソールを使用してテスト・メッセージを生成した場合、各メッセージのキーはNullです