SDK de inicio rápido de Streaming para Python
Publique y consuma mensajes en el servicio Streaming mediante el SDK de OCI para Python.
En este inicio rápido se muestra cómo utilizar el SDK de Oracle Cloud Infrastructure (OCI) para Python y Oracle Cloud Infrastructure Streaming para publicar y consumir mensajes.
Para obtener más información sobre conceptos clave y Streaming, consulte Visión general de Streaming. Para obtener más información sobre el uso de el SDK de OCI, consulte las Guías sobre el SDK.
Requisitos
-
Para utilizar el SDK para Python, debe tener lo siguiente:
- Una cuenta de Oracle Cloud Infrastructure.
- Un usuario creado en esa cuenta, en un grupo con una política que otorgue los permisos necesarios. Este usuario puede ser usted mismo u otra persona/sistema que necesite llamar a la API. Para obtener un ejemplo de cómo configurar un nuevo usuario, grupo, compartimento y política, consulte Adición de usuarios. Para obtener una lista de las políticas típicas que puede que desee utilizar, consulte Políticas Comunes.
- Par de claves utilizado para firmar solicitudes de API con la clave pública cargada en Oracle. Solo el usuario que llama a la API debe poseer la clave privada.
- Recopile el punto final de mensajes y el OCID de un flujo. Para obtener detalles sobre un flujo, consulte Obtención de detalles de un flujo. Para este inicio rápido, el flujo debe utilizar un punto final público y permitir que Oracle gestione el cifrado. Consulte Creación de un Flujo y Creación de un Pool de Flujos si no tiene un flujo existente.
- Python 3.6 o posterior, con PIP instalado y actualizado.
- Visual Code Studio (recomendado) o cualquier otro entorno de desarrollo integrado (IDE).
-
Instale los paquetes
oci-sdkpara Python mediante el siguiente comando:pip install ociNota
Se recomienda utilizar un entorno virtual de Python al instalaroci. Consulte la sección sobre descarga e instalación del SDK para obtener más información. - Asegúrese de que tiene un archivo de configuración de SDK válido. Para entornos de producción, debe utilizar la autorización de principal de instancia.
Producción de mensajes
- Abra su editor favorito, como Visual Studio Code, desde el directorio
wd. Ya debe tener los paquetesoci-sdkpara Python instalados para su entorno de Python actual después de cumplir los requisitos. -
Cree un archivo denominado
Producer.pyen el directoriowdcon el siguiente código. Sustituya los valores de las variablesociConfigFilePath,ociProfileName,ociStreamOcidyociMessageEndpointen el siguiente fragmento de código por los valores aplicables a su arrendamiento.import oci from base64 import b64encode ociMessageEndpoint = "<stream_message_endpoint>" ociStreamOcid = "<stream_OCID>" ociConfigFilePath = "<config_file_path>" ociProfileName = "<config_file_profile_name>" def produce_messages(client, stream_id): # Build up a PutMessagesDetails and publish some messages to the stream message_list = [] for i in range(100): key = "messageKey" + str(i) value = "messageValue " + str(i) encoded_key = b64encode(key.encode()).decode() encoded_value = b64encode(value.encode()).decode() message_list.append(oci.streaming.models.PutMessagesDetailsEntry(key=encoded_key, value=encoded_value)) print("Publishing {} messages to the stream {} ".format(len(message_list), stream_id)) messages = oci.streaming.models.PutMessagesDetails(messages=message_list) put_message_result = client.put_messages(stream_id, messages) # The put_message_result can contain some useful metadata for handling failures for entry in put_message_result.data.entries: if entry.error: print("Error ({}) : {}".format(entry.error, entry.error_message)) else: print("Published message to partition {} , offset {}".format(entry.partition, entry.offset)) config = oci.config.from_file(ociConfigFilePath, ociProfileName) stream_client = oci.streaming.StreamClient(config, service_endpoint=ociMessageEndpoint) # Publish some messages to the stream produce_messages(stream_client, ociStreamOcid) -
Desde el directorio
wd, ejecute el siguiente comando:python Producer.py - Mostrar los últimos mensajes enviados al flujo para ver los últimos mensajes enviados al flujo para verificar que la producción se ha realizado correctamente.
Consumo de mensajes
- En primer lugar, asegúrese de que el flujo del que desea consumir mensajes contiene mensajes. Puede utilizar la consola para producir un mensaje de prueba o utilizar el flujo y los mensajes que hemos creado en este inicio rápido.
- Abra su editor favorito, como Visual Studio Code, desde el directorio
wd. Ya debe tener los paquetesoci-sdkpara Python instalados para su entorno de Python actual tras asegurarse de cumplir los requisitos. -
Cree un archivo denominado
Consumer.pyen el directoriowdcon el siguiente código. Sustituya los valores de las variablesociConfigFilePath,ociProfileName,ociStreamOcidyociMessageEndpointen el siguiente fragmento de código por los valores aplicables a su arrendamiento.import oci import time from base64 import b64decode ociMessageEndpoint = "<stream_message_endpoint>" ociStreamOcid = "<stream_OCID>" ociConfigFilePath = "<config_file_path>" ociProfileName = "<config_file_profile_name>" def get_cursor_by_group(sc, sid, group_name, instance_name): print(" Creating a cursor for group {}, instance {}".format(group_name, instance_name)) cursor_details = oci.streaming.models.CreateGroupCursorDetails(group_name=group_name, instance_name=instance_name, type=oci.streaming.models. CreateGroupCursorDetails.TYPE_TRIM_HORIZON, commit_on_get=True) response = sc.create_group_cursor(sid, cursor_details) return response.data.value def simple_message_loop(client, stream_id, initial_cursor): cursor = initial_cursor while True: get_response = client.get_messages(stream_id, cursor, limit=10) # No messages to process. return. if not get_response.data: return # Process the messages print(" Read {} messages".format(len(get_response.data))) for message in get_response.data: if message.key is None: key = "Null" else: key = b64decode(message.key.encode()).decode() print("{}: {}".format(key, b64decode(message.value.encode()).decode())) # get_messages is a throttled method; clients should retrieve sufficiently large message # batches, as to avoid too many http requests. time.sleep(1) # use the next-cursor for iteration cursor = get_response.headers["opc-next-cursor"] config = oci.config.from_file(ociConfigFilePath, ociProfileName) stream_client = oci.streaming.StreamClient(config, service_endpoint=ociMessageEndpoint) # A cursor can be created as part of a consumer group. # Committed offsets are managed for the group, and partitions # are dynamically balanced amongst consumers in the group. group_cursor = get_cursor_by_group(stream_client, ociStreamOcid, "example-group", "example-instance-1") simple_message_loop(stream_client, ociStreamOcid, group_cursor) -
Desde el directorio
wd, ejecute el siguiente comando:python Consumer.py -
Se mostrarán mensajes similares a los siguientes:
Starting a simple message loop with a group cursor Creating a cursor for group example-group, instance example-instance-1 Read 2 messages Null: Example Test Message 0 Null: Example Test Message 0 Read 2 messages Null: Example Test Message 0 Null: Example Test Message 0 Read 1 messages Null: Example Test Message 0 Read 10 messages key 0: value 0 key 1: value 1Nota
Si ha utilizado la consola para producir un mensaje de prueba, la clave de cada mensaje esNull
Pasos siguientes
Consulte los siguientes recursos para obtener más información: