Client Kafka

I client Kafka sono principalmente componenti dell'ecosistema Kafka, in grado di leggere e scrivere dati negli argomenti di Kafka.

Nota

Durante la configurazione dei client Kafka, il modulo com.sun.security.auth.module.JndiLoginModule non può essere utilizzato per la configurazione JAAS di server e client. Si consiglia di utilizzare il modulo predefinito com.sun.security.auth.module.Krb5LoginModule per l'autenticazione mediante Kerberos.

Comandi comuni

Nota

  • Il servizio Kafka deve essere installato e avviato in uno dei nodi.
  • Per eseguire i comandi comuni, passare da SSH al nodo in cui è installato il servizio Kafka.
  • Per creare un argomento per pubblicare gli eventi, procedere come segue.
    1. Creare un file client.properties con i seguenti dettagli e fornire l'accesso appropriato all'utente Kafka
      sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true
      keyTab="/path/to/kafka.keytab" principal="kafka/<hostname>@REALM";
                                  
      security.protocol=SASL_PLAINTEXT
      sasl.kerberos.service.name=kafka
      bootstrap.servers=<broker_hostname>:6667
      compression.type=none
      sasl.mechanism=GSSAPI
                              
    2. Esegui:
      sudo -u kafka sh /usr/odh/current/kafka-broker/bin/kafka-topics.sh --bootstrap-server <hostname>:6667 --create --topic Test_Topic --partitions 3 --replication-factor 1 --command-config /usr/odh/current/kafka-broker/config/client.properties
  • Per pubblicare eventi negli argomenti:
    1. Creare un file producer.properties con i dettagli riportati di seguito e fornire l'accesso appropriato all'utente Kafka.
      sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required
      useKeyTab=true storeKey=true keyTab="/path/to/kafka.keytab" principal="kafka/<hostname>@REALM";
                                  
      security.protocol=SASL_PLAINTEXT
      sasl.kerberos.service.name=kafka
      bootstrap.servers=<broker_hostname>:6667
      compression.type=none
      sasl.mechanism=GSSAPI
    2. Esegui:
      sudo -u kafka sh /usr/odh/current/kafka-broker/bin/kafka-console-producer.sh --broker-list <hostname>>:6667 --topic Test_Topic --producer.config /usr/odh/current/kafka-broker/config/producer.properties
  • Per leggere i dati pubblicati dagli argomenti:
    1. Creare un file consumer.properties con i dettagli riportati di seguito e fornire l'accesso appropriato all'utente Kafka.
      security.protocol=SASL_PLAINTEXT
      sasl.kerberos.service.name=kafka
      bootstrap.servers=<broker_hostname>>:6667
      compression.type=none
      sasl.mechanism=GSSAPI
      group.id=test-consumer-group   
      sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule
      required useKeyTab=true storeKey=true keyTab="/path/to/kafka.keytab" principal="kafka/<hostname>@REALM";
    2. Esegui:
      sudo -u kafka sh /usr/odh/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server <hostname>:6667 --topic Test_Topic --consumer.config /usr/odh/current/kafka-broker/config/consumer.properties --from-beginning

Produttori Kafka

I produttori di Kafka sono gli editori responsabili della scrittura di dischi su argomenti. In genere, ciò significa scrivere un programma utilizzando l'API KafkaProducer.

Per creare un'istanza di un producer, eseguire le operazioni riportate di seguito.

KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig);

Per ulteriori informazioni sulle impostazioni del producer utilizzate in questo costruttore, vedere quanto segue.

Serializzazione di chiavi e valori

Per ogni producer, è necessario impostare due proprietà di serializzazione, key.serializer (per la chiave) e value.serializer (per il valore). Scrivere codice personalizzato per la serializzazione o utilizzare una delle opzioni fornite da Kafka. Ad esempio:

  • ByteArraySerializer: dati binari
  • StringSerializer: rappresentazioni stringa

conferme

Il percorso di scrittura completo per i record di un produttore è alla partizione principale e quindi a tutte le repliche successive. Il produttore può controllare quale punto del percorso attiva una conferma. A seconda dell'impostazione acks, il producer potrebbe attendere la propagazione della scrittura nel sistema o solo attendere il primo punto di successo.

I valori validi per acks sono:

  • 0: non attendere la conferma dalla partizione (throughput più veloce).
  • 1: attendere solo la risposta della partizione leader.
  • all: attendere che le risposte delle partizioni del follower soddisfino i requisiti minimi (throughput più lento).

