SDK für .NET mit Streaming verwenden - Schnellstart

Veröffentlichen und konsumieren Sie Nachrichten im Streaming-Service mit OCI SDK für .NET.

In diesem Schnellstart wird gezeigt, wie Sie das Oracle Cloud Infrastructure-(OCI-)SDK für .NET und Oracle Cloud Infrastructure Streaming verwenden, um Nachrichten zu veröffentlichen und zu konsumieren. In diesen Beispielen wird die Sprache C# verwendet.

Wichtige Konzepte und weitere Streamingdetails finden Sie unter Überblick über Streaming. Weitere Informationen zur Verwendung der OCI-SDKs finden Sie in den SDK-Dokumentationen.

Voraussetzungen

Hinweis

In diesem Schnellstart wird eine einfache .NET-Konsolenanwendung mit Visual Studio Code und der .NET-CLI erstellt und ausgeführt. Projektaufgaben wie das Erstellen, Kompilieren und Ausführen eines Projekts werden mit der .NET-CLI durchgeführt. Falls gewünscht, können Sie dieses Tutorial mit einer anderen IDE befolgen und Befehle in einem Terminal ausführen.
  1. Zur Verwendung des SDK für .NET ist Folgendes erforderlich:

    • Ein Oracle Cloud Infrastructure-Account.
    • Ein in diesem Account erstellter Benutzer in einer Gruppe mit einer Policy, die die erforderlichen Berechtigungen erteilt. Dieser Benutzer kann Ihr eigener Benutzer oder eine andere Person/ein anderes System sein, die/das die API aufrufen muss. Ein Beispiel für die Einrichtung eines neuen Benutzers, einer neuen Gruppe, eines neuen Compartments und einer neuen Policy finden Sie unter Benutzer hinzufügen. Eine Liste der typischen Policys, die Sie verwenden können, finden Sie unter Allgemeine Policys.
    • Ein Schlüsselpaar zum Signieren von API-Anforderungen, wobei der Public Key bei Oracle hochgeladen wird. Nur der Benutzer, der die API aufruft, sollte im Besitz des Private Keys sein. Weitere Informationen finden Sie unter SDK-Konfigurationsdatei.
  2. Erfassen Sie den Nachrichtenendpunkt und die OCID eines Streams. Die Schritte zum Abrufen von Details für einen Stream finden Sie unter Details für einen Stream abrufen. Im Rahmen dieses Schnellstarts sollte der Stream einen öffentlichen Endpunkt und die von Oracle verwaltete Verschlüsselung verwenden. Informationen hierzu finden Sie unter Streams erstellen und Streampool erstellen, wenn kein Stream vorhanden ist.
  3. Installieren Sie das .NET 5.0-SDK oder höher. Stellen Sie sicher, dass dotnet in der Umgebungsvariablen PATH festgelegt ist.
  4. Visual Studio Code (empfohlen) mit installierter C#-Erweiterung. Informationen zur Installation von Erweiterungen in Visual Studio Code finden Sie unter VS Code Extension Marketplace.

  5. Stellen Sie sicher, dass Sie über eine gültige SDK-Konfigurationsdatei verfügen. Verwenden Sie für Produktionsumgebungen die Instanz-Principal-Autorisierung.

