Avvio rapido SDK per lo streaming Python

Pubblica e utilizza i messaggi nel servizio di streaming utilizzando l'SDK OCI per Python.

Questo avvio rapido mostra come utilizzare Oracle Cloud Infrastructure (OCI) SDK for Python e Oracle Cloud Infrastructure Streaming per pubblicare e utilizzare i messaggi.

Per i concetti chiave e ulteriori dettagli sullo streaming, vedere Panoramica dello streaming. Per ulteriori informazioni sull'uso degli SDK OCI, consulta le guide SDK.

Prerequisiti

  1. Per utilizzare l'SDK per Python, è necessario disporre dei seguenti elementi:

    • Un account Oracle Cloud Infrastructure.
    • Utente creato in tale account, in un gruppo con un criterio che concede le autorizzazioni necessarie. Questo utente può essere te stesso o un'altra persona/sistema che deve chiamare l'API. Per un esempio su come impostare un nuovo utente, gruppo, compartimento e criterio, vedere Aggiunta di utenti. Per un elenco dei criteri tipici che si desidera utilizzare, vedere Criteri comuni.
    • Coppia di chiavi utilizzata per firmare le richieste API, con la chiave pubblica caricata in Oracle. Solo l'utente che chiama l'API deve possedere la chiave privata.
    Nota

    Per ulteriori informazioni, vedere Configurazione dell'SDK.
  2. Raccogliere l'endpoint e l'OCID dei messaggi di un flusso. Per i passi per ottenere i dettagli per un flusso, vedere Recupero dei dettagli per un flusso. Ai fini di questo avvio rapido, il flusso deve utilizzare un endpoint pubblico e consentire a Oracle di gestire la cifratura. Se non si dispone di un flusso esistente, vedere Creazione di un flusso e Creazione di un pool di flussi.
  3. Python 3.6 o versioni successive, con PIP installato e aggiornato.
  4. Visual Code Studio (consigliato) o qualsiasi altro ambiente di sviluppo integrato (IDE).
  5. Installare i pacchetti oci-sdk per Python utilizzando il comando seguente:

    pip install oci
    Nota

    Si consiglia di utilizzare un ambiente virtuale Python durante l'installazione di oci. Per ulteriori informazioni, vedere Download e installazione dell'SDK.
  6. Assicurarsi di disporre di un file di configurazione SDK valido. Per gli ambienti di produzione, è necessario utilizzare l'autorizzazione principal dell'istanza.

Produzione di messaggi

  1. Aprire l'editor preferito, ad esempio Visual Studio Code, dalla directory wd. È necessario che i pacchetti oci-sdk per Python siano già installati per l'ambiente Python corrente dopo aver soddisfatto i prerequisiti.
  2. Creare un file denominato Producer.py nella directory wd con il codice seguente. Sostituire i valori delle variabili ociConfigFilePath, ociProfileName,ociStreamOcid e ociMessageEndpoint nello snippet di codice seguente con i valori applicabili per la tenancy in uso.

    import oci  
      
    from base64 import b64encode  
      
    ociMessageEndpoint = "<stream_message_endpoint>"  
    ociStreamOcid = "<stream_OCID>"  
    ociConfigFilePath = "<config_file_path>"  
    ociProfileName = "<config_file_profile_name>"  
      
    def produce_messages(client, stream_id):
      # Build up a PutMessagesDetails and publish some messages to the stream
      message_list = []
      for i in range(100):
          key = "messageKey" + str(i)
          value = "messageValue " + str(i)
          encoded_key = b64encode(key.encode()).decode()
          encoded_value = b64encode(value.encode()).decode()
          message_list.append(oci.streaming.models.PutMessagesDetailsEntry(key=encoded_key, value=encoded_value))  
      
      print("Publishing {} messages to the stream {} ".format(len(message_list), stream_id))
      messages = oci.streaming.models.PutMessagesDetails(messages=message_list)
      put_message_result = client.put_messages(stream_id, messages)
      
      # The put_message_result can contain some useful metadata for handling failures
      for entry in put_message_result.data.entries:
          if entry.error:
              print("Error ({}) : {}".format(entry.error, entry.error_message))
          else:
              print("Published message to partition {} , offset {}".format(entry.partition, entry.offset))
    
    config = oci.config.from_file(ociConfigFilePath, ociProfileName)
    stream_client = oci.streaming.StreamClient(config, service_endpoint=ociMessageEndpoint)
    
    # Publish some messages to the stream
    produce_messages(stream_client, ociStreamOcid)
  3. Dalla directory wd, eseguire il comando seguente:

    python Producer.py
  4. Mostra i messaggi più recenti inviati al flusso per visualizzare i messaggi più recenti inviati al flusso per verificare che la produzione sia riuscita.

