Client Kafka
I client Kafka sono principalmente componenti dell'ecosistema Kafka, in grado di leggere e scrivere dati negli argomenti di Kafka.
Durante la configurazione dei client Kafka, il modulo
com.sun.security.auth.module.JndiLoginModule
non può essere utilizzato per la configurazione JAAS di server e client. Si consiglia di utilizzare il modulo predefinito com.sun.security.auth.module.Krb5LoginModule
per l'autenticazione mediante Kerberos.Comandi comuni
- Il servizio Kafka deve essere installato e avviato in uno dei nodi.
- Per eseguire i comandi comuni, passare da SSH al nodo in cui è installato il servizio Kafka.
- Per creare un argomento per pubblicare gli eventi, procedere come segue.
- Creare un file
client.properties
con i seguenti dettagli e fornire l'accesso appropriato all'utente Kafkasasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/path/to/kafka.keytab" principal="kafka/<hostname>@REALM"; security.protocol=SASL_PLAINTEXT sasl.kerberos.service.name=kafka bootstrap.servers=<broker_hostname>:6667 compression.type=none sasl.mechanism=GSSAPI
- Esegui:
sudo -u kafka sh /usr/odh/current/kafka-broker/bin/kafka-topics.sh --bootstrap-server <hostname>:6667 --create --topic Test_Topic --partitions 3 --replication-factor 1 --command-config /usr/odh/current/kafka-broker/config/client.properties
- Creare un file
- Per pubblicare eventi negli argomenti:
- Creare un file
producer.properties
con i dettagli riportati di seguito e fornire l'accesso appropriato all'utente Kafka.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/path/to/kafka.keytab" principal="kafka/<hostname>@REALM"; security.protocol=SASL_PLAINTEXT sasl.kerberos.service.name=kafka bootstrap.servers=<broker_hostname>:6667 compression.type=none sasl.mechanism=GSSAPI
- Esegui:
sudo -u kafka sh /usr/odh/current/kafka-broker/bin/kafka-console-producer.sh --broker-list <hostname>>:6667 --topic Test_Topic --producer.config /usr/odh/current/kafka-broker/config/producer.properties
- Creare un file
- Per leggere i dati pubblicati dagli argomenti:
- Creare un file
consumer.properties
con i dettagli riportati di seguito e fornire l'accesso appropriato all'utente Kafka.security.protocol=SASL_PLAINTEXT sasl.kerberos.service.name=kafka bootstrap.servers=<broker_hostname>>:6667 compression.type=none sasl.mechanism=GSSAPI group.id=test-consumer-group sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/path/to/kafka.keytab" principal="kafka/<hostname>@REALM";
- Esegui:
sudo -u kafka sh /usr/odh/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server <hostname>:6667 --topic Test_Topic --consumer.config /usr/odh/current/kafka-broker/config/consumer.properties --from-beginning
- Creare un file
Produttori Kafka
I produttori di Kafka sono gli editori responsabili della scrittura di dischi su argomenti. In genere, ciò significa scrivere un programma utilizzando l'API KafkaProducer.
Per creare un'istanza di un producer, eseguire le operazioni riportate di seguito.
KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig);
Per ulteriori informazioni sulle impostazioni del producer utilizzate in questo costruttore, vedere quanto segue.
Serializzazione di chiavi e valori
Per ogni producer, è necessario impostare due proprietà di serializzazione, key.serializer
(per la chiave) e value.serializer
(per il valore). Scrivere codice personalizzato per la serializzazione o utilizzare una delle opzioni fornite da Kafka. Ad esempio:
ByteArraySerializer
: dati binariStringSerializer
: rappresentazioni stringa
conferme
Il percorso di scrittura completo per i record di un produttore è alla partizione principale e quindi a tutte le repliche successive. Il produttore può controllare quale punto del percorso attiva una conferma. A seconda dell'impostazione acks
, il producer potrebbe attendere la propagazione della scrittura nel sistema o solo attendere il primo punto di successo.
I valori validi per acks
sono:
0
: non attendere la conferma dalla partizione (throughput più veloce).1
: attendere solo la risposta della partizione leader.all
: attendere che le risposte delle partizioni del follower soddisfino i requisiti minimi (throughput più lento).
Consumatori Kafka
I consumer Kafka sono gli abbonati responsabili della lettura di record da uno o più argomenti e da una o più partizioni di un argomento. I consumer che effettuano la sottoscrizione a un argomento vengono eseguiti manualmente o automaticamente. In genere, ciò significa scrivere un programma utilizzando l'API KafkaConsumer.
Per creare un consumer, eseguire le operazioni riportate di seguito.
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(consumerConfig);
La classe KafkaConsumer dispone di due parametri di tipo generico. Poiché i produttori possono inviare i dati (i valori) con le chiavi, il consumatore può leggere i dati tramite le chiavi. In questo esempio, sia le chiavi che i valori sono stringhe. Se si definiscono tipi diversi, è necessario definire un deserializzatore per i tipi alternativi. Per i deserializzatori è necessario implementare l'interfaccia org.apache.kafka.common.serialization.Deserializer
.
Di seguito sono riportati i parametri di configurazione più importanti da specificare.
bootstrap.servers
: un elenco di broker a cui connettersi inizialmente. Elenca da due a tre broker. Non è necessario elencare il cluster completo.group.id
: ogni consumer appartiene a un gruppo per condividere le partizioni di un topic.key.deserializer/value.deserializer
: specificare come convertire la rappresentazione Java in una sequenza di byte per l'invio di dati tramite il protocollo Kafka.
Sottoscrizione a un argomento
Sottoscrizione a un argomento mediante la chiamata al metodo subscribe()
:
kafkaConsumer.subscribe(Collections.singletonList(topic), rebalanceListener)
Specificare una lista di argomenti da cui si desidera utilizzare e un listener di ribilanciamento. Il riequilibrio è una parte importante della vita del consumatore. Ogni volta che il cluster o lo stato del consumatore cambia, viene emesso un ribilanciamento. Ciò garantisce che tutte le partizioni siano assegnate a un consumer.
Dopo aver eseguito la sottoscrizione a un argomento, il consumatore esegue un sondaggio per trovare nuovi record:
while (true) {
data = kafkaConsumer.poll();
// do something with 'data'
}
Il polling restituisce i record che possono essere elaborati dal client. Dopo aver elaborato i record, il client esegue il commit degli offset in modo sincrono, quindi attende il completamento dell'elaborazione prima di continuare il polling.
Assicurarsi di salvare l'avanzamento. Eseguire i metodi commitSync()
e commitAsync()
.
Si sconsiglia il commit automatico. Il commit manuale è appropriato nella maggior parte dei casi d'uso.
Accesso al servizio Big Data Kafka dall'esterno del cluster
Passaggi per avvicinare un utente dal locale:
- Verificare che nel proprio ambiente sia installato il client Kerberos.
- Copiare la tabella chiavi Kafka dal cluster dalla posizione
/etc/security/keytabs/kafka.service.keytab
nell'ambiente. - Copiare il contenuto di
/etc/krb5.conf
dall'host del broker cluster nel file/etc/krb5.conf file
dell'ambiente in uso (questa è la posizione predefinita del file di configurazione krb5). - Creare voci host per tutti i nodi del cluster nel file
/etc/hosts
. - Eseguire il collegamento dall'ambiente utilizzando la tabella chiavi e il nome principale utilizzando il comando
sudo kinit -kt <keytab path in local> <principal name>
. - Assicurarsi che il ticket venga emesso utilizzando il comando
klist
.
Al termine del processo di connessione, completare i passi riportati di seguito per eseguire Producer/Consumer.
- Copiare
/usr/odh/current/kafka-broker/config/kafka_jaas.conf
in locale. - Copiare
/usr/odh/current/kafka-broker/config/
inlocalserver.properties
. - Impostare l'esportazione della variabile di ambiente
KAFKA_OPTS="-Djava.security.auth.login.config=/local/path/to/kafka_jaas.conf"
. -
Assicurarsi che il collegamento sia completo e che sia disponibile un ticket valido (non scaduto)
kinit -kt /local/keytab/path/kafka.service.keytab kafka/<principal>
.
Con l'impostazione precedente, è possibile seguire i comandi comuni per eseguire i client Kafka dall'esterno del cluster Big Data Service o eseguire qualsiasi client Java/python.