Nachrichten erzeugen

  1. Öffnen Sie Ihren bevorzugten Editor, wie Visual Studio Code, im leeren Arbeitsverzeichnis wd.
  2. Öffnen Sie das Terminal, und wechseln Sie mit cd zum Verzeichnis wd.
  3. Erstellen Sie eine C#-.NET-Konsolenanwendung, indem Sie den folgenden Befehl im Terminal ausführen:

    dotnet new console

    In einer Meldung wird angegeben, dass die Anwendung erstellt wurde:

    The template "Console Application" was created successfully.

    Dadurch wird eine Program.cs-Datei mit C#-Code für eine einfache "HelloWorld"-Anwendung erstellt.

  4. Fügen Sie OCI-SDK-Packages für die einfache IAM-Authentifizierung und Streaming wie folgt zum C#-Projekt hinzu:

    dotnet add package OCI.DotNetSDK.Common

    dotnet add package OCI.DotNetSDK.Streaming
  5. Ersetzen Sie den Code in Program.cs im Verzeichnis wd durch den folgenden Code. Ersetzen Sie die Werte der Variablen configurationFilePath, profile,ociStreamOcid und ociMessageEndpoint im folgenden Code-Snippet durch die Werte für Ihren Mandanten.

    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading.Tasks;
    using Oci.Common.Auth;
    using Oci.Common.Waiters;
    using Oci.StreamingService;
    using Oci.StreamingService.Models;
    using Oci.StreamingService.Requests;
    using Oci.StreamingService.Responses;
    
    namespace OssProducer
    {
        class Program
        {
            public static async Task Main(string[] args)
            {
                Console.WriteLine("Starting example for OSS Producer");
                string configurationFilePath = "<config_file_path>";
                string profile = "<config_file_profile_name>";
                string ociStreamOcid = "<stream_OCID>";
                string ociMessageEndpoint = "<stream_message_endpoint>";
    
                try
                {
                    var provider = new ConfigFileAuthenticationDetailsProvider(configurationFilePath, profile);
    
                    StreamClient streamClient = new StreamClient(provider);
                    streamClient.SetEndpoint(ociMessageEndpoint);
    
                    await PublishExampleMessages(streamClient, ociStreamOcid);
                }
                catch (Exception e)
                {
                    Console.WriteLine($"Streaming example failed: {e}");
                }
            }
    
            private static async Task PublishExampleMessages(StreamClient streamClient, string streamId)
            {
                // build up a putRequest and publish some messages to the stream
                List<PutMessagesDetailsEntry> messages = new List<PutMessagesDetailsEntry>();
                for (int i = 0; i < 100; i++)
                {
                    PutMessagesDetailsEntry detailsEntry = new PutMessagesDetailsEntry
                    {
                        Key = Encoding.UTF8.GetBytes($"messagekey-{i}"),
                        Value = Encoding.UTF8.GetBytes($"messageValue-{i}")
                    };
                    messages.Add(detailsEntry);
                }
    
                Console.WriteLine($"Publishing {messages.Count} messages to stream {streamId}");
                PutMessagesDetails messagesDetails = new PutMessagesDetails
                {
                    Messages = messages
                };
                PutMessagesRequest putRequest = new PutMessagesRequest
                {
                    StreamId = streamId,
                    PutMessagesDetails = messagesDetails
                };
                PutMessagesResponse putResponse = await streamClient.PutMessages(putRequest);
    
                // the putResponse can contain some useful metadata for handling failures
                foreach (PutMessagesResultEntry entry in putResponse.PutMessagesResult.Entries)
                {
                    if (entry.Error != null)
                    {
                        Console.WriteLine($"Error({entry.Error}): {entry.ErrorMessage}");
                    }
                    else
                    {
                        Console.WriteLine($"Published message to partition {entry.Partition}, offset {entry.Offset}");
                    }
                }
            }
        }
    }
  6. Führen Sie im Verzeichnis wd den folgenden Befehl aus:

    dotnet run
  7. Zeigen Sie die neuesten Nachrichten an, die an den Stream gesendet wurden, um die neuesten Nachrichten anzuzeigen, die an den Stream gesendet wurden, um zu prüfen, ob die Produktion erfolgreich war.

