SDK para Início Rápido do Streaming do .NET

Publique e consuma mensagens no serviço Streaming usando o OCI SDK para .NET.

Este início rápido mostra como usar o OCI (Oracle Cloud Infrastructure) SDK for .NET e o Oracle Cloud Infrastructure Streaming para publicar e consumir mensagens. Esses exemplos usam a linguagem C#.

Para obter os principais conceitos e mais detalhes do Streaming, consulte Visão Geral do Streaming. Para obter mais informações sobre como usar os SDKs do OCI, consulte os Guias SDK.

Pré-requisitos

Observação

Neste início rápido, criamos e executamos um simples aplicativo de console .NET usando o Visual Studio Code e a CLI do .NET. As tarefas do projeto, como criar, compilar e executar um projeto, são executadas com o uso da CLI do .NET. Se preferir, siga este tutorial com outro IDE e execute comandos em um terminal.
  1. Para usar o SDK para .NET, você precisará ter o seguinte:

    • Uma conta do Oracle Cloud Infrastructure.
    • Um usuário criado nessa conta, em um grupo com uma política que conceda as permissões necessárias. Esse usuário pode ser você mesmo ou outra pessoa/sistema que precise chamar a API. Para obter um exemplo de como configurar um novo usuário, um novo grupo, um novo compartimento e uma nova política, consulte Adicionando Usuários. Para obter uma lista de políticas típicas que você pode usar, consulte Políticas Comuns.
    • Um par de chaves usado para assinar solicitações de API, com a chave pública carregada por upload no sistema Oracle. Somente o usuário que chama a API deve possuir a chave privada. Para obter mais informações, consulte o arquivo de configuração do SDK.
  2. Colete o ponto final e o OCID das Mensagens de um stream. Para obter etapas para obter detalhes de um stream, consulte Obtendo Detalhes de um Stream. Para os fins deste início rápido, o stream deve usar um ponto final público e permitir que a Oracle gerencie a criptografia. Consulte Criando um Stream e Criando um Pool de Streams se você não tiver um stream existente.
  3. Instale o .NET 5.0 SDK ou mais recente. Certifique-se de que dotnet esteja definido na variável de ambiente PATH.
  4. Visual Studio Code (recomendado) com a extensão C# instalada. Para obter informações sobre como instalar extensões no Visual Studio Code, consulte Marketplace de Extensões do VS Code.

  5. Verifique se você tem um arquivo de configuração do SDK válido. Para ambientes de produção, você deve usar a autorização do controlador de instâncias.

