SDK for Pythonストリーミング・クイックスタート

このクイックスタートでは、Oracle Cloud Infrastructure (OCI) SDK for PythonおよびOracle Cloud Infrastructure Streamingを使用して、メッセージを公開および消費する方法を示します。

主要概念およびストリーミングの詳細は、ストリーミングの概要を参照してください。OCI SDKの使用の詳細は、SDKガイドを参照してください。

前提条件

  1. SDK for Pythonを使用するには、次が必要です:

    • Oracle Cloud Infrastructureアカウント。
    • そのアカウントで作成され、必要な権限を付与するポリシーがあるグループに含まれるユーザー。このユーザーは、自分自身であるか、またはAPIをコールする必要がある別の個人/システムです。新しいユーザー、グループ、コンパートメントおよびポリシーの設定方法の例は、ユーザーの追加を参照してください。使用する一般的なポリシーのリストは、共通ポリシーを参照してください。
    • APIリクエストの署名に使用されるキー・ペア(公開キーがOracleにアップロードされている)。秘密キーはAPIをコールするユーザーのみが所有する必要があります。
    ノート

    詳細は、SDKの構成を参照してください。
  2. ストリームのメッセージ・エンドポイントおよびOCIDを収集します。ストリームの詳細を表示する手順は、ストリームおよびストリーム・プールのリストを参照してください。このクイックスタートの目的では、ストリームでパブリック・エンドポイントを使用し、Oracleで暗号化を管理する必要があります。既存のストリームがない場合は、ストリームの作成およびストリーム・プールの作成を参照してください。
  3. Python 3.6以降(PIPがインストールおよび更新済)。
  4. Visual Code Studio (推奨)またはその他の統合開発環境(IDE)。
  5. 次のコマンドを使用して、Python用のoci-sdkパッケージをインストールします:

    pip install oci
    ノート

    ociをインストールする場合は、Python仮想環境を使用することをお薦めします。詳細は、SDKのダウンロードとインストールを参照してください。
  6. 有効なSDK構成ファイルがあることを確認します。本番環境では、インスタンス・プリンシパル認可を使用する必要があります。

メッセージの生成

  1. wdディレクトリから、Visual Studio Codeなどのお気に入りのエディタを開きます。前提条件を満たした後、現在のPython環境にPython用のoci-sdkパッケージがすでにインストールされている必要があります。
  2. 次のコードを使用して、wdディレクトリにProducer.pyという名前のファイルを作成します。次のコード・スニペットの変数ociConfigFilePathociProfileNameociStreamOcidおよびociMessageEndpointの値は、テナンシに適用可能な値で置き換えてください。

    import oci  
      
    from base64 import b64encode  
      
    ociMessageEndpoint = "<stream_message_endpoint>"  
    ociStreamOcid = "<stream_OCID>"  
    ociConfigFilePath = "<config_file_path>"  
    ociProfileName = "<config_file_profile_name>"  
      
    def produce_messages(client, stream_id):
      # Build up a PutMessagesDetails and publish some messages to the stream
      message_list = []
      for i in range(100):
          key = "messageKey" + str(i)
          value = "messageValue " + str(i)
          encoded_key = b64encode(key.encode()).decode()
          encoded_value = b64encode(value.encode()).decode()
          message_list.append(oci.streaming.models.PutMessagesDetailsEntry(key=encoded_key, value=encoded_value))  
      
      print("Publishing {} messages to the stream {} ".format(len(message_list), stream_id))
      messages = oci.streaming.models.PutMessagesDetails(messages=message_list)
      put_message_result = client.put_messages(stream_id, messages)
      
      # The put_message_result can contain some useful metadata for handling failures
      for entry in put_message_result.data.entries:
          if entry.error:
              print("Error ({}) : {}".format(entry.error, entry.error_message))
          else:
              print("Published message to partition {} , offset {}".format(entry.partition, entry.offset))
    
    config = oci.config.from_file(ociConfigFilePath, ociProfileName)
    stream_client = oci.streaming.StreamClient(config, service_endpoint=ociMessageEndpoint)
    
    # Publish some messages to the stream
    produce_messages(stream_client, ociStreamOcid)
  3. wdディレクトリから、次のコマンドを実行します:

    python Producer.py
  4. コンソールを使用して、ストリームに送信された最新のメッセージを表示し、生成が成功したことを確認します。

メッセージの消費

  1. 最初に、メッセージを消費するストリームにメッセージが含まれていることを確認します。コンソールを使用してテスト・メッセージを生成するか、このクイックスタートで作成したストリームおよびメッセージを使用できます。
  2. wdディレクトリから、Visual Studio Codeなどのお気に入りのエディタを開きます。前提条件を満たしていることを確認した後、現在のPython環境にPython用のoci-sdkパッケージがすでにインストールされている必要があります。
  3. 次のコードを使用して、ディレクトリwdConsumer.pyという名前のファイルを作成します。次のコード・スニペットの変数ociConfigFilePathociProfileNameociStreamOcidおよびociMessageEndpointの値は、テナンシに適用可能な値で置き換えてください。

    import oci
    import time
    
    from base64 import b64decode
    
    ociMessageEndpoint = "<stream_message_endpoint>"  
    ociStreamOcid = "<stream_OCID>"  
    ociConfigFilePath = "<config_file_path>"  
    ociProfileName = "<config_file_profile_name>"  
    
    
    def get_cursor_by_group(sc, sid, group_name, instance_name):
        print(" Creating a cursor for group {}, instance {}".format(group_name, instance_name))
        cursor_details = oci.streaming.models.CreateGroupCursorDetails(group_name=group_name, instance_name=instance_name,
                                                                       type=oci.streaming.models.
                                                                       CreateGroupCursorDetails.TYPE_TRIM_HORIZON,
                                                                       commit_on_get=True)
        response = sc.create_group_cursor(sid, cursor_details)
        return response.data.value
    
    def simple_message_loop(client, stream_id, initial_cursor):
        cursor = initial_cursor
        while True:
            get_response = client.get_messages(stream_id, cursor, limit=10)
            # No messages to process. return.
            if not get_response.data:
                return
    
            # Process the messages
            print(" Read {} messages".format(len(get_response.data)))
            for message in get_response.data:
                if message.key is None:
                    key = "Null"
                else:
                    key = b64decode(message.key.encode()).decode()
                print("{}: {}".format(key,
                                      b64decode(message.value.encode()).decode()))
    
            # get_messages is a throttled method; clients should retrieve sufficiently large message
            # batches, as to avoid too many http requests.
            time.sleep(1)
            # use the next-cursor for iteration
            cursor = get_response.headers["opc-next-cursor"]
    
    
    config = oci.config.from_file(ociConfigFilePath, ociProfileName)
    stream_client = oci.streaming.StreamClient(config, service_endpoint=ociMessageEndpoint)
    
    # A cursor can be created as part of a consumer group.
    # Committed offsets are managed for the group, and partitions
    # are dynamically balanced amongst consumers in the group.
    group_cursor = get_cursor_by_group(stream_client, ociStreamOcid, "example-group", "example-instance-1")
    simple_message_loop(stream_client, ociStreamOcid, group_cursor)
  4. wdディレクトリから、次のコマンドを実行します:

    python Consumer.py
  5. 次のようなメッセージが表示されます:

    Starting a simple message loop with a group cursor
    Creating a cursor for group example-group, instance example-instance-1
     Read 2 messages
    Null: Example Test Message 0
    Null: Example Test Message 0
     Read 2 messages
    Null: Example Test Message 0
    Null: Example Test Message 0
     Read 1 messages
    Null: Example Test Message 0
     Read 10 messages
    key 0: value 0
    key 1: value 1
    ノート

    コンソールを使用してテスト・メッセージを生成した場合、各メッセージのキーはNullです