Messaggi di consumo

  1. In primo luogo, assicurarsi che il flusso da cui si desidera utilizzare i messaggi contenga messaggi. È possibile utilizzare la console per generare un messaggio di test oppure utilizzare il flusso e i messaggi creati in questo avvio rapido.
  2. Aprire l'editor preferito, ad esempio Visual Studio Code, dalla directory wd. È necessario che i pacchetti oci-sdk per Python siano già installati per l'ambiente Python corrente dopo aver verificato di disporre dei prerequisiti.
  3. Creare un file denominato Consumer.py nella directory wd con il codice seguente. Sostituire i valori delle variabili ociConfigFilePath, ociProfileName,ociStreamOcid e ociMessageEndpoint nello snippet di codice seguente con i valori applicabili per la tenancy in uso.

    import oci
    import time
    
    from base64 import b64decode
    
    ociMessageEndpoint = "<stream_message_endpoint>"  
    ociStreamOcid = "<stream_OCID>"  
    ociConfigFilePath = "<config_file_path>"  
    ociProfileName = "<config_file_profile_name>"  
    
    
    def get_cursor_by_group(sc, sid, group_name, instance_name):
        print(" Creating a cursor for group {}, instance {}".format(group_name, instance_name))
        cursor_details = oci.streaming.models.CreateGroupCursorDetails(group_name=group_name, instance_name=instance_name,
                                                                       type=oci.streaming.models.
                                                                       CreateGroupCursorDetails.TYPE_TRIM_HORIZON,
                                                                       commit_on_get=True)
        response = sc.create_group_cursor(sid, cursor_details)
        return response.data.value
    
    def simple_message_loop(client, stream_id, initial_cursor):
        cursor = initial_cursor
        while True:
            get_response = client.get_messages(stream_id, cursor, limit=10)
            # No messages to process. return.
            if not get_response.data:
                return
    
            # Process the messages
            print(" Read {} messages".format(len(get_response.data)))
            for message in get_response.data:
                if message.key is None:
                    key = "Null"
                else:
                    key = b64decode(message.key.encode()).decode()
                print("{}: {}".format(key,
                                      b64decode(message.value.encode()).decode()))
    
            # get_messages is a throttled method; clients should retrieve sufficiently large message
            # batches, as to avoid too many http requests.
            time.sleep(1)
            # use the next-cursor for iteration
            cursor = get_response.headers["opc-next-cursor"]
    
    
    config = oci.config.from_file(ociConfigFilePath, ociProfileName)
    stream_client = oci.streaming.StreamClient(config, service_endpoint=ociMessageEndpoint)
    
    # A cursor can be created as part of a consumer group.
    # Committed offsets are managed for the group, and partitions
    # are dynamically balanced amongst consumers in the group.
    group_cursor = get_cursor_by_group(stream_client, ociStreamOcid, "example-group", "example-instance-1")
    simple_message_loop(stream_client, ociStreamOcid, group_cursor)
  4. Dalla directory wd, eseguire il comando seguente:

    python Consumer.py
  5. Dovresti vedere messaggi simili ai seguenti:

    Starting a simple message loop with a group cursor
    Creating a cursor for group example-group, instance example-instance-1
     Read 2 messages
    Null: Example Test Message 0
    Null: Example Test Message 0
     Read 2 messages
    Null: Example Test Message 0
    Null: Example Test Message 0
     Read 1 messages
    Null: Example Test Message 0
     Read 10 messages
    key 0: value 0
    key 1: value 1
    Nota

    Se è stata utilizzata la console per generare un messaggio di test, la chiave per ogni messaggio è Null