SDK for .NETストリーミング・クイックスタート

このクイックスタートでは、Oracle Cloud Infrastructure (OCI) SDK for .NETおよびOracle Cloud Infrastructure Streamingを使用して、メッセージを公開および消費する方法を示します。これらの例では、C#言語を使用しています。

主要概念およびストリーミングの詳細は、ストリーミングの概要を参照してください。OCI SDKの使用の詳細は、SDKガイドを参照してください。

前提条件

ノート

このクイックスタートでは、Visual Studio Codeおよび.NET CLIを使用して、単純な.NETコンソール・アプリケーションを作成して実行します。プロジェクトの作成、コンパイル、実行などのプロジェクト・タスクは、.NET CLIを使用して行います。必要に応じて、別のIDEでこのチュートリアルに従い、ターミナルでコマンドを実行できます。
  1. SDK for .NETを使用するには、次が必要です:

    • Oracle Cloud Infrastructureアカウント。
    • そのアカウントで作成され、必要な権限を付与するポリシーがあるグループに含まれるユーザー。このユーザーは、自分自身であるか、またはAPIをコールする必要がある別の個人/システムです。新しいユーザー、グループ、コンパートメントおよびポリシーの設定方法の例は、ユーザーの追加を参照してください。使用する一般的なポリシーのリストは、共通ポリシーを参照してください。
    • APIリクエストの署名に使用されるキー・ペア(公開キーがOracleにアップロードされている)。秘密キーはAPIをコールするユーザーのみが所有する必要があります。詳細は、SDK構成ファイルを参照してください。
  2. ストリームのメッセージ・エンドポイントおよびOCIDを収集します。ストリームの詳細を表示する手順は、ストリームおよびストリーム・プールのリストを参照してください。このクイックスタートの目的では、ストリームでパブリック・エンドポイントを使用し、Oracleで暗号化を管理する必要があります。既存のストリームがない場合は、ストリームの作成およびストリーム・プールの作成を参照してください。
  3. .NET 5.0 SDK以降をインストールします。dotnetPATH環境変数に設定されていることを確認します。
  4. C#拡張機能がインストールされたVisual Studio Code (推奨)。Visual Studio Codeに拡張機能をインストールする方法の詳細は、VS Code Extension Marketplaceを参照してください。

  5. 有効なSDK構成ファイルがあることを確認します。本番環境では、インスタンス・プリンシパル認可を使用する必要があります。