Produzindo Mensagens

  1. Abra seu editor favorito, como o Visual Studio Code, no diretório de trabalho vazio wd.
  2. Abra o terminal e cd no diretório wd.
  3. Crie um aplicativo de console C# .NET executando o seguinte comando no terminal:

    dotnet new console

    Você deverá ver uma mensagem indicando que o aplicativo foi criado:

    The template "Console Application" was created successfully.

    Essa ação cria um arquivo Program.cs com código C# para um aplicativo simples "HelloWorld".

  4. Adicione pacotes SDK do OCI para autenticação básica do IAM e o Streaming ao seu projeto C# da seguinte forma:

    dotnet add package OCI.DotNetSDK.Common

    dotnet add package OCI.DotNetSDK.Streaming
  5. Substitua o código em Program.cs no diretório wd pelo código a seguir. Substitua os valores das variáveis configurationFilePath, profile,ociStreamOcid e ociMessageEndpoint no trecho de código a seguir pelos valores aplicáveis à sua tenancy.

    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading.Tasks;
    using Oci.Common.Auth;
    using Oci.Common.Waiters;
    using Oci.StreamingService;
    using Oci.StreamingService.Models;
    using Oci.StreamingService.Requests;
    using Oci.StreamingService.Responses;
    
    namespace OssProducer
    {
        class Program
        {
            public static async Task Main(string[] args)
            {
                Console.WriteLine("Starting example for OSS Producer");
                string configurationFilePath = "<config_file_path>";
                string profile = "<config_file_profile_name>";
                string ociStreamOcid = "<stream_OCID>";
                string ociMessageEndpoint = "<stream_message_endpoint>";
    
                try
                {
                    var provider = new ConfigFileAuthenticationDetailsProvider(configurationFilePath, profile);
    
                    StreamClient streamClient = new StreamClient(provider);
                    streamClient.SetEndpoint(ociMessageEndpoint);
    
                    await PublishExampleMessages(streamClient, ociStreamOcid);
                }
                catch (Exception e)
                {
                    Console.WriteLine($"Streaming example failed: {e}");
                }
            }
    
            private static async Task PublishExampleMessages(StreamClient streamClient, string streamId)
            {
                // build up a putRequest and publish some messages to the stream
                List<PutMessagesDetailsEntry> messages = new List<PutMessagesDetailsEntry>();
                for (int i = 0; i < 100; i++)
                {
                    PutMessagesDetailsEntry detailsEntry = new PutMessagesDetailsEntry
                    {
                        Key = Encoding.UTF8.GetBytes($"messagekey-{i}"),
                        Value = Encoding.UTF8.GetBytes($"messageValue-{i}")
                    };
                    messages.Add(detailsEntry);
                }
    
                Console.WriteLine($"Publishing {messages.Count} messages to stream {streamId}");
                PutMessagesDetails messagesDetails = new PutMessagesDetails
                {
                    Messages = messages
                };
                PutMessagesRequest putRequest = new PutMessagesRequest
                {
                    StreamId = streamId,
                    PutMessagesDetails = messagesDetails
                };
                PutMessagesResponse putResponse = await streamClient.PutMessages(putRequest);
    
                // the putResponse can contain some useful metadata for handling failures
                foreach (PutMessagesResultEntry entry in putResponse.PutMessagesResult.Entries)
                {
                    if (entry.Error != null)
                    {
                        Console.WriteLine($"Error({entry.Error}): {entry.ErrorMessage}");
                    }
                    else
                    {
                        Console.WriteLine($"Published message to partition {entry.Partition}, offset {entry.Offset}");
                    }
                }
            }
        }
    }
  6. No diretório wd, execute o seguinte comando:

    dotnet run
  7. Mostrar as mensagens mais recentes enviadas ao stream para ver as mensagens mais recentes enviadas ao stream para verificar se a produção foi bem-sucedida.

