SDK de inicio rápido de Streaming para .NET

Publique y consuma mensajes en el servicio Streaming mediante el SDK de OCI para .NET.

Este inicio rápido le muestra cómo utilizar el SDK de Oracle Cloud Infrastructure (OCI) para .NET y Oracle Cloud Infrastructure Streaming para publicar y consumir mensajes. Estos ejemplos utilizan el lenguaje C#.

Para conocer los conceptos clave y más detalles de Streaming, consulte Visión general de Streaming. Para obtener más información sobre el uso de los SDK de OCI, consulte las Guías sobre SDK.

Requisitos

Nota

En este inicio rápido, creamos y ejecutamos una aplicación de consola de .NET sencilla mediante Visual Studio Code y la CLI de .NET. Las tareas de proyecto, como crear, compilar y ejecutar un proyecto, se realizan mediante el uso de la CLI de .NET. Si lo prefiere, puede seguir este tutorial con un IDE diferente y ejecutar comandos en un terminal.
  1. Para utilizar el SDK para .NET, debe tener lo siguiente:

    • Una cuenta de Oracle Cloud Infrastructure.
    • Un usuario creado en esa cuenta, en un grupo con una política que otorgue los permisos necesarios. Este usuario puede ser usted mismo u otra persona/sistema que necesite llamar a la API. Para obtener un ejemplo de cómo configurar un nuevo usuario, grupo, compartimento y política, consulte Adición de usuarios. Para obtener una lista de las políticas típicas que puede que desee utilizar, consulte Políticas Comunes.
    • Par de claves utilizado para firmar solicitudes de API con la clave pública cargada en Oracle. Solo el usuario que llama a la API debe poseer la clave privada. Para obtener más información, consulte la sección sobre el archivo de configuración de SDK.
  2. Recopile el punto final de mensajes y el OCID de un flujo. Para obtener más información sobre un flujo, consulte Obtención de detalles de un flujo. Para este inicio rápido, el flujo debe utilizar un punto final público y permitir que Oracle gestione el cifrado. Consulte Creación de un flujo y Creación de un pool de flujos si no tiene un flujo existente.
  3. Instale el SDK de .NET 5.0 o posterior. Asegúrese de que dotnet está definido en su variable de entorno PATH.
  4. Visual Studio Code (recomendado) con la extensión C# instalado. Para obtener información sobre cómo instalar extensiones en Visual Studio Code, consulte Extension Marketplace de VS Code.

  5. Asegúrese de que tiene un archivo de configuración de SDK válido. Para entornos de producción, debe utilizar la autorización de principal de instancia.

