Avvio rapido SDK per lo streaming Python
Pubblica e utilizza i messaggi nel servizio di streaming utilizzando l'SDK OCI per Python.
Questo avvio rapido mostra come utilizzare Oracle Cloud Infrastructure (OCI) SDK for Python e Oracle Cloud Infrastructure Streaming per pubblicare e utilizzare i messaggi.
Per i concetti chiave e ulteriori dettagli sullo streaming, vedere Panoramica dello streaming. Per ulteriori informazioni sull'uso degli SDK OCI, consulta le guide SDK.
Prerequisiti
-
Per utilizzare l'SDK per Python, è necessario disporre dei seguenti elementi:
- Un account Oracle Cloud Infrastructure.
- Utente creato in tale account, in un gruppo con un criterio che concede le autorizzazioni necessarie. Questo utente può essere te stesso o un'altra persona/sistema che deve chiamare l'API. Per un esempio su come impostare un nuovo utente, gruppo, compartimento e criterio, vedere Aggiunta di utenti. Per un elenco dei criteri tipici che si desidera utilizzare, vedere Criteri comuni.
- Coppia di chiavi utilizzata per firmare le richieste API, con la chiave pubblica caricata in Oracle. Solo l'utente che chiama l'API deve possedere la chiave privata.
- Raccogliere l'endpoint e l'OCID dei messaggi di un flusso. Per i passi per ottenere i dettagli per un flusso, vedere Recupero dei dettagli per un flusso. Ai fini di questo avvio rapido, il flusso deve utilizzare un endpoint pubblico e consentire a Oracle di gestire la cifratura. Se non si dispone di un flusso esistente, vedere Creazione di un flusso e Creazione di un pool di flussi.
- Python 3.6 o versioni successive, con PIP installato e aggiornato.
- Visual Code Studio (consigliato) o qualsiasi altro ambiente di sviluppo integrato (IDE).
-
Installare i pacchetti
oci-sdk
per Python utilizzando il comando seguente:pip install oci
Nota
Si consiglia di utilizzare un ambiente virtuale Python durante l'installazione dioci
. Per ulteriori informazioni, vedere Download e installazione dell'SDK. - Assicurarsi di disporre di un file di configurazione SDK valido. Per gli ambienti di produzione, è necessario utilizzare l'autorizzazione principal dell'istanza.
Produzione di messaggi
- Aprire l'editor preferito, ad esempio Visual Studio Code, dalla directory
wd
. È necessario che i pacchettioci-sdk
per Python siano già installati per l'ambiente Python corrente dopo aver soddisfatto i prerequisiti. -
Creare un file denominato
Producer.py
nella directorywd
con il codice seguente. Sostituire i valori delle variabiliociConfigFilePath
,ociProfileName
,ociStreamOcid
eociMessageEndpoint
nello snippet di codice seguente con i valori applicabili per la tenancy in uso.import oci from base64 import b64encode ociMessageEndpoint = "<stream_message_endpoint>" ociStreamOcid = "<stream_OCID>" ociConfigFilePath = "<config_file_path>" ociProfileName = "<config_file_profile_name>" def produce_messages(client, stream_id): # Build up a PutMessagesDetails and publish some messages to the stream message_list = [] for i in range(100): key = "messageKey" + str(i) value = "messageValue " + str(i) encoded_key = b64encode(key.encode()).decode() encoded_value = b64encode(value.encode()).decode() message_list.append(oci.streaming.models.PutMessagesDetailsEntry(key=encoded_key, value=encoded_value)) print("Publishing {} messages to the stream {} ".format(len(message_list), stream_id)) messages = oci.streaming.models.PutMessagesDetails(messages=message_list) put_message_result = client.put_messages(stream_id, messages) # The put_message_result can contain some useful metadata for handling failures for entry in put_message_result.data.entries: if entry.error: print("Error ({}) : {}".format(entry.error, entry.error_message)) else: print("Published message to partition {} , offset {}".format(entry.partition, entry.offset)) config = oci.config.from_file(ociConfigFilePath, ociProfileName) stream_client = oci.streaming.StreamClient(config, service_endpoint=ociMessageEndpoint) # Publish some messages to the stream produce_messages(stream_client, ociStreamOcid)
-
Dalla directory
wd
, eseguire il comando seguente:python Producer.py
- Mostra i messaggi più recenti inviati al flusso per visualizzare i messaggi più recenti inviati al flusso per verificare che la produzione sia riuscita.
Messaggi di consumo
- In primo luogo, assicurarsi che il flusso da cui si desidera utilizzare i messaggi contenga messaggi. È possibile utilizzare la console per generare un messaggio di test oppure utilizzare il flusso e i messaggi creati in questo avvio rapido.
- Aprire l'editor preferito, ad esempio Visual Studio Code, dalla directory
wd
. È necessario che i pacchettioci-sdk
per Python siano già installati per l'ambiente Python corrente dopo aver verificato di disporre dei prerequisiti. -
Creare un file denominato
Consumer.py
nella directorywd
con il codice seguente. Sostituire i valori delle variabiliociConfigFilePath
,ociProfileName
,ociStreamOcid
eociMessageEndpoint
nello snippet di codice seguente con i valori applicabili per la tenancy in uso.import oci import time from base64 import b64decode ociMessageEndpoint = "<stream_message_endpoint>" ociStreamOcid = "<stream_OCID>" ociConfigFilePath = "<config_file_path>" ociProfileName = "<config_file_profile_name>" def get_cursor_by_group(sc, sid, group_name, instance_name): print(" Creating a cursor for group {}, instance {}".format(group_name, instance_name)) cursor_details = oci.streaming.models.CreateGroupCursorDetails(group_name=group_name, instance_name=instance_name, type=oci.streaming.models. CreateGroupCursorDetails.TYPE_TRIM_HORIZON, commit_on_get=True) response = sc.create_group_cursor(sid, cursor_details) return response.data.value def simple_message_loop(client, stream_id, initial_cursor): cursor = initial_cursor while True: get_response = client.get_messages(stream_id, cursor, limit=10) # No messages to process. return. if not get_response.data: return # Process the messages print(" Read {} messages".format(len(get_response.data))) for message in get_response.data: if message.key is None: key = "Null" else: key = b64decode(message.key.encode()).decode() print("{}: {}".format(key, b64decode(message.value.encode()).decode())) # get_messages is a throttled method; clients should retrieve sufficiently large message # batches, as to avoid too many http requests. time.sleep(1) # use the next-cursor for iteration cursor = get_response.headers["opc-next-cursor"] config = oci.config.from_file(ociConfigFilePath, ociProfileName) stream_client = oci.streaming.StreamClient(config, service_endpoint=ociMessageEndpoint) # A cursor can be created as part of a consumer group. # Committed offsets are managed for the group, and partitions # are dynamically balanced amongst consumers in the group. group_cursor = get_cursor_by_group(stream_client, ociStreamOcid, "example-group", "example-instance-1") simple_message_loop(stream_client, ociStreamOcid, group_cursor)
-
Dalla directory
wd
, eseguire il comando seguente:python Consumer.py
-
Dovresti vedere messaggi simili ai seguenti:
Starting a simple message loop with a group cursor Creating a cursor for group example-group, instance example-instance-1 Read 2 messages Null: Example Test Message 0 Null: Example Test Message 0 Read 2 messages Null: Example Test Message 0 Null: Example Test Message 0 Read 1 messages Null: Example Test Message 0 Read 10 messages key 0: value 0 key 1: value 1
Nota
Se è stata utilizzata la console per generare un messaggio di test, la chiave per ogni messaggio èNull
Passo successivo
Per ulteriori informazioni, consultare le risorse elencate di seguito.