Démarrage rapide du kit SDK pour .NET avec Streaming
Publiez et utilisez des messages dans le service Streaming à l'aide du kit SDK OCI pour .NET.
Ce démarrage rapide vous explique comment utiliser le kit SDK Oracle Cloud Infrastructure (OCI) pour .NET et Oracle Cloud Infrastructure Streaming afin de publier et d'utiliser des messages. Ces exemples utilisent le langage C#.
Pour plus d'informations sur les concepts clés et Streaming, reportez-vous à Présentation de Streaming. Pour plus d'informations sur l'utilisation des kits SDK OCI, reportez-vous aux guides SDK.
Prérequis
Dans ce démarrage rapide, nous créons et exécutons une simple application de console .NET à l'aide de Visual Studio Code et de l'interface de ligne de commande .NET. Les tâches de projet, telles que la création, la compilation et l'exécution d'un projet, sont effectuées à l'aide de l'interface de ligne de commande .NET. Si vous préférez, vous pouvez suivre ce tutoriel avec un IDE différent et exécuter des commandes dans un terminal.
-
Afin d'utiliser le kit SDK pour .NET, vous devez disposer des éléments suivants :
- Un compte Oracle Cloud Infrastructure.
- Un utilisateur créé dans ce compte, dans un groupe avec une stratégie qui octroie les droits d'accès requis. Cet utilisateur peut être vous-même, ou une autre personne/un autre système devant appeler l'API. Pour obtenir un exemple de configuration d'un nouvel utilisateur, d'un nouveau groupe, d'un nouveau compartiment et d'une nouvelle stratégie, reportez-vous à Ajout d'utilisateurs. Pour obtenir la liste des stratégies standard que vous pouvez utiliser, reportez-vous à Stratégies courantes.
- Une paire de clés utilisée lors de la signature des demandes d'API, avec la clé publique téléchargée vers Oracle. Seul l'utilisateur appelant l'API doit disposer de la clé privée. Pour plus d'informations, reportez-vous à Fichier de configuration du kit SDK.
- Collectez l'adresse des messages et l'OCID d'un flux de données. Pour connaître les étapes d'obtention des détails d'un flux de données, reportez-vous à Obtention des détails d'un flux de données. Dans le cadre de ce démarrage rapide, le flux de données doit utiliser une adresse publique et laisser Oracle gérer le cryptage. Si vous n'avez pas de flux existant, reportez-vous à Création d'un flux de données et à Création d'un pool de flux de données.
- Installez le kit SDK .NET version 5.0 ou ultérieure. Assurez-vous que
dotnet
est défini dans la variable d'environnementPATH
. -
Visual Studio Code (recommandé) avec l'extension C# installée. Pour plus d'informations sur l'installation d'extensions sur Visual Studio Code, reportez-vous au marché d'extension de VS Code.
- Assurez-vous que vous disposez d'un fichier de configuration de kit SDK valide. Pour les environnements de production, vous devez utiliser l'autorisation de principal d'instance.
Production de messages
- Ouvrez votre éditeur favori, tel que Visual Studio Code, à partir du répertoire de travail vide
wd
. - Ouvrez le terminal et utilisez la commande
cd
pour passer au répertoirewd
. -
Créez une application de console .NET C# en exécutant la commande suivante dans le terminal :
dotnet new console
Vous devez voir un message indiquant que l'application a été créée :
The template "Console Application" was created successfully.
Cette opération crée un fichier
Program.cs
avec du code C# pour une simple application "HelloWorld". -
Ajoutez à votre projet C# des packages de kit SDK OCI pour l'authentification IAM de base et Streaming, comme suit :
dotnet add package OCI.DotNetSDK.Common
dotnet add package OCI.DotNetSDK.Streaming
-
Remplacez le code de
Program.cs
dans le répertoirewd
par le code suivant. Remplacez les valeurs des variablesconfigurationFilePath
,profile
,ociStreamOcid
etociMessageEndpoint
du fragment de code suivant par les valeurs applicables à votre location.using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; using Oci.Common.Auth; using Oci.Common.Waiters; using Oci.StreamingService; using Oci.StreamingService.Models; using Oci.StreamingService.Requests; using Oci.StreamingService.Responses; namespace OssProducer { class Program { public static async Task Main(string[] args) { Console.WriteLine("Starting example for OSS Producer"); string configurationFilePath = "<config_file_path>"; string profile = "<config_file_profile_name>"; string ociStreamOcid = "<stream_OCID>"; string ociMessageEndpoint = "<stream_message_endpoint>"; try { var provider = new ConfigFileAuthenticationDetailsProvider(configurationFilePath, profile); StreamClient streamClient = new StreamClient(provider); streamClient.SetEndpoint(ociMessageEndpoint); await PublishExampleMessages(streamClient, ociStreamOcid); } catch (Exception e) { Console.WriteLine($"Streaming example failed: {e}"); } } private static async Task PublishExampleMessages(StreamClient streamClient, string streamId) { // build up a putRequest and publish some messages to the stream List<PutMessagesDetailsEntry> messages = new List<PutMessagesDetailsEntry>(); for (int i = 0; i < 100; i++) { PutMessagesDetailsEntry detailsEntry = new PutMessagesDetailsEntry { Key = Encoding.UTF8.GetBytes($"messagekey-{i}"), Value = Encoding.UTF8.GetBytes($"messageValue-{i}") }; messages.Add(detailsEntry); } Console.WriteLine($"Publishing {messages.Count} messages to stream {streamId}"); PutMessagesDetails messagesDetails = new PutMessagesDetails { Messages = messages }; PutMessagesRequest putRequest = new PutMessagesRequest { StreamId = streamId, PutMessagesDetails = messagesDetails }; PutMessagesResponse putResponse = await streamClient.PutMessages(putRequest); // the putResponse can contain some useful metadata for handling failures foreach (PutMessagesResultEntry entry in putResponse.PutMessagesResult.Entries) { if (entry.Error != null) { Console.WriteLine($"Error({entry.Error}): {entry.ErrorMessage}"); } else { Console.WriteLine($"Published message to partition {entry.Partition}, offset {entry.Offset}"); } } } } }
-
A partir du répertoire
wd
, exécutez la commande suivante :dotnet run
- Afficher les derniers messages envoyés au flux de données : affichez les derniers messages envoyés au flux de données pour vérifier que la production a réussi.
Utilisation des messages
- Tout d'abord, assurez-vous que le flux de données dont vous souhaitez utiliser des messages en contient. Vous pouvez utiliser la console pour produire un message de test, ou vous servir du flux de données et des messages créés dans ce démarrage rapide.
- Ouvrez votre éditeur favori, tel que Visual Studio Code, à partir du répertoire de travail vide
wd
. -
Créez une application de console .NET C# en exécutant la commande suivante dans le terminal :
dotnet new console
Vous devez voir un message indiquant que l'application a été créée :
The template "Console Application" was created successfully.
Cette opération crée un fichier
Program.cs
avec du code C# pour une simple application "HelloWorld". -
Ajoutez à votre projet C# des packages de kit SDK OCI pour l'authentification IAM de base et Streaming, comme suit :
dotnet add package OCI.DotNetSDK.Common
dotnet add package OCI.DotNetSDK.Streaming
-
Remplacez le code de
Program.cs
dans le répertoirewd
par le code suivant. Remplacez les valeurs des variablesconfigurationFilePath
,profile
,ociStreamOcid
etociMessageEndpoint
du fragment de code suivant par les valeurs applicables à votre location.using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; using Oci.Common.Auth; using Oci.Common.Waiters; using Oci.StreamingService; using Oci.StreamingService.Models; using Oci.StreamingService.Requests; using Oci.StreamingService.Responses; namespace OssConsumer { class Program { public static async Task Main(string[] args) { Console.WriteLine("Starting example for OSS Consumer"); string configurationFilePath = "<config_file_path>"; string profile = "<config_file_profile_name>"; string ociStreamOcid = "<stream_OCID>"; string ociMessageEndpoint = "<stream_message_endpoint>"; try { var provider = new ConfigFileAuthenticationDetailsProvider(configurationFilePath, profile); StreamClient streamClient = new StreamClient(provider); streamClient.SetEndpoint(ociMessageEndpoint); // A cursor can be created as part of a consumer group. // Committed offsets are managed for the group, and partitions // are dynamically balanced amongst consumers in the group. Console.WriteLine("Starting a simple message loop with a group cursor"); string groupCursor = await GetCursorByGroup(streamClient, ociStreamOcid, "exampleGroup", "exampleInstance-1"); await SimpleMessageLoop(streamClient, ociStreamOcid, groupCursor); } catch (Exception e) { Console.WriteLine($"Streaming example failed: {e}"); } } private static async Task<string> GetCursorByGroup(StreamClient streamClient, string streamId, string groupName, string instanceName) { Console.WriteLine($"Creating a cursor for group {groupName}, instance {instanceName}"); CreateGroupCursorDetails createGroupCursorDetails = new CreateGroupCursorDetails { GroupName = groupName, InstanceName = instanceName, Type = CreateGroupCursorDetails.TypeEnum.TrimHorizon, CommitOnGet = true }; CreateGroupCursorRequest createCursorRequest = new CreateGroupCursorRequest { StreamId = streamId, CreateGroupCursorDetails = createGroupCursorDetails }; CreateGroupCursorResponse groupCursorResponse = await streamClient.CreateGroupCursor(createCursorRequest); return groupCursorResponse.Cursor.Value; } private static async Task SimpleMessageLoop(StreamClient streamClient, string streamId, string initialCursor) { string cursor = initialCursor; for (int i = 0; i < 10; i++) { GetMessagesRequest getMessagesRequest = new GetMessagesRequest { StreamId = streamId, Cursor = cursor, Limit = 10 }; GetMessagesResponse getResponse = await streamClient.GetMessages(getMessagesRequest); // process the messages Console.WriteLine($"Read {getResponse.Items.Count}"); foreach (Message message in getResponse.Items) { string key = message.Key != null ? Encoding.UTF8.GetString(message.Key) : "Null"; Console.WriteLine($"{key} : {Encoding.UTF8.GetString(message.Value)}"); } // getMessages is a throttled method; clients should retrieve sufficiently large message // batches, as to avoid too many http requests. await Task.Delay(1000); // use the next-cursor for iteration cursor = getResponse.OpcNextCursor; } } } }
-
A partir du répertoire
wd
, exécutez la commande suivante :dotnet run
-
Des messages semblables à celui qui suit s'affichent :
Starting example for OSS Consumer Starting a simple message loop with a group cursor Creating a cursor for group exampleGroup, instance exampleInstance-1 Read 10 messagekey-0 : messageValue-0 messagekey-1 : messageValue-1 messagekey-2 : messageValue-2 messagekey-3 : messageValue-3 messagekey-4 : messageValue-4 messagekey-5 : messageValue-5 messagekey-6 : messageValue-6 messagekey-7 : messageValue-7 messagekey-8 : messageValue-8 messagekey-9 : messageValue-9 Read 10
Remarque
Si vous avez utilisé la console pour produire un message de test, la clé de chaque message estNull
.
Etapes suivantes
Pour plus d'informations, reportez-vous aux ressources suivantes :