Início Rápido do Kafka .NET Client e Serviço Streaming

Publique e consuma mensagens no serviço Streaming usando o cliente Kafka .NET.

Este início rápido mostra como usar o cliente Kafka .NET com o Oracle Cloud Infrastructure Streaming para publicar e consumir mensagens. Esses exemplos usam a linguagem C#.

Para obter mais informações, consulte Usando o Streaming com o Apache Kafka. Para obter os principais conceitos e mais detalhes do Streaming, consulte Visão Geral do Streaming

Pré-requisitos

Observação

Neste início rápido, criamos e executamos um simples aplicativo de console .NET usando o Visual Studio Code e a CLI do .NET. As tarefas do projeto, como criar, compilar e executar um projeto, são executadas com o uso da CLI do .NET. Se preferir, siga este tutorial com outro IDE e execute comandos em um terminal.
  1. Para usar o cliente Kafka .NET com o serviço Streaming, você deve ter o seguinte:

    • Uma conta do Oracle Cloud Infrastructure.
    • Um usuário criado nessa conta, em um grupo com uma política que conceda as permissões necessárias. Para obter um exemplo de como configurar um novo usuário, um novo grupo, um novo compartimento e uma nova política, consulte Adicionando Usuários. Para obter uma lista de políticas típicas que você pode usar, consulte Políticas Comuns.
  2. Colete os seguintes detalhes:

    • OCID do Stream
    • Ponto final de mensagens
    • OCID do pool de streams
    • FQDN do pool de streams
    • Definições de conexão do Kafka:
      • Servidores de bootstrap
      • Strings de conexão SASL
      • Protocolo de segurança

    Para obter as etapas de criação e gerenciamento de streams e pools de streams, consulte Gerenciando Streams e Gerenciando Pools de Streams. Os streams correspondem a um tópico do Kafka.

  3. Instale o .NET 5.0 SDK ou mais recente. Certifique-se de que dotnet esteja definido na variável de ambiente PATH.
  4. Visual Studio Code (recomendado) com a extensão C# instalada. Para obter informações sobre como instalar extensões no Visual Studio Code, consulte Marketplace de Extensões do VS Code.

  5. A autenticação com o protocolo Kafka usa tokens de autenticação e o mecanismo SASL/PLAIN. Consulte Como Trabalhar com Tokens de Autenticação para saber sobre a geração do token de autenticação. Se você criou o stream e o pool de streams no OCI, já estará autorizado a usar esse stream de acordo com o OCI IAM. Portanto, crie tokens de autenticação para seu usuário do OCI.

    Observação

    Os tokens de autenticação do usuário do OCI só ficam visíveis no momento da criação. Copie-o e mantenha-o em algum lugar seguro para uso futuro.
  6. Instale os certificados raiz da CA SSL no host no qual você está desenvolvendo e executando este início rápido. O cliente usa certificados da CA para verificar o certificado do broker.

    Para o Windows, faça download do arquivo cacert.pem distribuído com curl (download de cacert.pm). Para outras plataformas, consulte Configurar armazenamento confiável SSL.

Produzindo Mensagens

  1. Abra seu editor favorito, como o Visual Studio Code, no diretório de trabalho vazio wd.
  2. Abra o terminal e cd no diretório wd.
  3. Crie um aplicativo de console C# .NET executando o seguinte comando no terminal:

    dotnet new console

    Você deverá ver uma mensagem indicando que o aplicativo foi criado:

    The template "Console Application" was created successfully.

    Essa ação cria um arquivo Program.cs com código C# para um aplicativo simples "HelloWorld".

  4. Para fazer referência à biblioteca confluent-kafka-dotnet no seu novo projeto .NET Core, execute o seguinte comando no diretório wd do seu projeto:

    dotnet add package Confluent.Kafka
  5. Substitua o código em Program.cs no diretório wd pelo código a seguir. Substitua valores de variáveis no mapa ProducerConfig e o nome de topic pelos detalhes coletados nos pré-requisitos:

    using System;
    using Confluent.Kafka;
    
    namespace OssProducerWithKafkaApi
    {
        class Program
        {
            static void Main(string[] args)
            {
                Console.WriteLine("Demo for using Kafka APIs seamlessly with OSS");
    
                var config = new ProducerConfig {
                                BootstrapServers = "<bootstrap_servers_endpoint>", //usually of the form cell-1.streaming.[region code].oci.oraclecloud.com:9092
                                SslCaLocation = "<path\to\root\ca\certificate\*.pem>",
                                SecurityProtocol = SecurityProtocol.SaslSsl,
                                SaslMechanism = SaslMechanism.Plain,
                                SaslUsername = "<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>",
                                SaslPassword = "<your_OCI_user_auth_token>", // use the auth-token you created step 5 of Prerequisites section 
                                };
    
                Produce("<topic_stream_name>", config); // use the name of the stream you created
    
            }
    
            static void Produce(string topic, ClientConfig config)
            {
                using (var producer = new ProducerBuilder<string, string>(config).Build())
                {
                    int numProduced = 0;
                    int numMessages = 10;
                    for (int i=0; i<numMessages; ++i)
                    {
                        var key = "messageKey" + i;
                        var val = "messageVal" + i;
    
                        Console.WriteLine($"Producing record: {key} {val}");
    
                        producer.Produce(topic, new Message<string, string> { Key = key, Value = val },
                            (deliveryReport) =>
                            {
                                if (deliveryReport.Error.Code != ErrorCode.NoError)
                                {
                                    Console.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}");
                                }
                                else
                                {
                                    Console.WriteLine($"Produced message to: {deliveryReport.TopicPartitionOffset}");
                                    numProduced += 1;
                                }
                            });
                    }
    
                    producer.Flush(TimeSpan.FromSeconds(10));
    
                    Console.WriteLine($"{numProduced} messages were produced to topic {topic}");
                }
            }
        }
    }
  6. No diretório wd, execute o seguinte comando:

    dotnet run
  7. Mostrar as mensagens mais recentes enviadas ao stream para verificar se a produção foi bem-sucedida.

