Clientes Kafka
Os clientes Kafka são principalmente componentes do ecossistema Kafka que leem e gravam dados em tópicos do Kafka.
Ao configurar clientes Kafka, o módulo
com.sun.security.auth.module.JndiLoginModule
não pode ser usado para configuração JAAS de servidores e clientes. Recomendamos que você use o módulo padrão com.sun.security.auth.module.Krb5LoginModule
para autenticação usando Kerberos.Comandos Comuns
- O serviço Kafka deve ser instalado e iniciado em um dos nós.
- Para executar os comandos comuns, estabeleça SSH no nó em que o serviço Kafka está instalado.
- Para criar um tópico para publicar eventos:
- Crie um arquivo
client.properties
com os seguintes detalhes e forneça acesso apropriado ao usuário do Kafkasasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/path/to/kafka.keytab" principal="kafka/<hostname>@REALM"; security.protocol=SASL_PLAINTEXT sasl.kerberos.service.name=kafka bootstrap.servers=<broker_hostname>:6667 compression.type=none sasl.mechanism=GSSAPI
- Execução:
sudo -u kafka sh /usr/odh/current/kafka-broker/bin/kafka-topics.sh --bootstrap-server <hostname>:6667 --create --topic Test_Topic --partitions 3 --replication-factor 1 --command-config /usr/odh/current/kafka-broker/config/client.properties
- Crie um arquivo
- Para publicar eventos em tópicos:
- Crie um arquivo
producer.properties
com os detalhes a seguir e forneça acesso apropriado ao usuário do Kafka.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/path/to/kafka.keytab" principal="kafka/<hostname>@REALM"; security.protocol=SASL_PLAINTEXT sasl.kerberos.service.name=kafka bootstrap.servers=<broker_hostname>:6667 compression.type=none sasl.mechanism=GSSAPI
- Execução:
sudo -u kafka sh /usr/odh/current/kafka-broker/bin/kafka-console-producer.sh --broker-list <hostname>>:6667 --topic Test_Topic --producer.config /usr/odh/current/kafka-broker/config/producer.properties
- Crie um arquivo
- Para ler dados publicados de tópicos:
- Crie um arquivo
consumer.properties
com os detalhes a seguir e forneça acesso apropriado ao usuário do Kafka.security.protocol=SASL_PLAINTEXT sasl.kerberos.service.name=kafka bootstrap.servers=<broker_hostname>>:6667 compression.type=none sasl.mechanism=GSSAPI group.id=test-consumer-group sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/path/to/kafka.keytab" principal="kafka/<hostname>@REALM";
- Execução:
sudo -u kafka sh /usr/odh/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server <hostname>:6667 --topic Test_Topic --consumer.config /usr/odh/current/kafka-broker/config/consumer.properties --from-beginning
- Crie um arquivo
Produtores de Kafka
Os produtores de Kafka são os editores responsáveis por escrever registros para tópicos. Normalmente, isso significa gravar um programa usando a API KafkaProducer.
Para instanciar um produtor, execute:
KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig);
Consulte o seguinte para obter mais informações sobre as configurações do produtor usadas neste construtor.
Serialização de Chaves e Valores
Para cada produtor, duas propriedades de serialização devem ser definidas, key.serializer
(para a chave) e value.serializer
(para o valor). Escreva código personalizado para serialização ou use uma das opções fornecidas pelo Kafka. Por exemplo:
ByteArraySerializer
: Dados bináriosStringSerializer
: Representações de string
Reconhecimentos
O caminho de gravação completo para registros de um produtor é para a partição de líder e, em seguida, para todas as réplicas de seguidor. O produtor pode controlar qual ponto do caminho aciona uma confirmação. Dependendo da definição acks
, o produtor pode esperar que a gravação seja propagada por todo o sistema ou aguardar apenas o primeiro ponto de sucesso.
Os valores válidos de acks
são:
0
: Não espere por qualquer confirmação da partição (throughput mais rápido).1
: Aguarde apenas a resposta da partição líder.all
: Aguarde que as respostas das partições do seguidor atendam ao mínimo ( throughput mais lento).
Consumidores de Kafka
Os consumidores do Kafka são os assinantes responsáveis por ler registros de um ou mais tópicos e uma ou mais partições de um tópico. Os consumidores que assinam um tópico acontecem manual ou automaticamente. Normalmente, isso significa gravar um programa usando a API KafkaConsumer.
Para criar o consumidor, execute:
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(consumerConfig);
A classe KafkaConsumer tem dois parâmetros de tipo genéricos. Como os produtores podem enviar dados (os valores) com chaves, o consumidor pode ler dados por chaves. Neste exemplo, chaves e valores são strings. Se você definir tipos diferentes, será necessário definir um desserializador para acomodar os tipos alternativos. Para desserializadores, você deve implementar a interface org.apache.kafka.common.serialization.Deserializer
.
Os parâmetros de configuração mais importantes a serem especificados incluem:
bootstrap.servers
: Uma lista de corretores aos quais se conectar inicialmente. Liste dois a três corretores. Não é necessário listar o cluster completo.group.id
: Cada consumidor pertence a um grupo para compartilhar as partições de um tópico.key.deserializer/value.deserializer
: Especifique como converter a representação Java em uma sequência de bytes para enviar dados por meio do protocolo Kafka.
Inscrevendo-se em um tópico
Inscrevendo-se em um tópico usando a chamada do método subscribe()
:
kafkaConsumer.subscribe(Collections.singletonList(topic), rebalanceListener)
Especifique uma lista de tópicos dos quais queremos consumir e um listener de rebalanceamento. O reequilíbrio é uma parte importante da vida do consumidor. Sempre que o cluster ou o estado do consumidor muda, um rebalanceamento é emitido. Isso garante que todas as partições sejam atribuídas a um consumidor.
Depois de se inscrever em um tópico, o consumidor pesquisa para encontrar novos registros:
while (true) {
data = kafkaConsumer.poll();
// do something with 'data'
}
A pesquisa retorna registros que podem ser processados pelo cliente. Depois de processar os registros, o cliente confirma as compensações de forma síncrona, aguardando até que o processamento seja concluído antes de continuar a sondagem.
Certifique-se de salvar o andamento. Execute os métodos commitSync()
e commitAsync()
.
Não recomendamos commit automático. O commit manual é apropriado na maioria dos casos de uso.
Acessando o Big Data Service Kafka de Fora do Cluster
Etapas para vincular um usuário do local:
- Certifique-se de que o cliente Kerberos esteja instalado no seu ambiente.
- Copie o keytab do Kafka do cluster do local
/etc/security/keytabs/kafka.service.keytab
para seu ambiente. - Copie o conteúdo de
/etc/krb5.conf
do host do broker de cluster para o/etc/krb5.conf file
do seu ambiente (esse é o local padrão do arquivo de configuração krb5). - Crie entradas de host para todos os nós de cluster no arquivo
/etc/hosts
. - Faça kinit do seu ambiente usando keytab e principal usando o comando
sudo kinit -kt <keytab path in local> <principal name>
. - Certifique-se de que o ticket seja emitido usando o comando
klist
.
Após concluir o kinit, execute as seguintes etapas para executar o Produtor/Consumidor:
- Copie
/usr/odh/current/kafka-broker/config/kafka_jaas.conf
para local. - Copie
/usr/odh/current/kafka-broker/config/
paralocalserver.properties
. - Defina a exportação da variável de ambiente
KAFKA_OPTS="-Djava.security.auth.login.config=/local/path/to/kafka_jaas.conf"
. -
Certifique-se de que o kinit esteja completo e que o ticket válido esteja disponível (não expirado)
kinit -kt /local/keytab/path/kafka.service.keytab kafka/<principal>
.
Com a configuração anterior, você pode seguir comandos comuns para executar os clientes Kafka de fora do cluster do Big Data Service ou executar qualquer cliente Java/python.