Início Rápido do Kafka Python Client e Serviço Streaming

Publique e consuma mensagens no serviço Streaming usando o cliente Kafka Python.

Esse início rápido mostra como usar o cliente Kafka Python com o Oracle Cloud Infrastructure Streaming para publicar e consumir mensagens.

Para obter mais informações, consulte Usando o Streaming com o Apache Kafka. Para obter os principais conceitos e mais detalhes do Streaming, consulte Visão Geral do Streaming

Pré-requisitos

  1. Para usar o cliente Kafka Python com o serviço Streaming, você deve ter o seguinte:

    • Uma conta do Oracle Cloud Infrastructure.
    • Um usuário criado nessa conta, em um grupo com uma política que conceda as permissões necessárias. Para obter um exemplo de como configurar um novo usuário, um novo grupo, um novo compartimento e uma nova política, consulte Adicionando Usuários. Para obter uma lista de políticas típicas que você pode usar, consulte Políticas Comuns.
  2. Colete os seguintes detalhes:

    • OCID do Stream
    • Ponto final de mensagens
    • OCID do pool de streams
    • FQDN do pool de streams
    • Definições de conexão do Kafka:
      • Servidores de bootstrap
      • Strings de conexão SASL
      • Protocolo de segurança

    Para obter as etapas de criação e gerenciamento de streams e pools de streams, consulte Gerenciando Streams e Gerenciando Pools de Streams. Os streams correspondem a um tópico do Kafka.

  3. Python 3.6 ou posterior, com PIP instalado e atualizado.
  4. Visual Code Studio (recomendado) ou qualquer outro ambiente de desenvolvimento integrado (IDE).
  5. Instale pacotes Confluent-Kafka para Python usando o seguinte comando:

    pip install confluent-kafka
    Observação

    Você pode instalar esses pacotes globalmente ou em um virtualenv. O pacote librdkafka é usado pelo pacote confluent-kafka e incorporado nas rodas da versão mais recente do confluent-kafka. Para obter mais detalhes, consulte a documentação do cliente Confluent Python.
  6. Instale os certificados raiz da CA SSL no host no qual você está desenvolvendo e executando este início rápido. O cliente usa certificados da CA para verificar o certificado do broker.

    No Windows, faça download do arquivo cacert.pem distribuído com curl (download de cacert.pm). Para outras plataformas, consulte Configurar armazenamento confiável SSL.

  7. A autenticação com o protocolo Kafka usa tokens de autenticação e o mecanismo SASL/PLAIN. Consulte Como Trabalhar com Tokens de Autenticação para saber sobre a geração do token de autenticação. Se você criou o stream e o pool de streams no OCI, já estará autorizado a usar esse stream de acordo com o OCI IAM. Portanto, crie tokens de autenticação para seu usuário do OCI.

    Observação

    Os tokens de autenticação do usuário do OCI só ficam visíveis no momento da criação. Copie-o e mantenha-o em algum lugar seguro para uso futuro.

