Kafka .NET-Client und Streaming - Schnellstart

Veröffentlichen und konsumieren Sie Nachrichten im Streaming-Service mit dem Kafka .NET-Client.

In diesem Schnellstart wird gezeigt, wie Sie den Kafka-.NET-Client mit Oracle Cloud Infrastructure Streaming veröffentlichen und konsumieren. In diesen Beispielen wird die Sprache C# verwendet.

Weitere Informationen finden Sie unter Streaming mit Apache Kafka verwenden. Wichtige Konzepte und weitere Streamingdetails finden Sie unter Überblick über Streaming

Voraussetzungen

Hinweis

In diesem Schnellstart wird eine einfache .NET-Konsolenanwendung mit Visual Studio Code und der .NET-CLI erstellt und ausgeführt. Projektaufgaben wie das Erstellen, Kompilieren und Ausführen eines Projekts werden mit der .NET-CLI durchgeführt. Falls gewünscht, können Sie dieses Tutorial mit einer anderen IDE befolgen und Befehle in einem Terminal ausführen.
  1. Um den Kafka-.NET-Client mit Streaming zu verwenden, wird Folgendes benötigt:

    • Ein Oracle Cloud Infrastructure-Account.
    • Ein in diesem Account erstellter Benutzer in einer Gruppe mit einer Policy, die die erforderlichen Berechtigungen erteilt. Ein Beispiel für die Einrichtung eines neuen Benutzers, einer neuen Gruppe, eines neuen Compartments und einer neuen Policy finden Sie unter Benutzer hinzufügen. Eine Liste der typischen Policys, die Sie verwenden können, finden Sie unter Allgemeine Policys.
  2. Erfassen Sie die folgenden Details:

    • Stream-OCID
    • Nachrichtenendpunkt
    • Streampool-OCID
    • Streampool-FQDN
    • Kafka-Verbindungseinstellungen:
      • Bootstrap-Server
      • SASL-Verbindungszeichenfolgen
      • Sicherheitsprotokoll

    Die Schritte zum Erstellen und Verwalten von Streams und Streampools finden Sie unter Streams verwalten und Streampools verwalten. Streams entsprechen einem Kafka-Topic.

  3. Installieren Sie das .NET 5.0-SDK oder höher. Stellen Sie sicher, dass dotnet in der Umgebungsvariablen PATH festgelegt ist.
  4. Visual Studio Code (empfohlen) mit installierter C#-Erweiterung. Informationen zur Installation von Erweiterungen in Visual Studio Code finden Sie unter VS Code Extension Marketplace.

  5. Bei der Authentifizierung mit dem Kafka-Protokoll werden Authentifizierungstoken und der SASL/PLAIN-Mechanismus verwendet. Informationen zur Generierung von Authentifizierungstoken finden Sie unter Mit Authentifizierungstoken arbeiten. Wenn Sie den Stream und Streampool in OCI erstellt haben, sind Sie bereits zur Verwendung dieses Streams gemäß OCI IAM autorisiert. Erstellen Sie daher Authentifizierungstoken für den OCI-Benutzer.

    Hinweis

    OCI-Benutzerauthentifizierungstoken sind nur zum Zeitpunkt der Erstellung sichtbar. Kopieren Sie es, und bewahren Sie es für die zukünftige Verwendung an einem sicheren Ort auf.
  6. Installieren Sie die SSL-CA-Root-Zertifikate auf dem Host, auf dem Sie diesen Schnellstart entwickeln und ausführen. Der Client verwendet CA-Zertifikate, um das Zertifikat des Brokers zu verifizieren.

    Laden Sie unter Windows die mit curl verteilte Datei cacert.pem herunter (cacert.pm herunterladen). Informationen zu anderen Plattformen finden Sie unter SSL-Truststore konfigurieren.

Nachrichten erzeugen

  1. Öffnen Sie Ihren bevorzugten Editor, wie Visual Studio Code, im leeren Arbeitsverzeichnis wd.
  2. Öffnen Sie das Terminal, und wechseln Sie mit cd zum Verzeichnis wd.
  3. Erstellen Sie eine C#-.NET-Konsolenanwendung, indem Sie den folgenden Befehl im Terminal ausführen:

    dotnet new console

    In einer Meldung wird angegeben, dass die Anwendung erstellt wurde:

    The template "Console Application" was created successfully.

    Dadurch wird eine Program.cs-Datei mit C#-Code für eine einfache "HelloWorld"-Anwendung erstellt.

  4. Um die confluent-kafka-dotnet-Library in Ihrem neuen .NET Core-Projekt zu referenzieren, führen Sie den folgenden Befehl im Verzeichnis wd des Projekts aus:

    dotnet add package Confluent.Kafka
  5. Ersetzen Sie den Code in Program.cs im Verzeichnis wd durch den folgenden Code. Ersetzen Sie Variablenwerte in der Map ProducerConfig und den Namen von topic durch die Details, die Sie in den Voraussetzungen erfasst haben:

    using System;
    using Confluent.Kafka;
    
    namespace OssProducerWithKafkaApi
    {
        class Program
        {
            static void Main(string[] args)
            {
                Console.WriteLine("Demo for using Kafka APIs seamlessly with OSS");
    
                var config = new ProducerConfig {
                                BootstrapServers = "<bootstrap_servers_endpoint>", //usually of the form cell-1.streaming.[region code].oci.oraclecloud.com:9092
                                SslCaLocation = "<path\to\root\ca\certificate\*.pem>",
                                SecurityProtocol = SecurityProtocol.SaslSsl,
                                SaslMechanism = SaslMechanism.Plain,
                                SaslUsername = "<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>",
                                SaslPassword = "<your_OCI_user_auth_token>", // use the auth-token you created step 5 of Prerequisites section 
                                };
    
                Produce("<topic_stream_name>", config); // use the name of the stream you created
    
            }
    
            static void Produce(string topic, ClientConfig config)
            {
                using (var producer = new ProducerBuilder<string, string>(config).Build())
                {
                    int numProduced = 0;
                    int numMessages = 10;
                    for (int i=0; i<numMessages; ++i)
                    {
                        var key = "messageKey" + i;
                        var val = "messageVal" + i;
    
                        Console.WriteLine($"Producing record: {key} {val}");
    
                        producer.Produce(topic, new Message<string, string> { Key = key, Value = val },
                            (deliveryReport) =>
                            {
                                if (deliveryReport.Error.Code != ErrorCode.NoError)
                                {
                                    Console.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}");
                                }
                                else
                                {
                                    Console.WriteLine($"Produced message to: {deliveryReport.TopicPartitionOffset}");
                                    numProduced += 1;
                                }
                            });
                    }
    
                    producer.Flush(TimeSpan.FromSeconds(10));
    
                    Console.WriteLine($"{numProduced} messages were produced to topic {topic}");
                }
            }
        }
    }
  6. Führen Sie im Verzeichnis wd den folgenden Befehl aus:

    dotnet run
  7. Zeigen Sie die neuesten Nachrichten an, die an den Stream gesendet wurden, um zu prüfen, ob die Produktion erfolgreich war.