Nachrichten konsumieren

  1. Stellen Sie zunächst sicher, dass der Stream, aus dem Sie Nachrichten konsumieren möchten, Nachrichten enthält. Sie können eine Testnachricht mit der Konsole erstellen oder den Stream und die Nachrichten verwenden, die wir in diesem Schnellstart erstellt haben.
  2. Öffnen Sie Ihren bevorzugten Editor, wie Visual Studio Code, im leeren Arbeitsverzeichnis wd.
  3. Erstellen Sie eine C#-.NET-Konsolenanwendung, indem Sie den folgenden Befehl im Terminal ausführen:

    dotnet new console

    In einer Meldung wird angegeben, dass die Anwendung erstellt wurde:

    The template "Console Application" was created successfully.

    Dadurch wird eine Program.cs-Datei mit C#-Code für eine einfache "HelloWorld"-Anwendung erstellt.

  4. Fügen Sie OCI-SDK-Packages für die einfache IAM-Authentifizierung und Streaming wie folgt zum C#-Projekt hinzu:

    dotnet add package OCI.DotNetSDK.Common

    dotnet add package OCI.DotNetSDK.Streaming
  5. Ersetzen Sie den Code in Program.cs im Verzeichnis wd durch den folgenden Code. Ersetzen Sie die Werte der Variablen configurationFilePath, profile,ociStreamOcid und ociMessageEndpoint im folgenden Code-Snippet durch die Werte für Ihren Mandanten.

    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading.Tasks;
    using Oci.Common.Auth;
    using Oci.Common.Waiters;
    using Oci.StreamingService;
    using Oci.StreamingService.Models;
    using Oci.StreamingService.Requests;
    using Oci.StreamingService.Responses;
    
    namespace OssConsumer
    {
        class Program
        {
            public static async Task Main(string[] args)
            {
                Console.WriteLine("Starting example for OSS Consumer");
                string configurationFilePath = "<config_file_path>";
                string profile = "<config_file_profile_name>";
                string ociStreamOcid = "<stream_OCID>";
                string ociMessageEndpoint = "<stream_message_endpoint>";
    
                try
                {
                    var provider = new ConfigFileAuthenticationDetailsProvider(configurationFilePath, profile);
    
                    StreamClient streamClient = new StreamClient(provider);
                    streamClient.SetEndpoint(ociMessageEndpoint);
    
                    // A cursor can be created as part of a consumer group.
                    // Committed offsets are managed for the group, and partitions
                    // are dynamically balanced amongst consumers in the group.
                    Console.WriteLine("Starting a simple message loop with a group cursor");
                    string groupCursor = await GetCursorByGroup(streamClient, ociStreamOcid, "exampleGroup", "exampleInstance-1");
                    await SimpleMessageLoop(streamClient, ociStreamOcid, groupCursor);
                }
                catch (Exception e)
                {
                    Console.WriteLine($"Streaming example failed: {e}");
                }
            }
    
            private static async Task<string> GetCursorByGroup(StreamClient streamClient, string streamId, string groupName, string instanceName)
            {
                Console.WriteLine($"Creating a cursor for group {groupName}, instance {instanceName}");
    
                CreateGroupCursorDetails createGroupCursorDetails = new CreateGroupCursorDetails
                {
                    GroupName = groupName,
                    InstanceName = instanceName,
                    Type = CreateGroupCursorDetails.TypeEnum.TrimHorizon,
                    CommitOnGet = true
                };
                CreateGroupCursorRequest createCursorRequest = new CreateGroupCursorRequest
                {
                    StreamId = streamId,
                    CreateGroupCursorDetails = createGroupCursorDetails
                };
                CreateGroupCursorResponse groupCursorResponse = await streamClient.CreateGroupCursor(createCursorRequest);
    
                return groupCursorResponse.Cursor.Value;
            }
            private static async Task SimpleMessageLoop(StreamClient streamClient, string streamId, string initialCursor)
            {
                string cursor = initialCursor;
                for (int i = 0; i < 10; i++)
                {
    
                    GetMessagesRequest getMessagesRequest = new GetMessagesRequest
                    {
                        StreamId = streamId,
                        Cursor = cursor,
                        Limit = 10
                    };
                    GetMessagesResponse getResponse = await streamClient.GetMessages(getMessagesRequest);
    
                    // process the messages
                    Console.WriteLine($"Read {getResponse.Items.Count}");
                    foreach (Message message in getResponse.Items)
                    {
                        string key = message.Key != null ? Encoding.UTF8.GetString(message.Key) : "Null";
                        Console.WriteLine($"{key} : {Encoding.UTF8.GetString(message.Value)}");
                    }
    
                    // getMessages is a throttled method; clients should retrieve sufficiently large message
                    // batches, as to avoid too many http requests.
                    await Task.Delay(1000);
    
                    // use the next-cursor for iteration
                    cursor = getResponse.OpcNextCursor;
                }
            }
        }
    }
  6. Führen Sie im Verzeichnis wd den folgenden Befehl aus:

    dotnet run
  7. Meldungen wie die Folgenden sollten angezeigt werden:

    Starting example for OSS Consumer
    Starting a simple message loop with a group cursor
    Creating a cursor for group exampleGroup, instance exampleInstance-1
    Read 10
    messagekey-0 : messageValue-0
    messagekey-1 : messageValue-1
    messagekey-2 : messageValue-2
    messagekey-3 : messageValue-3
    messagekey-4 : messageValue-4
    messagekey-5 : messageValue-5
    messagekey-6 : messageValue-6
    messagekey-7 : messageValue-7
    messagekey-8 : messageValue-8
    messagekey-9 : messageValue-9
    Read 10
    Hinweis

    Wenn Sie die Konsole zum Erzeugen einer Testnachricht verwendet haben, lautet der Schlüssel für jede Nachricht Null