Consumindo Mensagens

  1. Primeiro, certifique-se de que o stream cujas mensagens você deseja consumir contenha mensagens. Você pode usar a Console para produzir uma mensagem de teste ou usar o stream e as mensagens que criamos neste início rápido.
  2. Abra seu editor favorito, como o Visual Studio Code, no diretório de trabalho vazio wd.
  3. Crie um aplicativo da console .NET executando o seguinte comando no terminal:

    dotnet new console

    Você deverá ver uma mensagem indicando que o aplicativo foi criado:

    The template "Console Application" was created successfully.

    Essa ação cria um arquivo Program.cs com código C# para um aplicativo simples "HelloWorld".

  4. Adicione pacotes SDK do OCI para autenticação básica do IAM e o Streaming ao seu projeto C# da seguinte forma:

    dotnet add package OCI.DotNetSDK.Common

    dotnet add package OCI.DotNetSDK.Streaming
  5. Substitua o código em Program.cs no diretório wd pelo código a seguir. Substitua os valores das variáveis configurationFilePath, profile,ociStreamOcid e ociMessageEndpoint no trecho de código a seguir pelos valores aplicáveis à sua tenancy.

    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading.Tasks;
    using Oci.Common.Auth;
    using Oci.Common.Waiters;
    using Oci.StreamingService;
    using Oci.StreamingService.Models;
    using Oci.StreamingService.Requests;
    using Oci.StreamingService.Responses;
    
    namespace OssConsumer
    {
        class Program
        {
            public static async Task Main(string[] args)
            {
                Console.WriteLine("Starting example for OSS Consumer");
                string configurationFilePath = "<config_file_path>";
                string profile = "<config_file_profile_name>";
                string ociStreamOcid = "<stream_OCID>";
                string ociMessageEndpoint = "<stream_message_endpoint>";
    
                try
                {
                    var provider = new ConfigFileAuthenticationDetailsProvider(configurationFilePath, profile);
    
                    StreamClient streamClient = new StreamClient(provider);
                    streamClient.SetEndpoint(ociMessageEndpoint);
    
                    // A cursor can be created as part of a consumer group.
                    // Committed offsets are managed for the group, and partitions
                    // are dynamically balanced amongst consumers in the group.
                    Console.WriteLine("Starting a simple message loop with a group cursor");
                    string groupCursor = await GetCursorByGroup(streamClient, ociStreamOcid, "exampleGroup", "exampleInstance-1");
                    await SimpleMessageLoop(streamClient, ociStreamOcid, groupCursor);
                }
                catch (Exception e)
                {
                    Console.WriteLine($"Streaming example failed: {e}");
                }
            }
    
            private static async Task<string> GetCursorByGroup(StreamClient streamClient, string streamId, string groupName, string instanceName)
            {
                Console.WriteLine($"Creating a cursor for group {groupName}, instance {instanceName}");
    
                CreateGroupCursorDetails createGroupCursorDetails = new CreateGroupCursorDetails
                {
                    GroupName = groupName,
                    InstanceName = instanceName,
                    Type = CreateGroupCursorDetails.TypeEnum.TrimHorizon,
                    CommitOnGet = true
                };
                CreateGroupCursorRequest createCursorRequest = new CreateGroupCursorRequest
                {
                    StreamId = streamId,
                    CreateGroupCursorDetails = createGroupCursorDetails
                };
                CreateGroupCursorResponse groupCursorResponse = await streamClient.CreateGroupCursor(createCursorRequest);
    
                return groupCursorResponse.Cursor.Value;
            }
            private static async Task SimpleMessageLoop(StreamClient streamClient, string streamId, string initialCursor)
            {
                string cursor = initialCursor;
                for (int i = 0; i < 10; i++)
                {
    
                    GetMessagesRequest getMessagesRequest = new GetMessagesRequest
                    {
                        StreamId = streamId,
                        Cursor = cursor,
                        Limit = 10
                    };
                    GetMessagesResponse getResponse = await streamClient.GetMessages(getMessagesRequest);
    
                    // process the messages
                    Console.WriteLine($"Read {getResponse.Items.Count}");
                    foreach (Message message in getResponse.Items)
                    {
                        string key = message.Key != null ? Encoding.UTF8.GetString(message.Key) : "Null";
                        Console.WriteLine($"{key} : {Encoding.UTF8.GetString(message.Value)}");
                    }
    
                    // getMessages is a throttled method; clients should retrieve sufficiently large message
                    // batches, as to avoid too many http requests.
                    await Task.Delay(1000);
    
                    // use the next-cursor for iteration
                    cursor = getResponse.OpcNextCursor;
                }
            }
        }
    }
  6. No diretório wd, execute o seguinte comando:

    dotnet run
  7. Você deverá ver mensagens semelhantes a esta:

    Starting example for OSS Consumer
    Starting a simple message loop with a group cursor
    Creating a cursor for group exampleGroup, instance exampleInstance-1
    Read 10
    messagekey-0 : messageValue-0
    messagekey-1 : messageValue-1
    messagekey-2 : messageValue-2
    messagekey-3 : messageValue-3
    messagekey-4 : messageValue-4
    messagekey-5 : messageValue-5
    messagekey-6 : messageValue-6
    messagekey-7 : messageValue-7
    messagekey-8 : messageValue-8
    messagekey-9 : messageValue-9
    Read 10
    Observação

    Se você usou a Console para produzir uma mensagem de teste, a chave de cada mensagem será Null