Nachrichten konsumieren

  1. Stellen Sie zunächst sicher, dass der Stream, aus dem Sie Nachrichten konsumieren möchten, Nachrichten enthält. Sie können eine Testnachricht mit der Konsole erstellen oder den Stream und die Nachrichten verwenden, die wir in diesem Schnellstart erstellt haben.
  2. Öffnen Sie Ihren bevorzugten Editor, wie Visual Studio Code, im leeren Arbeitsverzeichnis wd.
  3. Erstellen Sie eine C#-.NET-Konsolenanwendung, indem Sie den folgenden Befehl im Terminal ausführen:

    dotnet new console

    In einer Meldung wird angegeben, dass die Anwendung erstellt wurde:

    The template "Console Application" was created successfully.

    Dadurch wird eine Program.cs-Datei mit C#-Code für eine einfache "HelloWorld"-Anwendung erstellt.

  4. Um die confluent-kafka-dotnet-Library in Ihrem neuen .NET Core-Projekt zu referenzieren, führen Sie den folgenden Befehl im Verzeichnis wd des Projekts aus:

    dotnet add package Confluent.Kafka
  5. Ersetzen Sie den Code in Program.cs im Verzeichnis wd durch den folgenden Code. Ersetzen Sie Variablenwerte in der Map ProducerConfig und den Namen von topic durch die Details, die Sie in den Voraussetzungen erfasst haben:

    using System;
    using Confluent.Kafka;
    using System.Threading;
    
    namespace OssKafkaConsumerDotnet
    {
        class Program
        {
            static void Main(string[] args)
            {
                Console.WriteLine("Demo for using Kafka APIs seamlessly with OSS");
    
                var config = new ConsumerConfig {
                                BootstrapServers = "<bootstrap_servers_endpoint>", //usually of the form cell-1.streaming.[region code].oci.oraclecloud.com:9092
                                SslCaLocation = "<path\to\root\ca\certificate\*.pem>",
                                SecurityProtocol = SecurityProtocol.SaslSsl,
                                SaslMechanism = SaslMechanism.Plain,
                                SaslUsername = "<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>",
                                SaslPassword = "<your_OCI_user_auth_token>", // use the auth-token you created step 5 of Prerequisites section 
                                };
    
                Consume("<topic_stream_name>", config); // use the name of the stream you created
            }
            static void Consume(string topic, ClientConfig config)
            {
                var consumerConfig = new ConsumerConfig(config);
                consumerConfig.GroupId = "dotnet-oss-consumer-group";
                consumerConfig.AutoOffsetReset = AutoOffsetReset.Earliest;
                consumerConfig.EnableAutoCommit = true;
    
                CancellationTokenSource cts = new CancellationTokenSource();
                Console.CancelKeyPress += (_, e) => {
                    e.Cancel = true; // prevent the process from terminating.
                    cts.Cancel();
                };
    
                using (var consumer = new ConsumerBuilder<string, string>(consumerConfig).Build())
                {
                    consumer.Subscribe(topic);
                    try
                    {
                        while (true)
                        {
                            var cr = consumer.Consume(cts.Token);
                            string key = cr.Message.Key == null ? "Null" : cr.Message.Key;
                            Console.WriteLine($"Consumed record with key {key} and value {cr.Message.Value}");
                        }
                    }
                    catch (OperationCanceledException)
                    {
                        //exception might have occurred since Ctrl-C was pressed.
                    }
                    finally
                    {
                        // Ensure the consumer leaves the group cleanly and final offsets are committed.
                        consumer.Close();
                    }
                }
            }
    
        }
    }
  6. Führen Sie im Verzeichnis wd den folgenden Befehl aus:

    dotnet run
  7. Meldungen wie die Folgenden sollten angezeigt werden:

    Demo for using Kafka APIs seamlessly with OSS
    Consumed record with key messageKey0 and value messageValue0
    Consumed record with key messageKey1 and value messageValue1
    Consumed record with key Null and value Example test message
    
    Hinweis

    Wenn Sie die Konsole zum Erzeugen einer Testnachricht verwendet haben, lautet der Schlüssel für jede Nachricht Null