Inicio rápido del cliente Python de Kafka y Streaming

Publique y consuma mensajes en el servicio Streaming con el cliente Python de Kafka.

En este inicio rápido se muestra cómo utilizar el cliente Python de Kafka con Oracle Cloud Infrastructure Streaming para publicar y consumir mensajes.

Para obtener más información, consulte Uso de Streaming con Apache Kafka. Para obtener conceptos clave y más información sobre Streaming, consulte Visión general de Streaming.

Requisitos

  1. Para utilizar el cliente Python de Kafka con Streaming, debe tener lo siguiente:

    • Una cuenta de Oracle Cloud Infrastructure.
    • Un usuario creado en esa cuenta, en un grupo con una política que otorgue los permisos necesarios. Para obtener un ejemplo de cómo configurar un nuevo usuario, grupo, compartimento y política, consulte Adición de usuarios. Para obtener una lista de las políticas típicas que puede que desee utilizar, consulte Políticas Comunes.
  2. Recopile los siguientes detalles:

    • OCID de flujo
    • Punto final de mensajes
    • OCID de pool de flujos
    • FQDN de pool de flujos
    • Configuración de conexión de Kafka:
      • Servidores de inicialización de datos
      • Cadenas de conexión de SASL
      • Protocolo de seguridad

    Para conocer los pasos para crear y gestionar flujos y pools de flujos, consulte Gestión de flujos y Gestión de pools de flujos. Los flujos corresponden a un tema de Kafka.

  3. Python 3.6 o posterior, con PIP instalado y actualizado.
  4. Visual Code Studio (recomendado) o cualquier otro entorno de desarrollo integrado (IDE).
  5. Instale los paquetes Confluent-Kafka para Python mediante el siguiente comando:

    pip install confluent-kafka
    Nota

    Puede instalar estos paquetes de forma global o dentro de virtualenv. El paquete librdkafka lo utiliza el paquete confluent-kafka y se embebe en wheels para la versión más reciente de confluent-kafka. Para obtener más información, consulte la documentación del cliente Python de Confluent.
  6. Instale los certificados raíz de CA SSL en el host en el que está desarrollando y ejecutando este inicio rápido. El cliente utiliza certificados de CA para verificar el certificado del broker.

    Para Windows, descargue el archivo cacert.pem que se distribuye con curl (descargue cacert.pm). Para otras plataformas, consulte Configure SSL trust store.

  7. La autenticación con el protocolo de Kafka utiliza tokens de autenticación y el mecanismo SASL/PLAIN. Consulte Trabajar con tokens de autenticación para la generación de tokens de autenticación. Si ha creado el flujo y el pool de flujos en OCI, ya está autorizado a utilizar este flujo según OCI IAM, por lo que debe crear tokens de autenticación para el usuario de OCI.

    Nota

    Los tokens de autenticación de usuario de OCI solo son visibles en el momento de la creación. Cópielo y guárdelo en un lugar seguro para su uso en el futuro.

