Oracle Cloud Infrastructureドキュメント

メッセージの消費

このトピックでは、ストリームからのメッセージを消費する方法について説明します。

概要

メッセージを消費するには、次のことが必要です:

  • カーソルの作成
  • カーソルを使用してメッセージを読み取ります

カーソルの使用

カーソルはストリーム内のロケーションへのポインタです。 このロケーションは、パーティション内の特定のオフセットまたは時間へのポインタか、または現在のグループへのポインタです。

メッセージを消費するには、CreateCursorまたはCreateGroupCursor APIを使用してパーティションにカーソルを作成し、メッセージの消費を開始する場所を示します。

サポートされているカーソルのタイプは5つあります:

  • TRIM_HORIZON - ストリーム内で使用可能な最も古いメッセージから使用を開始します。 ストリーム内のすべてのメッセージを消費するカーソルをTRIM_HORIZONで作成します。
  • AT_OFFSET - 指定されたオフセットで消費を開始します。 オフセットは、最も古いメッセージのオフセット以上で、最新の公開オフセット以下である必要があります。
  • AFTER_OFFSET - 指定されたオフセットのあとに消費を開始します。 このカーソルには、AT_OFFSETカーソルと同じ制限があります。
  • AT_TIME - 特定の時間からの消費を開始します。 戻されるメッセージのタイムスタンプは、指定された時間以降になります。
  • LATEST - カーソルを作成した後に公開されたメッセージの消費を開始します。

メッセージの消費

作成したカーソルは、GetMessagesを使用してメッセージの消費を開始できます。 GetMessagesを呼び出すたびに、次のGetMessagesコールで使用するカーソルが返されます。 返されたカーソルはnullにはできず、5分で有効期限が切れます。 消費を続けるかぎり、カーソルを再作成する必要はありません。

コンシューマ・グループを使用したメッセージの消費

コンシューマは、グループの一部としてメッセージを消費するように構成できます。 ストリーム・パーティションはグループのメンバー間で分散されるため、単一パーティションのメッセージは1つのコンシューマにのみ送信されます。

パーティション割当てがコンシューマへの参加またはグループからの参加としてリバランスされます。 グループの消費は、単一のコンシューマと同じカーソル・メカニズムを使用して実現されますが、別の種類のカーソルが使用されます。

コンシューマを作成するには、グループ・カーソルを作成し、グループ、インスタンス名およびカーソル・タイプを指定します。 グループは、カーソルを作成する最初のリクエストで作成されます。その保存期間は、割り当てられたストリームと同じです:

CreateGroupCursorRequest groupRequest = CreateGroupCursorRequest.builder()
                .streamId(streamId)
                .createGroupCursorDetails(CreateGroupCursorDetails.builder()
                        .groupName(groupName)
                        .instanceName(instanceName)
                        .type(CreateGroupCursorDetails.Type.TrimHorizon)
                        .commitOnGet(true)
                        .build())
                .build());
 
CreateGroupCursorResponse groupCursorResponse = streamClient.createGroupCursor(groupRequest);
String groupCursor = groupCursorResponse.getCursor().getValue();
// this groupCursor can be used in the same message loop a described above; subsequent getMessages calls return an updated groupCursor.

 

作成したカーソルは、GetMessagesを使用してメッセージの消費を開始できます。 GetMessagesを呼び出すたびに、次のGetMessagesコールで使用するカーソルが返されます。 返されたカーソルはnullにはできず、5分で有効期限が切れます。 消費を続けるかぎり、カーソルを再作成する必要はありません。

必要なIAMポリシー

Oracle Cloud Infrastructureを使用するには、管理者が作成するポリシーで、コンソールまたはSDK、CLIまたはその他のツールを使用したREST APIのどちらを使用しているかにかかわらず、必要なタイプのアクセスを付与する必要があります。 アクションを実行しようとしたときに、権限のないメッセージや権限のないメッセージを取得する場合は、管理者に付与されているアクセスのタイプと作業するコンパートメントを確認してください。

管理者向け: 「ストリーミング・ユーザーがストリームを管理」のポリシーによって、指定されたグループにストリーミングおよび関連するStreamingサービス・リソースを含むすべての操作が許可されます。

新しいポリシーの場合は、「ポリシーの開始」「共通ポリシー」を参照してください。 データベースのポリシーの作成をさらに深く検証する場合は、IAMポリシー参照「ストリーミング・サービスの詳細」を参照してください。

コンソールの使用

コンソールを使用してメッセージを消費することはできません。

APIの使用

APIおよび署名リクエストの使用については、REST APIおよび「セキュリティ資格証明」を参照してください。 SDKの詳細は、「ソフトウェア開発キットとコマンドライン・インタフェース」を参照してください。

次のAPI操作を使用して、メッセージを消費します: