Kafka Java-Client und Streaming - Schnellstart

Veröffentlichen und konsumieren Sie Nachrichten im Streaming-Service mit dem Kafka-Java-Client.

In diesem Schnellstart wird gezeigt, wie Sie den Kafka-Java-Client mit Oracle Cloud Infrastructure Streaming verwenden, um Nachrichten zu veröffentlichen und zu konsumieren.

Weitere Informationen finden Sie unter Streaming mit Apache Kafka verwenden. Wichtige Konzepte und weitere Streamingdetails finden Sie unter Überblick über Streaming

Voraussetzungen

  1. Um den Kafka-Java-Client mit Streaming zu verwenden, wird Folgendes benötigt:

    • Ein Oracle Cloud Infrastructure-Account.
    • Ein in diesem Account erstellter Benutzer in einer Gruppe mit einer Policy, die die erforderlichen Berechtigungen erteilt. Ein Beispiel für die Einrichtung eines neuen Benutzers, einer neuen Gruppe, eines neuen Compartments und einer neuen Policy finden Sie unter Benutzer hinzufügen. Eine Liste der typischen Policys, die Sie verwenden können, finden Sie unter Allgemeine Policys.
  2. Erfassen Sie die folgenden Details:

    • Stream-OCID
    • Nachrichtenendpunkt
    • Streampool-OCID
    • Streampool-FQDN
    • Kafka-Verbindungseinstellungen:
      • Bootstrap-Server
      • SASL-Verbindungszeichenfolgen
      • Sicherheitsprotokoll

    Die Schritte zum Erstellen und Verwalten von Streams und Streampools finden Sie unter Streams verwalten und Streampools verwalten. Streams entsprechen einem Kafka-Topic.

  3. JDK 8 oder höher ist installiert. Stellen Sie sicher, dass sich Java in Ihrem PATH befindet.
  4. Maven 3.0 oder höher ist installiert. Stellen Sie sicher, dass sich Maven in Ihrem Pfad befindet.
  5. Intellij (empfohlen) oder eine andere Integrated Development Environment (IDE).
  6. Fügen Sie die neueste Version der Maven-Abhängigkeit oder -JAR für das Kafka-Java-SDK wie folgt zu pom.xml hinzu:

    	<dependency>
    		<groupId>org.apache.kafka</groupId>
    		<artifactId>kafka-clients</artifactId>
    		<version>2.8.0</version>
    	</dependency>
    
  7. Wenn Sie wd als Arbeitsverzeichnis für das Java-Projekt in diesem Beispiel verwenden, sieht pom.xml in etwa wie folgt aus:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>oci.example</groupId>
        <artifactId>StreamsExampleWithKafkaApis</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
        </properties>
    
        <dependencies>
    		<dependency>
    			<groupId>org.apache.kafka</groupId>
    			<artifactId>kafka-clients</artifactId>
    			<version>2.8.0</version>
    		</dependency>
        </dependencies>
    </project>
    
  8. Bei der Authentifizierung mit dem Kafka-Protokoll werden Authentifizierungstoken und der SASL/PLAIN-Mechanismus verwendet. Informationen zur Generierung von Authentifizierungstoken finden Sie unter Mit Authentifizierungstoken arbeiten. Wenn Sie den Stream und Streampool in OCI erstellt haben, sind Sie bereits zur Verwendung dieses Streams gemäß OCI IAM autorisiert. Erstellen Sie daher Authentifizierungstoken für den OCI-Benutzer.

    Hinweis

    OCI-Benutzerauthentifizierungstoken sind nur zum Zeitpunkt der Erstellung sichtbar. Kopieren Sie es, und bewahren Sie es für die zukünftige Verwendung an einem sicheren Ort auf.

