Démarrage rapide du kit SDK pour Python avec Streaming

Publiez et utilisez des messages dans le service Streaming à l'aide du kit SDK OCI pour Python.

Ce démarrage rapide vous explique comment utiliser le kit SDK Oracle Cloud Infrastructure (OCI) pour Python et Oracle Cloud Infrastructure Streaming afin de publier et d'utiliser des messages.

Pour plus d'informations sur les concepts clés et Streaming, reportez-vous à Présentation de Streaming. Pour plus d'informations sur l'utilisation des kits SDK OCI, reportez-vous aux guides SDK.

Prérequis

  1. Afin d'utiliser le kit SDK pour Python, vous devez disposer des éléments suivants :

    • Un compte Oracle Cloud Infrastructure.
    • Un utilisateur créé dans ce compte, dans un groupe avec une stratégie qui octroie les droits d'accès requis. Cet utilisateur peut être vous-même, ou une autre personne/un autre système devant appeler l'API. Pour obtenir un exemple de configuration d'un nouvel utilisateur, d'un nouveau groupe, d'un nouveau compartiment et d'une nouvelle stratégie, reportez-vous à Ajout d'utilisateurs. Pour obtenir la liste des stratégies standard que vous pouvez utiliser, reportez-vous à Stratégies courantes.
    • Une paire de clés utilisée lors de la signature des demandes d'API, avec la clé publique téléchargée vers Oracle. Seul l'utilisateur appelant l'API doit disposer de la clé privée.
    Remarque

    Pour plus d'informations, reportez-vous à Configuration du kit SDK.
  2. Collectez l'adresse des messages et l'OCID d'un flux de données. Pour connaître les étapes d'obtention des détails d'un flux de données, reportez-vous à Obtention des détails d'un flux de données. Dans le cadre de ce démarrage rapide, le flux de données doit utiliser une adresse publique et laisser Oracle gérer le cryptage. Si vous n'avez pas de flux existant, reportez-vous à Création d'un flux de données et à Création d'un pool de flux de données.
  3. Python version 3.6 ou ultérieure, avec PIP installé et mis à jour.
  4. Visual Code Studio (recommandé) ou tout autre environnement de développement intégré (IDE).
  5. Installez des packages oci-sdk pour Python à l'aide de la commande suivante :

    pip install oci
    Remarque

    Nous vous recommandons d'utiliser un environnement virtuel Python lors de l'installation d'oci. Pour plus d'informations, reportez-vous à Téléchargement et installation du kit SDK.
  6. Assurez-vous que vous disposez d'un fichier de configuration de kit SDK valide. Pour les environnements de production, vous devez utiliser l'autorisation de principal d'instance.

Production de messages

  1. Ouvrez votre éditeur favori, tel que Visual Studio Code, à partir du répertoire wd. Vous devez déjà disposer de packages oci-sdk pour Python installés pour votre environnement Python actuel si les prérequis ont été respectés.
  2. Créez un fichier nommé Producer.py dans le répertoire wd avec le code suivant. Remplacez les valeurs des variables ociConfigFilePath, ociProfileName, ociStreamOcid et ociMessageEndpoint du fragment de code suivant par les valeurs applicables à votre location.

    import oci  
      
    from base64 import b64encode  
      
    ociMessageEndpoint = "<stream_message_endpoint>"  
    ociStreamOcid = "<stream_OCID>"  
    ociConfigFilePath = "<config_file_path>"  
    ociProfileName = "<config_file_profile_name>"  
      
    def produce_messages(client, stream_id):
      # Build up a PutMessagesDetails and publish some messages to the stream
      message_list = []
      for i in range(100):
          key = "messageKey" + str(i)
          value = "messageValue " + str(i)
          encoded_key = b64encode(key.encode()).decode()
          encoded_value = b64encode(value.encode()).decode()
          message_list.append(oci.streaming.models.PutMessagesDetailsEntry(key=encoded_key, value=encoded_value))  
      
      print("Publishing {} messages to the stream {} ".format(len(message_list), stream_id))
      messages = oci.streaming.models.PutMessagesDetails(messages=message_list)
      put_message_result = client.put_messages(stream_id, messages)
      
      # The put_message_result can contain some useful metadata for handling failures
      for entry in put_message_result.data.entries:
          if entry.error:
              print("Error ({}) : {}".format(entry.error, entry.error_message))
          else:
              print("Published message to partition {} , offset {}".format(entry.partition, entry.offset))
    
    config = oci.config.from_file(ociConfigFilePath, ociProfileName)
    stream_client = oci.streaming.StreamClient(config, service_endpoint=ociMessageEndpoint)
    
    # Publish some messages to the stream
    produce_messages(stream_client, ociStreamOcid)
  3. A partir du répertoire wd, exécutez la commande suivante :

    python Producer.py
  4. Afficher les derniers messages envoyés au flux de données : affichez les derniers messages envoyés au flux de données pour vérifier que la production a réussi.

Utilisation des messages

  1. Tout d'abord, assurez-vous que le flux de données dont vous souhaitez utiliser des messages en contient. Vous pouvez utiliser la console pour produire un message de test, ou vous servir du flux de données et des messages créés dans ce démarrage rapide.
  2. Ouvrez votre éditeur favori, tel que Visual Studio Code, à partir du répertoire wd. Vous devez déjà disposer de packages oci-sdk pour Python installés pour votre environnement Python actuel si vous avez vérifié les prérequis.
  3. Créez un fichier nommé Consumer.py dans le répertoire wd avec le code suivant. Remplacez les valeurs des variables ociConfigFilePath, ociProfileName, ociStreamOcid et ociMessageEndpoint du fragment de code suivant par les valeurs applicables à votre location.

    import oci
    import time
    
    from base64 import b64decode
    
    ociMessageEndpoint = "<stream_message_endpoint>"  
    ociStreamOcid = "<stream_OCID>"  
    ociConfigFilePath = "<config_file_path>"  
    ociProfileName = "<config_file_profile_name>"  
    
    
    def get_cursor_by_group(sc, sid, group_name, instance_name):
        print(" Creating a cursor for group {}, instance {}".format(group_name, instance_name))
        cursor_details = oci.streaming.models.CreateGroupCursorDetails(group_name=group_name, instance_name=instance_name,
                                                                       type=oci.streaming.models.
                                                                       CreateGroupCursorDetails.TYPE_TRIM_HORIZON,
                                                                       commit_on_get=True)
        response = sc.create_group_cursor(sid, cursor_details)
        return response.data.value
    
    def simple_message_loop(client, stream_id, initial_cursor):
        cursor = initial_cursor
        while True:
            get_response = client.get_messages(stream_id, cursor, limit=10)
            # No messages to process. return.
            if not get_response.data:
                return
    
            # Process the messages
            print(" Read {} messages".format(len(get_response.data)))
            for message in get_response.data:
                if message.key is None:
                    key = "Null"
                else:
                    key = b64decode(message.key.encode()).decode()
                print("{}: {}".format(key,
                                      b64decode(message.value.encode()).decode()))
    
            # get_messages is a throttled method; clients should retrieve sufficiently large message
            # batches, as to avoid too many http requests.
            time.sleep(1)
            # use the next-cursor for iteration
            cursor = get_response.headers["opc-next-cursor"]
    
    
    config = oci.config.from_file(ociConfigFilePath, ociProfileName)
    stream_client = oci.streaming.StreamClient(config, service_endpoint=ociMessageEndpoint)
    
    # A cursor can be created as part of a consumer group.
    # Committed offsets are managed for the group, and partitions
    # are dynamically balanced amongst consumers in the group.
    group_cursor = get_cursor_by_group(stream_client, ociStreamOcid, "example-group", "example-instance-1")
    simple_message_loop(stream_client, ociStreamOcid, group_cursor)
  4. A partir du répertoire wd, exécutez la commande suivante :

    python Consumer.py
  5. Des messages semblables à celui qui suit s'affichent :

    Starting a simple message loop with a group cursor
    Creating a cursor for group example-group, instance example-instance-1
     Read 2 messages
    Null: Example Test Message 0
    Null: Example Test Message 0
     Read 2 messages
    Null: Example Test Message 0
    Null: Example Test Message 0
     Read 1 messages
    Null: Example Test Message 0
     Read 10 messages
    key 0: value 0
    key 1: value 1
    Remarque

    Si vous avez utilisé la console pour produire un message de test, la clé de chaque message est Null.