Producción de mensajes

  1. Abra su editor favorito, como Visual Studio Code, desde el directorio de trabajo vacío wd.
  2. Abra el terminal y utilice cd para acceder al directorio wd.
  3. Cree una aplicación de consola .NET C# ejecutando el siguiente comando en el terminal:

    dotnet new console

    Debe mostrarse un mensaje que indica que se ha creado la aplicación:

    The template "Console Application" was created successfully.

    Esto crea un archivo Program.cs con código C# para una aplicación "HelloWorld" simple.

  4. Agregue paquetes de SDK de OCI para la autenticación básica de IAM y Streaming en el proyecto C# de la siguiente forma:

    dotnet add package OCI.DotNetSDK.Common

    dotnet add package OCI.DotNetSDK.Streaming
  5. Sustituya el código en Program.cs del directorio wd por el siguiente código. Sustituya los valores de las variables configurationFilePath, profile,ociStreamOcid y ociMessageEndpoint en el siguiente fragmento de código por los valores aplicables a su arrendamiento.

    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading.Tasks;
    using Oci.Common.Auth;
    using Oci.Common.Waiters;
    using Oci.StreamingService;
    using Oci.StreamingService.Models;
    using Oci.StreamingService.Requests;
    using Oci.StreamingService.Responses;
    
    namespace OssProducer
    {
        class Program
        {
            public static async Task Main(string[] args)
            {
                Console.WriteLine("Starting example for OSS Producer");
                string configurationFilePath = "<config_file_path>";
                string profile = "<config_file_profile_name>";
                string ociStreamOcid = "<stream_OCID>";
                string ociMessageEndpoint = "<stream_message_endpoint>";
    
                try
                {
                    var provider = new ConfigFileAuthenticationDetailsProvider(configurationFilePath, profile);
    
                    StreamClient streamClient = new StreamClient(provider);
                    streamClient.SetEndpoint(ociMessageEndpoint);
    
                    await PublishExampleMessages(streamClient, ociStreamOcid);
                }
                catch (Exception e)
                {
                    Console.WriteLine($"Streaming example failed: {e}");
                }
            }
    
            private static async Task PublishExampleMessages(StreamClient streamClient, string streamId)
            {
                // build up a putRequest and publish some messages to the stream
                List<PutMessagesDetailsEntry> messages = new List<PutMessagesDetailsEntry>();
                for (int i = 0; i < 100; i++)
                {
                    PutMessagesDetailsEntry detailsEntry = new PutMessagesDetailsEntry
                    {
                        Key = Encoding.UTF8.GetBytes($"messagekey-{i}"),
                        Value = Encoding.UTF8.GetBytes($"messageValue-{i}")
                    };
                    messages.Add(detailsEntry);
                }
    
                Console.WriteLine($"Publishing {messages.Count} messages to stream {streamId}");
                PutMessagesDetails messagesDetails = new PutMessagesDetails
                {
                    Messages = messages
                };
                PutMessagesRequest putRequest = new PutMessagesRequest
                {
                    StreamId = streamId,
                    PutMessagesDetails = messagesDetails
                };
                PutMessagesResponse putResponse = await streamClient.PutMessages(putRequest);
    
                // the putResponse can contain some useful metadata for handling failures
                foreach (PutMessagesResultEntry entry in putResponse.PutMessagesResult.Entries)
                {
                    if (entry.Error != null)
                    {
                        Console.WriteLine($"Error({entry.Error}): {entry.ErrorMessage}");
                    }
                    else
                    {
                        Console.WriteLine($"Published message to partition {entry.Partition}, offset {entry.Offset}");
                    }
                }
            }
        }
    }
  6. Desde el directorio wd, ejecute el siguiente comando:

    dotnet run
  7. Mostrar los últimos mensajes enviados al flujo para ver los últimos mensajes enviados al flujo para verificar que la producción se ha realizado correctamente.