Consumatori Kafka

I consumer Kafka sono gli abbonati responsabili della lettura di record da uno o più argomenti e da una o più partizioni di un argomento. I consumer che effettuano la sottoscrizione a un argomento vengono eseguiti manualmente o automaticamente. In genere, ciò significa scrivere un programma utilizzando l'API KafkaConsumer.

Per creare un consumer, eseguire le operazioni riportate di seguito.

KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(consumerConfig);

La classe KafkaConsumer dispone di due parametri di tipo generico. Poiché i produttori possono inviare i dati (i valori) con le chiavi, il consumatore può leggere i dati tramite le chiavi. In questo esempio, sia le chiavi che i valori sono stringhe. Se si definiscono tipi diversi, è necessario definire un deserializzatore per i tipi alternativi. Per i deserializzatori è necessario implementare l'interfaccia org.apache.kafka.common.serialization.Deserializer.

Di seguito sono riportati i parametri di configurazione più importanti da specificare.

  • bootstrap.servers: un elenco di broker a cui connettersi inizialmente. Elenca da due a tre broker. Non è necessario elencare il cluster completo.
  • group.id: ogni consumer appartiene a un gruppo per condividere le partizioni di un topic.
  • key.deserializer/value.deserializer: specificare come convertire la rappresentazione Java in una sequenza di byte per l'invio di dati tramite il protocollo Kafka.

Sottoscrizione a un argomento

Sottoscrizione a un argomento mediante la chiamata al metodo subscribe():

kafkaConsumer.subscribe(Collections.singletonList(topic), rebalanceListener)

Specificare una lista di argomenti da cui si desidera utilizzare e un listener di ribilanciamento. Il riequilibrio è una parte importante della vita del consumatore. Ogni volta che il cluster o lo stato del consumatore cambia, viene emesso un ribilanciamento. Ciò garantisce che tutte le partizioni siano assegnate a un consumer.

Dopo aver eseguito la sottoscrizione a un argomento, il consumatore esegue un sondaggio per trovare nuovi record:

while (true) {
  data = kafkaConsumer.poll();
  // do something with 'data'
}

Il polling restituisce i record che possono essere elaborati dal client. Dopo aver elaborato i record, il client esegue il commit degli offset in modo sincrono, quindi attende il completamento dell'elaborazione prima di continuare il polling.

Assicurarsi di salvare l'avanzamento. Eseguire i metodi commitSync() e commitAsync().

Si sconsiglia il commit automatico. Il commit manuale è appropriato nella maggior parte dei casi d'uso.

Accesso al servizio Big Data Kafka dall'esterno del cluster

Passaggi per avvicinare un utente dal locale:

  1. Verificare che nel proprio ambiente sia installato il client Kerberos.
  2. Copiare la tabella chiavi Kafka dal cluster dalla posizione /etc/security/keytabs/kafka.service.keytab nell'ambiente.
  3. Copiare il contenuto di /etc/krb5.conf dall'host del broker cluster nel file /etc/krb5.conf file dell'ambiente in uso (questa è la posizione predefinita del file di configurazione krb5).
  4. Creare voci host per tutti i nodi del cluster nel file /etc/hosts.
  5. Eseguire il collegamento dall'ambiente utilizzando la tabella chiavi e il nome principale utilizzando il comando sudo kinit -kt <keytab path in local> <principal name>.
  6. Assicurarsi che il ticket venga emesso utilizzando il comando klist.

Al termine del processo di connessione, completare i passi riportati di seguito per eseguire Producer/Consumer.

  1. Copiare /usr/odh/current/kafka-broker/config/kafka_jaas.conf in locale.
  2. Copiare /usr/odh/current/kafka-broker/config/ in localserver.properties.
  3. Impostare l'esportazione della variabile di ambiente KAFKA_OPTS="-Djava.security.auth.login.config=/local/path/to/kafka_jaas.conf".
  4. Assicurarsi che il collegamento sia completo e che sia disponibile un ticket valido (non scaduto) kinit -kt /local/keytab/path/kafka.service.keytab kafka/<principal>.

Con l'impostazione precedente, è possibile seguire i comandi comuni per eseguire i client Kafka dall'esterno del cluster Big Data Service o eseguire qualsiasi client Java/python.