SDK de inicio rápido de Streaming para Python
Publique y consuma mensajes en el servicio Streaming mediante el SDK de OCI para Python.
En este inicio rápido se muestra cómo utilizar el SDK de Oracle Cloud Infrastructure (OCI) para Python y Oracle Cloud Infrastructure Streaming para publicar y consumir mensajes.
Para conocer los conceptos clave y más detalles de Streaming, consulte Visión general de Streaming. Para obtener más información sobre el uso de los SDK de OCI, consulte las Guías sobre SDK.
Requisitos
-
Para utilizar el SDK para Python, debe tener lo siguiente:
- Una cuenta de Oracle Cloud Infrastructure.
- Un usuario creado en esa cuenta, en un grupo con una política que otorgue los permisos necesarios. Este usuario puede ser usted mismo u otra persona/sistema que necesite llamar a la API. Para obtener un ejemplo de cómo configurar un nuevo usuario, grupo, compartimento y política, consulte Adición de usuarios. Para obtener una lista de las políticas típicas que puede que desee utilizar, consulte Políticas Comunes.
- Par de claves utilizado para firmar solicitudes de API con la clave pública cargada en Oracle. Solo el usuario que llama a la API debe poseer la clave privada.
- Recopile el punto final de mensajes y el OCID de un flujo. Para obtener más información sobre un flujo, consulte Obtención de detalles de un flujo. Para este inicio rápido, el flujo debe utilizar un punto final público y permitir que Oracle gestione el cifrado. Consulte Creación de un flujo y Creación de un pool de flujos si no tiene un flujo existente.
- Python 3.6 o posterior, con PIP instalado y actualizado.
- Visual Code Studio (recomendado) o cualquier otro entorno de desarrollo integrado (IDE).
-
Instale los paquetes
oci-sdk
para Python mediante el siguiente comando:pip install oci
Nota
Se recomienda utilizar un entorno virtual de Python al instalaroci
. Consulte la sección sobre descarga e instalación del SDK para obtener más información. - Asegúrese de que tiene un archivo de configuración de SDK válido. Para entornos de producción, debe utilizar la autorización de principal de instancia.
Producción de mensajes
- Abra su editor favorito, como Visual Studio Code, desde el directorio
wd
. Ya debe tener los paquetesoci-sdk
para Python instalados para su entorno de Python actual después de cumplir los requisitos. -
Cree un archivo denominado
Producer.py
en el directoriowd
con el siguiente código. Sustituya los valores de las variablesociConfigFilePath
,ociProfileName
,ociStreamOcid
yociMessageEndpoint
en el siguiente fragmento de código por los valores aplicables a su arrendamiento.import oci from base64 import b64encode ociMessageEndpoint = "<stream_message_endpoint>" ociStreamOcid = "<stream_OCID>" ociConfigFilePath = "<config_file_path>" ociProfileName = "<config_file_profile_name>" def produce_messages(client, stream_id): # Build up a PutMessagesDetails and publish some messages to the stream message_list = [] for i in range(100): key = "messageKey" + str(i) value = "messageValue " + str(i) encoded_key = b64encode(key.encode()).decode() encoded_value = b64encode(value.encode()).decode() message_list.append(oci.streaming.models.PutMessagesDetailsEntry(key=encoded_key, value=encoded_value)) print("Publishing {} messages to the stream {} ".format(len(message_list), stream_id)) messages = oci.streaming.models.PutMessagesDetails(messages=message_list) put_message_result = client.put_messages(stream_id, messages) # The put_message_result can contain some useful metadata for handling failures for entry in put_message_result.data.entries: if entry.error: print("Error ({}) : {}".format(entry.error, entry.error_message)) else: print("Published message to partition {} , offset {}".format(entry.partition, entry.offset)) config = oci.config.from_file(ociConfigFilePath, ociProfileName) stream_client = oci.streaming.StreamClient(config, service_endpoint=ociMessageEndpoint) # Publish some messages to the stream produce_messages(stream_client, ociStreamOcid)
-
Desde el directorio
wd
, ejecute el siguiente comando:python Producer.py
- Mostrar los últimos mensajes enviados al flujo para ver los últimos mensajes enviados al flujo para verificar que la producción se ha realizado correctamente.
Consumo de mensajes
- En primer lugar, asegúrese de que el flujo del que desea consumir mensajes contiene mensajes. Puede utilizar la consola para producir un mensaje de prueba o utilizar el flujo y los mensajes que hemos creado en este inicio rápido.
- Abra su editor favorito, como Visual Studio Code, desde el directorio
wd
. Ya debe tener los paquetesoci-sdk
para Python instalados para su entorno de Python actual tras asegurarse de cumplir los requisitos. -
Cree un archivo denominado
Consumer.py
en el directoriowd
con el siguiente código. Sustituya los valores de las variablesociConfigFilePath
,ociProfileName
,ociStreamOcid
yociMessageEndpoint
en el siguiente fragmento de código por los valores aplicables a su arrendamiento.import oci import time from base64 import b64decode ociMessageEndpoint = "<stream_message_endpoint>" ociStreamOcid = "<stream_OCID>" ociConfigFilePath = "<config_file_path>" ociProfileName = "<config_file_profile_name>" def get_cursor_by_group(sc, sid, group_name, instance_name): print(" Creating a cursor for group {}, instance {}".format(group_name, instance_name)) cursor_details = oci.streaming.models.CreateGroupCursorDetails(group_name=group_name, instance_name=instance_name, type=oci.streaming.models. CreateGroupCursorDetails.TYPE_TRIM_HORIZON, commit_on_get=True) response = sc.create_group_cursor(sid, cursor_details) return response.data.value def simple_message_loop(client, stream_id, initial_cursor): cursor = initial_cursor while True: get_response = client.get_messages(stream_id, cursor, limit=10) # No messages to process. return. if not get_response.data: return # Process the messages print(" Read {} messages".format(len(get_response.data))) for message in get_response.data: if message.key is None: key = "Null" else: key = b64decode(message.key.encode()).decode() print("{}: {}".format(key, b64decode(message.value.encode()).decode())) # get_messages is a throttled method; clients should retrieve sufficiently large message # batches, as to avoid too many http requests. time.sleep(1) # use the next-cursor for iteration cursor = get_response.headers["opc-next-cursor"] config = oci.config.from_file(ociConfigFilePath, ociProfileName) stream_client = oci.streaming.StreamClient(config, service_endpoint=ociMessageEndpoint) # A cursor can be created as part of a consumer group. # Committed offsets are managed for the group, and partitions # are dynamically balanced amongst consumers in the group. group_cursor = get_cursor_by_group(stream_client, ociStreamOcid, "example-group", "example-instance-1") simple_message_loop(stream_client, ociStreamOcid, group_cursor)
-
Desde el directorio
wd
, ejecute el siguiente comando:python Consumer.py
-
Se mostrarán mensajes similares a los siguientes:
Starting a simple message loop with a group cursor Creating a cursor for group example-group, instance example-instance-1 Read 2 messages Null: Example Test Message 0 Null: Example Test Message 0 Read 2 messages Null: Example Test Message 0 Null: Example Test Message 0 Read 1 messages Null: Example Test Message 0 Read 10 messages key 0: value 0 key 1: value 1
Nota
Si ha utilizado la consola para producir un mensaje de prueba, la clave de cada mensaje esNull
Pasos siguientes
Consulte los siguientes recursos para obtener más información: