Início Rápido do Kafka .NET Client e Serviço Streaming
Publique e consuma mensagens no serviço Streaming usando o cliente Kafka .NET.
Este início rápido mostra como usar o cliente Kafka .NET com o Oracle Cloud Infrastructure Streaming para publicar e consumir mensagens. Esses exemplos usam a linguagem C#.
Para obter mais informações, consulte Usando o Streaming com o Apache Kafka. Para obter os principais conceitos e mais detalhes do Streaming, consulte Visão Geral do Streaming
Pré-requisitos
Neste início rápido, criamos e executamos um simples aplicativo de console .NET usando o Visual Studio Code e a CLI do .NET. As tarefas do projeto, como criar, compilar e executar um projeto, são executadas com o uso da CLI do .NET. Se preferir, siga este tutorial com outro IDE e execute comandos em um terminal.
-
Para usar o cliente Kafka .NET com o serviço Streaming, você deve ter o seguinte:
- Uma conta do Oracle Cloud Infrastructure.
- Um usuário criado nessa conta, em um grupo com uma política que conceda as permissões necessárias. Para obter um exemplo de como configurar um novo usuário, um novo grupo, um novo compartimento e uma nova política, consulte Adicionando Usuários. Para obter uma lista de políticas típicas que você pode usar, consulte Políticas Comuns.
-
Colete os seguintes detalhes:
- OCID do Stream
- Ponto final de mensagens
- OCID do pool de streams
- FQDN do pool de streams
- Definições de conexão do Kafka:
- Servidores de bootstrap
- Strings de conexão SASL
- Protocolo de segurança
Para obter as etapas de criação e gerenciamento de streams e pools de streams, consulte Gerenciando Streams e Gerenciando Pools de Streams. Os streams correspondem a um tópico do Kafka.
- Instale o .NET 5.0 SDK ou mais recente. Certifique-se de que
dotnet
esteja definido na variável de ambientePATH
. -
Visual Studio Code (recomendado) com a extensão C# instalada. Para obter informações sobre como instalar extensões no Visual Studio Code, consulte Marketplace de Extensões do VS Code.
-
A autenticação com o protocolo Kafka usa tokens de autenticação e o mecanismo SASL/PLAIN. Consulte Como Trabalhar com Tokens de Autenticação para saber sobre a geração do token de autenticação. Se você criou o stream e o pool de streams no OCI, já estará autorizado a usar esse stream de acordo com o OCI IAM. Portanto, crie tokens de autenticação para seu usuário do OCI.
Observação
Os tokens de autenticação do usuário do OCI só ficam visíveis no momento da criação. Copie-o e mantenha-o em algum lugar seguro para uso futuro. -
Instale os certificados raiz da CA SSL no host no qual você está desenvolvendo e executando este início rápido. O cliente usa certificados da CA para verificar o certificado do broker.
Para o Windows, faça download do arquivo
cacert.pem
distribuído com curl (download de cacert.pm). Para outras plataformas, consulte Configurar armazenamento confiável SSL.
Produzindo Mensagens
- Abra seu editor favorito, como o Visual Studio Code, no diretório de trabalho vazio
wd
. - Abra o terminal e
cd
no diretóriowd
. -
Crie um aplicativo de console C# .NET executando o seguinte comando no terminal:
dotnet new console
Você deverá ver uma mensagem indicando que o aplicativo foi criado:
The template "Console Application" was created successfully.
Essa ação cria um arquivo
Program.cs
com código C# para um aplicativo simples "HelloWorld". -
Para fazer referência à biblioteca
confluent-kafka-dotnet
no seu novo projeto .NET Core, execute o seguinte comando no diretóriowd
do seu projeto:dotnet add package Confluent.Kafka
-
Substitua o código em
Program.cs
no diretóriowd
pelo código a seguir. Substitua valores de variáveis no mapaProducerConfig
e o nome detopic
pelos detalhes coletados nos pré-requisitos:using System; using Confluent.Kafka; namespace OssProducerWithKafkaApi { class Program { static void Main(string[] args) { Console.WriteLine("Demo for using Kafka APIs seamlessly with OSS"); var config = new ProducerConfig { BootstrapServers = "<bootstrap_servers_endpoint>", //usually of the form cell-1.streaming.[region code].oci.oraclecloud.com:9092 SslCaLocation = "<path\to\root\ca\certificate\*.pem>", SecurityProtocol = SecurityProtocol.SaslSsl, SaslMechanism = SaslMechanism.Plain, SaslUsername = "<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>", SaslPassword = "<your_OCI_user_auth_token>", // use the auth-token you created step 5 of Prerequisites section }; Produce("<topic_stream_name>", config); // use the name of the stream you created } static void Produce(string topic, ClientConfig config) { using (var producer = new ProducerBuilder<string, string>(config).Build()) { int numProduced = 0; int numMessages = 10; for (int i=0; i<numMessages; ++i) { var key = "messageKey" + i; var val = "messageVal" + i; Console.WriteLine($"Producing record: {key} {val}"); producer.Produce(topic, new Message<string, string> { Key = key, Value = val }, (deliveryReport) => { if (deliveryReport.Error.Code != ErrorCode.NoError) { Console.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}"); } else { Console.WriteLine($"Produced message to: {deliveryReport.TopicPartitionOffset}"); numProduced += 1; } }); } producer.Flush(TimeSpan.FromSeconds(10)); Console.WriteLine($"{numProduced} messages were produced to topic {topic}"); } } } }
-
No diretório
wd
, execute o seguinte comando:dotnet run
- Mostrar as mensagens mais recentes enviadas ao stream para verificar se a produção foi bem-sucedida.
Consumindo Mensagens
- Primeiro, certifique-se de que o stream cujas mensagens você deseja consumir contenha mensagens. Você pode usar a Console para produzir uma mensagem de teste ou usar o stream e as mensagens que criamos neste início rápido.
- Abra seu editor favorito, como o Visual Studio Code, no diretório de trabalho vazio
wd
. -
Crie um aplicativo da console .NET executando o seguinte comando no terminal:
dotnet new console
Você deverá ver uma mensagem indicando que o aplicativo foi criado:
The template "Console Application" was created successfully.
Essa ação cria um arquivo
Program.cs
com código C# para um aplicativo simples "HelloWorld". -
Para fazer referência à biblioteca
confluent-kafka-dotnet
no seu novo projeto .NET Core, execute o seguinte comando no diretóriowd
do seu projeto:dotnet add package Confluent.Kafka
-
Substitua o código em
Program.cs
no diretóriowd
pelo código a seguir. Substitua valores de variáveis no mapaProducerConfig
e o nome detopic
pelos detalhes coletados nos pré-requisitos:using System; using Confluent.Kafka; using System.Threading; namespace OssKafkaConsumerDotnet { class Program { static void Main(string[] args) { Console.WriteLine("Demo for using Kafka APIs seamlessly with OSS"); var config = new ConsumerConfig { BootstrapServers = "<bootstrap_servers_endpoint>", //usually of the form cell-1.streaming.[region code].oci.oraclecloud.com:9092 SslCaLocation = "<path\to\root\ca\certificate\*.pem>", SecurityProtocol = SecurityProtocol.SaslSsl, SaslMechanism = SaslMechanism.Plain, SaslUsername = "<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>", SaslPassword = "<your_OCI_user_auth_token>", // use the auth-token you created step 5 of Prerequisites section }; Consume("<topic_stream_name>", config); // use the name of the stream you created } static void Consume(string topic, ClientConfig config) { var consumerConfig = new ConsumerConfig(config); consumerConfig.GroupId = "dotnet-oss-consumer-group"; consumerConfig.AutoOffsetReset = AutoOffsetReset.Earliest; consumerConfig.EnableAutoCommit = true; CancellationTokenSource cts = new CancellationTokenSource(); Console.CancelKeyPress += (_, e) => { e.Cancel = true; // prevent the process from terminating. cts.Cancel(); }; using (var consumer = new ConsumerBuilder<string, string>(consumerConfig).Build()) { consumer.Subscribe(topic); try { while (true) { var cr = consumer.Consume(cts.Token); string key = cr.Message.Key == null ? "Null" : cr.Message.Key; Console.WriteLine($"Consumed record with key {key} and value {cr.Message.Value}"); } } catch (OperationCanceledException) { //exception might have occurred since Ctrl-C was pressed. } finally { // Ensure the consumer leaves the group cleanly and final offsets are committed. consumer.Close(); } } } } }
-
No diretório
wd
, execute o seguinte comando:dotnet run
-
Você deverá ver mensagens semelhantes a esta:
Demo for using Kafka APIs seamlessly with OSS Consumed record with key messageKey0 and value messageValue0 Consumed record with key messageKey1 and value messageValue1 Consumed record with key Null and value Example test message
Observação
Se você usou a Console para produzir uma mensagem de teste, a chave de cada mensagem seráNull
Próximas Etapas
Consulte os seguintes recursos para obter mais informações: