Démarrage rapide du client .NET Kafka et de Streaming
Publiez et utilisez des messages dans le service Streaming à l'aide du client .NET Kafka.
Ce démarrage rapide vous explique comment utiliser le client .NET Kafka avec Oracle Cloud Infrastructure Streaming pour publier et utiliser des messages. Ces exemples utilisent le langage C#.
Pour plus d'informations, reportez-vous à Utilisation de Streaming avec Apache Kafka. Pour plus d'informations sur les concepts clés et Streaming, reportez-vous à Présentation de Streaming.
Prérequis
Dans ce démarrage rapide, nous créons et exécutons une simple application de console .NET à l'aide de Visual Studio Code et de l'interface de ligne de commande .NET. Les tâches de projet, telles que la création, la compilation et l'exécution d'un projet, sont effectuées à l'aide de l'interface de ligne de commande .NET. Si vous préférez, vous pouvez suivre ce tutoriel avec un IDE différent et exécuter des commandes dans un terminal.
-
Afin d'utiliser le client .NET Kafka avec Streaming, vous devez disposer des éléments suivants :
- Un compte Oracle Cloud Infrastructure.
- Un utilisateur créé dans ce compte, dans un groupe avec une stratégie qui octroie les droits d'accès requis. Pour obtenir un exemple de configuration d'un nouvel utilisateur, d'un nouveau groupe, d'un nouveau compartiment et d'une nouvelle stratégie, reportez-vous à Ajout d'utilisateurs. Pour obtenir la liste des stratégies standard que vous pouvez utiliser, reportez-vous à Stratégies courantes.
-
Collectez les informations suivantes :
- OCID de flux de données
- Adresse des messages
- OCID de pool de flux de données
- Nom de domaine qualifié complet de pool de flux de données
- Paramètres de la connexion Kafka :
- Serveurs de démarrage
- Chaînes de connexion SASL
- Protocole de sécurité
Pour connaître les étapes de création et de gestion des flux de données et des pools de flux de données, reportez-vous à Gestion des flux de données et à Gestion des pools de flux de données. Les flux de données correspondent à une rubrique Kafka.
- Installez le kit SDK .NET version 5.0 ou ultérieure. Assurez-vous que
dotnet
est défini dans la variable d'environnementPATH
. -
Visual Studio Code (recommandé) avec l'extension C# installée. Pour plus d'informations sur l'installation d'extensions sur Visual Studio Code, reportez-vous au marché d'extension de VS Code.
-
L'authentification avec le protocole Kafka utilise des jetons d'authentification et le mécanisme SASL/PLAIN. Reportez-vous à Utilisation des jetons d'authentification pour plus d'informations sur la génération de jetons d'authentification. Si vous avez créé le flux de données et le pool de flux de données dans OCI, vous êtes déjà autorisé à utiliser ce flux de données conformément à OCI IAM. Vous devez donc créer des jetons d'authentification pour l'utilisateur OCI.
Remarque
Les jetons d'authentification de l'utilisateur OCI ne sont visibles qu'au moment de la création. Copiez-les et conservez-les en lieu sûr pour une utilisation ultérieure. -
Installez les certificats racine d'autorité de certification SSL sur l'hôte où vous développez et exécutez ce démarrage rapide. Le client utilise des certificats d'autorité de certification pour vérifier le certificat du broker.
Pour Windows, téléchargez le fichier
cacert.pem
distribué avec curl (télécharger cacert.pm). Pour les autres plates-formes, reportez-vous à Configuration du truststore SSL.
Production de messages
- Ouvrez votre éditeur favori, tel que Visual Studio Code, à partir du répertoire de travail vide
wd
. - Ouvrez le terminal et utilisez la commande
cd
pour passer au répertoirewd
. -
Créez une application de console .NET C# en exécutant la commande suivante dans le terminal :
dotnet new console
Vous devez voir un message indiquant que l'application a été créée :
The template "Console Application" was created successfully.
Cette opération crée un fichier
Program.cs
avec du code C# pour une simple application "HelloWorld". -
Pour référencer la bibliothèque
confluent-kafka-dotnet
dans votre nouveau projet .NET Core, exécutez la commande suivante dans le répertoirewd
de votre projet :dotnet add package Confluent.Kafka
-
Remplacez le code de
Program.cs
dans le répertoirewd
par le code suivant. Remplacez les valeurs des variables de la correspondanceProducerConfig
et le nom de la rubrique (topic
) par les détails que vous avez collectés dans les prérequis :using System; using Confluent.Kafka; namespace OssProducerWithKafkaApi { class Program { static void Main(string[] args) { Console.WriteLine("Demo for using Kafka APIs seamlessly with OSS"); var config = new ProducerConfig { BootstrapServers = "<bootstrap_servers_endpoint>", //usually of the form cell-1.streaming.[region code].oci.oraclecloud.com:9092 SslCaLocation = "<path\to\root\ca\certificate\*.pem>", SecurityProtocol = SecurityProtocol.SaslSsl, SaslMechanism = SaslMechanism.Plain, SaslUsername = "<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>", SaslPassword = "<your_OCI_user_auth_token>", // use the auth-token you created step 5 of Prerequisites section }; Produce("<topic_stream_name>", config); // use the name of the stream you created } static void Produce(string topic, ClientConfig config) { using (var producer = new ProducerBuilder<string, string>(config).Build()) { int numProduced = 0; int numMessages = 10; for (int i=0; i<numMessages; ++i) { var key = "messageKey" + i; var val = "messageVal" + i; Console.WriteLine($"Producing record: {key} {val}"); producer.Produce(topic, new Message<string, string> { Key = key, Value = val }, (deliveryReport) => { if (deliveryReport.Error.Code != ErrorCode.NoError) { Console.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}"); } else { Console.WriteLine($"Produced message to: {deliveryReport.TopicPartitionOffset}"); numProduced += 1; } }); } producer.Flush(TimeSpan.FromSeconds(10)); Console.WriteLine($"{numProduced} messages were produced to topic {topic}"); } } } }
-
A partir du répertoire
wd
, exécutez la commande suivante :dotnet run
- Afficher les derniers messages envoyés au flux de données pour vérifier que la production a réussi.
Utilisation des messages
- Tout d'abord, assurez-vous que le flux de données dont vous souhaitez utiliser des messages en contient. Vous pouvez utiliser la console pour produire un message de test, ou vous servir du flux de données et des messages créés dans ce démarrage rapide.
- Ouvrez votre éditeur favori, tel que Visual Studio Code, à partir du répertoire de travail vide
wd
. -
Créez une application de console .NET C# en exécutant la commande suivante dans le terminal :
dotnet new console
Vous devez voir un message indiquant que l'application a été créée :
The template "Console Application" was created successfully.
Cette opération crée un fichier
Program.cs
avec du code C# pour une simple application "HelloWorld". -
Pour référencer la bibliothèque
confluent-kafka-dotnet
dans votre nouveau projet .NET Core, exécutez la commande suivante dans le répertoirewd
de votre projet :dotnet add package Confluent.Kafka
-
Remplacez le code de
Program.cs
dans le répertoirewd
par le code suivant. Remplacez les valeurs des variables de la correspondanceProducerConfig
et le nom de la rubrique (topic
) par les détails que vous avez collectés dans les prérequis :using System; using Confluent.Kafka; using System.Threading; namespace OssKafkaConsumerDotnet { class Program { static void Main(string[] args) { Console.WriteLine("Demo for using Kafka APIs seamlessly with OSS"); var config = new ConsumerConfig { BootstrapServers = "<bootstrap_servers_endpoint>", //usually of the form cell-1.streaming.[region code].oci.oraclecloud.com:9092 SslCaLocation = "<path\to\root\ca\certificate\*.pem>", SecurityProtocol = SecurityProtocol.SaslSsl, SaslMechanism = SaslMechanism.Plain, SaslUsername = "<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>", SaslPassword = "<your_OCI_user_auth_token>", // use the auth-token you created step 5 of Prerequisites section }; Consume("<topic_stream_name>", config); // use the name of the stream you created } static void Consume(string topic, ClientConfig config) { var consumerConfig = new ConsumerConfig(config); consumerConfig.GroupId = "dotnet-oss-consumer-group"; consumerConfig.AutoOffsetReset = AutoOffsetReset.Earliest; consumerConfig.EnableAutoCommit = true; CancellationTokenSource cts = new CancellationTokenSource(); Console.CancelKeyPress += (_, e) => { e.Cancel = true; // prevent the process from terminating. cts.Cancel(); }; using (var consumer = new ConsumerBuilder<string, string>(consumerConfig).Build()) { consumer.Subscribe(topic); try { while (true) { var cr = consumer.Consume(cts.Token); string key = cr.Message.Key == null ? "Null" : cr.Message.Key; Console.WriteLine($"Consumed record with key {key} and value {cr.Message.Value}"); } } catch (OperationCanceledException) { //exception might have occurred since Ctrl-C was pressed. } finally { // Ensure the consumer leaves the group cleanly and final offsets are committed. consumer.Close(); } } } } }
-
A partir du répertoire
wd
, exécutez la commande suivante :dotnet run
-
Des messages semblables à celui qui suit s'affichent :
Demo for using Kafka APIs seamlessly with OSS Consumed record with key messageKey0 and value messageValue0 Consumed record with key messageKey1 and value messageValue1 Consumed record with key Null and value Example test message
Remarque
Si vous avez utilisé la console pour produire un message de test, la clé de chaque message estNull
.
Etapes suivantes
Pour plus d'informations, reportez-vous aux ressources suivantes :