Consumindo Mensagens

  1. Primeiro, certifique-se de que o stream cujas mensagens você deseja consumir contenha mensagens. Você pode usar a Console para produzir uma mensagem de teste ou usar o stream e as mensagens que criamos neste início rápido.
  2. Abra seu editor favorito, como o Visual Studio Code, no diretório de trabalho vazio wd.
  3. Crie um aplicativo da console .NET executando o seguinte comando no terminal:

    dotnet new console

    Você deverá ver uma mensagem indicando que o aplicativo foi criado:

    The template "Console Application" was created successfully.

    Essa ação cria um arquivo Program.cs com código C# para um aplicativo simples "HelloWorld".

  4. Para fazer referência à biblioteca confluent-kafka-dotnet no seu novo projeto .NET Core, execute o seguinte comando no diretório wd do seu projeto:

    dotnet add package Confluent.Kafka
  5. Substitua o código em Program.cs no diretório wd pelo código a seguir. Substitua valores de variáveis no mapa ProducerConfig e o nome de topic pelos detalhes coletados nos pré-requisitos:

    using System;
    using Confluent.Kafka;
    using System.Threading;
    
    namespace OssKafkaConsumerDotnet
    {
        class Program
        {
            static void Main(string[] args)
            {
                Console.WriteLine("Demo for using Kafka APIs seamlessly with OSS");
    
                var config = new ConsumerConfig {
                                BootstrapServers = "<bootstrap_servers_endpoint>", //usually of the form cell-1.streaming.[region code].oci.oraclecloud.com:9092
                                SslCaLocation = "<path\to\root\ca\certificate\*.pem>",
                                SecurityProtocol = SecurityProtocol.SaslSsl,
                                SaslMechanism = SaslMechanism.Plain,
                                SaslUsername = "<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>",
                                SaslPassword = "<your_OCI_user_auth_token>", // use the auth-token you created step 5 of Prerequisites section 
                                };
    
                Consume("<topic_stream_name>", config); // use the name of the stream you created
            }
            static void Consume(string topic, ClientConfig config)
            {
                var consumerConfig = new ConsumerConfig(config);
                consumerConfig.GroupId = "dotnet-oss-consumer-group";
                consumerConfig.AutoOffsetReset = AutoOffsetReset.Earliest;
                consumerConfig.EnableAutoCommit = true;
    
                CancellationTokenSource cts = new CancellationTokenSource();
                Console.CancelKeyPress += (_, e) => {
                    e.Cancel = true; // prevent the process from terminating.
                    cts.Cancel();
                };
    
                using (var consumer = new ConsumerBuilder<string, string>(consumerConfig).Build())
                {
                    consumer.Subscribe(topic);
                    try
                    {
                        while (true)
                        {
                            var cr = consumer.Consume(cts.Token);
                            string key = cr.Message.Key == null ? "Null" : cr.Message.Key;
                            Console.WriteLine($"Consumed record with key {key} and value {cr.Message.Value}");
                        }
                    }
                    catch (OperationCanceledException)
                    {
                        //exception might have occurred since Ctrl-C was pressed.
                    }
                    finally
                    {
                        // Ensure the consumer leaves the group cleanly and final offsets are committed.
                        consumer.Close();
                    }
                }
            }
    
        }
    }
  6. No diretório wd, execute o seguinte comando:

    dotnet run
  7. Você deverá ver mensagens semelhantes a esta:

    Demo for using Kafka APIs seamlessly with OSS
    Consumed record with key messageKey0 and value messageValue0
    Consumed record with key messageKey1 and value messageValue1
    Consumed record with key Null and value Example test message
    
    Observação

    Se você usou a Console para produzir uma mensagem de teste, a chave de cada mensagem será Null