Consumo de mensajes

  1. En primer lugar, asegúrese de que el flujo del que desea consumir mensajes contiene mensajes. Puede utilizar la consola para producir un mensaje de prueba o utilizar el flujo y los mensajes que hemos creado en este inicio rápido.
  2. Abra su editor favorito, como Visual Studio Code, desde el directorio de trabajo vacío wd.
  3. Cree una aplicación de consola de .NET C# ejecutando el siguiente comando en el terminal:

    dotnet new console

    Debe mostrarse un mensaje que indica que se ha creado la aplicación:

    The template "Console Application" was created successfully.

    Esto crea un archivo Program.cs con código C# para una aplicación "HelloWorld" simple.

  4. Agregue paquetes de SDK de OCI para la autenticación básica de IAM y Streaming en el proyecto C# de la siguiente forma:

    dotnet add package OCI.DotNetSDK.Common

    dotnet add package OCI.DotNetSDK.Streaming
  5. Sustituya el código en Program.cs del directorio wd por el siguiente código. Sustituya los valores de las variables configurationFilePath, profile,ociStreamOcid y ociMessageEndpoint en el siguiente fragmento de código por los valores aplicables a su arrendamiento.

    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading.Tasks;
    using Oci.Common.Auth;
    using Oci.Common.Waiters;
    using Oci.StreamingService;
    using Oci.StreamingService.Models;
    using Oci.StreamingService.Requests;
    using Oci.StreamingService.Responses;
    
    namespace OssConsumer
    {
        class Program
        {
            public static async Task Main(string[] args)
            {
                Console.WriteLine("Starting example for OSS Consumer");
                string configurationFilePath = "<config_file_path>";
                string profile = "<config_file_profile_name>";
                string ociStreamOcid = "<stream_OCID>";
                string ociMessageEndpoint = "<stream_message_endpoint>";
    
                try
                {
                    var provider = new ConfigFileAuthenticationDetailsProvider(configurationFilePath, profile);
    
                    StreamClient streamClient = new StreamClient(provider);
                    streamClient.SetEndpoint(ociMessageEndpoint);
    
                    // A cursor can be created as part of a consumer group.
                    // Committed offsets are managed for the group, and partitions
                    // are dynamically balanced amongst consumers in the group.
                    Console.WriteLine("Starting a simple message loop with a group cursor");
                    string groupCursor = await GetCursorByGroup(streamClient, ociStreamOcid, "exampleGroup", "exampleInstance-1");
                    await SimpleMessageLoop(streamClient, ociStreamOcid, groupCursor);
                }
                catch (Exception e)
                {
                    Console.WriteLine($"Streaming example failed: {e}");
                }
            }
    
            private static async Task<string> GetCursorByGroup(StreamClient streamClient, string streamId, string groupName, string instanceName)
            {
                Console.WriteLine($"Creating a cursor for group {groupName}, instance {instanceName}");
    
                CreateGroupCursorDetails createGroupCursorDetails = new CreateGroupCursorDetails
                {
                    GroupName = groupName,
                    InstanceName = instanceName,
                    Type = CreateGroupCursorDetails.TypeEnum.TrimHorizon,
                    CommitOnGet = true
                };
                CreateGroupCursorRequest createCursorRequest = new CreateGroupCursorRequest
                {
                    StreamId = streamId,
                    CreateGroupCursorDetails = createGroupCursorDetails
                };
                CreateGroupCursorResponse groupCursorResponse = await streamClient.CreateGroupCursor(createCursorRequest);
    
                return groupCursorResponse.Cursor.Value;
            }
            private static async Task SimpleMessageLoop(StreamClient streamClient, string streamId, string initialCursor)
            {
                string cursor = initialCursor;
                for (int i = 0; i < 10; i++)
                {
    
                    GetMessagesRequest getMessagesRequest = new GetMessagesRequest
                    {
                        StreamId = streamId,
                        Cursor = cursor,
                        Limit = 10
                    };
                    GetMessagesResponse getResponse = await streamClient.GetMessages(getMessagesRequest);
    
                    // process the messages
                    Console.WriteLine($"Read {getResponse.Items.Count}");
                    foreach (Message message in getResponse.Items)
                    {
                        string key = message.Key != null ? Encoding.UTF8.GetString(message.Key) : "Null";
                        Console.WriteLine($"{key} : {Encoding.UTF8.GetString(message.Value)}");
                    }
    
                    // getMessages is a throttled method; clients should retrieve sufficiently large message
                    // batches, as to avoid too many http requests.
                    await Task.Delay(1000);
    
                    // use the next-cursor for iteration
                    cursor = getResponse.OpcNextCursor;
                }
            }
        }
    }
  6. Desde el directorio wd, ejecute el siguiente comando:

    dotnet run
  7. Se mostrarán mensajes similares a los siguientes:

    Starting example for OSS Consumer
    Starting a simple message loop with a group cursor
    Creating a cursor for group exampleGroup, instance exampleInstance-1
    Read 10
    messagekey-0 : messageValue-0
    messagekey-1 : messageValue-1
    messagekey-2 : messageValue-2
    messagekey-3 : messageValue-3
    messagekey-4 : messageValue-4
    messagekey-5 : messageValue-5
    messagekey-6 : messageValue-6
    messagekey-7 : messageValue-7
    messagekey-8 : messageValue-8
    messagekey-9 : messageValue-9
    Read 10
    Nota

    Si ha utilizado la consola para producir un mensaje de prueba, la clave de cada mensaje es Null