Avvio rapido SDK per lo streaming .NET

Pubblica e utilizza i messaggi nel servizio di streaming utilizzando l'SDK OCI per .NET.

Questa rapida panoramica ti mostra come utilizzare Oracle Cloud Infrastructure (OCI) SDK for .NET e Oracle Cloud Infrastructure Streaming per pubblicare e utilizzare i messaggi. Questi esempi utilizzano il linguaggio C#.

Per i concetti chiave e ulteriori dettagli sullo streaming, vedere Panoramica dello streaming. Per ulteriori informazioni sull'uso degli SDK OCI, consulta le guide SDK.

Prerequisiti

Nota

In questo avvio rapido, viene creata ed eseguita una semplice applicazione console .NET utilizzando Visual Studio Code e l'interfaccia CLI .NET. I task di progetto, ad esempio la creazione, la compilazione e l'esecuzione di un progetto, vengono eseguiti utilizzando l'interfaccia CLI di .NET. Se preferisci, puoi seguire questa esercitazione con un IDE diverso ed eseguire i comandi in un terminale.
  1. Per utilizzare l'SDK per .NET, è necessario disporre dei seguenti elementi:

    • Un account Oracle Cloud Infrastructure.
    • Utente creato in tale account, in un gruppo con un criterio che concede le autorizzazioni necessarie. Questo utente può essere te stesso o un'altra persona/sistema che deve chiamare l'API. Per un esempio su come impostare un nuovo utente, gruppo, compartimento e criterio, vedere Aggiunta di utenti. Per un elenco dei criteri tipici che si desidera utilizzare, vedere Criteri comuni.
    • Coppia di chiavi utilizzata per firmare le richieste API, con la chiave pubblica caricata in Oracle. Solo l'utente che chiama l'API deve possedere la chiave privata. Per maggiori informazioni, vedere file di configurazione di SDK.
  2. Raccogliere l'endpoint e l'OCID dei messaggi di un flusso. Per i passi per ottenere i dettagli per un flusso, vedere Recupero dei dettagli per un flusso. Ai fini di questo avvio rapido, il flusso deve utilizzare un endpoint pubblico e consentire a Oracle di gestire la cifratura. Se non si dispone di un flusso esistente, vedere Creazione di un flusso e Creazione di un pool di flussi.
  3. Installare .NET 5.0 SDK o versione successiva. Assicurarsi che dotnet sia impostato nella variabile di ambiente PATH.
  4. Visual Studio Code (consigliato) con l'estensione C# installata. Per informazioni su come installare le estensioni in Visual Studio Code, vedere VS Code Extension Marketplace.

  5. Assicurarsi di disporre di un file di configurazione SDK valido. Per gli ambienti di produzione, è necessario utilizzare l'autorizzazione principal dell'istanza.