メッセージの生成

  1. 空の作業ディレクトリwdから、Visual Studio Codeなどのお気に入りのエディタを開きます。
  2. ターミナルを開き、wdディレクトリにcdで移動します。
  3. ターミナルで次のコマンドを実行して、C# .NETコンソール・アプリケーションを作成します:

    dotnet new console

    アプリケーションが作成されたことを示すメッセージが表示されます:

    The template "Console Application" was created successfully.

    これにより、単純な"HelloWorld"アプリケーションのC#コードを含むProgram.csファイルが作成されます。

  4. IAM基本認証およびストリーミング用のOCI SDKパッケージをC#プロジェクトに次のように追加します:

    dotnet add package OCI.DotNetSDK.Common

    dotnet add package OCI.DotNetSDK.Streaming
  5. wdディレクトリのProgram.csのコードを次のコードに置き換えます。次のコード・スニペットの変数configurationFilePathprofileociStreamOcidおよびociMessageEndpointの値は、テナンシに適用可能な値で置き換えてください。

    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading.Tasks;
    using Oci.Common.Auth;
    using Oci.Common.Waiters;
    using Oci.StreamingService;
    using Oci.StreamingService.Models;
    using Oci.StreamingService.Requests;
    using Oci.StreamingService.Responses;
    
    namespace OssProducer
    {
        class Program
        {
            public static async Task Main(string[] args)
            {
                Console.WriteLine("Starting example for OSS Producer");
                string configurationFilePath = "<config_file_path>";
                string profile = "<config_file_profile_name>";
                string ociStreamOcid = "<stream_OCID>";
                string ociMessageEndpoint = "<stream_message_endpoint>";
    
                try
                {
                    var provider = new ConfigFileAuthenticationDetailsProvider(configurationFilePath, profile);
    
                    StreamClient streamClient = new StreamClient(provider);
                    streamClient.SetEndpoint(ociMessageEndpoint);
    
                    await PublishExampleMessages(streamClient, ociStreamOcid);
                }
                catch (Exception e)
                {
                    Console.WriteLine($"Streaming example failed: {e}");
                }
            }
    
            private static async Task PublishExampleMessages(StreamClient streamClient, string streamId)
            {
                // build up a putRequest and publish some messages to the stream
                List<PutMessagesDetailsEntry> messages = new List<PutMessagesDetailsEntry>();
                for (int i = 0; i < 100; i++)
                {
                    PutMessagesDetailsEntry detailsEntry = new PutMessagesDetailsEntry
                    {
                        Key = Encoding.UTF8.GetBytes($"messagekey-{i}"),
                        Value = Encoding.UTF8.GetBytes($"messageValue-{i}")
                    };
                    messages.Add(detailsEntry);
                }
    
                Console.WriteLine($"Publishing {messages.Count} messages to stream {streamId}");
                PutMessagesDetails messagesDetails = new PutMessagesDetails
                {
                    Messages = messages
                };
                PutMessagesRequest putRequest = new PutMessagesRequest
                {
                    StreamId = streamId,
                    PutMessagesDetails = messagesDetails
                };
                PutMessagesResponse putResponse = await streamClient.PutMessages(putRequest);
    
                // the putResponse can contain some useful metadata for handling failures
                foreach (PutMessagesResultEntry entry in putResponse.PutMessagesResult.Entries)
                {
                    if (entry.Error != null)
                    {
                        Console.WriteLine($"Error({entry.Error}): {entry.ErrorMessage}");
                    }
                    else
                    {
                        Console.WriteLine($"Published message to partition {entry.Partition}, offset {entry.Offset}");
                    }
                }
            }
        }
    }
  6. wdディレクトリから、次のコマンドを実行します:

    dotnet run
  7. コンソールを使用して、ストリームに送信された最新のメッセージを表示し、生成が成功したことを確認します。

