Clientes Kafka

Os clientes Kafka são principalmente componentes do ecossistema Kafka que leem e gravam dados em tópicos do Kafka.

Observação

Ao configurar clientes Kafka, o módulo com.sun.security.auth.module.JndiLoginModule não pode ser usado para configuração JAAS de servidores e clientes. Recomendamos que você use o módulo padrão com.sun.security.auth.module.Krb5LoginModule para autenticação usando Kerberos.

Comandos Comuns

Observação

  • O serviço Kafka deve ser instalado e iniciado em um dos nós.
  • Para executar os comandos comuns, estabeleça SSH no nó em que o serviço Kafka está instalado.
  • Para criar um tópico para publicar eventos:
    1. Crie um arquivo client.properties com os seguintes detalhes e forneça acesso apropriado ao usuário do Kafka
      sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true
      keyTab="/path/to/kafka.keytab" principal="kafka/<hostname>@REALM";
                                  
      security.protocol=SASL_PLAINTEXT
      sasl.kerberos.service.name=kafka
      bootstrap.servers=<broker_hostname>:6667
      compression.type=none
      sasl.mechanism=GSSAPI
                              
    2. Execução:
      sudo -u kafka sh /usr/odh/current/kafka-broker/bin/kafka-topics.sh --bootstrap-server <hostname>:6667 --create --topic Test_Topic --partitions 3 --replication-factor 1 --command-config /usr/odh/current/kafka-broker/config/client.properties
  • Para publicar eventos em tópicos:
    1. Crie um arquivo producer.properties com os detalhes a seguir e forneça acesso apropriado ao usuário do Kafka.
      sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required
      useKeyTab=true storeKey=true keyTab="/path/to/kafka.keytab" principal="kafka/<hostname>@REALM";
                                  
      security.protocol=SASL_PLAINTEXT
      sasl.kerberos.service.name=kafka
      bootstrap.servers=<broker_hostname>:6667
      compression.type=none
      sasl.mechanism=GSSAPI
    2. Execução:
      sudo -u kafka sh /usr/odh/current/kafka-broker/bin/kafka-console-producer.sh --broker-list <hostname>>:6667 --topic Test_Topic --producer.config /usr/odh/current/kafka-broker/config/producer.properties
  • Para ler dados publicados de tópicos:
    1. Crie um arquivo consumer.properties com os detalhes a seguir e forneça acesso apropriado ao usuário do Kafka.
      security.protocol=SASL_PLAINTEXT
      sasl.kerberos.service.name=kafka
      bootstrap.servers=<broker_hostname>>:6667
      compression.type=none
      sasl.mechanism=GSSAPI
      group.id=test-consumer-group   
      sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule
      required useKeyTab=true storeKey=true keyTab="/path/to/kafka.keytab" principal="kafka/<hostname>@REALM";
    2. Execução:
      sudo -u kafka sh /usr/odh/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server <hostname>:6667 --topic Test_Topic --consumer.config /usr/odh/current/kafka-broker/config/consumer.properties --from-beginning

Produtores de Kafka

Os produtores de Kafka são os editores responsáveis por escrever registros para tópicos. Normalmente, isso significa gravar um programa usando a API KafkaProducer.

Para instanciar um produtor, execute:

KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig);

Consulte o seguinte para obter mais informações sobre as configurações do produtor usadas neste construtor.

Serialização de Chaves e Valores

Para cada produtor, duas propriedades de serialização devem ser definidas, key.serializer (para a chave) e value.serializer (para o valor). Escreva código personalizado para serialização ou use uma das opções fornecidas pelo Kafka. Por exemplo:

  • ByteArraySerializer: Dados binários
  • StringSerializer: Representações de string

Reconhecimentos

O caminho de gravação completo para registros de um produtor é para a partição de líder e, em seguida, para todas as réplicas de seguidor. O produtor pode controlar qual ponto do caminho aciona uma confirmação. Dependendo da definição acks, o produtor pode esperar que a gravação seja propagada por todo o sistema ou aguardar apenas o primeiro ponto de sucesso.