Produzindo Mensagens

  1. Abra seu editor favorito, como o Visual Studio Code, no diretório de trabalho vazio wd. Você já deverá ter pacotes confluent-kafka para Python instalados para seu ambiente Python atual após atender aos pré-requisitos.
  2. Crie um arquivo chamado Producer.py no diretório wd com o código a seguir. Substitua os valores de configuração no mapa conf, e o nome do tópico é o nome do stream que você criou.

    from confluent_kafka import Producer, KafkaError  
      
    if __name__ == '__main__':  
      
      topic = "<topic_stream_name>"  
      conf = {  
        'bootstrap.servers': "<bootstrap_servers_endpoint>", #usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092  
        'security.protocol': 'SASL_SSL',  
      
        'ssl.ca.location': '</path/on/your/host/to/your/cert.pem/>',  # from step 6 of Prerequisites section
         # optionally instead of giving path as shown above, you can do 1. pip install certifi 2. import certifi and
         # 3. 'ssl.ca.location': certifi.where()
      
        'sasl.mechanism': 'PLAIN',
        'sasl.username': '<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>',  # from step 2 of Prerequisites section
        'sasl.password': '<your_OCI_user_auth_token>',  # from step 7 of Prerequisites section
       }  
      
       # Create Producer instance  
       producer = Producer(**conf)  
       delivered_records = 0  
      
       # Optional per-message on_delivery handler (triggered by poll() or flush())  
       # when a message has been successfully delivered or permanently failed delivery after retries.  
       def acked(err, msg):  
           global delivered_records  
           """Delivery report handler called on  
               successful or failed delivery of message """  
           if err is not None:  
               print("Failed to deliver message: {}".format(err))  
           else:  
               delivered_records += 1  
               print("Produced record to topic {} partition [{}] @ offset {}".format(msg.topic(), msg.partition(), msg.offset()))  
    
      for n in range(10):  
          record_key = "messageKey" + str(n)  
          record_value = "messageValue" + str(n)  
          print("Producing record: {}\t{}".format(record_key, record_value))  
          producer.produce(topic, key=record_key, value=record_value, on_delivery=acked)  
          # p.poll() serves delivery reports (on_delivery) from previous produce() calls.  
          producer.poll(0)  
    
      producer.flush()  
      print("{} messages were produced to topic {}!".format(delivered_records, topic))
  3. No diretório wd, execute o seguinte comando:

    python Producer.py
  4. Mostrar as mensagens mais recentes enviadas ao stream para ver as mensagens mais recentes enviadas ao stream para verificar se a produção foi bem-sucedida.

Consumindo Mensagens

  1. Primeiro, certifique-se de que o stream cujas mensagens você deseja consumir contenha mensagens. Você pode usar a Console para produzir uma mensagem de teste ou usar o stream e as mensagens que criamos neste início rápido.
  2. Abra seu editor favorito, como o Visual Studio Code, no diretório de trabalho vazio wd. Você já deverá ter pacotes confluent-kafka para Python instalados para seu ambiente Python atual após atender aos pré-requisitos.
  3. Crie um arquivo chamado Consumer.py no diretório wd com o código a seguir. Substitua os valores de configuração no mapa conf, e o nome do tópico é o nome do stream que você criou.

    from confluent_kafka import Consumer
    
    
    if __name__ == '__main__':
    
      topic = "<topic_stream_name>"  
      conf = {  
        'bootstrap.servers': "<bootstrap_servers_endpoint>", #usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092  
        'security.protocol': 'SASL_SSL',  
      
        'ssl.ca.location': '</path/on/your/host/to/your/cert.pem/>',  # from step 6 of Prerequisites section
         # optionally instead of giving path as shown above, you can do 1. pip install certifi 2. import certifi and
         # 3. 'ssl.ca.location': certifi.where()
      
        'sasl.mechanism': 'PLAIN',
        'sasl.username': '<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>',  # from step 2 of Prerequisites section
        'sasl.password': '<your_OCI_user_auth_token>',  # from step 7 of Prerequisites section
       }  
    
        # Create Consumer instance
        consumer = Consumer(conf)
    
        # Subscribe to topic
        consumer.subscribe([topic])
    
        # Process messages
        try:
            while True:
                msg = consumer.poll(1.0)
                if msg is None:
                    # No message available within timeout.
                    # Initial message consumption may take up to
                    # `session.timeout.ms` for the consumer group to
                    # rebalance and start consuming
                    print("Waiting for message or event/error in poll()")
                    continue
                elif msg.error():
                    print('error: {}'.format(msg.error()))
                else:
                    # Check for Kafka message
                    record_key = "Null" if msg.key() is None else msg.key().decode('utf-8')
                    record_value = msg.value().decode('utf-8')
                    print("Consumed record with key "+ record_key + " and value " + record_value)
        except KeyboardInterrupt:
            pass
        finally:
            print("Leave group and commit final offsets")
            consumer.close()
  4. No diretório wd, execute o seguinte comando:

    python Consumer.py
  5. Você deverá ver mensagens semelhantes a esta:

    Waiting for message or event/error in poll()
    Waiting for message or event/error in poll()
    Consumed record with key messageKey0 and value messageValue0
    Consumed record with key messageKey1 and value messageValue1
    Consumed record with key Null and value Example test message
    Observação

    Se você usou a Console para produzir uma mensagem de teste, a chave de cada mensagem será Null