Nachrichten erzeugen

  1. Öffnen Sie Ihren bevorzugten Editor, wie Visual Studio Code, im Verzeichnis wd. Sie sollten bereits über die Kafka-SDK-Abhängigkeiten für Java als Teil der Datei pom.xml Ihres Maven-Java-Projekts verfügen, nachdem Sie die Schritte unter Voraussetzungen ausgeführt haben.
  2. Erstellen Sie eine neue Datei namens Producer.java im Verzeichnis wd unter dem Pfad /src/main/java/kafka/sdk/oss/example/ mit dem folgenden Code. Ersetzen Sie die Werte der Variablen im Code gemäß den Codekommentaren, nämlich bootstrapServers bis streamOrKafkaTopicName. Diese Variablen gelten für Kafka-Verbindungseinstellungen, die Sie in den Voraussetzungen erfasst haben.

    package kafka.sdk.oss.example;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    
    public class Producer {
    
        static String bootstrapServers = "<bootstrap_servers_endpoint>", // usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092 ;
        static String tenancyName = "<OCI_tenancy_name>";
        static String username = "<your_OCI_username>";
        static String streamPoolId = "<stream_pool_OCID>";
        static String authToken = "<your_OCI_user_auth_token>"; // from step 8 of Prerequisites section
        static String streamOrKafkaTopicName = "<topic_stream_name>"; // from step 2 of Prerequisites section
    
        private static Properties getKafkaProperties() {
            Properties properties = new Properties();
            properties.put("bootstrap.servers", bootstrapServers);
            properties.put("security.protocol", "SASL_SSL");
            properties.put("sasl.mechanism", "PLAIN");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
            final String value = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
                    + tenancyName + "/"
                    + username + "/"
                    + streamPoolId + "\" "
                    + "password=\""
                    + authToken + "\";";
            properties.put("sasl.jaas.config", value);
            properties.put("retries", 3); // retries on transient errors and load balancing disconnection
            properties.put("max.request.size", 1024 * 1024); // limit request size to 1MB
            return properties;
        }
    
        public static void main(String args[]) {
            try {
                Properties properties = getKafkaProperties();
                KafkaProducer producer = new KafkaProducer<>(properties);
    
                for(int i=0;i<10;i++) {
                    ProducerRecord<String, String> record = new ProducerRecord<>(streamOrKafkaTopicName, "messageKey" + i, "messageValue" + i);
                    producer.send(record, (md, ex) -> {
                        if (ex != null) {
                            System.err.println("exception occurred in producer for review :" + record.value()
                                    + ", exception is " + ex);
                            ex.printStackTrace();
                        } else {
                            System.err.println("Sent msg to " + md.partition() + " with offset " + md.offset() + " at " + md.timestamp());
                        }
                    });
                }
                // producer.send() is async, to make sure all messages are sent we use producer.flush()
                producer.flush();
                producer.close();
            } catch (Exception e) {
                System.err.println("Error: exception " + e);
            }
        }
    }
    
  3. Führen Sie im Verzeichnis wd den folgenden Befehl aus:

    mvn clean install exec:java -Dexec.mainClass=kafka.sdk.oss.example.Producer
  4. Zeigen Sie die neuesten Nachrichten an, die an den Stream gesendet wurden, um die neuesten Nachrichten anzuzeigen, die an den Stream gesendet wurden, um zu prüfen, ob die Produktion erfolgreich war.

Nachrichten konsumieren

  1. Stellen Sie zunächst sicher, dass der Stream, aus dem Sie Nachrichten konsumieren möchten, Nachrichten enthält. Sie können eine Testnachricht mit der Konsole erstellen oder den Stream und die Nachrichten verwenden, die wir in diesem Schnellstart erstellt haben.
  2. Öffnen Sie Ihren bevorzugten Editor, wie Visual Studio Code, im Verzeichnis wd unter dem Pfad /src/main/java/kafka/sdk/oss/example/. Sie sollten bereits über die Kafka-SDK-Abhängigkeiten für Java als Teil der Datei pom.xml Ihres Maven-Java-Projekts verfügen, nachdem Sie die Schritte unter Voraussetzungen ausgeführt haben.
  3. Erstellen Sie eine neue Datei namens Consumer.java im Verzeichnis wd mit dem folgenden Code. Ersetzen Sie die Werte der Variablen im Code gemäß den Codekommentaren, nämlich bootstrapServers bis consumerGroupName. Diese Variablen gelten für Kafka-Verbindungseinstellungen, die Sie in den Voraussetzungen erfasst haben.

    package kafka.sdk.oss.example;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    
    public class Consumer {
        static String bootstrapServers = "<bootstrap_servers_endpoint>", // usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092 ;
        static String tenancyName = "<OCI_tenancy_name>";
        static String username = "<your_OCI_username>";
        static String streamPoolId = "<stream_pool_OCID>";
        static String authToken = "<your_OCI_user_auth_token>"; // from step 8 of Prerequisites section
        static String streamOrKafkaTopicName = "<topic_stream_name>"; // from step 2 of Prerequisites section
        static String consumerGroupName = "<consumer_group_name>"; 
    
        private static Properties getKafkaProperties(){
            Properties props = new Properties();
            props.put("bootstrap.servers", bootstrapServers);
            props.put("group.id", consumerGroupName);
            props.put("enable.auto.commit", "false");
            props.put("session.timeout.ms", "30000");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                    "org.apache.kafka.common.serialization.StringDeserializer");
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                    "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("security.protocol", "SASL_SSL");
            props.put("sasl.mechanism", "PLAIN");
            props.put("auto.offset.reset", "earliest");
            final String value = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
                    + tenancyName + "/"
                    + username + "/"
                    + streamPoolId + "\" "
                    + "password=\""
                    + authToken + "\";";
            props.put("sasl.jaas.config", value);
            return props;
        }
    
        public static void main(String[] args) {
            final KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(getKafkaProperties());;
            consumer.subscribe(Collections.singletonList(streamOrKafkaTopicName));
            ConsumerRecords<Integer, String> records = consumer.poll(10000);
    
            System.out.println("size of records polled is "+ records.count());
            for (ConsumerRecord<Integer, String> record : records) {
                System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
            }
    
            consumer.commitSync();
            consumer.close();
        }
    }
    
    
  4. Führen Sie im Verzeichnis wd den folgenden Befehl aus:

    mvn clean install exec:java -Dexec.mainClass=kafka.sdk.oss.example.Consumer
    
  5. Meldungen wie die Folgenden sollten angezeigt werden:

    [INFO related maven compiling and building the Java code]
    size of records polled is 3
    Received message: (messageKey0, message value) at offset 1284
    Received message: (messageKey0, message value) at offset 1285
    Received message: (null, message produced using oci console) at offset 1286
    Hinweis

    Wenn Sie die Konsole zum Erzeugen einer Testnachricht verwendet haben, lautet der Schlüssel für jede Nachricht Null