Produzione di messaggi

  1. Aprire l'editor preferito, ad esempio Visual Studio Code, dalla directory di lavoro vuota wd.
  2. Aprire il terminale e cd nella directory wd.
  3. Creare un'applicazione console C# .NET eseguendo il seguente comando nel terminale:

    dotnet new console

    Viene visualizzato un messaggio che indica che l'applicazione è stata creata:

    The template "Console Application" was created successfully.

    Questo crea un file Program.cs con codice C# per una semplice applicazione "HelloWorld".

  4. Aggiungere package SDK OCI per l'autenticazione IAM di base e lo streaming al progetto C# come indicato di seguito.

    dotnet add package OCI.DotNetSDK.Common

    dotnet add package OCI.DotNetSDK.Streaming
  5. Sostituire il codice in Program.cs nella directory wd con il codice seguente. Sostituire i valori delle variabili configurationFilePath, profile,ociStreamOcid e ociMessageEndpoint nello snippet di codice seguente con i valori applicabili per la tenancy in uso.

    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading.Tasks;
    using Oci.Common.Auth;
    using Oci.Common.Waiters;
    using Oci.StreamingService;
    using Oci.StreamingService.Models;
    using Oci.StreamingService.Requests;
    using Oci.StreamingService.Responses;
    
    namespace OssProducer
    {
        class Program
        {
            public static async Task Main(string[] args)
            {
                Console.WriteLine("Starting example for OSS Producer");
                string configurationFilePath = "<config_file_path>";
                string profile = "<config_file_profile_name>";
                string ociStreamOcid = "<stream_OCID>";
                string ociMessageEndpoint = "<stream_message_endpoint>";
    
                try
                {
                    var provider = new ConfigFileAuthenticationDetailsProvider(configurationFilePath, profile);
    
                    StreamClient streamClient = new StreamClient(provider);
                    streamClient.SetEndpoint(ociMessageEndpoint);
    
                    await PublishExampleMessages(streamClient, ociStreamOcid);
                }
                catch (Exception e)
                {
                    Console.WriteLine($"Streaming example failed: {e}");
                }
            }
    
            private static async Task PublishExampleMessages(StreamClient streamClient, string streamId)
            {
                // build up a putRequest and publish some messages to the stream
                List<PutMessagesDetailsEntry> messages = new List<PutMessagesDetailsEntry>();
                for (int i = 0; i < 100; i++)
                {
                    PutMessagesDetailsEntry detailsEntry = new PutMessagesDetailsEntry
                    {
                        Key = Encoding.UTF8.GetBytes($"messagekey-{i}"),
                        Value = Encoding.UTF8.GetBytes($"messageValue-{i}")
                    };
                    messages.Add(detailsEntry);
                }
    
                Console.WriteLine($"Publishing {messages.Count} messages to stream {streamId}");
                PutMessagesDetails messagesDetails = new PutMessagesDetails
                {
                    Messages = messages
                };
                PutMessagesRequest putRequest = new PutMessagesRequest
                {
                    StreamId = streamId,
                    PutMessagesDetails = messagesDetails
                };
                PutMessagesResponse putResponse = await streamClient.PutMessages(putRequest);
    
                // the putResponse can contain some useful metadata for handling failures
                foreach (PutMessagesResultEntry entry in putResponse.PutMessagesResult.Entries)
                {
                    if (entry.Error != null)
                    {
                        Console.WriteLine($"Error({entry.Error}): {entry.ErrorMessage}");
                    }
                    else
                    {
                        Console.WriteLine($"Published message to partition {entry.Partition}, offset {entry.Offset}");
                    }
                }
            }
        }
    }
  6. Dalla directory wd, eseguire il comando seguente:

    dotnet run
  7. Mostra i messaggi più recenti inviati al flusso per visualizzare i messaggi più recenti inviati al flusso per verificare che la produzione sia riuscita.

