Démarrage rapide du kit SDK pour Python avec Streaming
Publiez et utilisez des messages dans le service Streaming à l'aide du kit SDK OCI pour Python.
Ce démarrage rapide vous explique comment utiliser le kit SDK Oracle Cloud Infrastructure (OCI) pour Python et Oracle Cloud Infrastructure Streaming afin de publier et d'utiliser des messages.
Pour plus d'informations sur les concepts clés et Streaming, reportez-vous à Présentation de Streaming. Pour plus d'informations sur l'utilisation des kits SDK OCI, reportez-vous aux guides SDK.
Prérequis
-
Afin d'utiliser le kit SDK pour Python, vous devez disposer des éléments suivants :
- Un compte Oracle Cloud Infrastructure.
- Un utilisateur créé dans ce compte, dans un groupe avec une stratégie qui octroie les droits d'accès requis. Cet utilisateur peut être vous-même, ou une autre personne/un autre système devant appeler l'API. Pour obtenir un exemple de configuration d'un nouvel utilisateur, d'un nouveau groupe, d'un nouveau compartiment et d'une nouvelle stratégie, reportez-vous à Ajout d'utilisateurs. Pour obtenir la liste des stratégies standard que vous pouvez utiliser, reportez-vous à Stratégies courantes.
- Une paire de clés utilisée lors de la signature des demandes d'API, avec la clé publique téléchargée vers Oracle. Seul l'utilisateur appelant l'API doit disposer de la clé privée.
- Collectez l'adresse des messages et l'OCID d'un flux de données. Pour connaître les étapes d'obtention des détails d'un flux de données, reportez-vous à Obtention des détails d'un flux de données. Dans le cadre de ce démarrage rapide, le flux de données doit utiliser une adresse publique et laisser Oracle gérer le cryptage. Si vous n'avez pas de flux existant, reportez-vous à Création d'un flux de données et à Création d'un pool de flux de données.
- Python version 3.6 ou ultérieure, avec PIP installé et mis à jour.
- Visual Code Studio (recommandé) ou tout autre environnement de développement intégré (IDE).
-
Installez des packages
oci-sdk
pour Python à l'aide de la commande suivante :pip install oci
Remarque
Nous vous recommandons d'utiliser un environnement virtuel Python lors de l'installation d'oci
. Pour plus d'informations, reportez-vous à Téléchargement et installation du kit SDK. - Assurez-vous que vous disposez d'un fichier de configuration de kit SDK valide. Pour les environnements de production, vous devez utiliser l'autorisation de principal d'instance.
Production de messages
- Ouvrez votre éditeur favori, tel que Visual Studio Code, à partir du répertoire
wd
. Vous devez déjà disposer de packagesoci-sdk
pour Python installés pour votre environnement Python actuel si les prérequis ont été respectés. -
Créez un fichier nommé
Producer.py
dans le répertoirewd
avec le code suivant. Remplacez les valeurs des variablesociConfigFilePath
,ociProfileName
,ociStreamOcid
etociMessageEndpoint
du fragment de code suivant par les valeurs applicables à votre location.import oci from base64 import b64encode ociMessageEndpoint = "<stream_message_endpoint>" ociStreamOcid = "<stream_OCID>" ociConfigFilePath = "<config_file_path>" ociProfileName = "<config_file_profile_name>" def produce_messages(client, stream_id): # Build up a PutMessagesDetails and publish some messages to the stream message_list = [] for i in range(100): key = "messageKey" + str(i) value = "messageValue " + str(i) encoded_key = b64encode(key.encode()).decode() encoded_value = b64encode(value.encode()).decode() message_list.append(oci.streaming.models.PutMessagesDetailsEntry(key=encoded_key, value=encoded_value)) print("Publishing {} messages to the stream {} ".format(len(message_list), stream_id)) messages = oci.streaming.models.PutMessagesDetails(messages=message_list) put_message_result = client.put_messages(stream_id, messages) # The put_message_result can contain some useful metadata for handling failures for entry in put_message_result.data.entries: if entry.error: print("Error ({}) : {}".format(entry.error, entry.error_message)) else: print("Published message to partition {} , offset {}".format(entry.partition, entry.offset)) config = oci.config.from_file(ociConfigFilePath, ociProfileName) stream_client = oci.streaming.StreamClient(config, service_endpoint=ociMessageEndpoint) # Publish some messages to the stream produce_messages(stream_client, ociStreamOcid)
-
A partir du répertoire
wd
, exécutez la commande suivante :python Producer.py
- Afficher les derniers messages envoyés au flux de données : affichez les derniers messages envoyés au flux de données pour vérifier que la production a réussi.
Utilisation des messages
- Tout d'abord, assurez-vous que le flux de données dont vous souhaitez utiliser des messages en contient. Vous pouvez utiliser la console pour produire un message de test, ou vous servir du flux de données et des messages créés dans ce démarrage rapide.
- Ouvrez votre éditeur favori, tel que Visual Studio Code, à partir du répertoire
wd
. Vous devez déjà disposer de packagesoci-sdk
pour Python installés pour votre environnement Python actuel si vous avez vérifié les prérequis. -
Créez un fichier nommé
Consumer.py
dans le répertoirewd
avec le code suivant. Remplacez les valeurs des variablesociConfigFilePath
,ociProfileName
,ociStreamOcid
etociMessageEndpoint
du fragment de code suivant par les valeurs applicables à votre location.import oci import time from base64 import b64decode ociMessageEndpoint = "<stream_message_endpoint>" ociStreamOcid = "<stream_OCID>" ociConfigFilePath = "<config_file_path>" ociProfileName = "<config_file_profile_name>" def get_cursor_by_group(sc, sid, group_name, instance_name): print(" Creating a cursor for group {}, instance {}".format(group_name, instance_name)) cursor_details = oci.streaming.models.CreateGroupCursorDetails(group_name=group_name, instance_name=instance_name, type=oci.streaming.models. CreateGroupCursorDetails.TYPE_TRIM_HORIZON, commit_on_get=True) response = sc.create_group_cursor(sid, cursor_details) return response.data.value def simple_message_loop(client, stream_id, initial_cursor): cursor = initial_cursor while True: get_response = client.get_messages(stream_id, cursor, limit=10) # No messages to process. return. if not get_response.data: return # Process the messages print(" Read {} messages".format(len(get_response.data))) for message in get_response.data: if message.key is None: key = "Null" else: key = b64decode(message.key.encode()).decode() print("{}: {}".format(key, b64decode(message.value.encode()).decode())) # get_messages is a throttled method; clients should retrieve sufficiently large message # batches, as to avoid too many http requests. time.sleep(1) # use the next-cursor for iteration cursor = get_response.headers["opc-next-cursor"] config = oci.config.from_file(ociConfigFilePath, ociProfileName) stream_client = oci.streaming.StreamClient(config, service_endpoint=ociMessageEndpoint) # A cursor can be created as part of a consumer group. # Committed offsets are managed for the group, and partitions # are dynamically balanced amongst consumers in the group. group_cursor = get_cursor_by_group(stream_client, ociStreamOcid, "example-group", "example-instance-1") simple_message_loop(stream_client, ociStreamOcid, group_cursor)
-
A partir du répertoire
wd
, exécutez la commande suivante :python Consumer.py
-
Des messages semblables à celui qui suit s'affichent :
Starting a simple message loop with a group cursor Creating a cursor for group example-group, instance example-instance-1 Read 2 messages Null: Example Test Message 0 Null: Example Test Message 0 Read 2 messages Null: Example Test Message 0 Null: Example Test Message 0 Read 1 messages Null: Example Test Message 0 Read 10 messages key 0: value 0 key 1: value 1
Remarque
Si vous avez utilisé la console pour produire un message de test, la clé de chaque message estNull
.
Etapes suivantes
Pour plus d'informations, reportez-vous aux ressources suivantes :