Inicio rápido del cliente Python de Kafka y Streaming
Publique y consuma mensajes en el servicio Streaming con el cliente Python de Kafka.
En este inicio rápido se muestra cómo utilizar el cliente Python de Kafka con Oracle Cloud Infrastructure Streaming para publicar y consumir mensajes.
Para obtener más información, consulte Uso de Streaming con Apache Kafka. Para obtener conceptos clave y más información sobre Streaming, consulte Visión general de Streaming.
Requisitos
-
Para utilizar el cliente Python de Kafka con Streaming, debe tener lo siguiente:
- Una cuenta de Oracle Cloud Infrastructure.
- Un usuario creado en esa cuenta, en un grupo con una política que otorgue los permisos necesarios. Para obtener un ejemplo de cómo configurar un nuevo usuario, grupo, compartimento y política, consulte Adición de usuarios. Para obtener una lista de las políticas típicas que puede que desee utilizar, consulte Políticas Comunes.
-
Recopile los siguientes detalles:
- OCID de flujo
- Punto final de mensajes
- OCID de pool de flujos
- FQDN de pool de flujos
- Configuración de conexión de Kafka:
- Servidores de inicialización de datos
- Cadenas de conexión de SASL
- Protocolo de seguridad
Para conocer los pasos para crear y gestionar flujos y pools de flujos, consulte Gestión de flujos y Gestión de pools de flujos. Los flujos corresponden a un tema de Kafka.
- Python 3.6 o posterior, con PIP instalado y actualizado.
- Visual Code Studio (recomendado) o cualquier otro entorno de desarrollo integrado (IDE).
-
Instale los paquetes
Confluent-Kafka
para Python mediante el siguiente comando:pip install confluent-kafka
Nota
Puede instalar estos paquetes de forma global o dentro de virtualenv. El paquetelibrdkafka
lo utiliza el paqueteconfluent-kafka
y se embebe en wheels para la versión más reciente deconfluent-kafka
. Para obtener más información, consulte la documentación del cliente Python de Confluent. -
Instale los certificados raíz de CA SSL en el host en el que está desarrollando y ejecutando este inicio rápido. El cliente utiliza certificados de CA para verificar el certificado del broker.
Para Windows, descargue el archivo
cacert.pem
que se distribuye con curl (descargue cacert.pm). Para otras plataformas, consulte Configure SSL trust store. -
La autenticación con el protocolo de Kafka utiliza tokens de autenticación y el mecanismo SASL/PLAIN. Consulte Trabajar con tokens de autenticación para la generación de tokens de autenticación. Si ha creado el flujo y el pool de flujos en OCI, ya está autorizado a utilizar este flujo según OCI IAM, por lo que debe crear tokens de autenticación para el usuario de OCI.
Nota
Los tokens de autenticación de usuario de OCI solo son visibles en el momento de la creación. Cópielo y guárdelo en un lugar seguro para su uso en el futuro.
Producción de mensajes
- Abra su editor favorito, como Visual Studio Code, desde el directorio de trabajo vacío
wd
. Ya debe tener los paquetesconfluent-kafka
para Python instalados para su entorno de Python actual después de cumplir los requisitos. -
Cree un archivo denominado
Producer.py
en el directoriowd
con el siguiente código. Sustituya los valores de configuración en la asignaciónconf
y el nombre del tema es el nombre del flujo que ha creado.from confluent_kafka import Producer, KafkaError if __name__ == '__main__': topic = "<topic_stream_name>" conf = { 'bootstrap.servers': "<bootstrap_servers_endpoint>", #usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092 'security.protocol': 'SASL_SSL', 'ssl.ca.location': '</path/on/your/host/to/your/cert.pem/>', # from step 6 of Prerequisites section # optionally instead of giving path as shown above, you can do 1. pip install certifi 2. import certifi and # 3. 'ssl.ca.location': certifi.where() 'sasl.mechanism': 'PLAIN', 'sasl.username': '<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>', # from step 2 of Prerequisites section 'sasl.password': '<your_OCI_user_auth_token>', # from step 7 of Prerequisites section } # Create Producer instance producer = Producer(**conf) delivered_records = 0 # Optional per-message on_delivery handler (triggered by poll() or flush()) # when a message has been successfully delivered or permanently failed delivery after retries. def acked(err, msg): global delivered_records """Delivery report handler called on successful or failed delivery of message """ if err is not None: print("Failed to deliver message: {}".format(err)) else: delivered_records += 1 print("Produced record to topic {} partition [{}] @ offset {}".format(msg.topic(), msg.partition(), msg.offset())) for n in range(10): record_key = "messageKey" + str(n) record_value = "messageValue" + str(n) print("Producing record: {}\t{}".format(record_key, record_value)) producer.produce(topic, key=record_key, value=record_value, on_delivery=acked) # p.poll() serves delivery reports (on_delivery) from previous produce() calls. producer.poll(0) producer.flush() print("{} messages were produced to topic {}!".format(delivered_records, topic))
-
Desde el directorio
wd
, ejecute el siguiente comando:python Producer.py
- Mostrar los últimos mensajes enviados al flujo para ver los últimos mensajes enviados al flujo para verificar que la producción se ha realizado correctamente.
Consumo de mensajes
- En primer lugar, asegúrese de que el flujo del que desea consumir mensajes contiene mensajes. Puede utilizar la consola para producir un mensaje de prueba o utilizar el flujo y los mensajes que hemos creado en este inicio rápido.
- Abra su editor favorito, como Visual Studio Code, desde el directorio de trabajo vacío
wd
. Ya debe tener los paquetesconfluent-kafka
para Python instalados para su entorno de Python actual después de cumplir los requisitos. -
Cree un archivo denominado
Consumer.py
en el directoriowd
con el siguiente código. Sustituya los valores de configuración en la asignaciónconf
y el nombre del tema es el nombre del flujo que ha creado.from confluent_kafka import Consumer if __name__ == '__main__': topic = "<topic_stream_name>" conf = { 'bootstrap.servers': "<bootstrap_servers_endpoint>", #usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092 'security.protocol': 'SASL_SSL', 'ssl.ca.location': '</path/on/your/host/to/your/cert.pem/>', # from step 6 of Prerequisites section # optionally instead of giving path as shown above, you can do 1. pip install certifi 2. import certifi and # 3. 'ssl.ca.location': certifi.where() 'sasl.mechanism': 'PLAIN', 'sasl.username': '<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>', # from step 2 of Prerequisites section 'sasl.password': '<your_OCI_user_auth_token>', # from step 7 of Prerequisites section } # Create Consumer instance consumer = Consumer(conf) # Subscribe to topic consumer.subscribe([topic]) # Process messages try: while True: msg = consumer.poll(1.0) if msg is None: # No message available within timeout. # Initial message consumption may take up to # `session.timeout.ms` for the consumer group to # rebalance and start consuming print("Waiting for message or event/error in poll()") continue elif msg.error(): print('error: {}'.format(msg.error())) else: # Check for Kafka message record_key = "Null" if msg.key() is None else msg.key().decode('utf-8') record_value = msg.value().decode('utf-8') print("Consumed record with key "+ record_key + " and value " + record_value) except KeyboardInterrupt: pass finally: print("Leave group and commit final offsets") consumer.close()
-
Desde el directorio
wd
, ejecute el siguiente comando:python Consumer.py
-
Se mostrarán mensajes similares a los siguientes:
Waiting for message or event/error in poll() Waiting for message or event/error in poll() Consumed record with key messageKey0 and value messageValue0 Consumed record with key messageKey1 and value messageValue1 Consumed record with key Null and value Example test message
Nota
Si ha utilizado la consola para producir un mensaje de prueba, la clave de cada mensaje esNull
Pasos siguientes
Consulte los siguientes recursos para obtener más información: