SDK de inicio rápido de Streaming para .NET
Publique y consuma mensajes en el servicio Streaming mediante el SDK de OCI para .NET.
Este inicio rápido le muestra cómo utilizar el SDK de Oracle Cloud Infrastructure (OCI) para .NET y Oracle Cloud Infrastructure Streaming para publicar y consumir mensajes. Estos ejemplos utilizan el lenguaje C#.
Para obtener más información sobre conceptos clave y Streaming, consulte Visión general de Streaming. Para obtener más información sobre el uso de el SDK de OCI, consulte las Guías sobre el SDK.
Requisitos
En este inicio rápido, creamos y ejecutamos una aplicación de consola de .NET sencilla mediante Visual Studio Code y la CLI de .NET. Las tareas de proyecto, como crear, compilar y ejecutar un proyecto, se realizan mediante el uso de la CLI de .NET. Si lo prefiere, puede seguir este tutorial con un IDE diferente y ejecutar comandos en un terminal.
-
Para utilizar el SDK para .NET, debe tener lo siguiente:
- Una cuenta de Oracle Cloud Infrastructure.
- Un usuario creado en esa cuenta, en un grupo con una política que otorgue los permisos necesarios. Este usuario puede ser usted mismo u otra persona/sistema que necesite llamar a la API. Para obtener un ejemplo de cómo configurar un nuevo usuario, grupo, compartimento y política, consulte Adición de usuarios. Para obtener una lista de las políticas típicas que puede que desee utilizar, consulte Políticas Comunes.
- Par de claves utilizado para firmar solicitudes de API con la clave pública cargada en Oracle. Solo el usuario que llama a la API debe poseer la clave privada. Para obtener más información, consulte la sección sobre el archivo de configuración de SDK.
- Recopile el punto final de mensajes y el OCID de un flujo. Para obtener detalles sobre un flujo, consulte Obtención de detalles de un flujo. Para este inicio rápido, el flujo debe utilizar un punto final público y permitir que Oracle gestione el cifrado. Consulte Creación de un Flujo y Creación de un Pool de Flujos si no tiene un flujo existente.
- Instale el SDK de .NET 5.0 o posterior. Asegúrese de que
dotnet
está definido en su variable de entornoPATH
. -
Visual Studio Code (recomendado) con la extensión C# instalado. Para obtener información sobre cómo instalar extensiones en Visual Studio Code, consulte Extension Marketplace de VS Code.
- Asegúrese de que tiene un archivo de configuración de SDK válido. Para entornos de producción, debe utilizar la autorización de principal de instancia.
Producción de mensajes
- Abra su editor favorito, como Visual Studio Code, desde el directorio de trabajo vacío
wd
. - Abra el terminal y utilice
cd
para acceder al directoriowd
. -
Cree una aplicación de consola .NET C# ejecutando el siguiente comando en el terminal:
dotnet new console
Debe mostrarse un mensaje que indica que se ha creado la aplicación:
The template "Console Application" was created successfully.
Esto crea un archivo
Program.cs
con código C# para una aplicación "HelloWorld" simple. -
Agregue paquetes de SDK de OCI para la autenticación básica de IAM y Streaming en el proyecto C# de la siguiente forma:
dotnet add package OCI.DotNetSDK.Common
dotnet add package OCI.DotNetSDK.Streaming
-
Sustituya el código en
Program.cs
del directoriowd
por el siguiente código. Sustituya los valores de las variablesconfigurationFilePath
,profile
,ociStreamOcid
yociMessageEndpoint
en el siguiente fragmento de código por los valores aplicables a su arrendamiento.using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; using Oci.Common.Auth; using Oci.Common.Waiters; using Oci.StreamingService; using Oci.StreamingService.Models; using Oci.StreamingService.Requests; using Oci.StreamingService.Responses; namespace OssProducer { class Program { public static async Task Main(string[] args) { Console.WriteLine("Starting example for OSS Producer"); string configurationFilePath = "<config_file_path>"; string profile = "<config_file_profile_name>"; string ociStreamOcid = "<stream_OCID>"; string ociMessageEndpoint = "<stream_message_endpoint>"; try { var provider = new ConfigFileAuthenticationDetailsProvider(configurationFilePath, profile); StreamClient streamClient = new StreamClient(provider); streamClient.SetEndpoint(ociMessageEndpoint); await PublishExampleMessages(streamClient, ociStreamOcid); } catch (Exception e) { Console.WriteLine($"Streaming example failed: {e}"); } } private static async Task PublishExampleMessages(StreamClient streamClient, string streamId) { // build up a putRequest and publish some messages to the stream List<PutMessagesDetailsEntry> messages = new List<PutMessagesDetailsEntry>(); for (int i = 0; i < 100; i++) { PutMessagesDetailsEntry detailsEntry = new PutMessagesDetailsEntry { Key = Encoding.UTF8.GetBytes($"messagekey-{i}"), Value = Encoding.UTF8.GetBytes($"messageValue-{i}") }; messages.Add(detailsEntry); } Console.WriteLine($"Publishing {messages.Count} messages to stream {streamId}"); PutMessagesDetails messagesDetails = new PutMessagesDetails { Messages = messages }; PutMessagesRequest putRequest = new PutMessagesRequest { StreamId = streamId, PutMessagesDetails = messagesDetails }; PutMessagesResponse putResponse = await streamClient.PutMessages(putRequest); // the putResponse can contain some useful metadata for handling failures foreach (PutMessagesResultEntry entry in putResponse.PutMessagesResult.Entries) { if (entry.Error != null) { Console.WriteLine($"Error({entry.Error}): {entry.ErrorMessage}"); } else { Console.WriteLine($"Published message to partition {entry.Partition}, offset {entry.Offset}"); } } } } }
-
Desde el directorio
wd
, ejecute el siguiente comando:dotnet run
- Mostrar los últimos mensajes enviados al flujo para ver los últimos mensajes enviados al flujo para verificar que la producción se ha realizado correctamente.
Consumo de mensajes
- En primer lugar, asegúrese de que el flujo del que desea consumir mensajes contiene mensajes. Puede utilizar la consola para producir un mensaje de prueba o utilizar el flujo y los mensajes que hemos creado en este inicio rápido.
- Abra su editor favorito, como Visual Studio Code, desde el directorio de trabajo vacío
wd
. -
Cree una aplicación de consola de .NET C# ejecutando el siguiente comando en el terminal:
dotnet new console
Debe mostrarse un mensaje que indica que se ha creado la aplicación:
The template "Console Application" was created successfully.
Esto crea un archivo
Program.cs
con código C# para una aplicación "HelloWorld" simple. -
Agregue paquetes de SDK de OCI para la autenticación básica de IAM y Streaming en el proyecto C# de la siguiente forma:
dotnet add package OCI.DotNetSDK.Common
dotnet add package OCI.DotNetSDK.Streaming
-
Sustituya el código en
Program.cs
del directoriowd
por el siguiente código. Sustituya los valores de las variablesconfigurationFilePath
,profile
,ociStreamOcid
yociMessageEndpoint
en el siguiente fragmento de código por los valores aplicables a su arrendamiento.using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; using Oci.Common.Auth; using Oci.Common.Waiters; using Oci.StreamingService; using Oci.StreamingService.Models; using Oci.StreamingService.Requests; using Oci.StreamingService.Responses; namespace OssConsumer { class Program { public static async Task Main(string[] args) { Console.WriteLine("Starting example for OSS Consumer"); string configurationFilePath = "<config_file_path>"; string profile = "<config_file_profile_name>"; string ociStreamOcid = "<stream_OCID>"; string ociMessageEndpoint = "<stream_message_endpoint>"; try { var provider = new ConfigFileAuthenticationDetailsProvider(configurationFilePath, profile); StreamClient streamClient = new StreamClient(provider); streamClient.SetEndpoint(ociMessageEndpoint); // A cursor can be created as part of a consumer group. // Committed offsets are managed for the group, and partitions // are dynamically balanced amongst consumers in the group. Console.WriteLine("Starting a simple message loop with a group cursor"); string groupCursor = await GetCursorByGroup(streamClient, ociStreamOcid, "exampleGroup", "exampleInstance-1"); await SimpleMessageLoop(streamClient, ociStreamOcid, groupCursor); } catch (Exception e) { Console.WriteLine($"Streaming example failed: {e}"); } } private static async Task<string> GetCursorByGroup(StreamClient streamClient, string streamId, string groupName, string instanceName) { Console.WriteLine($"Creating a cursor for group {groupName}, instance {instanceName}"); CreateGroupCursorDetails createGroupCursorDetails = new CreateGroupCursorDetails { GroupName = groupName, InstanceName = instanceName, Type = CreateGroupCursorDetails.TypeEnum.TrimHorizon, CommitOnGet = true }; CreateGroupCursorRequest createCursorRequest = new CreateGroupCursorRequest { StreamId = streamId, CreateGroupCursorDetails = createGroupCursorDetails }; CreateGroupCursorResponse groupCursorResponse = await streamClient.CreateGroupCursor(createCursorRequest); return groupCursorResponse.Cursor.Value; } private static async Task SimpleMessageLoop(StreamClient streamClient, string streamId, string initialCursor) { string cursor = initialCursor; for (int i = 0; i < 10; i++) { GetMessagesRequest getMessagesRequest = new GetMessagesRequest { StreamId = streamId, Cursor = cursor, Limit = 10 }; GetMessagesResponse getResponse = await streamClient.GetMessages(getMessagesRequest); // process the messages Console.WriteLine($"Read {getResponse.Items.Count}"); foreach (Message message in getResponse.Items) { string key = message.Key != null ? Encoding.UTF8.GetString(message.Key) : "Null"; Console.WriteLine($"{key} : {Encoding.UTF8.GetString(message.Value)}"); } // getMessages is a throttled method; clients should retrieve sufficiently large message // batches, as to avoid too many http requests. await Task.Delay(1000); // use the next-cursor for iteration cursor = getResponse.OpcNextCursor; } } } }
-
Desde el directorio
wd
, ejecute el siguiente comando:dotnet run
-
Se mostrarán mensajes similares a los siguientes:
Starting example for OSS Consumer Starting a simple message loop with a group cursor Creating a cursor for group exampleGroup, instance exampleInstance-1 Read 10 messagekey-0 : messageValue-0 messagekey-1 : messageValue-1 messagekey-2 : messageValue-2 messagekey-3 : messageValue-3 messagekey-4 : messageValue-4 messagekey-5 : messageValue-5 messagekey-6 : messageValue-6 messagekey-7 : messageValue-7 messagekey-8 : messageValue-8 messagekey-9 : messageValue-9 Read 10
Nota
Si ha utilizado la consola para producir un mensaje de prueba, la clave de cada mensaje esNull
Pasos siguientes
Consulte los siguientes recursos para obtener más información: