Quickstart client Kafka .NET e streaming

Pubblica e utilizza i messaggi nel servizio di streaming utilizzando il client .NET Kafka.

Questo avvio rapido mostra come utilizzare il client Kafka .NET con Oracle Cloud Infrastructure Streaming per pubblicare e utilizzare i messaggi. Questi esempi utilizzano il linguaggio C#.

Per ulteriori informazioni, vedere Utilizzo dello streaming con Apache Kafka. Per i concetti chiave e ulteriori dettagli sullo streaming, vedere Panoramica dello streaming

Prerequisiti

Nota

In questo avvio rapido, viene creata ed eseguita una semplice applicazione console .NET utilizzando Visual Studio Code e l'interfaccia CLI .NET. I task di progetto, ad esempio la creazione, la compilazione e l'esecuzione di un progetto, vengono eseguiti utilizzando l'interfaccia CLI di .NET. Se preferisci, puoi seguire questa esercitazione con un IDE diverso ed eseguire i comandi in un terminale.
  1. Per utilizzare il client .NET Kafka con Streaming, è necessario disporre dei seguenti elementi:

    • Un account Oracle Cloud Infrastructure.
    • Utente creato in tale account, in un gruppo con un criterio che concede le autorizzazioni necessarie. Per un esempio su come impostare un nuovo utente, gruppo, compartimento e criterio, vedere Aggiunta di utenti. Per un elenco dei criteri tipici che si desidera utilizzare, vedere Criteri comuni.
  2. Raccogliere i seguenti dettagli:

    • OCID flusso
    • endpoint messaggi
    • OCID pool di flussi
    • FQDN del pool di flussi
    • Impostazioni di connessione Kafka:
      • Server bootstrap
      • Stringhe di connessione SASL
      • Protocollo di sicurezza

    Per i passi per creare e gestire i flussi e i pool di flussi, vedere Gestione dei flussi e Gestione dei pool di flussi. I flussi corrispondono a un argomento Kafka.

  3. Installare .NET 5.0 SDK o versione successiva. Assicurarsi che dotnet sia impostato nella variabile di ambiente PATH.
  4. Visual Studio Code (consigliato) con l'estensione C# installata. Per informazioni su come installare le estensioni in Visual Studio Code, vedere VS Code Extension Marketplace.

  5. L'autenticazione con il protocollo Kafka utilizza i token di autenticazione e il meccanismo SASL/PLAIN. Per la generazione del token di autenticazione, vedere Utilizzo dei token di autenticazione. Se hai creato il flusso e il pool di flussi in OCI, sei già autorizzato a utilizzare questo flusso in base a IAM OCI, quindi devi creare token di autenticazione per l'utente OCI.

    Nota

    I token di autenticazione utente OCI sono visibili solo al momento della creazione. Copialo e conservalo in un luogo sicuro per un uso futuro.
  6. Installare i certificati radice SSL CA sull'host in cui si sta sviluppando ed eseguendo questa istanza di avvio rapido. Il client utilizza certificati CA per verificare il certificato del broker.

    Per Windows, scaricare il file cacert.pem distribuito con curl (scaricare cacert.pm). Per altre piattaforme, vedere Configura truststore SSL.

Produzione di messaggi

  1. Aprire l'editor preferito, ad esempio Visual Studio Code, dalla directory di lavoro vuota wd.
  2. Aprire il terminale e cd nella directory wd.
  3. Creare un'applicazione console C# .NET eseguendo il seguente comando nel terminale:

    dotnet new console

    Viene visualizzato un messaggio che indica che l'applicazione è stata creata:

    The template "Console Application" was created successfully.

    Questo crea un file Program.cs con codice C# per una semplice applicazione "HelloWorld".

  4. Per fare riferimento alla libreria confluent-kafka-dotnet nel nuovo progetto .NET Core, eseguire il comando seguente nella directory del progetto wd:

    dotnet add package Confluent.Kafka
  5. Sostituire il codice in Program.cs nella directory wd con il codice seguente. Sostituire i valori delle variabili nella mappa ProducerConfig e il nome di topic con i dettagli raccolti nei prerequisiti:

    using System;
    using Confluent.Kafka;
    
    namespace OssProducerWithKafkaApi
    {
        class Program
        {
            static void Main(string[] args)
            {
                Console.WriteLine("Demo for using Kafka APIs seamlessly with OSS");
    
                var config = new ProducerConfig {
                                BootstrapServers = "<bootstrap_servers_endpoint>", //usually of the form cell-1.streaming.[region code].oci.oraclecloud.com:9092
                                SslCaLocation = "<path\to\root\ca\certificate\*.pem>",
                                SecurityProtocol = SecurityProtocol.SaslSsl,
                                SaslMechanism = SaslMechanism.Plain,
                                SaslUsername = "<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>",
                                SaslPassword = "<your_OCI_user_auth_token>", // use the auth-token you created step 5 of Prerequisites section 
                                };
    
                Produce("<topic_stream_name>", config); // use the name of the stream you created
    
            }
    
            static void Produce(string topic, ClientConfig config)
            {
                using (var producer = new ProducerBuilder<string, string>(config).Build())
                {
                    int numProduced = 0;
                    int numMessages = 10;
                    for (int i=0; i<numMessages; ++i)
                    {
                        var key = "messageKey" + i;
                        var val = "messageVal" + i;
    
                        Console.WriteLine($"Producing record: {key} {val}");
    
                        producer.Produce(topic, new Message<string, string> { Key = key, Value = val },
                            (deliveryReport) =>
                            {
                                if (deliveryReport.Error.Code != ErrorCode.NoError)
                                {
                                    Console.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}");
                                }
                                else
                                {
                                    Console.WriteLine($"Produced message to: {deliveryReport.TopicPartitionOffset}");
                                    numProduced += 1;
                                }
                            });
                    }
    
                    producer.Flush(TimeSpan.FromSeconds(10));
    
                    Console.WriteLine($"{numProduced} messages were produced to topic {topic}");
                }
            }
        }
    }
  6. Dalla directory wd, eseguire il comando seguente:

    dotnet run
  7. Mostra i messaggi più recenti inviati al flusso per verificare che la produzione sia riuscita.

