Service de diffusion en continu et trousse SDK pour Python - Démarrage rapide

Publier et consommer des messages dans le service de diffusion en continu à l'aide de la trousse SDK OCI pour Python.

Ce démarrage rapide vous montre comment utiliser la trousse SDK pour Python pour Oracle Cloud Infrastructure (OCI) et le service de diffusion en continu pour Oracle Cloud Infrastructure pour publier et consommer des messages.

Pour les concepts clés et plus de détails sur le service de diffusion en continu, voir Aperçu du service de diffusion en continu. Pour plus d'informations sur l'utilisation des trousses SDK pour OCI, consultez les guides sur les SDK.

Préalables

  1. Pour utiliser la trousse SDK pour Python, vous devez disposer des éléments suivants :

    • Un compte Oracle Cloud Infrastructure
    • Utilisateur créé dans ce compte, dans un groupe avec une politique qui accorde les autorisations requises. Il peut s'agir d'un utilisateur pour vous-même, une autre personne ou un autre système qui doit appeler l'API. Pour des exemples sur la configuration d'un nouvel utilisateur, d'un groupe, d'un compartiment et d'une politique, voir Ajout d'utilisateurs. Pour obtenir la liste des politiques types que vous pouvez utiliser, voir Politiques communes.
    • Une paire de clés utilisée pour signer des demandes d'API, avec la clé publique chargée dans Oracle. Seul l'utilisateur appelant l'API doit disposer de la clé privée.
    Note

    Pour plus d'informations, voir Configuration de la trousse SDK.
  2. Collectez le point d'extrémité et l'OCID des messages d'un flux. Pour les étapes d'obtention des détails d'un flux, voir Obtention des détails d'un flux. Aux fins de ce démarrage rapide, le flux doit utiliser un point d'extrémité public et laisser Oracle gérer le chiffrement. Reportez-vous aux sections Création d'un flux et Création d'un groupe de flux si vous n'avez pas de flux existant.
  3. Python 3.6 ou version ultérieure, avec PIP installé et mis à jour.
  4. Visual Code Studio (recommandé) ou tout autre environnement de développement intégré (IDE).
  5. Installez les ensembles oci-sdk pour Python à l'aide de la commande suivante :

    pip install oci
    Note

    Nous vous recommandons d'utiliser un environnement virtuel Python lors de l'installation d'oci. Pour plus d'informations, voir Téléchargement et installation de la trousse SDK.
  6. Assurez-vous de disposer d'un fichier de configuration de trousse SDK valide. Pour les environnements de production, vous devez utiliser l'autorisation du principal d'instance.

Production de messages

  1. Ouvrez votre éditeur favori, tel que Visual Studio Code, à partir du répertoire wd. Une fois les préalables satisfaits, les ensembles oci-sdk doivent déjà être installés pour Python pour votre environnement Python courant.
  2. Créez un fichier nommé Producer.py dans le répertoire wd avec le code suivant. Remplacez les valeurs des variables ociConfigFilePath, ociProfileName, ociStreamOcid et ociMessageEndpoint dans l'extrait de code suivant par les valeurs applicables à votre location.

    import oci  
      
    from base64 import b64encode  
      
    ociMessageEndpoint = "<stream_message_endpoint>"  
    ociStreamOcid = "<stream_OCID>"  
    ociConfigFilePath = "<config_file_path>"  
    ociProfileName = "<config_file_profile_name>"  
      
    def produce_messages(client, stream_id):
      # Build up a PutMessagesDetails and publish some messages to the stream
      message_list = []
      for i in range(100):
          key = "messageKey" + str(i)
          value = "messageValue " + str(i)
          encoded_key = b64encode(key.encode()).decode()
          encoded_value = b64encode(value.encode()).decode()
          message_list.append(oci.streaming.models.PutMessagesDetailsEntry(key=encoded_key, value=encoded_value))  
      
      print("Publishing {} messages to the stream {} ".format(len(message_list), stream_id))
      messages = oci.streaming.models.PutMessagesDetails(messages=message_list)
      put_message_result = client.put_messages(stream_id, messages)
      
      # The put_message_result can contain some useful metadata for handling failures
      for entry in put_message_result.data.entries:
          if entry.error:
              print("Error ({}) : {}".format(entry.error, entry.error_message))
          else:
              print("Published message to partition {} , offset {}".format(entry.partition, entry.offset))
    
    config = oci.config.from_file(ociConfigFilePath, ociProfileName)
    stream_client = oci.streaming.StreamClient(config, service_endpoint=ociMessageEndpoint)
    
    # Publish some messages to the stream
    produce_messages(stream_client, ociStreamOcid)
  3. Depuis le répertoire wd, exécutez la commande suivante :

    python Producer.py
  4. Afficher les derniers messages envoyés au flux pour voir les derniers messages envoyés au flux pour vérifier que la production a réussi.

