Quickstart client Kafka Python e streaming

Pubblica e utilizza i messaggi nel servizio di streaming utilizzando il client Python Kafka.

Questo avvio rapido mostra come utilizzare il client Python Kafka con Oracle Cloud Infrastructure Streaming per pubblicare e utilizzare i messaggi.

Per ulteriori informazioni, vedere Utilizzo dello streaming con Apache Kafka. Per i concetti chiave e ulteriori dettagli sullo streaming, vedere Panoramica dello streaming

Prerequisiti

  1. Per utilizzare il client Python Kafka con Streaming, è necessario disporre dei seguenti elementi:

    • Un account Oracle Cloud Infrastructure.
    • Utente creato in tale account, in un gruppo con un criterio che concede le autorizzazioni necessarie. Per un esempio su come impostare un nuovo utente, gruppo, compartimento e criterio, vedere Aggiunta di utenti. Per un elenco dei criteri tipici che si desidera utilizzare, vedere Criteri comuni.
  2. Raccogliere i seguenti dettagli:

    • OCID flusso
    • endpoint messaggi
    • OCID pool di flussi
    • FQDN del pool di flussi
    • Impostazioni di connessione Kafka:
      • Server bootstrap
      • Stringhe di connessione SASL
      • Protocollo di sicurezza

    Per i passi per creare e gestire i flussi e i pool di flussi, vedere Gestione dei flussi e Gestione dei pool di flussi. I flussi corrispondono a un argomento Kafka.

  3. Python 3.6 o versioni successive, con PIP installato e aggiornato.
  4. Visual Code Studio (consigliato) o qualsiasi altro ambiente di sviluppo integrato (IDE).
  5. Installare i pacchetti Confluent-Kafka per Python utilizzando il comando seguente:

    pip install confluent-kafka
    Nota

    È possibile installare questi pacchetti a livello globale o in un virtualenv. Il pacchetto librdkafka viene utilizzato dal pacchetto confluent-kafka e incorporato nelle ruote per l'ultima release confluent-kafka. Per ulteriori informazioni, consulta la documentazione del client Fluent Python.
  6. Installare i certificati radice SSL CA sull'host in cui si sta sviluppando ed eseguendo questa istanza di avvio rapido. Il client utilizza certificati CA per verificare il certificato del broker.

    Per Windows, scaricare il file cacert.pem distribuito con curl (scaricare cacert.pm). Per altre piattaforme, vedere Configura truststore SSL.

  7. L'autenticazione con il protocollo Kafka utilizza i token di autenticazione e il meccanismo SASL/PLAIN. Per la generazione del token di autenticazione, vedere Utilizzo dei token di autenticazione. Se hai creato il flusso e il pool di flussi in OCI, sei già autorizzato a utilizzare questo flusso in base a IAM OCI, quindi devi creare token di autenticazione per l'utente OCI.

    Nota

    I token di autenticazione utente OCI sono visibili solo al momento della creazione. Copialo e conservalo in un luogo sicuro per un uso futuro.

Produzione di messaggi

  1. Aprire l'editor preferito, ad esempio Visual Studio Code, dalla directory di lavoro vuota wd. È necessario che i pacchetti confluent-kafka per Python siano già installati per l'ambiente Python corrente dopo aver soddisfatto i prerequisiti.
  2. Creare un file denominato Producer.py nella directory wd con il codice seguente. Sostituire i valori di configurazione nella mappa conf e il nome dell'argomento è il nome del flusso creato.

    from confluent_kafka import Producer, KafkaError  
      
    if __name__ == '__main__':  
      
      topic = "<topic_stream_name>"  
      conf = {  
        'bootstrap.servers': "<bootstrap_servers_endpoint>", #usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092  
        'security.protocol': 'SASL_SSL',  
      
        'ssl.ca.location': '</path/on/your/host/to/your/cert.pem/>',  # from step 6 of Prerequisites section
         # optionally instead of giving path as shown above, you can do 1. pip install certifi 2. import certifi and
         # 3. 'ssl.ca.location': certifi.where()
      
        'sasl.mechanism': 'PLAIN',
        'sasl.username': '<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>',  # from step 2 of Prerequisites section
        'sasl.password': '<your_OCI_user_auth_token>',  # from step 7 of Prerequisites section
       }  
      
       # Create Producer instance  
       producer = Producer(**conf)  
       delivered_records = 0  
      
       # Optional per-message on_delivery handler (triggered by poll() or flush())  
       # when a message has been successfully delivered or permanently failed delivery after retries.  
       def acked(err, msg):  
           global delivered_records  
           """Delivery report handler called on  
               successful or failed delivery of message """  
           if err is not None:  
               print("Failed to deliver message: {}".format(err))  
           else:  
               delivered_records += 1  
               print("Produced record to topic {} partition [{}] @ offset {}".format(msg.topic(), msg.partition(), msg.offset()))  
    
      for n in range(10):  
          record_key = "messageKey" + str(n)  
          record_value = "messageValue" + str(n)  
          print("Producing record: {}\t{}".format(record_key, record_value))  
          producer.produce(topic, key=record_key, value=record_value, on_delivery=acked)  
          # p.poll() serves delivery reports (on_delivery) from previous produce() calls.  
          producer.poll(0)  
    
      producer.flush()  
      print("{} messages were produced to topic {}!".format(delivered_records, topic))
  3. Dalla directory wd, eseguire il comando seguente:

    python Producer.py
  4. Mostra i messaggi più recenti inviati al flusso per visualizzare i messaggi più recenti inviati al flusso per verificare che la produzione sia riuscita.

Messaggi di consumo

  1. In primo luogo, assicurarsi che il flusso da cui si desidera utilizzare i messaggi contenga messaggi. È possibile utilizzare la console per generare un messaggio di test oppure utilizzare il flusso e i messaggi creati in questo avvio rapido.
  2. Aprire l'editor preferito, ad esempio Visual Studio Code, dalla directory di lavoro vuota wd. È necessario che i pacchetti confluent-kafka per Python siano già installati per l'ambiente Python corrente dopo aver soddisfatto i prerequisiti.
  3. Creare un file denominato Consumer.py nella directory wd con il codice seguente. Sostituire i valori di configurazione nella mappa conf e il nome dell'argomento è il nome del flusso creato.

    from confluent_kafka import Consumer
    
    
    if __name__ == '__main__':
    
      topic = "<topic_stream_name>"  
      conf = {  
        'bootstrap.servers': "<bootstrap_servers_endpoint>", #usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092  
        'security.protocol': 'SASL_SSL',  
      
        'ssl.ca.location': '</path/on/your/host/to/your/cert.pem/>',  # from step 6 of Prerequisites section
         # optionally instead of giving path as shown above, you can do 1. pip install certifi 2. import certifi and
         # 3. 'ssl.ca.location': certifi.where()
      
        'sasl.mechanism': 'PLAIN',
        'sasl.username': '<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>',  # from step 2 of Prerequisites section
        'sasl.password': '<your_OCI_user_auth_token>',  # from step 7 of Prerequisites section
       }  
    
        # Create Consumer instance
        consumer = Consumer(conf)
    
        # Subscribe to topic
        consumer.subscribe([topic])
    
        # Process messages
        try:
            while True:
                msg = consumer.poll(1.0)
                if msg is None:
                    # No message available within timeout.
                    # Initial message consumption may take up to
                    # `session.timeout.ms` for the consumer group to
                    # rebalance and start consuming
                    print("Waiting for message or event/error in poll()")
                    continue
                elif msg.error():
                    print('error: {}'.format(msg.error()))
                else:
                    # Check for Kafka message
                    record_key = "Null" if msg.key() is None else msg.key().decode('utf-8')
                    record_value = msg.value().decode('utf-8')
                    print("Consumed record with key "+ record_key + " and value " + record_value)
        except KeyboardInterrupt:
            pass
        finally:
            print("Leave group and commit final offsets")
            consumer.close()
  4. Dalla directory wd, eseguire il comando seguente:

    python Consumer.py
  5. Dovresti vedere messaggi simili ai seguenti:

    Waiting for message or event/error in poll()
    Waiting for message or event/error in poll()
    Consumed record with key messageKey0 and value messageValue0
    Consumed record with key messageKey1 and value messageValue1
    Consumed record with key Null and value Example test message
    Nota

    Se è stata utilizzata la console per generare un messaggio di test, la chiave per ogni messaggio è Null