Inicio rápido del cliente .NET de Kafka y Streaming

Publique y consuma mensajes en el servicio Streaming mediante el cliente .NET de Kafka.

Este inicio rápido le muestra cómo utilizar el cliente .NET de Kafka con Oracle Cloud Infrastructure Streaming para publicar y consumir mensajes. Estos ejemplos utilizan el lenguaje C#.

Para obtener más información, consulte Uso de Streaming con Apache Kafka. Para obtener conceptos clave y más información sobre Streaming, consulte Visión general de Streaming.

Requisitos

Nota

En este inicio rápido, creamos y ejecutamos una aplicación de consola de .NET sencilla mediante Visual Studio Code y la CLI de .NET. Las tareas de proyecto, como crear, compilar y ejecutar un proyecto, se realizan mediante el uso de la CLI de .NET. Si lo prefiere, puede seguir este tutorial con un IDE diferente y ejecutar comandos en un terminal.
  1. Para utilizar el cliente .NET de Kafka con Streaming, debe tener lo siguiente:

    • Una cuenta de Oracle Cloud Infrastructure.
    • Un usuario creado en esa cuenta, en un grupo con una política que otorgue los permisos necesarios. Para obtener un ejemplo de cómo configurar un nuevo usuario, grupo, compartimento y política, consulte Adición de usuarios. Para obtener una lista de las políticas típicas que puede que desee utilizar, consulte Políticas Comunes.
  2. Recopile los siguientes detalles:

    • OCID de flujo
    • Punto final de mensajes
    • OCID de pool de flujos
    • FQDN de pool de flujos
    • Configuración de conexión de Kafka:
      • Servidores de inicialización de datos
      • Cadenas de conexión de SASL
      • Protocolo de seguridad

    Para conocer los pasos para crear y gestionar flujos y pools de flujos, consulte Gestión de flujos y Gestión de pools de flujos. Los flujos corresponden a un tema de Kafka.

  3. Instale el SDK de .NET 5.0 o posterior. Asegúrese de que dotnet está definido en su variable de entorno PATH.
  4. Visual Studio Code (recomendado) con la extensión C# instalado. Para obtener información sobre cómo instalar extensiones en Visual Studio Code, consulte Extension Marketplace de VS Code.

  5. La autenticación con el protocolo de Kafka utiliza tokens de autenticación y el mecanismo SASL/PLAIN. Consulte Trabajar con tokens de autenticación para la generación de tokens de autenticación. Si ha creado el flujo y el pool de flujos en OCI, ya está autorizado a utilizar este flujo según OCI IAM, por lo que debe crear tokens de autenticación para el usuario de OCI.

    Nota

    Los tokens de autenticación de usuario de OCI solo son visibles en el momento de la creación. Cópielo y guárdelo en un lugar seguro para su uso en el futuro.
  6. Instale los certificados raíz de CA SSL en el host en el que está desarrollando y ejecutando este inicio rápido. El cliente utiliza certificados de CA para verificar el certificado del broker.

    Para Windows, descargue el archivo cacert.pem que se distribuye con curl (descargue cacert.pm). Para otras plataformas, consulte Configure SSL trust store.

Producción de mensajes

  1. Abra su editor favorito, como Visual Studio Code, desde el directorio de trabajo vacío wd.
  2. Abra el terminal y utilice cd para acceder al directorio wd.
  3. Cree una aplicación de consola .NET C# ejecutando el siguiente comando en el terminal:

    dotnet new console

    Debe mostrarse un mensaje que indica que se ha creado la aplicación:

    The template "Console Application" was created successfully.

    Esto crea un archivo Program.cs con código C# para una aplicación "HelloWorld" simple.

  4. Para hacer referencia a la biblioteca confluent-kafka-dotnet en el nuevo proyecto de .NET Core, ejecute el siguiente comando en el directorio wd del proyecto:

    dotnet add package Confluent.Kafka
  5. Sustituya el código en Program.cs del directorio wd por el siguiente código. Sustituya los valores de las variables en la asignación ProducerConfig y el nombre de topic por los detalles que ha recopilado en los requisitos:

    using System;
    using Confluent.Kafka;
    
    namespace OssProducerWithKafkaApi
    {
        class Program
        {
            static void Main(string[] args)
            {
                Console.WriteLine("Demo for using Kafka APIs seamlessly with OSS");
    
                var config = new ProducerConfig {
                                BootstrapServers = "<bootstrap_servers_endpoint>", //usually of the form cell-1.streaming.[region code].oci.oraclecloud.com:9092
                                SslCaLocation = "<path\to\root\ca\certificate\*.pem>",
                                SecurityProtocol = SecurityProtocol.SaslSsl,
                                SaslMechanism = SaslMechanism.Plain,
                                SaslUsername = "<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>",
                                SaslPassword = "<your_OCI_user_auth_token>", // use the auth-token you created step 5 of Prerequisites section 
                                };
    
                Produce("<topic_stream_name>", config); // use the name of the stream you created
    
            }
    
            static void Produce(string topic, ClientConfig config)
            {
                using (var producer = new ProducerBuilder<string, string>(config).Build())
                {
                    int numProduced = 0;
                    int numMessages = 10;
                    for (int i=0; i<numMessages; ++i)
                    {
                        var key = "messageKey" + i;
                        var val = "messageVal" + i;
    
                        Console.WriteLine($"Producing record: {key} {val}");
    
                        producer.Produce(topic, new Message<string, string> { Key = key, Value = val },
                            (deliveryReport) =>
                            {
                                if (deliveryReport.Error.Code != ErrorCode.NoError)
                                {
                                    Console.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}");
                                }
                                else
                                {
                                    Console.WriteLine($"Produced message to: {deliveryReport.TopicPartitionOffset}");
                                    numProduced += 1;
                                }
                            });
                    }
    
                    producer.Flush(TimeSpan.FromSeconds(10));
    
                    Console.WriteLine($"{numProduced} messages were produced to topic {topic}");
                }
            }
        }
    }
  6. Desde el directorio wd, ejecute el siguiente comando:

    dotnet run
  7. Mostrar los últimos mensajes enviados al flujo para verificar que la producción se ha realizado correctamente.