Consommation de messages

  1. Assurez-vous tout d'abord que le flux à partir duquel vous voulez consommer des messages contient des messages. Vous pouvez utiliser la console pour produire un message de test ou utiliser le flux et les messages que nous avons créés dans ce démarrage rapide.
  2. Ouvrez votre éditeur favori, tel que Visual Studio Code, à partir du répertoire wd. Des ensembles oci-sdk doivent déjà être installés pour Python pour votre environnement Python courant après avoir vérifié que vous disposez des préalables.
  3. Créez un fichier nommé Consumer.py dans le répertoire wd avec le code suivant. Remplacez les valeurs des variables ociConfigFilePath, ociProfileName, ociStreamOcid et ociMessageEndpoint dans l'extrait de code suivant par les valeurs applicables à votre location.

    import oci
    import time
    
    from base64 import b64decode
    
    ociMessageEndpoint = "<stream_message_endpoint>"  
    ociStreamOcid = "<stream_OCID>"  
    ociConfigFilePath = "<config_file_path>"  
    ociProfileName = "<config_file_profile_name>"  
    
    
    def get_cursor_by_group(sc, sid, group_name, instance_name):
        print(" Creating a cursor for group {}, instance {}".format(group_name, instance_name))
        cursor_details = oci.streaming.models.CreateGroupCursorDetails(group_name=group_name, instance_name=instance_name,
                                                                       type=oci.streaming.models.
                                                                       CreateGroupCursorDetails.TYPE_TRIM_HORIZON,
                                                                       commit_on_get=True)
        response = sc.create_group_cursor(sid, cursor_details)
        return response.data.value
    
    def simple_message_loop(client, stream_id, initial_cursor):
        cursor = initial_cursor
        while True:
            get_response = client.get_messages(stream_id, cursor, limit=10)
            # No messages to process. return.
            if not get_response.data:
                return
    
            # Process the messages
            print(" Read {} messages".format(len(get_response.data)))
            for message in get_response.data:
                if message.key is None:
                    key = "Null"
                else:
                    key = b64decode(message.key.encode()).decode()
                print("{}: {}".format(key,
                                      b64decode(message.value.encode()).decode()))
    
            # get_messages is a throttled method; clients should retrieve sufficiently large message
            # batches, as to avoid too many http requests.
            time.sleep(1)
            # use the next-cursor for iteration
            cursor = get_response.headers["opc-next-cursor"]
    
    
    config = oci.config.from_file(ociConfigFilePath, ociProfileName)
    stream_client = oci.streaming.StreamClient(config, service_endpoint=ociMessageEndpoint)
    
    # A cursor can be created as part of a consumer group.
    # Committed offsets are managed for the group, and partitions
    # are dynamically balanced amongst consumers in the group.
    group_cursor = get_cursor_by_group(stream_client, ociStreamOcid, "example-group", "example-instance-1")
    simple_message_loop(stream_client, ociStreamOcid, group_cursor)
  4. Depuis le répertoire wd, exécutez la commande suivante :

    python Consumer.py
  5. Des messages similaires aux suivants doivent s'afficher :

    Starting a simple message loop with a group cursor
    Creating a cursor for group example-group, instance example-instance-1
     Read 2 messages
    Null: Example Test Message 0
    Null: Example Test Message 0
     Read 2 messages
    Null: Example Test Message 0
    Null: Example Test Message 0
     Read 1 messages
    Null: Example Test Message 0
     Read 10 messages
    key 0: value 0
    key 1: value 1
    Note

    Si vous avez utilisé la console pour produire un message de test, la clé de chaque message est Null.