Producción de mensajes

  1. Abra su editor favorito, como Visual Studio Code, desde el directorio de trabajo vacío wd. Ya debe tener los paquetes confluent-kafka para Python instalados para su entorno de Python actual después de cumplir los requisitos.
  2. Cree un archivo denominado Producer.py en el directorio wd con el siguiente código. Sustituya los valores de configuración en la asignación conf y el nombre del tema es el nombre del flujo que ha creado.

    from confluent_kafka import Producer, KafkaError  
      
    if __name__ == '__main__':  
      
      topic = "<topic_stream_name>"  
      conf = {  
        'bootstrap.servers': "<bootstrap_servers_endpoint>", #usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092  
        'security.protocol': 'SASL_SSL',  
      
        'ssl.ca.location': '</path/on/your/host/to/your/cert.pem/>',  # from step 6 of Prerequisites section
         # optionally instead of giving path as shown above, you can do 1. pip install certifi 2. import certifi and
         # 3. 'ssl.ca.location': certifi.where()
      
        'sasl.mechanism': 'PLAIN',
        'sasl.username': '<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>',  # from step 2 of Prerequisites section
        'sasl.password': '<your_OCI_user_auth_token>',  # from step 7 of Prerequisites section
       }  
      
       # Create Producer instance  
       producer = Producer(**conf)  
       delivered_records = 0  
      
       # Optional per-message on_delivery handler (triggered by poll() or flush())  
       # when a message has been successfully delivered or permanently failed delivery after retries.  
       def acked(err, msg):  
           global delivered_records  
           """Delivery report handler called on  
               successful or failed delivery of message """  
           if err is not None:  
               print("Failed to deliver message: {}".format(err))  
           else:  
               delivered_records += 1  
               print("Produced record to topic {} partition [{}] @ offset {}".format(msg.topic(), msg.partition(), msg.offset()))  
    
      for n in range(10):  
          record_key = "messageKey" + str(n)  
          record_value = "messageValue" + str(n)  
          print("Producing record: {}\t{}".format(record_key, record_value))  
          producer.produce(topic, key=record_key, value=record_value, on_delivery=acked)  
          # p.poll() serves delivery reports (on_delivery) from previous produce() calls.  
          producer.poll(0)  
    
      producer.flush()  
      print("{} messages were produced to topic {}!".format(delivered_records, topic))
  3. Desde el directorio wd, ejecute el siguiente comando:

    python Producer.py
  4. Mostrar los últimos mensajes enviados al flujo para ver los últimos mensajes enviados al flujo para verificar que la producción se ha realizado correctamente.

Consumo de mensajes

  1. En primer lugar, asegúrese de que el flujo del que desea consumir mensajes contiene mensajes. Puede utilizar la consola para producir un mensaje de prueba o utilizar el flujo y los mensajes que hemos creado en este inicio rápido.
  2. Abra su editor favorito, como Visual Studio Code, desde el directorio de trabajo vacío wd. Ya debe tener los paquetes confluent-kafka para Python instalados para su entorno de Python actual después de cumplir los requisitos.
  3. Cree un archivo denominado Consumer.py en el directorio wd con el siguiente código. Sustituya los valores de configuración en la asignación conf y el nombre del tema es el nombre del flujo que ha creado.

    from confluent_kafka import Consumer
    
    
    if __name__ == '__main__':
    
      topic = "<topic_stream_name>"  
      conf = {  
        'bootstrap.servers': "<bootstrap_servers_endpoint>", #usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092  
        'security.protocol': 'SASL_SSL',  
      
        'ssl.ca.location': '</path/on/your/host/to/your/cert.pem/>',  # from step 6 of Prerequisites section
         # optionally instead of giving path as shown above, you can do 1. pip install certifi 2. import certifi and
         # 3. 'ssl.ca.location': certifi.where()
      
        'sasl.mechanism': 'PLAIN',
        'sasl.username': '<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>',  # from step 2 of Prerequisites section
        'sasl.password': '<your_OCI_user_auth_token>',  # from step 7 of Prerequisites section
       }  
    
        # Create Consumer instance
        consumer = Consumer(conf)
    
        # Subscribe to topic
        consumer.subscribe([topic])
    
        # Process messages
        try:
            while True:
                msg = consumer.poll(1.0)
                if msg is None:
                    # No message available within timeout.
                    # Initial message consumption may take up to
                    # `session.timeout.ms` for the consumer group to
                    # rebalance and start consuming
                    print("Waiting for message or event/error in poll()")
                    continue
                elif msg.error():
                    print('error: {}'.format(msg.error()))
                else:
                    # Check for Kafka message
                    record_key = "Null" if msg.key() is None else msg.key().decode('utf-8')
                    record_value = msg.value().decode('utf-8')
                    print("Consumed record with key "+ record_key + " and value " + record_value)
        except KeyboardInterrupt:
            pass
        finally:
            print("Leave group and commit final offsets")
            consumer.close()
  4. Desde el directorio wd, ejecute el siguiente comando:

    python Consumer.py
  5. Se mostrarán mensajes similares a los siguientes:

    Waiting for message or event/error in poll()
    Waiting for message or event/error in poll()
    Consumed record with key messageKey0 and value messageValue0
    Consumed record with key messageKey1 and value messageValue1
    Consumed record with key Null and value Example test message
    Nota

    Si ha utilizado la consola para producir un mensaje de prueba, la clave de cada mensaje es Null