Kafka Java-Client und Streaming - Schnellstart
Veröffentlichen und konsumieren Sie Nachrichten im Streaming-Service mit dem Kafka-Java-Client.
In diesem Schnellstart wird gezeigt, wie Sie den Kafka-Java-Client mit Oracle Cloud Infrastructure Streaming verwenden, um Nachrichten zu veröffentlichen und zu konsumieren.
Weitere Informationen finden Sie unter Streaming mit Apache Kafka verwenden. Wichtige Konzepte und weitere Streamingdetails finden Sie unter Überblick über Streaming
Voraussetzungen
-
Um den Kafka-Java-Client mit Streaming zu verwenden, wird Folgendes benötigt:
- Ein Oracle Cloud Infrastructure-Account.
- Ein in diesem Account erstellter Benutzer in einer Gruppe mit einer Policy, die die erforderlichen Berechtigungen erteilt. Ein Beispiel für die Einrichtung eines neuen Benutzers, einer neuen Gruppe, eines neuen Compartments und einer neuen Policy finden Sie unter Benutzer hinzufügen. Eine Liste der typischen Policys, die Sie verwenden können, finden Sie unter Allgemeine Policys.
-
Erfassen Sie die folgenden Details:
- Stream-OCID
- Nachrichtenendpunkt
- Streampool-OCID
- Streampool-FQDN
- Kafka-Verbindungseinstellungen:
- Bootstrap-Server
- SASL-Verbindungszeichenfolgen
- Sicherheitsprotokoll
Die Schritte zum Erstellen und Verwalten von Streams und Streampools finden Sie unter Streams verwalten und Streampools verwalten. Streams entsprechen einem Kafka-Topic.
- JDK 8 oder höher ist installiert. Stellen Sie sicher, dass sich Java in Ihrem PATH befindet.
- Maven 3.0 oder höher ist installiert. Stellen Sie sicher, dass sich Maven in Ihrem Pfad befindet.
- Intellij (empfohlen) oder eine andere Integrated Development Environment (IDE).
-
Fügen Sie die neueste Version der Maven-Abhängigkeit oder -JAR für das Kafka-Java-SDK wie folgt zu
pom.xml
hinzu:<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.8.0</version> </dependency>
-
Wenn Sie
wd
als Arbeitsverzeichnis für das Java-Projekt in diesem Beispiel verwenden, siehtpom.xml
in etwa wie folgt aus:<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>oci.example</groupId> <artifactId>StreamsExampleWithKafkaApis</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.8.0</version> </dependency> </dependencies> </project>
-
Bei der Authentifizierung mit dem Kafka-Protokoll werden Authentifizierungstoken und der SASL/PLAIN-Mechanismus verwendet. Informationen zur Generierung von Authentifizierungstoken finden Sie unter Mit Authentifizierungstoken arbeiten. Wenn Sie den Stream und Streampool in OCI erstellt haben, sind Sie bereits zur Verwendung dieses Streams gemäß OCI IAM autorisiert. Erstellen Sie daher Authentifizierungstoken für den OCI-Benutzer.
Hinweis
OCI-Benutzerauthentifizierungstoken sind nur zum Zeitpunkt der Erstellung sichtbar. Kopieren Sie es, und bewahren Sie es für die zukünftige Verwendung an einem sicheren Ort auf.
Nachrichten erzeugen
- Öffnen Sie Ihren bevorzugten Editor, wie Visual Studio Code, im Verzeichnis
wd
. Sie sollten bereits über die Kafka-SDK-Abhängigkeiten für Java als Teil der Dateipom.xml
Ihres Maven-Java-Projekts verfügen, nachdem Sie die Schritte unter Voraussetzungen ausgeführt haben. -
Erstellen Sie eine neue Datei namens
Producer.java
im Verzeichniswd
unter dem Pfad/src/main/java/kafka/sdk/oss/example/
mit dem folgenden Code. Ersetzen Sie die Werte der Variablen im Code gemäß den Codekommentaren, nämlichbootstrapServers
bisstreamOrKafkaTopicName
. Diese Variablen gelten für Kafka-Verbindungseinstellungen, die Sie in den Voraussetzungen erfasst haben.package kafka.sdk.oss.example; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class Producer { static String bootstrapServers = "<bootstrap_servers_endpoint>", // usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092 ; static String tenancyName = "<OCI_tenancy_name>"; static String username = "<your_OCI_username>"; static String streamPoolId = "<stream_pool_OCID>"; static String authToken = "<your_OCI_user_auth_token>"; // from step 8 of Prerequisites section static String streamOrKafkaTopicName = "<topic_stream_name>"; // from step 2 of Prerequisites section private static Properties getKafkaProperties() { Properties properties = new Properties(); properties.put("bootstrap.servers", bootstrapServers); properties.put("security.protocol", "SASL_SSL"); properties.put("sasl.mechanism", "PLAIN"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); final String value = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + tenancyName + "/" + username + "/" + streamPoolId + "\" " + "password=\"" + authToken + "\";"; properties.put("sasl.jaas.config", value); properties.put("retries", 3); // retries on transient errors and load balancing disconnection properties.put("max.request.size", 1024 * 1024); // limit request size to 1MB return properties; } public static void main(String args[]) { try { Properties properties = getKafkaProperties(); KafkaProducer producer = new KafkaProducer<>(properties); for(int i=0;i<10;i++) { ProducerRecord<String, String> record = new ProducerRecord<>(streamOrKafkaTopicName, "messageKey" + i, "messageValue" + i); producer.send(record, (md, ex) -> { if (ex != null) { System.err.println("exception occurred in producer for review :" + record.value() + ", exception is " + ex); ex.printStackTrace(); } else { System.err.println("Sent msg to " + md.partition() + " with offset " + md.offset() + " at " + md.timestamp()); } }); } // producer.send() is async, to make sure all messages are sent we use producer.flush() producer.flush(); producer.close(); } catch (Exception e) { System.err.println("Error: exception " + e); } } }
-
Führen Sie im Verzeichnis
wd
den folgenden Befehl aus:mvn clean install exec:java -Dexec.mainClass=kafka.sdk.oss.example.Producer
- Zeigen Sie die neuesten Nachrichten an, die an den Stream gesendet wurden, um die neuesten Nachrichten anzuzeigen, die an den Stream gesendet wurden, um zu prüfen, ob die Produktion erfolgreich war.
Nachrichten konsumieren
- Stellen Sie zunächst sicher, dass der Stream, aus dem Sie Nachrichten konsumieren möchten, Nachrichten enthält. Sie können eine Testnachricht mit der Konsole erstellen oder den Stream und die Nachrichten verwenden, die wir in diesem Schnellstart erstellt haben.
- Öffnen Sie Ihren bevorzugten Editor, wie Visual Studio Code, im Verzeichnis
wd
unter dem Pfad/src/main/java/kafka/sdk/oss/example/
. Sie sollten bereits über die Kafka-SDK-Abhängigkeiten für Java als Teil der Dateipom.xml
Ihres Maven-Java-Projekts verfügen, nachdem Sie die Schritte unter Voraussetzungen ausgeführt haben. -
Erstellen Sie eine neue Datei namens
Consumer.java
im Verzeichniswd
mit dem folgenden Code. Ersetzen Sie die Werte der Variablen im Code gemäß den Codekommentaren, nämlichbootstrapServers
bisconsumerGroupName
. Diese Variablen gelten für Kafka-Verbindungseinstellungen, die Sie in den Voraussetzungen erfasst haben.package kafka.sdk.oss.example; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class Consumer { static String bootstrapServers = "<bootstrap_servers_endpoint>", // usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092 ; static String tenancyName = "<OCI_tenancy_name>"; static String username = "<your_OCI_username>"; static String streamPoolId = "<stream_pool_OCID>"; static String authToken = "<your_OCI_user_auth_token>"; // from step 8 of Prerequisites section static String streamOrKafkaTopicName = "<topic_stream_name>"; // from step 2 of Prerequisites section static String consumerGroupName = "<consumer_group_name>"; private static Properties getKafkaProperties(){ Properties props = new Properties(); props.put("bootstrap.servers", bootstrapServers); props.put("group.id", consumerGroupName); props.put("enable.auto.commit", "false"); props.put("session.timeout.ms", "30000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put("security.protocol", "SASL_SSL"); props.put("sasl.mechanism", "PLAIN"); props.put("auto.offset.reset", "earliest"); final String value = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + tenancyName + "/" + username + "/" + streamPoolId + "\" " + "password=\"" + authToken + "\";"; props.put("sasl.jaas.config", value); return props; } public static void main(String[] args) { final KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(getKafkaProperties());; consumer.subscribe(Collections.singletonList(streamOrKafkaTopicName)); ConsumerRecords<Integer, String> records = consumer.poll(10000); System.out.println("size of records polled is "+ records.count()); for (ConsumerRecord<Integer, String> record : records) { System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset()); } consumer.commitSync(); consumer.close(); } }
-
Führen Sie im Verzeichnis
wd
den folgenden Befehl aus:mvn clean install exec:java -Dexec.mainClass=kafka.sdk.oss.example.Consumer
-
Meldungen wie die Folgenden sollten angezeigt werden:
[INFO related maven compiling and building the Java code] size of records polled is 3 Received message: (messageKey0, message value) at offset 1284 Received message: (messageKey0, message value) at offset 1285 Received message: (null, message produced using oci console) at offset 1286
Hinweis
Wenn Sie die Konsole zum Erzeugen einer Testnachricht verwendet haben, lautet der Schlüssel für jede NachrichtNull
Nächste Schritte
Weitere Informationen finden Sie in den folgenden Ressourcen: