Clientes de Kafka
Los clientes de Kafka son principalmente componentes del ecosistema de Kafka que leen y escriben datos en temas de Kafka.
Al configurar clientes de Kafka, el módulo
com.sun.security.auth.module.JndiLoginModule
no se puede utilizar para la configuración de servidores y clientes de JAAS. Recomendamos utilizar el módulo predeterminado com.sun.security.auth.module.Krb5LoginModule
para la autenticación mediante Kerberos.Comandos habituales
- El servicio de Kafka se debe instalar e iniciar en uno de los nodos.
- Para ejecutar los comandos comunes, utilice SSH en el nodo donde está instalado el servicio de Kafka.
- Para crear un tema para publicar eventos:
- Cree un archivo
client.properties
con los siguientes detalles y proporcione el acceso adecuado al usuario de Kafkasasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/path/to/kafka.keytab" principal="kafka/<hostname>@REALM"; security.protocol=SASL_PLAINTEXT sasl.kerberos.service.name=kafka bootstrap.servers=<broker_hostname>:6667 compression.type=none sasl.mechanism=GSSAPI
- Ejecutar:
sudo -u kafka sh /usr/odh/current/kafka-broker/bin/kafka-topics.sh --bootstrap-server <hostname>:6667 --create --topic Test_Topic --partitions 3 --replication-factor 1 --command-config /usr/odh/current/kafka-broker/config/client.properties
- Cree un archivo
- Para publicar eventos en temas:
- Cree un archivo
producer.properties
con los siguientes detalles y proporcione el acceso adecuado al usuario de Kafka.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/path/to/kafka.keytab" principal="kafka/<hostname>@REALM"; security.protocol=SASL_PLAINTEXT sasl.kerberos.service.name=kafka bootstrap.servers=<broker_hostname>:6667 compression.type=none sasl.mechanism=GSSAPI
- Ejecutar:
sudo -u kafka sh /usr/odh/current/kafka-broker/bin/kafka-console-producer.sh --broker-list <hostname>>:6667 --topic Test_Topic --producer.config /usr/odh/current/kafka-broker/config/producer.properties
- Cree un archivo
- Para leer datos publicados de temas:
- Cree un archivo
consumer.properties
con los siguientes detalles y proporcione el acceso adecuado al usuario de Kafka.security.protocol=SASL_PLAINTEXT sasl.kerberos.service.name=kafka bootstrap.servers=<broker_hostname>>:6667 compression.type=none sasl.mechanism=GSSAPI group.id=test-consumer-group sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/path/to/kafka.keytab" principal="kafka/<hostname>@REALM";
- Ejecutar:
sudo -u kafka sh /usr/odh/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server <hostname>:6667 --topic Test_Topic --consumer.config /usr/odh/current/kafka-broker/config/consumer.properties --from-beginning
- Cree un archivo
Productores de Kafka
Los productores de Kafka son los editores responsables de escribir registros en temas. Normalmente, esto significa escribir un programa mediante la API KafkaProducer.
Para crear una instancia de un productor, ejecute:
KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig);
Consulte lo siguiente para obtener más información sobre la configuración del productor utilizada en este constructor.
Serialización de claves y valores
Para cada productor, se deben definir dos propiedades de serialización, key.serializer
(para la clave) y value.serializer
(para el valor). Escribir código personalizado para la serialización o utilizar una de las opciones proporcionadas por Kafka. Por ejemplo:
ByteArraySerializer
: datos binariosStringSerializer
: representaciones de cadena
Acuses de recibo
La ruta de escritura completa para los registros de un productor es a la partición de líder y, a continuación, a todas las réplicas de seguidores. El productor puede controlar qué punto de la ruta dispara una confirmación. Según la configuración acks
, el productor puede esperar a que la escritura se propague por todo el sistema o solo esperar al primer punto de éxito.
Los valores acks
válidos son:
0
: no espere ninguna confirmación de la partición (rendimiento más rápido).1
: espere solo la respuesta de partición de líder.all
: espere a que las respuestas de particiones de seguidor cumplan el mínimo (rendimiento más lento).
Consumidores de Kafka
Los consumidores de Kafka son los suscriptores responsables de leer los registros de uno o más temas y una o más particiones de un tema. Los consumidores que se suscriben a un tema se realizan de forma manual o automática. Normalmente, esto significa escribir un programa mediante la API KafkaConsumer.
Para crear un consumidor, ejecute:
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(consumerConfig);
La clase KafkaConsumer tiene dos parámetros de tipo genérico. Como los productores pueden enviar datos (los valores) con claves, el consumidor puede leer los datos por claves. En este ejemplo, tanto las claves como los valores son cadenas. Si define diferentes tipos, debe definir un deserializador para que se adapte a los tipos alternativos. Para los deserializadores, debe implantar la interfaz org.apache.kafka.common.serialization.Deserializer
.
Entre los parámetros de configuración más importantes que se deben especificar se incluyen:
bootstrap.servers
: lista de corredores a los que conectarse inicialmente. Lista de dos a tres corredores. No es necesario que muestre el cluster completo.group.id
: cada consumidor pertenece a un grupo para compartir las particiones de un tema.key.deserializer/value.deserializer
: especifique cómo convertir la representación Java en una secuencia de bytes para enviar datos a través del protocolo de Kafka.
Suscripción a un tema
Suscripción a un tema mediante la llamada al método subscribe()
:
kafkaConsumer.subscribe(Collections.singletonList(topic), rebalanceListener)
Especifique una lista de temas de los que desea consumir y un listener de nuevo equilibrio. El reequilibrio es una parte importante de la vida del consumidor. Cuando cambia el cluster o el estado del consumidor, se emite un nuevo equilibrio. Esto garantiza que todas las particiones se asignen a un consumidor.
Después de suscribirse a un tema, el consumidor sondea para buscar nuevos registros:
while (true) {
data = kafkaConsumer.poll();
// do something with 'data'
}
El sondeo devuelve los registros que puede procesar el cliente. Después de procesar los registros, el cliente confirma las compensaciones de forma síncrona, esperando a que finalice el procesamiento antes de continuar con el sondeo.
Asegúrese de guardar el progreso. Ejecute los métodos commitSync()
y commitAsync()
.
No recomendamos la confirmación automática. La confirmación manual es adecuada en la mayoría de los casos de uso.
Acceso a Big Data Service Kafka desde fuera del cluster
Pasos para conectar a un usuario desde el entorno local:
- Asegúrese de que el cliente Kerberos esté instalado en su entorno.
- Copie el separador de claves de Kafka del cluster de la ubicación
/etc/security/keytabs/kafka.service.keytab
a su entorno. - Copie el contenido de
/etc/krb5.conf
del host del broker de cluster en/etc/krb5.conf file
del entorno (esta es la ubicación por defecto del archivo de configuración krb5). - Cree entradas de host para todos los nodos de cluster en el archivo
/etc/hosts
. - Realice kinit desde su entorno con keytab y principal mediante el comando
sudo kinit -kt <keytab path in local> <principal name>
. - Asegúrese de que el ticket se emite mediante el comando
klist
.
Una vez completada la kinit, complete los pasos siguientes para ejecutar Productor/Consumidor:
- Copie
/usr/odh/current/kafka-broker/config/kafka_jaas.conf
en local. - Copie
/usr/odh/current/kafka-broker/config/
enlocalserver.properties
. - Defina la exportación de variable de entorno
KAFKA_OPTS="-Djava.security.auth.login.config=/local/path/to/kafka_jaas.conf"
. -
Asegúrese de que kinit está completo y de que el ticket válido está disponible (no caducado)
kinit -kt /local/keytab/path/kafka.service.keytab kafka/<principal>
.
Con la configuración anterior, puede seguir comandos comunes para ejecutar los clientes de Kafka desde fuera del cluster de Big Data Service o ejecutar cualquier cliente Java/python.