Messaggi di consumo

  1. In primo luogo, assicurarsi che il flusso da cui si desidera utilizzare i messaggi contenga messaggi. È possibile utilizzare la console per generare un messaggio di test oppure utilizzare il flusso e i messaggi creati in questo avvio rapido.
  2. Aprire l'editor preferito, ad esempio Visual Studio Code, dalla directory di lavoro vuota wd.
  3. Creare un'applicazione console C# .NET eseguendo il seguente comando sul terminale:

    dotnet new console

    Viene visualizzato un messaggio che indica che l'applicazione è stata creata:

    The template "Console Application" was created successfully.

    Questo crea un file Program.cs con codice C# per una semplice applicazione "HelloWorld".

  4. Aggiungere package SDK OCI per l'autenticazione IAM di base e lo streaming al progetto C# come indicato di seguito.

    dotnet add package OCI.DotNetSDK.Common

    dotnet add package OCI.DotNetSDK.Streaming
  5. Sostituire il codice in Program.cs nella directory wd con il codice seguente. Sostituire i valori delle variabili configurationFilePath, profile,ociStreamOcid e ociMessageEndpoint nello snippet di codice seguente con i valori applicabili per la tenancy in uso.

    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading.Tasks;
    using Oci.Common.Auth;
    using Oci.Common.Waiters;
    using Oci.StreamingService;
    using Oci.StreamingService.Models;
    using Oci.StreamingService.Requests;
    using Oci.StreamingService.Responses;
    
    namespace OssConsumer
    {
        class Program
        {
            public static async Task Main(string[] args)
            {
                Console.WriteLine("Starting example for OSS Consumer");
                string configurationFilePath = "<config_file_path>";
                string profile = "<config_file_profile_name>";
                string ociStreamOcid = "<stream_OCID>";
                string ociMessageEndpoint = "<stream_message_endpoint>";
    
                try
                {
                    var provider = new ConfigFileAuthenticationDetailsProvider(configurationFilePath, profile);
    
                    StreamClient streamClient = new StreamClient(provider);
                    streamClient.SetEndpoint(ociMessageEndpoint);
    
                    // A cursor can be created as part of a consumer group.
                    // Committed offsets are managed for the group, and partitions
                    // are dynamically balanced amongst consumers in the group.
                    Console.WriteLine("Starting a simple message loop with a group cursor");
                    string groupCursor = await GetCursorByGroup(streamClient, ociStreamOcid, "exampleGroup", "exampleInstance-1");
                    await SimpleMessageLoop(streamClient, ociStreamOcid, groupCursor);
                }
                catch (Exception e)
                {
                    Console.WriteLine($"Streaming example failed: {e}");
                }
            }
    
            private static async Task<string> GetCursorByGroup(StreamClient streamClient, string streamId, string groupName, string instanceName)
            {
                Console.WriteLine($"Creating a cursor for group {groupName}, instance {instanceName}");
    
                CreateGroupCursorDetails createGroupCursorDetails = new CreateGroupCursorDetails
                {
                    GroupName = groupName,
                    InstanceName = instanceName,
                    Type = CreateGroupCursorDetails.TypeEnum.TrimHorizon,
                    CommitOnGet = true
                };
                CreateGroupCursorRequest createCursorRequest = new CreateGroupCursorRequest
                {
                    StreamId = streamId,
                    CreateGroupCursorDetails = createGroupCursorDetails
                };
                CreateGroupCursorResponse groupCursorResponse = await streamClient.CreateGroupCursor(createCursorRequest);
    
                return groupCursorResponse.Cursor.Value;
            }
            private static async Task SimpleMessageLoop(StreamClient streamClient, string streamId, string initialCursor)
            {
                string cursor = initialCursor;
                for (int i = 0; i < 10; i++)
                {
    
                    GetMessagesRequest getMessagesRequest = new GetMessagesRequest
                    {
                        StreamId = streamId,
                        Cursor = cursor,
                        Limit = 10
                    };
                    GetMessagesResponse getResponse = await streamClient.GetMessages(getMessagesRequest);
    
                    // process the messages
                    Console.WriteLine($"Read {getResponse.Items.Count}");
                    foreach (Message message in getResponse.Items)
                    {
                        string key = message.Key != null ? Encoding.UTF8.GetString(message.Key) : "Null";
                        Console.WriteLine($"{key} : {Encoding.UTF8.GetString(message.Value)}");
                    }
    
                    // getMessages is a throttled method; clients should retrieve sufficiently large message
                    // batches, as to avoid too many http requests.
                    await Task.Delay(1000);
    
                    // use the next-cursor for iteration
                    cursor = getResponse.OpcNextCursor;
                }
            }
        }
    }
  6. Dalla directory wd, eseguire il comando seguente:

    dotnet run
  7. Dovresti vedere messaggi simili ai seguenti:

    Starting example for OSS Consumer
    Starting a simple message loop with a group cursor
    Creating a cursor for group exampleGroup, instance exampleInstance-1
    Read 10
    messagekey-0 : messageValue-0
    messagekey-1 : messageValue-1
    messagekey-2 : messageValue-2
    messagekey-3 : messageValue-3
    messagekey-4 : messageValue-4
    messagekey-5 : messageValue-5
    messagekey-6 : messageValue-6
    messagekey-7 : messageValue-7
    messagekey-8 : messageValue-8
    messagekey-9 : messageValue-9
    Read 10
    Nota

    Se è stata utilizzata la console per generare un messaggio di test, la chiave per ogni messaggio è Null