Messaggi di consumo

  1. In primo luogo, assicurarsi che il flusso da cui si desidera utilizzare i messaggi contenga messaggi. È possibile utilizzare la console per generare un messaggio di test oppure utilizzare il flusso e i messaggi creati in questo avvio rapido.
  2. Aprire l'editor preferito, ad esempio Visual Studio Code, dalla directory di lavoro vuota wd.
  3. Creare un'applicazione console C# .NET eseguendo il seguente comando sul terminale:

    dotnet new console

    Viene visualizzato un messaggio che indica che l'applicazione è stata creata:

    The template "Console Application" was created successfully.

    Questo crea un file Program.cs con codice C# per una semplice applicazione "HelloWorld".

  4. Per fare riferimento alla libreria confluent-kafka-dotnet nel nuovo progetto .NET Core, eseguire il comando seguente nella directory del progetto wd:

    dotnet add package Confluent.Kafka
  5. Sostituire il codice in Program.cs nella directory wd con il codice seguente. Sostituire i valori delle variabili nella mappa ProducerConfig e il nome di topic con i dettagli raccolti nei prerequisiti:

    using System;
    using Confluent.Kafka;
    using System.Threading;
    
    namespace OssKafkaConsumerDotnet
    {
        class Program
        {
            static void Main(string[] args)
            {
                Console.WriteLine("Demo for using Kafka APIs seamlessly with OSS");
    
                var config = new ConsumerConfig {
                                BootstrapServers = "<bootstrap_servers_endpoint>", //usually of the form cell-1.streaming.[region code].oci.oraclecloud.com:9092
                                SslCaLocation = "<path\to\root\ca\certificate\*.pem>",
                                SecurityProtocol = SecurityProtocol.SaslSsl,
                                SaslMechanism = SaslMechanism.Plain,
                                SaslUsername = "<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>",
                                SaslPassword = "<your_OCI_user_auth_token>", // use the auth-token you created step 5 of Prerequisites section 
                                };
    
                Consume("<topic_stream_name>", config); // use the name of the stream you created
            }
            static void Consume(string topic, ClientConfig config)
            {
                var consumerConfig = new ConsumerConfig(config);
                consumerConfig.GroupId = "dotnet-oss-consumer-group";
                consumerConfig.AutoOffsetReset = AutoOffsetReset.Earliest;
                consumerConfig.EnableAutoCommit = true;
    
                CancellationTokenSource cts = new CancellationTokenSource();
                Console.CancelKeyPress += (_, e) => {
                    e.Cancel = true; // prevent the process from terminating.
                    cts.Cancel();
                };
    
                using (var consumer = new ConsumerBuilder<string, string>(consumerConfig).Build())
                {
                    consumer.Subscribe(topic);
                    try
                    {
                        while (true)
                        {
                            var cr = consumer.Consume(cts.Token);
                            string key = cr.Message.Key == null ? "Null" : cr.Message.Key;
                            Console.WriteLine($"Consumed record with key {key} and value {cr.Message.Value}");
                        }
                    }
                    catch (OperationCanceledException)
                    {
                        //exception might have occurred since Ctrl-C was pressed.
                    }
                    finally
                    {
                        // Ensure the consumer leaves the group cleanly and final offsets are committed.
                        consumer.Close();
                    }
                }
            }
    
        }
    }
  6. Dalla directory wd, eseguire il comando seguente:

    dotnet run
  7. Dovresti vedere messaggi simili ai seguenti:

    Demo for using Kafka APIs seamlessly with OSS
    Consumed record with key messageKey0 and value messageValue0
    Consumed record with key messageKey1 and value messageValue1
    Consumed record with key Null and value Example test message
    
    Nota

    Se è stata utilizzata la console per generare un messaggio di test, la chiave per ogni messaggio è Null