Os valores válidos de acks são:

  • 0: Não espere por qualquer confirmação da partição (throughput mais rápido).
  • 1: Aguarde apenas a resposta da partição líder.
  • all: Aguarde que as respostas das partições do seguidor atendam ao mínimo ( throughput mais lento).

Consumidores de Kafka

Os consumidores do Kafka são os assinantes responsáveis por ler registros de um ou mais tópicos e uma ou mais partições de um tópico. Os consumidores que assinam um tópico acontecem manual ou automaticamente. Normalmente, isso significa gravar um programa usando a API KafkaConsumer.

Para criar o consumidor, execute:

KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(consumerConfig);

A classe KafkaConsumer tem dois parâmetros de tipo genéricos. Como os produtores podem enviar dados (os valores) com chaves, o consumidor pode ler dados por chaves. Neste exemplo, chaves e valores são strings. Se você definir tipos diferentes, será necessário definir um desserializador para acomodar os tipos alternativos. Para desserializadores, você deve implementar a interface org.apache.kafka.common.serialization.Deserializer.

Os parâmetros de configuração mais importantes a serem especificados incluem:

  • bootstrap.servers: Uma lista de corretores aos quais se conectar inicialmente. Liste dois a três corretores. Não é necessário listar o cluster completo.
  • group.id: Cada consumidor pertence a um grupo para compartilhar as partições de um tópico.
  • key.deserializer/value.deserializer: Especifique como converter a representação Java em uma sequência de bytes para enviar dados por meio do protocolo Kafka.

Inscrevendo-se em um tópico

Inscrevendo-se em um tópico usando a chamada do método subscribe():

kafkaConsumer.subscribe(Collections.singletonList(topic), rebalanceListener)

Especifique uma lista de tópicos dos quais queremos consumir e um listener de rebalanceamento. O reequilíbrio é uma parte importante da vida do consumidor. Sempre que o cluster ou o estado do consumidor muda, um rebalanceamento é emitido. Isso garante que todas as partições sejam atribuídas a um consumidor.

Depois de se inscrever em um tópico, o consumidor pesquisa para encontrar novos registros:

while (true) {
  data = kafkaConsumer.poll();
  // do something with 'data'
}

A pesquisa retorna registros que podem ser processados pelo cliente. Depois de processar os registros, o cliente confirma as compensações de forma síncrona, aguardando até que o processamento seja concluído antes de continuar a sondagem.

Certifique-se de salvar o andamento. Execute os métodos commitSync() e commitAsync().

Não recomendamos commit automático. O commit manual é apropriado na maioria dos casos de uso.

Acessando o Big Data Service Kafka de Fora do Cluster

Etapas para vincular um usuário do local:

  1. Certifique-se de que o cliente Kerberos esteja instalado no seu ambiente.
  2. Copie o keytab do Kafka do cluster do local /etc/security/keytabs/kafka.service.keytab para seu ambiente.
  3. Copie o conteúdo de /etc/krb5.conf do host do broker de cluster para o /etc/krb5.conf file do seu ambiente (esse é o local padrão do arquivo de configuração krb5).
  4. Crie entradas de host para todos os nós de cluster no arquivo /etc/hosts.
  5. Faça kinit do seu ambiente usando keytab e principal usando o comando sudo kinit -kt <keytab path in local> <principal name>.
  6. Certifique-se de que o ticket seja emitido usando o comando klist.

Após concluir o kinit, execute as seguintes etapas para executar o Produtor/Consumidor:

  1. Copie /usr/odh/current/kafka-broker/config/kafka_jaas.conf para local.
  2. Copie /usr/odh/current/kafka-broker/config/ para localserver.properties.
  3. Defina a exportação da variável de ambiente KAFKA_OPTS="-Djava.security.auth.login.config=/local/path/to/kafka_jaas.conf".
  4. Certifique-se de que o kinit esteja completo e que o ticket válido esteja disponível (não expirado) kinit -kt /local/keytab/path/kafka.service.keytab kafka/<principal>.

Com a configuração anterior, você pode seguir comandos comuns para executar os clientes Kafka de fora do cluster do Big Data Service ou executar qualquer cliente Java/python.