Início Rápido do Kafka Python Client e Serviço Streaming
Publique e consuma mensagens no serviço Streaming usando o cliente Kafka Python.
Esse início rápido mostra como usar o cliente Kafka Python com o Oracle Cloud Infrastructure Streaming para publicar e consumir mensagens.
Para obter mais informações, consulte Usando o Streaming com o Apache Kafka. Para obter os principais conceitos e mais detalhes do Streaming, consulte Visão Geral do Streaming
Pré-requisitos
-
Para usar o cliente Kafka Python com o serviço Streaming, você deve ter o seguinte:
- Uma conta do Oracle Cloud Infrastructure.
- Um usuário criado nessa conta, em um grupo com uma política que conceda as permissões necessárias. Para obter um exemplo de como configurar um novo usuário, um novo grupo, um novo compartimento e uma nova política, consulte Adicionando Usuários. Para obter uma lista de políticas típicas que você pode usar, consulte Políticas Comuns.
-
Colete os seguintes detalhes:
- OCID do Stream
- Ponto final de mensagens
- OCID do pool de streams
- FQDN do pool de streams
- Definições de conexão do Kafka:
- Servidores de bootstrap
- Strings de conexão SASL
- Protocolo de segurança
Para obter as etapas de criação e gerenciamento de streams e pools de streams, consulte Gerenciando Streams e Gerenciando Pools de Streams. Os streams correspondem a um tópico do Kafka.
- Python 3.6 ou posterior, com PIP instalado e atualizado.
- Visual Code Studio (recomendado) ou qualquer outro ambiente de desenvolvimento integrado (IDE).
-
Instale pacotes
Confluent-Kafka
para Python usando o seguinte comando:pip install confluent-kafka
Observação
Você pode instalar esses pacotes globalmente ou em um virtualenv. O pacotelibrdkafka
é usado pelo pacoteconfluent-kafka
e incorporado nas rodas da versão mais recente doconfluent-kafka
. Para obter mais detalhes, consulte a documentação do cliente Confluent Python. -
Instale os certificados raiz da CA SSL no host no qual você está desenvolvendo e executando este início rápido. O cliente usa certificados da CA para verificar o certificado do broker.
No Windows, faça download do arquivo
cacert.pem
distribuído com curl (download de cacert.pm). Para outras plataformas, consulte Configurar armazenamento confiável SSL. -
A autenticação com o protocolo Kafka usa tokens de autenticação e o mecanismo SASL/PLAIN. Consulte Como Trabalhar com Tokens de Autenticação para saber sobre a geração do token de autenticação. Se você criou o stream e o pool de streams no OCI, já estará autorizado a usar esse stream de acordo com o OCI IAM. Portanto, crie tokens de autenticação para seu usuário do OCI.
Observação
Os tokens de autenticação do usuário do OCI só ficam visíveis no momento da criação. Copie-o e mantenha-o em algum lugar seguro para uso futuro.
Produzindo Mensagens
- Abra seu editor favorito, como o Visual Studio Code, no diretório de trabalho vazio
wd
. Você já deverá ter pacotesconfluent-kafka
para Python instalados para seu ambiente Python atual após atender aos pré-requisitos. -
Crie um arquivo chamado
Producer.py
no diretóriowd
com o código a seguir. Substitua os valores de configuração no mapaconf
, e o nome do tópico é o nome do stream que você criou.from confluent_kafka import Producer, KafkaError if __name__ == '__main__': topic = "<topic_stream_name>" conf = { 'bootstrap.servers': "<bootstrap_servers_endpoint>", #usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092 'security.protocol': 'SASL_SSL', 'ssl.ca.location': '</path/on/your/host/to/your/cert.pem/>', # from step 6 of Prerequisites section # optionally instead of giving path as shown above, you can do 1. pip install certifi 2. import certifi and # 3. 'ssl.ca.location': certifi.where() 'sasl.mechanism': 'PLAIN', 'sasl.username': '<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>', # from step 2 of Prerequisites section 'sasl.password': '<your_OCI_user_auth_token>', # from step 7 of Prerequisites section } # Create Producer instance producer = Producer(**conf) delivered_records = 0 # Optional per-message on_delivery handler (triggered by poll() or flush()) # when a message has been successfully delivered or permanently failed delivery after retries. def acked(err, msg): global delivered_records """Delivery report handler called on successful or failed delivery of message """ if err is not None: print("Failed to deliver message: {}".format(err)) else: delivered_records += 1 print("Produced record to topic {} partition [{}] @ offset {}".format(msg.topic(), msg.partition(), msg.offset())) for n in range(10): record_key = "messageKey" + str(n) record_value = "messageValue" + str(n) print("Producing record: {}\t{}".format(record_key, record_value)) producer.produce(topic, key=record_key, value=record_value, on_delivery=acked) # p.poll() serves delivery reports (on_delivery) from previous produce() calls. producer.poll(0) producer.flush() print("{} messages were produced to topic {}!".format(delivered_records, topic))
-
No diretório
wd
, execute o seguinte comando:python Producer.py
- Mostrar as mensagens mais recentes enviadas ao stream para ver as mensagens mais recentes enviadas ao stream para verificar se a produção foi bem-sucedida.
Consumindo Mensagens
- Primeiro, certifique-se de que o stream cujas mensagens você deseja consumir contenha mensagens. Você pode usar a Console para produzir uma mensagem de teste ou usar o stream e as mensagens que criamos neste início rápido.
- Abra seu editor favorito, como o Visual Studio Code, no diretório de trabalho vazio
wd
. Você já deverá ter pacotesconfluent-kafka
para Python instalados para seu ambiente Python atual após atender aos pré-requisitos. -
Crie um arquivo chamado
Consumer.py
no diretóriowd
com o código a seguir. Substitua os valores de configuração no mapaconf
, e o nome do tópico é o nome do stream que você criou.from confluent_kafka import Consumer if __name__ == '__main__': topic = "<topic_stream_name>" conf = { 'bootstrap.servers': "<bootstrap_servers_endpoint>", #usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092 'security.protocol': 'SASL_SSL', 'ssl.ca.location': '</path/on/your/host/to/your/cert.pem/>', # from step 6 of Prerequisites section # optionally instead of giving path as shown above, you can do 1. pip install certifi 2. import certifi and # 3. 'ssl.ca.location': certifi.where() 'sasl.mechanism': 'PLAIN', 'sasl.username': '<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>', # from step 2 of Prerequisites section 'sasl.password': '<your_OCI_user_auth_token>', # from step 7 of Prerequisites section } # Create Consumer instance consumer = Consumer(conf) # Subscribe to topic consumer.subscribe([topic]) # Process messages try: while True: msg = consumer.poll(1.0) if msg is None: # No message available within timeout. # Initial message consumption may take up to # `session.timeout.ms` for the consumer group to # rebalance and start consuming print("Waiting for message or event/error in poll()") continue elif msg.error(): print('error: {}'.format(msg.error())) else: # Check for Kafka message record_key = "Null" if msg.key() is None else msg.key().decode('utf-8') record_value = msg.value().decode('utf-8') print("Consumed record with key "+ record_key + " and value " + record_value) except KeyboardInterrupt: pass finally: print("Leave group and commit final offsets") consumer.close()
-
No diretório
wd
, execute o seguinte comando:python Consumer.py
-
Você deverá ver mensagens semelhantes a esta:
Waiting for message or event/error in poll() Waiting for message or event/error in poll() Consumed record with key messageKey0 and value messageValue0 Consumed record with key messageKey1 and value messageValue1 Consumed record with key Null and value Example test message
Observação
Se você usou a Console para produzir uma mensagem de teste, a chave de cada mensagem seráNull
Próximas Etapas
Consulte os seguintes recursos para obter mais informações: