Oracle Cloud Infrastructureドキュメント

ストリーミングSDKの使用

このトピックでは、ストリーミング SDKの使用方法について説明します。

開始

Oracle Cloud Infrastructure SDKのインストールおよび構成の詳細は、「開発者ツール」を参照してください。

アーキテクチャの概要

ストリーミングには、次の主要な構成要素があります:

  • ストリーム: パーティション化され、メッセージのログのみが追加されます。
  • パーティション: ストリームのセクション。 パーティションを使用すると、複数のノード間でメッセージを分割することによって、ストリームを配布できます。 各パーティションを別々のマシンに配置して、複数のコンシューマが1つのトピックからパラレルで読み取ることができるようにすることができます。
  • プロデューサ: ストリームにメッセージを公開するエンティティ。
  • コンシューマ: ストリームからメッセージを読み取るエンティティです。
  • コンシューマ・グループ: ストリームの個別のパーティションから独立してメッセージを読み取ることができるコンシューマのグループ。

ストリーミング・クライアント

ストリーミングSDKは2つのクライアントにカプセル化されています: StreamAdminClient StreamClient

StreamAdminClient は、ストリーミング・サービスのコントロール・プレーン操作を組み込みます。 これを使用して、ストリームの作成、削除、更新、変更およびリストを行うことができます。

StreamAdminClient オブジェクトをインスタンス化するには:

StreamAdminClient adminClient = new StreamAdminClient([authProvider]);
adminClient.setEndpoint("https://streaming.r2.oracleiaas.com"); // You cannot use the setRegion method

 

StreamClient は、メッセージの公開および消費に使用します。

StreamClient オブジェクトをインスタンス化するには:

// First you have to get the stream you want to consume/publish.
// You can either make a CreateStream, GetStream, or ListStream call. They all return a "messagesEndpoint" as part of a Stream object.
// That endpoint NEEDS to be used when creating the StreamClient object.
GetStreamRequest getStreamRequest = GetStreamRequest.builder().streamId(streamId).build();
Stream stream = adminClient.getStream(getStreamRequest).getStream();
 
StreamClient streamClient = new StreamClient([authProvider]);
streamClient.setEndpoint(stream.getMessagesEndpoint());

ストリームの作成

ストリームを作成するには、StreamAdminClientcreateStream メソッドを使用します。 ストリームの作成は非同期操作です。 作成操作の完了を確認するには、新しいストリームのlifecycleStateDetails プロパティがActive またはFailedのいずれかであることをチェックします。

ストリームの作成方法を示す例を次に示します:

// No error handling
CreateStreamDetails createStreamDetails =  CreateStreamDetails.builder()
   .partitions(5) // number of partitions you want in your stream

.name("myStream") // the name of the stream - only used in the console .compartmentId(tenancy) // the compartment id you want your stream to live in .build();

// You can also add tags to the createStreamDetails object. CreateStreamRequest createStreamRequest = CreateStreamRequest.builder() .createStreamDetails(createStreamDetails) .build(); Stream stream = adminClient.createStream(createStreamRequest).getStream();

while (stream.getLifecycleState() != Stream.LifecycleState.Active && stream.getLifecycleState() != Stream.LifecycleState.Failed) {

GetStreamRequest getStreamRequest = GetStreamRequest.builder().streamId(stream.getId()).build(); stream = adminClient.getStream(getStreamRequest).getStream(); } // Handle stream Failure

ストリームの削除

ストリームを削除するには、StreamAdminClientdeleteStream メソッドAPIを使用します。 ストリームの削除は非同期操作であり、削除操作が終了すると、ストリーム状態はDeleted に変わります。 削除プロセス中は、メッセージの消費または生成にストリームを使用できません。

次の例は、deleteStream メソッドを使用してストリームを削除する方法を示しています:

// No error handling DeleteStreamRequest deleteStreamRequest =

DeleteStreamRequest.builder()

.streamId(stream.getId()) .build(); adminClient.deleteStream(deleteStreamRequest);

ストリームのリスト

指定されたコンパートメントのストリームのリストを戻すには、listStreams メソッドを使用します。

OCID、ライフ・サイクル状態および名前により、戻りリストをフィルタ処理できます。

結果は、名前または作成時間による昇順または降順でソートできます。

結果はページ区切りのリストに再度渡されます。 結果の各ページにトークンが戻されます。このトークンをgetOpcNextPage メソッドに渡して、結果の次のページを取得してください。 getOpcNextPage から返されたnullトークンは、使用可能な結果がこれ以上ないことを示します。

例えば:

// No error handling
ListStreamsRequest listStreamsRequest =
        ListStreamsRequest.builder()
                .compartmentId(tenancy)
                .build();
// You can filter by OCID (exact match only) [builder].id(streamId) -> This will return 0..1 item
// You can filter by name (exact match only) [builder].name(name) -> This will return 0..n items
// You can order the result per TimeCreated or Name [builder].sortBy(SortBy.[TimeCreated|Name])
// You can change the ordering [builder].sortOrder(SortOrder.[Asc|Desc])
// You can filter by lifecycleState [builder].lifecycleState(lifecycleState)
 
String page;
do {
    ListStreamsResponse listStreamsResponse = adminClient.listStreams(listStreamsRequest);
    List<StreamSummary> streams = listStreamsResponse.getItems();
    // Do something with the streams
    page = listStreamsResponse.getOpcNextPage();
} while (page != null);

ストリーム詳細の取得

ストリームの詳細を取得するには、getStream メソッドを使用し、ストリームのプロパティを調べます。 例えば:

// No error handling
GetStreamRequest getStreamRequest =
        GetStreamRequest.builder()
                .streamId(streamId)
                .build();
 
Stream stream = adminClient.getStream(getStreamRequest).getStream();

メッセージの公開

ストリームが作成され、アクティブになると、streamClient.putMessagesメソッドを使用してメッセージを公開できます。

メッセージは、キー(null)と値で構成されます。 キーと値は両方ともバイト配列です。

メッセージがストリームのパーティションに公開されます。 複数のパーティションがある場合、メッセージが公開されるパーティションはメッセージ・キーを使用して計算されます。 キーがNullの場合、パーティションは値のサブセットを使用して計算されます。 Nullキーのメッセージの場合、パーティション・スキームが変更される可能性があるため、同じ値のメッセージが同じパーティションに表示されることはありません。 Nullキーを送信すると、そのメッセージはランダム・パーティションに格納されます。 同じ値のメッセージを同じパーティションに移動する場合は、これらのメッセージに同じキーを使用する必要があります。

次のコードは、メッセージの公開方法を示しています:

// No error handling
List<PutMessagesDetailsEntry> messages = new ArrayList<>();
 
for (int i = 0; i < 40; i++) {
    byte[] key = "myKey".getBytes(Charsets.UTF_8); // In that case, all messages will go on the same partition since the key is the same.
    byte[] value = UUID.randomUUID().toString().getBytes(Charsets.UTF_8);
    messages.add(new PutMessagesDetailsEntry(key, value));
}
 
PutMessagesDetails putMessagesDetails =
        PutMessagesDetails.builder()
                .messages(messages)
                .build();
 
PutMessagesRequest putMessagesRequest =
        PutMessagesRequest.builder()
                .putMessagesDetails(putMessagesDetails)
                .build();
 
PutMessagesResult putMessagesResult = streamClient.putMessages(putMessagesRequest).getPutMessagesResult();
// It's not because the call didn't fail that the messages were successfully published!
int failures = putMessagesResult.getFailures();
// If failures is > 0, it means we have a partial-success call.
List<PutMessagesResultEntry> entries = putMessagesResult.getEntries();
// entries is a list of the same size as the list of messages you sent.
// It is guaranteed that the order of the messages is the same as when you sent them.
// Each entry contains either "offset/partition/timestamp" if the message was successfully published
// or "error/errorMessage" if it failed.
if (failures != 0) {
    entries.forEach(entry -> {
        if (StringUtils.isNotEmpty(entry.getError())) {
            // That particular message failed to get published.
            // It could be a throttle error and in that case error would be "429" and errorMessage would contain a meaningful message.
            // Or it could be an internal error on our side and error would be "500".
             
            // Possible solution would be to republish only failed messages.
        }
    });
}

メッセージの消費

メッセージを消費するには、カーソルを使用する必要があります。カーソルとは、パーティションに対するオフセットのことです。

サポートされているカーソルのタイプは5つあります:

  • TRIM_HORIZON - ストリーム内で使用可能な最も古いメッセージから使用を開始します。 ストリーム内のすべてのメッセージを消費するカーソルをTRIM_HORIZONで作成します。
  • AT_OFFSET - 指定されたオフセットで消費を開始します。 オフセットは、最も古いメッセージのオフセット以上で、最新の公開オフセット以下である必要があります。
  • AFTER_OFFSET - 指定されたオフセットのあとに消費を開始します。 このカーソルには、AT_OFFSETカーソルと同じ制限があります。
  • AT_TIME - 特定の時間からの消費を開始します。 戻されるメッセージのタイムスタンプは、指定された時間以降になります。
  • LATEST - カーソルを作成した後に公開されたメッセージの消費を開始します。

使用可能な最も古いメッセージから消費を開始するTRIM_HORIZONカーソルを作成するには:

// No error handling
CreateCursorDetails createCursorDetails =
        CreateCursorDetails.builder()
                .type(Type.TrimHorizon)
                .partition("0")
                .build();
// If using AT_OFFSET or AFTER_OFFSET you need to specify the offset [builder].offset(offset)
// If using AT_TIME you need to specify the time [builder].time(new Date(xxx))
 
CreateCursorRequest createCursorRequest =
        CreateCursorRequest.builder()
                .createCursorDetails(createCursorDetails)
                .build();
 
String cursor = streamClient.createCursor(createCursorRequest).getCursor().getValue();
// Cursor will then be used to get messages from the stream.

作成したカーソルは、GetMessagesメソッドを使用してメッセージの消費を開始できます。 GetMessagesの各呼び出しは、次のGetMessages呼び出しで使用するためのカーソルを返します。 返されたカーソルはnullではなく、5分で期限切れになります。 消費を続けるかぎり、カーソルを再作成する必要はありません。

カーソルを使用してメッセージを取得する例を次に示します:

// No error handling (there is a high chance of getting a throttling error using a tight loop)
while (true) { // or your own exit condition
    GetMessagesRequest getMessagesRequest =
            GetMessagesRequest.builder()
                    .cursor(cursor)
                    .build();
 
    GetMessagesResponse getMessagesResponse = streamClient.getMessages(getMessagesRequest);
 
    // This could be empty, but we will always return an updated cursor
    getMessagesResponse.getItems().forEach(message -> {
        // Process the message
    });
 
    cursor = getMessagesResponse.getOpcNextCursor();Consuming Messages

コンシューマ・グループの使用

コンシューマは、グループの一部としてメッセージを消費するように構成できます。 ストリーム・パーティションはグループのメンバー間で分散されるため、単一パーティションのメッセージは1つのコンシューマにのみ送信されます。

パーティション割当てがコンシューマへの参加またはグループからの参加としてリバランスされます。 グループの消費は、単一のコンシューマと同じカーソル・メカニズムを使用して実現されますが、別の種類のカーソルが使用されます。

コンシューマ・グループの仕組み

コンシューマ・グループは、ストリーム内のすべてのパーティションからのメッセージを消費するように調整する一連のインスタンスです。 インスタンスは相互作用を通じてグループのメンバーシップを維持します。一定期間に相互作用がないと、タイムアウトになり、グループからインスタンスが削除されます。 パーティションは、グループ内の特定のインスタンス用に予約されます。グループに参加するインスタンスやインスタンス・タイムアウトなど、特定のグループ・イベントに対する予約がリバランスされます。

インスタンスは、グループへのメンバーシップを保持しますが、メッセージを取得するためのコールは、そのインスタンスに予約されているパーティションからのみメッセージを返します。 パーティションの予約は、メッセージが1つのインスタンスによってのみ処理されるようにします。

グループ内のインスタンスのセットは時間の経過とともに変化することが想定されています。トランスリエンスは、一般にサーバー障害、スケーリング・パターンまたは操作管理のために、様々な理由で発生する可能性があります。

インスタンスの存続期間を超えてコンシューマ処理状態を維持することで、インスタンスの受信と移動が可能になり、グループのコンテキスト内で適切な場所に存在することができます。

インスタンスは、同じグループ内の他のインスタンスによって処理されないように、処理済メッセージのオフセットをコミットします。

コンシューマ・グループでのメッセージの使用

コンシューマ・グループを作成するには、グループ・カーソルを作成し、グループ、インスタンス名およびカーソル・タイプを指定します。 グループは、カーソルを作成する最初のリクエストで作成されます。その保存期間は、割り当てられたストリームと同じです:

CreateGroupCursorRequest groupRequest = CreateGroupCursorRequest.builder()
                .streamId(streamId)
                .createGroupCursorDetails(CreateGroupCursorDetails.builder()
                        .groupName(groupName)
                        .instanceName(instanceName)
                        .type(CreateGroupCursorDetails.Type.TrimHorizon)
                        .commitOnGet(true)
                        .build())
                .build());
 
CreateGroupCursorResponse groupCursorResponse = streamClient.createGroupCursor(groupRequest);
String groupCursor = groupCursorResponse.getCursor().getValue();
// this groupCursor can be used in the same message loop a described above; subsequent getMessages calls return an updated groupCursor.

 

作成したカーソルは、GetMessagesを使用してメッセージの消費を開始できます。 GetMessagesを呼び出すたびに、次のGetMessagesコールで使用するカーソルが返されます。 返されたカーソルはnullにはなりません。5分で有効期限が切れます。 消費を続けるかぎり、カーソルを再作成する必要はありません。