メッセージの消費

  1. 最初に、メッセージを消費するストリームにメッセージが含まれていることを確認します。コンソールを使用してテスト・メッセージを生成するか、このクイックスタートで作成したストリームおよびメッセージを使用できます。
  2. 空の作業ディレクトリwdから、Visual Studio Codeなどのお気に入りのエディタを開きます。
  3. ターミナルで次のコマンドを実行して、C# .NETコンソール・アプリケーションを作成します:

    dotnet new console

    アプリケーションが作成されたことを示すメッセージが表示されます:

    The template "Console Application" was created successfully.

    これにより、単純な"HelloWorld"アプリケーションのC#コードを含むProgram.csファイルが作成されます。

  4. IAM基本認証およびストリーミング用のOCI SDKパッケージをC#プロジェクトに次のように追加します:

    dotnet add package OCI.DotNetSDK.Common

    dotnet add package OCI.DotNetSDK.Streaming
  5. wdディレクトリのProgram.csのコードを次のコードに置き換えます。次のコード・スニペットの変数configurationFilePathprofileociStreamOcidおよびociMessageEndpointの値は、テナンシに適用可能な値で置き換えてください。

    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading.Tasks;
    using Oci.Common.Auth;
    using Oci.Common.Waiters;
    using Oci.StreamingService;
    using Oci.StreamingService.Models;
    using Oci.StreamingService.Requests;
    using Oci.StreamingService.Responses;
    
    namespace OssConsumer
    {
        class Program
        {
            public static async Task Main(string[] args)
            {
                Console.WriteLine("Starting example for OSS Consumer");
                string configurationFilePath = "<config_file_path>";
                string profile = "<config_file_profile_name>";
                string ociStreamOcid = "<stream_OCID>";
                string ociMessageEndpoint = "<stream_message_endpoint>";
    
                try
                {
                    var provider = new ConfigFileAuthenticationDetailsProvider(configurationFilePath, profile);
    
                    StreamClient streamClient = new StreamClient(provider);
                    streamClient.SetEndpoint(ociMessageEndpoint);
    
                    // A cursor can be created as part of a consumer group.
                    // Committed offsets are managed for the group, and partitions
                    // are dynamically balanced amongst consumers in the group.
                    Console.WriteLine("Starting a simple message loop with a group cursor");
                    string groupCursor = await GetCursorByGroup(streamClient, ociStreamOcid, "exampleGroup", "exampleInstance-1");
                    await SimpleMessageLoop(streamClient, ociStreamOcid, groupCursor);
                }
                catch (Exception e)
                {
                    Console.WriteLine($"Streaming example failed: {e}");
                }
            }
    
            private static async Task<string> GetCursorByGroup(StreamClient streamClient, string streamId, string groupName, string instanceName)
            {
                Console.WriteLine($"Creating a cursor for group {groupName}, instance {instanceName}");
    
                CreateGroupCursorDetails createGroupCursorDetails = new CreateGroupCursorDetails
                {
                    GroupName = groupName,
                    InstanceName = instanceName,
                    Type = CreateGroupCursorDetails.TypeEnum.TrimHorizon,
                    CommitOnGet = true
                };
                CreateGroupCursorRequest createCursorRequest = new CreateGroupCursorRequest
                {
                    StreamId = streamId,
                    CreateGroupCursorDetails = createGroupCursorDetails
                };
                CreateGroupCursorResponse groupCursorResponse = await streamClient.CreateGroupCursor(createCursorRequest);
    
                return groupCursorResponse.Cursor.Value;
            }
            private static async Task SimpleMessageLoop(StreamClient streamClient, string streamId, string initialCursor)
            {
                string cursor = initialCursor;
                for (int i = 0; i < 10; i++)
                {
    
                    GetMessagesRequest getMessagesRequest = new GetMessagesRequest
                    {
                        StreamId = streamId,
                        Cursor = cursor,
                        Limit = 10
                    };
                    GetMessagesResponse getResponse = await streamClient.GetMessages(getMessagesRequest);
    
                    // process the messages
                    Console.WriteLine($"Read {getResponse.Items.Count}");
                    foreach (Message message in getResponse.Items)
                    {
                        string key = message.Key != null ? Encoding.UTF8.GetString(message.Key) : "Null";
                        Console.WriteLine($"{key} : {Encoding.UTF8.GetString(message.Value)}");
                    }
    
                    // getMessages is a throttled method; clients should retrieve sufficiently large message
                    // batches, as to avoid too many http requests.
                    await Task.Delay(1000);
    
                    // use the next-cursor for iteration
                    cursor = getResponse.OpcNextCursor;
                }
            }
        }
    }
  6. wdディレクトリから、次のコマンドを実行します:

    dotnet run
  7. 次のようなメッセージが表示されます:

    Starting example for OSS Consumer
    Starting a simple message loop with a group cursor
    Creating a cursor for group exampleGroup, instance exampleInstance-1
    Read 10
    messagekey-0 : messageValue-0
    messagekey-1 : messageValue-1
    messagekey-2 : messageValue-2
    messagekey-3 : messageValue-3
    messagekey-4 : messageValue-4
    messagekey-5 : messageValue-5
    messagekey-6 : messageValue-6
    messagekey-7 : messageValue-7
    messagekey-8 : messageValue-8
    messagekey-9 : messageValue-9
    Read 10
    ノート

    コンソールを使用してテスト・メッセージを生成した場合、各メッセージのキーはNullです