Quickstart client Kafka Python e streaming
Pubblica e utilizza i messaggi nel servizio di streaming utilizzando il client Python Kafka.
Questo avvio rapido mostra come utilizzare il client Python Kafka con Oracle Cloud Infrastructure Streaming per pubblicare e utilizzare i messaggi.
Per ulteriori informazioni, vedere Utilizzo dello streaming con Apache Kafka. Per i concetti chiave e ulteriori dettagli sullo streaming, vedere Panoramica dello streaming
Prerequisiti
-
Per utilizzare il client Python Kafka con Streaming, è necessario disporre dei seguenti elementi:
- Un account Oracle Cloud Infrastructure.
- Utente creato in tale account, in un gruppo con un criterio che concede le autorizzazioni necessarie. Per un esempio su come impostare un nuovo utente, gruppo, compartimento e criterio, vedere Aggiunta di utenti. Per un elenco dei criteri tipici che si desidera utilizzare, vedere Criteri comuni.
-
Raccogliere i seguenti dettagli:
- OCID flusso
- endpoint messaggi
- OCID pool di flussi
- FQDN del pool di flussi
- Impostazioni di connessione Kafka:
- Server bootstrap
- Stringhe di connessione SASL
- Protocollo di sicurezza
Per i passi per creare e gestire i flussi e i pool di flussi, vedere Gestione dei flussi e Gestione dei pool di flussi. I flussi corrispondono a un argomento Kafka.
- Python 3.6 o versioni successive, con PIP installato e aggiornato.
- Visual Code Studio (consigliato) o qualsiasi altro ambiente di sviluppo integrato (IDE).
-
Installare i pacchetti
Confluent-Kafka
per Python utilizzando il comando seguente:pip install confluent-kafka
Nota
È possibile installare questi pacchetti a livello globale o in un virtualenv. Il pacchettolibrdkafka
viene utilizzato dal pacchettoconfluent-kafka
e incorporato nelle ruote per l'ultima releaseconfluent-kafka
. Per ulteriori informazioni, consulta la documentazione del client Fluent Python. -
Installare i certificati radice SSL CA sull'host in cui si sta sviluppando ed eseguendo questa istanza di avvio rapido. Il client utilizza certificati CA per verificare il certificato del broker.
Per Windows, scaricare il file
cacert.pem
distribuito con curl (scaricare cacert.pm). Per altre piattaforme, vedere Configura truststore SSL. -
L'autenticazione con il protocollo Kafka utilizza i token di autenticazione e il meccanismo SASL/PLAIN. Per la generazione del token di autenticazione, vedere Utilizzo dei token di autenticazione. Se hai creato il flusso e il pool di flussi in OCI, sei già autorizzato a utilizzare questo flusso in base a IAM OCI, quindi devi creare token di autenticazione per l'utente OCI.
Nota
I token di autenticazione utente OCI sono visibili solo al momento della creazione. Copialo e conservalo in un luogo sicuro per un uso futuro.
Produzione di messaggi
- Aprire l'editor preferito, ad esempio Visual Studio Code, dalla directory di lavoro vuota
wd
. È necessario che i pacchetticonfluent-kafka
per Python siano già installati per l'ambiente Python corrente dopo aver soddisfatto i prerequisiti. -
Creare un file denominato
Producer.py
nella directorywd
con il codice seguente. Sostituire i valori di configurazione nella mappaconf
e il nome dell'argomento è il nome del flusso creato.from confluent_kafka import Producer, KafkaError if __name__ == '__main__': topic = "<topic_stream_name>" conf = { 'bootstrap.servers': "<bootstrap_servers_endpoint>", #usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092 'security.protocol': 'SASL_SSL', 'ssl.ca.location': '</path/on/your/host/to/your/cert.pem/>', # from step 6 of Prerequisites section # optionally instead of giving path as shown above, you can do 1. pip install certifi 2. import certifi and # 3. 'ssl.ca.location': certifi.where() 'sasl.mechanism': 'PLAIN', 'sasl.username': '<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>', # from step 2 of Prerequisites section 'sasl.password': '<your_OCI_user_auth_token>', # from step 7 of Prerequisites section } # Create Producer instance producer = Producer(**conf) delivered_records = 0 # Optional per-message on_delivery handler (triggered by poll() or flush()) # when a message has been successfully delivered or permanently failed delivery after retries. def acked(err, msg): global delivered_records """Delivery report handler called on successful or failed delivery of message """ if err is not None: print("Failed to deliver message: {}".format(err)) else: delivered_records += 1 print("Produced record to topic {} partition [{}] @ offset {}".format(msg.topic(), msg.partition(), msg.offset())) for n in range(10): record_key = "messageKey" + str(n) record_value = "messageValue" + str(n) print("Producing record: {}\t{}".format(record_key, record_value)) producer.produce(topic, key=record_key, value=record_value, on_delivery=acked) # p.poll() serves delivery reports (on_delivery) from previous produce() calls. producer.poll(0) producer.flush() print("{} messages were produced to topic {}!".format(delivered_records, topic))
-
Dalla directory
wd
, eseguire il comando seguente:python Producer.py
- Mostra i messaggi più recenti inviati al flusso per visualizzare i messaggi più recenti inviati al flusso per verificare che la produzione sia riuscita.
Messaggi di consumo
- In primo luogo, assicurarsi che il flusso da cui si desidera utilizzare i messaggi contenga messaggi. È possibile utilizzare la console per generare un messaggio di test oppure utilizzare il flusso e i messaggi creati in questo avvio rapido.
- Aprire l'editor preferito, ad esempio Visual Studio Code, dalla directory di lavoro vuota
wd
. È necessario che i pacchetticonfluent-kafka
per Python siano già installati per l'ambiente Python corrente dopo aver soddisfatto i prerequisiti. -
Creare un file denominato
Consumer.py
nella directorywd
con il codice seguente. Sostituire i valori di configurazione nella mappaconf
e il nome dell'argomento è il nome del flusso creato.from confluent_kafka import Consumer if __name__ == '__main__': topic = "<topic_stream_name>" conf = { 'bootstrap.servers': "<bootstrap_servers_endpoint>", #usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092 'security.protocol': 'SASL_SSL', 'ssl.ca.location': '</path/on/your/host/to/your/cert.pem/>', # from step 6 of Prerequisites section # optionally instead of giving path as shown above, you can do 1. pip install certifi 2. import certifi and # 3. 'ssl.ca.location': certifi.where() 'sasl.mechanism': 'PLAIN', 'sasl.username': '<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>', # from step 2 of Prerequisites section 'sasl.password': '<your_OCI_user_auth_token>', # from step 7 of Prerequisites section } # Create Consumer instance consumer = Consumer(conf) # Subscribe to topic consumer.subscribe([topic]) # Process messages try: while True: msg = consumer.poll(1.0) if msg is None: # No message available within timeout. # Initial message consumption may take up to # `session.timeout.ms` for the consumer group to # rebalance and start consuming print("Waiting for message or event/error in poll()") continue elif msg.error(): print('error: {}'.format(msg.error())) else: # Check for Kafka message record_key = "Null" if msg.key() is None else msg.key().decode('utf-8') record_value = msg.value().decode('utf-8') print("Consumed record with key "+ record_key + " and value " + record_value) except KeyboardInterrupt: pass finally: print("Leave group and commit final offsets") consumer.close()
-
Dalla directory
wd
, eseguire il comando seguente:python Consumer.py
-
Dovresti vedere messaggi simili ai seguenti:
Waiting for message or event/error in poll() Waiting for message or event/error in poll() Consumed record with key messageKey0 and value messageValue0 Consumed record with key messageKey1 and value messageValue1 Consumed record with key Null and value Example test message
Nota
Se è stata utilizzata la console per generare un messaggio di test, la chiave per ogni messaggio èNull
Passo successivo
Per ulteriori informazioni, consultare le risorse elencate di seguito.