Service de diffusion en continu et trousse SDK pour .NET - Démarrage rapide
Publier et consommer des messages dans le service de diffusion en continu à l'aide de la trousse SDK OCI pour .NET.
Ce démarrage rapide vous montre comment utiliser la trousse SDK pour .NET pour Oracle Cloud Infrastructure (OCI) et le service de diffusion en continu pour Oracle Cloud Infrastructure pour publier et consommer des messages. Ces exemples utilisent le langage C#.
Pour les concepts clés et plus de détails sur le service de diffusion en continu, voir Aperçu du service de diffusion en continu. Pour plus d'informations sur l'utilisation des trousses SDK pour OCI, consultez les guides sur les SDK.
Préalables
Dans ce démarrage rapide, nous créons et exécutons une application de console .NET simple à l'aide de Visual Studio Code et de l'interface de ligne de commande .NET. Les tâches de projet, telles que la création, la compilation et l'exécution d'un projet, sont effectuées à l'aide de l'interface de ligne de commande .NET. Si vous préférez, vous pouvez suivre ce tutoriel avec un environnement de développement intégré (IDE) différent et exécuter des commandes dans un terminal.
-
Pour utiliser la trousse SDK pour .NET, vous devez disposer des éléments suivants :
- Un compte Oracle Cloud Infrastructure
- Utilisateur créé dans ce compte, dans un groupe avec une politique qui accorde les autorisations requises. Il peut s'agir d'un utilisateur pour vous-même, une autre personne ou un autre système qui doit appeler l'API. Pour des exemples sur la configuration d'un nouvel utilisateur, d'un groupe, d'un compartiment et d'une politique, voir Ajout d'utilisateurs. Pour obtenir la liste des politiques types que vous pouvez utiliser, voir Politiques communes.
- Une paire de clés utilisée pour signer des demandes d'API, avec la clé publique chargée dans Oracle. Seul l'utilisateur appelant l'API doit disposer de la clé privée. Pour plus d'informations, voir Fichier de configuration de trousse SDK.
- Collectez le point d'extrémité et l'OCID des messages d'un flux. Pour les étapes d'obtention des détails d'un flux, voir Obtention des détails d'un flux. Aux fins de ce démarrage rapide, le flux doit utiliser un point d'extrémité public et laisser Oracle gérer le chiffrement. Reportez-vous aux sections Création d'un flux et Création d'un groupe de flux si vous n'avez pas de flux existant.
- Installez la trousse SDK .NET 5.0 ou une version supérieure. Assurez-vous que
dotnet
est défini dans la variable d'environnementPATH
. -
Visual Studio Code (recommandé) avec l'extension C# installée. Pour plus d'informations sur l'installation d'extensions sur Visual Studio Code, voir Site Marketplace pour les extensions VS Code.
- Assurez-vous de disposer d'un fichier de configuration de trousse SDK valide. Pour les environnements de production, vous devez utiliser l'autorisation du principal d'instance.
Production de messages
- Ouvrez votre éditeur favori, tel que Visual Studio Code, à partir du répertoire de travail vide
wd
. - Ouvrez le terminal et
cd
dans le répertoirewd
. -
Créez une application de console C# .NET en exécutant la commande suivante dans le terminal :
dotnet new console
Vous devriez voir un message indiquant que l'application a été créée :
The template "Console Application" was created successfully.
Cela crée un fichier
Program.cs
avec du code C# pour une application "HelloWorld" simple. -
Ajoutez des ensembles de trousses SDK OCI pour l'authentification GIA de base et le service de diffusion en continu à votre projet C# comme suit :
dotnet add package OCI.DotNetSDK.Common
dotnet add package OCI.DotNetSDK.Streaming
-
Remplacez le code dans
Program.cs
du répertoirewd
par le code suivant. Remplacez les valeurs des variablesconfigurationFilePath
,profile
,ociStreamOcid
etociMessageEndpoint
dans l'extrait de code suivant par les valeurs applicables à votre location.using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; using Oci.Common.Auth; using Oci.Common.Waiters; using Oci.StreamingService; using Oci.StreamingService.Models; using Oci.StreamingService.Requests; using Oci.StreamingService.Responses; namespace OssProducer { class Program { public static async Task Main(string[] args) { Console.WriteLine("Starting example for OSS Producer"); string configurationFilePath = "<config_file_path>"; string profile = "<config_file_profile_name>"; string ociStreamOcid = "<stream_OCID>"; string ociMessageEndpoint = "<stream_message_endpoint>"; try { var provider = new ConfigFileAuthenticationDetailsProvider(configurationFilePath, profile); StreamClient streamClient = new StreamClient(provider); streamClient.SetEndpoint(ociMessageEndpoint); await PublishExampleMessages(streamClient, ociStreamOcid); } catch (Exception e) { Console.WriteLine($"Streaming example failed: {e}"); } } private static async Task PublishExampleMessages(StreamClient streamClient, string streamId) { // build up a putRequest and publish some messages to the stream List<PutMessagesDetailsEntry> messages = new List<PutMessagesDetailsEntry>(); for (int i = 0; i < 100; i++) { PutMessagesDetailsEntry detailsEntry = new PutMessagesDetailsEntry { Key = Encoding.UTF8.GetBytes($"messagekey-{i}"), Value = Encoding.UTF8.GetBytes($"messageValue-{i}") }; messages.Add(detailsEntry); } Console.WriteLine($"Publishing {messages.Count} messages to stream {streamId}"); PutMessagesDetails messagesDetails = new PutMessagesDetails { Messages = messages }; PutMessagesRequest putRequest = new PutMessagesRequest { StreamId = streamId, PutMessagesDetails = messagesDetails }; PutMessagesResponse putResponse = await streamClient.PutMessages(putRequest); // the putResponse can contain some useful metadata for handling failures foreach (PutMessagesResultEntry entry in putResponse.PutMessagesResult.Entries) { if (entry.Error != null) { Console.WriteLine($"Error({entry.Error}): {entry.ErrorMessage}"); } else { Console.WriteLine($"Published message to partition {entry.Partition}, offset {entry.Offset}"); } } } } }
-
Depuis le répertoire
wd
, exécutez la commande suivante :dotnet run
- Afficher les derniers messages envoyés au flux pour voir les derniers messages envoyés au flux pour vérifier que la production a réussi.
Consommation de messages
- Assurez-vous tout d'abord que le flux à partir duquel vous voulez consommer des messages contient des messages. Vous pouvez utiliser la console pour produire un message de test ou utiliser le flux et les messages que nous avons créés dans ce démarrage rapide.
- Ouvrez votre éditeur favori, tel que Visual Studio Code, à partir du répertoire de travail vide
wd
. -
Créez une application de console C# .NET en exécutant la commande suivante dans le terminal :
dotnet new console
Vous devriez voir un message indiquant que l'application a été créée :
The template "Console Application" was created successfully.
Cela crée un fichier
Program.cs
avec du code C# pour une application "HelloWorld" simple. -
Ajoutez des ensembles de trousses SDK OCI pour l'authentification GIA de base et le service de diffusion en continu à votre projet C# comme suit :
dotnet add package OCI.DotNetSDK.Common
dotnet add package OCI.DotNetSDK.Streaming
-
Remplacez le code dans
Program.cs
du répertoirewd
par le code suivant. Remplacez les valeurs des variablesconfigurationFilePath
,profile
,ociStreamOcid
etociMessageEndpoint
dans l'extrait de code suivant par les valeurs applicables à votre location.using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; using Oci.Common.Auth; using Oci.Common.Waiters; using Oci.StreamingService; using Oci.StreamingService.Models; using Oci.StreamingService.Requests; using Oci.StreamingService.Responses; namespace OssConsumer { class Program { public static async Task Main(string[] args) { Console.WriteLine("Starting example for OSS Consumer"); string configurationFilePath = "<config_file_path>"; string profile = "<config_file_profile_name>"; string ociStreamOcid = "<stream_OCID>"; string ociMessageEndpoint = "<stream_message_endpoint>"; try { var provider = new ConfigFileAuthenticationDetailsProvider(configurationFilePath, profile); StreamClient streamClient = new StreamClient(provider); streamClient.SetEndpoint(ociMessageEndpoint); // A cursor can be created as part of a consumer group. // Committed offsets are managed for the group, and partitions // are dynamically balanced amongst consumers in the group. Console.WriteLine("Starting a simple message loop with a group cursor"); string groupCursor = await GetCursorByGroup(streamClient, ociStreamOcid, "exampleGroup", "exampleInstance-1"); await SimpleMessageLoop(streamClient, ociStreamOcid, groupCursor); } catch (Exception e) { Console.WriteLine($"Streaming example failed: {e}"); } } private static async Task<string> GetCursorByGroup(StreamClient streamClient, string streamId, string groupName, string instanceName) { Console.WriteLine($"Creating a cursor for group {groupName}, instance {instanceName}"); CreateGroupCursorDetails createGroupCursorDetails = new CreateGroupCursorDetails { GroupName = groupName, InstanceName = instanceName, Type = CreateGroupCursorDetails.TypeEnum.TrimHorizon, CommitOnGet = true }; CreateGroupCursorRequest createCursorRequest = new CreateGroupCursorRequest { StreamId = streamId, CreateGroupCursorDetails = createGroupCursorDetails }; CreateGroupCursorResponse groupCursorResponse = await streamClient.CreateGroupCursor(createCursorRequest); return groupCursorResponse.Cursor.Value; } private static async Task SimpleMessageLoop(StreamClient streamClient, string streamId, string initialCursor) { string cursor = initialCursor; for (int i = 0; i < 10; i++) { GetMessagesRequest getMessagesRequest = new GetMessagesRequest { StreamId = streamId, Cursor = cursor, Limit = 10 }; GetMessagesResponse getResponse = await streamClient.GetMessages(getMessagesRequest); // process the messages Console.WriteLine($"Read {getResponse.Items.Count}"); foreach (Message message in getResponse.Items) { string key = message.Key != null ? Encoding.UTF8.GetString(message.Key) : "Null"; Console.WriteLine($"{key} : {Encoding.UTF8.GetString(message.Value)}"); } // getMessages is a throttled method; clients should retrieve sufficiently large message // batches, as to avoid too many http requests. await Task.Delay(1000); // use the next-cursor for iteration cursor = getResponse.OpcNextCursor; } } } }
-
Depuis le répertoire
wd
, exécutez la commande suivante :dotnet run
-
Des messages similaires aux suivants doivent s'afficher :
Starting example for OSS Consumer Starting a simple message loop with a group cursor Creating a cursor for group exampleGroup, instance exampleInstance-1 Read 10 messagekey-0 : messageValue-0 messagekey-1 : messageValue-1 messagekey-2 : messageValue-2 messagekey-3 : messageValue-3 messagekey-4 : messageValue-4 messagekey-5 : messageValue-5 messagekey-6 : messageValue-6 messagekey-7 : messageValue-7 messagekey-8 : messageValue-8 messagekey-9 : messageValue-9 Read 10
Note
Si vous avez utilisé la console pour produire un message de test, la clé de chaque message estNull
.
Étapes suivantes
Pour plus d'informations, voir les ressources suivantes :