Client .NET Kafka et service de diffusion en continu - Démarrage rapide
Publier et consommer des messages dans le service de flux à l'aide du client .NET Kafka.
Ce démarrage rapide vous montre comment utiliser le client .NET Kafka avec le service de diffusion en continu pour Oracle Cloud Infrastructure Streaming pour publier et consommer des messages. Ces exemples utilisent le langage C#.
Pour plus d'informations, voir Utilisation du service de diffusion en continu avec Apache Kafka. Pour les concepts clés et plus de détails sur le service de diffusion en continu, voir Aperçu du service de diffusion en continu
Préalables
Dans ce démarrage rapide, nous créons et exécutons une application de console .NET simple à l'aide de Visual Studio Code et de l'interface de ligne de commande .NET. Les tâches de projet, telles que la création, la compilation et l'exécution d'un projet, sont effectuées à l'aide de l'interface de ligne de commande .NET. Si vous préférez, vous pouvez suivre ce tutoriel avec un environnement de développement intégré (IDE) différent et exécuter des commandes dans un terminal.
-
Pour utiliser le client .NET Kafka avec le service de diffusion en continu, vous devez disposer des éléments suivants :
- Un compte Oracle Cloud Infrastructure
- Utilisateur créé dans ce compte, dans un groupe avec une politique qui accorde les autorisations requises. Pour des exemples sur la configuration d'un nouvel utilisateur, d'un groupe, d'un compartiment et d'une politique, voir Ajout d'utilisateurs. Pour obtenir la liste des politiques types que vous pouvez utiliser, voir Politiques communes.
-
Collectez les détails suivants :
- OCID du flux
- Point d'extrémité pour les messages
- OCID du groupe de flux
- Nom de domaine complet du groupe de flux
- Paramètres de connexion Kafka :
- Serveurs d'amorçage
- Chaînes de connexion SASL
- Protocole de sécurité
Pour les étapes de création et de gestion des flux et des groupes de flux, voir Gestion des flux et Gestion des groupes de flux. Les flux correspondent à une rubrique Kafka.
- Installez la trousse SDK .NET 5.0 ou une version supérieure. Assurez-vous que
dotnet
est défini dans la variable d'environnementPATH
. -
Visual Studio Code (recommandé) avec l'extension C# installée. Pour plus d'informations sur l'installation d'extensions sur Visual Studio Code, voir Site Marketplace pour les extensions VS Code.
-
L'authentification avec le protocole Kafka utilise des jetons d'authentification et le mécanisme SASL/PLAIN. Voir Utilisation des jetons d'authentification pour la génération du jeton d'authentification. Si vous avez créé le flux et le groupe de flux dans OCI, vous êtes déjà autorisé à utiliser ce flux selon le service GIA pour OCI, de sorte que vous devez créer des jetons d'authentification pour votre utilisateur OCI.
Note
Les jetons d'authentification d'utilisateur OCI ne sont visibles qu'au moment de la création. Copiez-le et conservez-le dans un endroit sûr pour une utilisation future. -
Installez les certificats racines de l'autorité de certification SSL sur l'hôte sur lequel vous développez et exécutez ce démarrage rapide. Le client utilise des certificats AC pour vérifier le certificat du courtier.
Pour Windows, téléchargez le fichier
cacert.pem
distribué avec curl (téléchargez cacert.pm). Pour les autres plates-formes, voir Configurer le magasin de certificats SSL.
Production de messages
- Ouvrez votre éditeur favori, tel que Visual Studio Code, à partir du répertoire de travail vide
wd
. - Ouvrez le terminal et
cd
dans le répertoirewd
. -
Créez une application de console C# .NET en exécutant la commande suivante dans le terminal :
dotnet new console
Vous devriez voir un message indiquant que l'application a été créée :
The template "Console Application" was created successfully.
Cela crée un fichier
Program.cs
avec du code C# pour une application "HelloWorld" simple. -
Pour référencer la bibliothèque
confluent-kafka-dotnet
dans votre nouveau projet .NET Core, exécutez la commande suivante dans le répertoire de votre projetwd
:dotnet add package Confluent.Kafka
-
Remplacez le code dans
Program.cs
du répertoirewd
par le code suivant. Remplacez les valeurs des variables du mappageProducerConfig
et le nom detopic
par les détails que vous avez collectés dans les préalables :using System; using Confluent.Kafka; namespace OssProducerWithKafkaApi { class Program { static void Main(string[] args) { Console.WriteLine("Demo for using Kafka APIs seamlessly with OSS"); var config = new ProducerConfig { BootstrapServers = "<bootstrap_servers_endpoint>", //usually of the form cell-1.streaming.[region code].oci.oraclecloud.com:9092 SslCaLocation = "<path\to\root\ca\certificate\*.pem>", SecurityProtocol = SecurityProtocol.SaslSsl, SaslMechanism = SaslMechanism.Plain, SaslUsername = "<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>", SaslPassword = "<your_OCI_user_auth_token>", // use the auth-token you created step 5 of Prerequisites section }; Produce("<topic_stream_name>", config); // use the name of the stream you created } static void Produce(string topic, ClientConfig config) { using (var producer = new ProducerBuilder<string, string>(config).Build()) { int numProduced = 0; int numMessages = 10; for (int i=0; i<numMessages; ++i) { var key = "messageKey" + i; var val = "messageVal" + i; Console.WriteLine($"Producing record: {key} {val}"); producer.Produce(topic, new Message<string, string> { Key = key, Value = val }, (deliveryReport) => { if (deliveryReport.Error.Code != ErrorCode.NoError) { Console.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}"); } else { Console.WriteLine($"Produced message to: {deliveryReport.TopicPartitionOffset}"); numProduced += 1; } }); } producer.Flush(TimeSpan.FromSeconds(10)); Console.WriteLine($"{numProduced} messages were produced to topic {topic}"); } } } }
-
Depuis le répertoire
wd
, exécutez la commande suivante :dotnet run
- Afficher les derniers messages envoyés au flux pour vérifier que la production a réussi.
Consommation de messages
- Assurez-vous tout d'abord que le flux à partir duquel vous voulez consommer des messages contient des messages. Vous pouvez utiliser la console pour produire un message de test ou utiliser le flux et les messages que nous avons créés dans ce démarrage rapide.
- Ouvrez votre éditeur favori, tel que Visual Studio Code, à partir du répertoire de travail vide
wd
. -
Créez une application de console C# .NET en exécutant la commande suivante dans le terminal :
dotnet new console
Vous devriez voir un message indiquant que l'application a été créée :
The template "Console Application" was created successfully.
Cela crée un fichier
Program.cs
avec du code C# pour une application "HelloWorld" simple. -
Pour référencer la bibliothèque
confluent-kafka-dotnet
dans votre nouveau projet .NET Core, exécutez la commande suivante dans le répertoire de votre projetwd
:dotnet add package Confluent.Kafka
-
Remplacez le code dans
Program.cs
du répertoirewd
par le code suivant. Remplacez les valeurs des variables du mappageProducerConfig
et le nom detopic
par les détails que vous avez collectés dans les préalables :using System; using Confluent.Kafka; using System.Threading; namespace OssKafkaConsumerDotnet { class Program { static void Main(string[] args) { Console.WriteLine("Demo for using Kafka APIs seamlessly with OSS"); var config = new ConsumerConfig { BootstrapServers = "<bootstrap_servers_endpoint>", //usually of the form cell-1.streaming.[region code].oci.oraclecloud.com:9092 SslCaLocation = "<path\to\root\ca\certificate\*.pem>", SecurityProtocol = SecurityProtocol.SaslSsl, SaslMechanism = SaslMechanism.Plain, SaslUsername = "<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>", SaslPassword = "<your_OCI_user_auth_token>", // use the auth-token you created step 5 of Prerequisites section }; Consume("<topic_stream_name>", config); // use the name of the stream you created } static void Consume(string topic, ClientConfig config) { var consumerConfig = new ConsumerConfig(config); consumerConfig.GroupId = "dotnet-oss-consumer-group"; consumerConfig.AutoOffsetReset = AutoOffsetReset.Earliest; consumerConfig.EnableAutoCommit = true; CancellationTokenSource cts = new CancellationTokenSource(); Console.CancelKeyPress += (_, e) => { e.Cancel = true; // prevent the process from terminating. cts.Cancel(); }; using (var consumer = new ConsumerBuilder<string, string>(consumerConfig).Build()) { consumer.Subscribe(topic); try { while (true) { var cr = consumer.Consume(cts.Token); string key = cr.Message.Key == null ? "Null" : cr.Message.Key; Console.WriteLine($"Consumed record with key {key} and value {cr.Message.Value}"); } } catch (OperationCanceledException) { //exception might have occurred since Ctrl-C was pressed. } finally { // Ensure the consumer leaves the group cleanly and final offsets are committed. consumer.Close(); } } } } }
-
Depuis le répertoire
wd
, exécutez la commande suivante :dotnet run
-
Des messages similaires aux suivants doivent s'afficher :
Demo for using Kafka APIs seamlessly with OSS Consumed record with key messageKey0 and value messageValue0 Consumed record with key messageKey1 and value messageValue1 Consumed record with key Null and value Example test message
Note
Si vous avez utilisé la console pour produire un message de test, la clé de chaque message estNull
.
Étapes suivantes
Pour plus d'informations, voir les ressources suivantes :