Consumo de mensajes

  1. En primer lugar, asegúrese de que el flujo del que desea consumir mensajes contiene mensajes. Puede utilizar la consola para producir un mensaje de prueba o utilizar el flujo y los mensajes que hemos creado en este inicio rápido.
  2. Abra su editor favorito, como Visual Studio Code, desde el directorio de trabajo vacío wd.
  3. Cree una aplicación de consola de .NET C# ejecutando el siguiente comando en el terminal:

    dotnet new console

    Debe mostrarse un mensaje que indica que se ha creado la aplicación:

    The template "Console Application" was created successfully.

    Esto crea un archivo Program.cs con código C# para una aplicación "HelloWorld" simple.

  4. Para hacer referencia a la biblioteca confluent-kafka-dotnet en el nuevo proyecto de .NET Core, ejecute el siguiente comando en el directorio wd del proyecto:

    dotnet add package Confluent.Kafka
  5. Sustituya el código en Program.cs del directorio wd por el siguiente código. Sustituya los valores de las variables en la asignación ProducerConfig y el nombre de topic por los detalles que ha recopilado en los requisitos:

    using System;
    using Confluent.Kafka;
    using System.Threading;
    
    namespace OssKafkaConsumerDotnet
    {
        class Program
        {
            static void Main(string[] args)
            {
                Console.WriteLine("Demo for using Kafka APIs seamlessly with OSS");
    
                var config = new ConsumerConfig {
                                BootstrapServers = "<bootstrap_servers_endpoint>", //usually of the form cell-1.streaming.[region code].oci.oraclecloud.com:9092
                                SslCaLocation = "<path\to\root\ca\certificate\*.pem>",
                                SecurityProtocol = SecurityProtocol.SaslSsl,
                                SaslMechanism = SaslMechanism.Plain,
                                SaslUsername = "<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>",
                                SaslPassword = "<your_OCI_user_auth_token>", // use the auth-token you created step 5 of Prerequisites section 
                                };
    
                Consume("<topic_stream_name>", config); // use the name of the stream you created
            }
            static void Consume(string topic, ClientConfig config)
            {
                var consumerConfig = new ConsumerConfig(config);
                consumerConfig.GroupId = "dotnet-oss-consumer-group";
                consumerConfig.AutoOffsetReset = AutoOffsetReset.Earliest;
                consumerConfig.EnableAutoCommit = true;
    
                CancellationTokenSource cts = new CancellationTokenSource();
                Console.CancelKeyPress += (_, e) => {
                    e.Cancel = true; // prevent the process from terminating.
                    cts.Cancel();
                };
    
                using (var consumer = new ConsumerBuilder<string, string>(consumerConfig).Build())
                {
                    consumer.Subscribe(topic);
                    try
                    {
                        while (true)
                        {
                            var cr = consumer.Consume(cts.Token);
                            string key = cr.Message.Key == null ? "Null" : cr.Message.Key;
                            Console.WriteLine($"Consumed record with key {key} and value {cr.Message.Value}");
                        }
                    }
                    catch (OperationCanceledException)
                    {
                        //exception might have occurred since Ctrl-C was pressed.
                    }
                    finally
                    {
                        // Ensure the consumer leaves the group cleanly and final offsets are committed.
                        consumer.Close();
                    }
                }
            }
    
        }
    }
  6. Desde el directorio wd, ejecute el siguiente comando:

    dotnet run
  7. Se mostrarán mensajes similares a los siguientes:

    Demo for using Kafka APIs seamlessly with OSS
    Consumed record with key messageKey0 and value messageValue0
    Consumed record with key messageKey1 and value messageValue1
    Consumed record with key Null and value Example test message
    
    Nota

    Si ha utilizado la consola para producir un mensaje de prueba, la clave de cada mensaje es Null