Quickstart client Kafka Java e streaming
Pubblica e utilizza i messaggi nel servizio di streaming utilizzando il client Java Kafka.
Questo avvio rapido mostra come utilizzare il client Java Kafka con Oracle Cloud Infrastructure Streaming per pubblicare e utilizzare i messaggi.
Per ulteriori informazioni, vedere Utilizzo dello streaming con Apache Kafka. Per i concetti chiave e ulteriori dettagli sullo streaming, vedere Panoramica dello streaming
Prerequisiti
-
Per utilizzare il client Java Kafka con Streaming, è necessario disporre dei seguenti elementi:
- Un account Oracle Cloud Infrastructure.
- Utente creato in tale account, in un gruppo con un criterio che concede le autorizzazioni necessarie. Per un esempio su come impostare un nuovo utente, gruppo, compartimento e criterio, vedere Aggiunta di utenti. Per un elenco dei criteri tipici che si desidera utilizzare, vedere Criteri comuni.
-
Raccogliere i seguenti dettagli:
- OCID flusso
- endpoint messaggi
- OCID pool di flussi
- FQDN del pool di flussi
- Impostazioni di connessione Kafka:
- Server bootstrap
- Stringhe di connessione SASL
- Protocollo di sicurezza
Per i passi per creare e gestire i flussi e i pool di flussi, vedere Gestione dei flussi e Gestione dei pool di flussi. I flussi corrispondono a un argomento Kafka.
- JDK 8 o versione successiva installato. Assicurati che Java sia nel tuo PATH.
- Maven 3.0 o installato. Assicurati che Maven sia nel tuo percorso.
- Intellij (consigliato) o qualsiasi altro ambiente di sviluppo integrato (IDE).
-
Aggiungere la versione più recente della dipendenza o del file jar Maven per Kafka Java SDK all'indirizzo
pom.xml
come indicato di seguito.<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.8.0</version> </dependency>
-
Supponendo
wd
come directory di lavoro per il progetto Java di questo esempio, l'aspetto dipom.xml
sarà simile al seguente:<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>oci.example</groupId> <artifactId>StreamsExampleWithKafkaApis</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.8.0</version> </dependency> </dependencies> </project>
-
L'autenticazione con il protocollo Kafka utilizza i token di autenticazione e il meccanismo SASL/PLAIN. Per la generazione del token di autenticazione, vedere Utilizzo dei token di autenticazione. Se hai creato il flusso e il pool di flussi in OCI, sei già autorizzato a utilizzare questo flusso in base a IAM OCI, quindi devi creare token di autenticazione per l'utente OCI.
Nota
I token di autenticazione utente OCI sono visibili solo al momento della creazione. Copialo e conservalo in un luogo sicuro per un uso futuro.
Produzione di messaggi
- Aprire l'editor preferito, ad esempio Visual Studio Code, dalla directory
wd
. È necessario disporre già delle dipendenze SDK Kafka per Java nell'ambito del progettopom.xml
Java Maven dopo aver soddisfatto i prerequisiti. -
Creare un nuovo file denominato
Producer.java
nella directorywd
sotto il percorso/src/main/java/kafka/sdk/oss/example/
con il codice seguente. Sostituire i valori delle variabili nel codice come indicato dai commenti del codice, ovvero dabootstrapServers
astreamOrKafkaTopicName
. Queste variabili si riferiscono alle impostazioni di connessione Kafka raccolte nei prerequisiti.package kafka.sdk.oss.example; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class Producer { static String bootstrapServers = "<bootstrap_servers_endpoint>", // usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092 ; static String tenancyName = "<OCI_tenancy_name>"; static String username = "<your_OCI_username>"; static String streamPoolId = "<stream_pool_OCID>"; static String authToken = "<your_OCI_user_auth_token>"; // from step 8 of Prerequisites section static String streamOrKafkaTopicName = "<topic_stream_name>"; // from step 2 of Prerequisites section private static Properties getKafkaProperties() { Properties properties = new Properties(); properties.put("bootstrap.servers", bootstrapServers); properties.put("security.protocol", "SASL_SSL"); properties.put("sasl.mechanism", "PLAIN"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); final String value = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + tenancyName + "/" + username + "/" + streamPoolId + "\" " + "password=\"" + authToken + "\";"; properties.put("sasl.jaas.config", value); properties.put("retries", 3); // retries on transient errors and load balancing disconnection properties.put("max.request.size", 1024 * 1024); // limit request size to 1MB return properties; } public static void main(String args[]) { try { Properties properties = getKafkaProperties(); KafkaProducer producer = new KafkaProducer<>(properties); for(int i=0;i<10;i++) { ProducerRecord<String, String> record = new ProducerRecord<>(streamOrKafkaTopicName, "messageKey" + i, "messageValue" + i); producer.send(record, (md, ex) -> { if (ex != null) { System.err.println("exception occurred in producer for review :" + record.value() + ", exception is " + ex); ex.printStackTrace(); } else { System.err.println("Sent msg to " + md.partition() + " with offset " + md.offset() + " at " + md.timestamp()); } }); } // producer.send() is async, to make sure all messages are sent we use producer.flush() producer.flush(); producer.close(); } catch (Exception e) { System.err.println("Error: exception " + e); } } }
-
Dalla directory
wd
, eseguire il comando seguente:mvn clean install exec:java -Dexec.mainClass=kafka.sdk.oss.example.Producer
- Mostra i messaggi più recenti inviati al flusso per visualizzare i messaggi più recenti inviati al flusso per verificare che la produzione sia riuscita.
Messaggi di consumo
- In primo luogo, assicurarsi che il flusso da cui si desidera utilizzare i messaggi contenga messaggi. È possibile utilizzare la console per generare un messaggio di test oppure utilizzare il flusso e i messaggi creati in questo avvio rapido.
- Aprire l'editor preferito, ad esempio Visual Studio Code, dalla directory
wd
sotto il percorso/src/main/java/kafka/sdk/oss/example/
. È necessario disporre già delle dipendenze SDK Kafka per Java nell'ambito del progettopom.xml
Java Maven dopo aver soddisfatto i prerequisiti. -
Creare un nuovo file denominato
Consumer.java
nella directorywd
con il codice seguente. Sostituire i valori delle variabili nel codice come indicato dai commenti del codice, ovvero dabootstrapServers
aconsumerGroupName
. Queste variabili si riferiscono alle impostazioni di connessione Kafka raccolte nei prerequisiti.package kafka.sdk.oss.example; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class Consumer { static String bootstrapServers = "<bootstrap_servers_endpoint>", // usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092 ; static String tenancyName = "<OCI_tenancy_name>"; static String username = "<your_OCI_username>"; static String streamPoolId = "<stream_pool_OCID>"; static String authToken = "<your_OCI_user_auth_token>"; // from step 8 of Prerequisites section static String streamOrKafkaTopicName = "<topic_stream_name>"; // from step 2 of Prerequisites section static String consumerGroupName = "<consumer_group_name>"; private static Properties getKafkaProperties(){ Properties props = new Properties(); props.put("bootstrap.servers", bootstrapServers); props.put("group.id", consumerGroupName); props.put("enable.auto.commit", "false"); props.put("session.timeout.ms", "30000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put("security.protocol", "SASL_SSL"); props.put("sasl.mechanism", "PLAIN"); props.put("auto.offset.reset", "earliest"); final String value = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + tenancyName + "/" + username + "/" + streamPoolId + "\" " + "password=\"" + authToken + "\";"; props.put("sasl.jaas.config", value); return props; } public static void main(String[] args) { final KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(getKafkaProperties());; consumer.subscribe(Collections.singletonList(streamOrKafkaTopicName)); ConsumerRecords<Integer, String> records = consumer.poll(10000); System.out.println("size of records polled is "+ records.count()); for (ConsumerRecord<Integer, String> record : records) { System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset()); } consumer.commitSync(); consumer.close(); } }
-
Dalla directory
wd
, eseguire il comando seguente:mvn clean install exec:java -Dexec.mainClass=kafka.sdk.oss.example.Consumer
-
Dovresti vedere messaggi simili ai seguenti:
[INFO related maven compiling and building the Java code] size of records polled is 3 Received message: (messageKey0, message value) at offset 1284 Received message: (messageKey0, message value) at offset 1285 Received message: (null, message produced using oci console) at offset 1286
Nota
Se è stata utilizzata la console per generare un messaggio di test, la chiave per ogni messaggio èNull
Passo successivo
Per ulteriori informazioni, consultare le risorse elencate di seguito.