Quickstart client Kafka .NET e streaming
Pubblica e utilizza i messaggi nel servizio di streaming utilizzando il client .NET Kafka.
Questo avvio rapido mostra come utilizzare il client Kafka .NET con Oracle Cloud Infrastructure Streaming per pubblicare e utilizzare i messaggi. Questi esempi utilizzano il linguaggio C#.
Per ulteriori informazioni, vedere Utilizzo dello streaming con Apache Kafka. Per i concetti chiave e ulteriori dettagli sullo streaming, vedere Panoramica dello streaming
Prerequisiti
In questo avvio rapido, viene creata ed eseguita una semplice applicazione console .NET utilizzando Visual Studio Code e l'interfaccia CLI .NET. I task di progetto, ad esempio la creazione, la compilazione e l'esecuzione di un progetto, vengono eseguiti utilizzando l'interfaccia CLI di .NET. Se preferisci, puoi seguire questa esercitazione con un IDE diverso ed eseguire i comandi in un terminale.
-
Per utilizzare il client .NET Kafka con Streaming, è necessario disporre dei seguenti elementi:
- Un account Oracle Cloud Infrastructure.
- Utente creato in tale account, in un gruppo con un criterio che concede le autorizzazioni necessarie. Per un esempio su come impostare un nuovo utente, gruppo, compartimento e criterio, vedere Aggiunta di utenti. Per un elenco dei criteri tipici che si desidera utilizzare, vedere Criteri comuni.
-
Raccogliere i seguenti dettagli:
- OCID flusso
- endpoint messaggi
- OCID pool di flussi
- FQDN del pool di flussi
- Impostazioni di connessione Kafka:
- Server bootstrap
- Stringhe di connessione SASL
- Protocollo di sicurezza
Per i passi per creare e gestire i flussi e i pool di flussi, vedere Gestione dei flussi e Gestione dei pool di flussi. I flussi corrispondono a un argomento Kafka.
- Installare .NET 5.0 SDK o versione successiva. Assicurarsi che
dotnet
sia impostato nella variabile di ambientePATH
. -
Visual Studio Code (consigliato) con l'estensione C# installata. Per informazioni su come installare le estensioni in Visual Studio Code, vedere VS Code Extension Marketplace.
-
L'autenticazione con il protocollo Kafka utilizza i token di autenticazione e il meccanismo SASL/PLAIN. Per la generazione del token di autenticazione, vedere Utilizzo dei token di autenticazione. Se hai creato il flusso e il pool di flussi in OCI, sei già autorizzato a utilizzare questo flusso in base a IAM OCI, quindi devi creare token di autenticazione per l'utente OCI.
Nota
I token di autenticazione utente OCI sono visibili solo al momento della creazione. Copialo e conservalo in un luogo sicuro per un uso futuro. -
Installare i certificati radice SSL CA sull'host in cui si sta sviluppando ed eseguendo questa istanza di avvio rapido. Il client utilizza certificati CA per verificare il certificato del broker.
Per Windows, scaricare il file
cacert.pem
distribuito con curl (scaricare cacert.pm). Per altre piattaforme, vedere Configura truststore SSL.
Produzione di messaggi
- Aprire l'editor preferito, ad esempio Visual Studio Code, dalla directory di lavoro vuota
wd
. - Aprire il terminale e
cd
nella directorywd
. -
Creare un'applicazione console C# .NET eseguendo il seguente comando nel terminale:
dotnet new console
Viene visualizzato un messaggio che indica che l'applicazione è stata creata:
The template "Console Application" was created successfully.
Questo crea un file
Program.cs
con codice C# per una semplice applicazione "HelloWorld". -
Per fare riferimento alla libreria
confluent-kafka-dotnet
nel nuovo progetto .NET Core, eseguire il comando seguente nella directory del progettowd
:dotnet add package Confluent.Kafka
-
Sostituire il codice in
Program.cs
nella directorywd
con il codice seguente. Sostituire i valori delle variabili nella mappaProducerConfig
e il nome ditopic
con i dettagli raccolti nei prerequisiti:using System; using Confluent.Kafka; namespace OssProducerWithKafkaApi { class Program { static void Main(string[] args) { Console.WriteLine("Demo for using Kafka APIs seamlessly with OSS"); var config = new ProducerConfig { BootstrapServers = "<bootstrap_servers_endpoint>", //usually of the form cell-1.streaming.[region code].oci.oraclecloud.com:9092 SslCaLocation = "<path\to\root\ca\certificate\*.pem>", SecurityProtocol = SecurityProtocol.SaslSsl, SaslMechanism = SaslMechanism.Plain, SaslUsername = "<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>", SaslPassword = "<your_OCI_user_auth_token>", // use the auth-token you created step 5 of Prerequisites section }; Produce("<topic_stream_name>", config); // use the name of the stream you created } static void Produce(string topic, ClientConfig config) { using (var producer = new ProducerBuilder<string, string>(config).Build()) { int numProduced = 0; int numMessages = 10; for (int i=0; i<numMessages; ++i) { var key = "messageKey" + i; var val = "messageVal" + i; Console.WriteLine($"Producing record: {key} {val}"); producer.Produce(topic, new Message<string, string> { Key = key, Value = val }, (deliveryReport) => { if (deliveryReport.Error.Code != ErrorCode.NoError) { Console.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}"); } else { Console.WriteLine($"Produced message to: {deliveryReport.TopicPartitionOffset}"); numProduced += 1; } }); } producer.Flush(TimeSpan.FromSeconds(10)); Console.WriteLine($"{numProduced} messages were produced to topic {topic}"); } } } }
-
Dalla directory
wd
, eseguire il comando seguente:dotnet run
- Mostra i messaggi più recenti inviati al flusso per verificare che la produzione sia riuscita.
Messaggi di consumo
- In primo luogo, assicurarsi che il flusso da cui si desidera utilizzare i messaggi contenga messaggi. È possibile utilizzare la console per generare un messaggio di test oppure utilizzare il flusso e i messaggi creati in questo avvio rapido.
- Aprire l'editor preferito, ad esempio Visual Studio Code, dalla directory di lavoro vuota
wd
. -
Creare un'applicazione console C# .NET eseguendo il seguente comando sul terminale:
dotnet new console
Viene visualizzato un messaggio che indica che l'applicazione è stata creata:
The template "Console Application" was created successfully.
Questo crea un file
Program.cs
con codice C# per una semplice applicazione "HelloWorld". -
Per fare riferimento alla libreria
confluent-kafka-dotnet
nel nuovo progetto .NET Core, eseguire il comando seguente nella directory del progettowd
:dotnet add package Confluent.Kafka
-
Sostituire il codice in
Program.cs
nella directorywd
con il codice seguente. Sostituire i valori delle variabili nella mappaProducerConfig
e il nome ditopic
con i dettagli raccolti nei prerequisiti:using System; using Confluent.Kafka; using System.Threading; namespace OssKafkaConsumerDotnet { class Program { static void Main(string[] args) { Console.WriteLine("Demo for using Kafka APIs seamlessly with OSS"); var config = new ConsumerConfig { BootstrapServers = "<bootstrap_servers_endpoint>", //usually of the form cell-1.streaming.[region code].oci.oraclecloud.com:9092 SslCaLocation = "<path\to\root\ca\certificate\*.pem>", SecurityProtocol = SecurityProtocol.SaslSsl, SaslMechanism = SaslMechanism.Plain, SaslUsername = "<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>", SaslPassword = "<your_OCI_user_auth_token>", // use the auth-token you created step 5 of Prerequisites section }; Consume("<topic_stream_name>", config); // use the name of the stream you created } static void Consume(string topic, ClientConfig config) { var consumerConfig = new ConsumerConfig(config); consumerConfig.GroupId = "dotnet-oss-consumer-group"; consumerConfig.AutoOffsetReset = AutoOffsetReset.Earliest; consumerConfig.EnableAutoCommit = true; CancellationTokenSource cts = new CancellationTokenSource(); Console.CancelKeyPress += (_, e) => { e.Cancel = true; // prevent the process from terminating. cts.Cancel(); }; using (var consumer = new ConsumerBuilder<string, string>(consumerConfig).Build()) { consumer.Subscribe(topic); try { while (true) { var cr = consumer.Consume(cts.Token); string key = cr.Message.Key == null ? "Null" : cr.Message.Key; Console.WriteLine($"Consumed record with key {key} and value {cr.Message.Value}"); } } catch (OperationCanceledException) { //exception might have occurred since Ctrl-C was pressed. } finally { // Ensure the consumer leaves the group cleanly and final offsets are committed. consumer.Close(); } } } } }
-
Dalla directory
wd
, eseguire il comando seguente:dotnet run
-
Dovresti vedere messaggi simili ai seguenti:
Demo for using Kafka APIs seamlessly with OSS Consumed record with key messageKey0 and value messageValue0 Consumed record with key messageKey1 and value messageValue1 Consumed record with key Null and value Example test message
Nota
Se è stata utilizzata la console per generare un messaggio di test, la chiave per ogni messaggio èNull
Passo successivo
Per ulteriori informazioni, consultare le risorse elencate di seguito.