Démarrage rapide du client Java Kafka et de Streaming
Publier et utiliser des messages dans le service Streaming à l'aide du client Java Kafka.
Ce démarrage rapide vous explique comment utiliser le client Java Kafka avec Oracle Cloud Infrastructure Streaming pour publier et utiliser des messages.
Pour plus d'informations, reportez-vous à Utilisation de Streaming avec Apache Kafka. Pour plus d'informations sur les concepts clés et Streaming, reportez-vous à Présentation de Streaming.
Prérequis
-
Afin d'utiliser le client Java Kafka avec Streaming, vous devez disposer des éléments suivants :
- Un compte Oracle Cloud Infrastructure.
- Un utilisateur créé dans ce compte, dans un groupe avec une stratégie qui octroie les droits d'accès requis. Pour obtenir un exemple de configuration d'un nouvel utilisateur, d'un nouveau groupe, d'un nouveau compartiment et d'une nouvelle stratégie, reportez-vous à Ajout d'utilisateurs. Pour obtenir la liste des stratégies standard que vous pouvez utiliser, reportez-vous à Stratégies courantes.
-
Collectez les informations suivantes :
- OCID de flux de données
- Adresse des messages
- OCID de pool de flux de données
- Nom de domaine qualifié complet de pool de flux de données
- Paramètres de la connexion Kafka :
- Serveurs de démarrage
- Chaînes de connexion SASL
- Protocole de sécurité
Pour connaître les étapes de création et de gestion des flux de données et des pools de flux de données, reportez-vous à Gestion des flux de données et à Gestion des pools de flux de données. Les flux de données correspondent à une rubrique Kafka.
- JDK version 8 ou ultérieure installé. Assurez-vous que Java se trouve dans votre PATH.
- Maven version 3.0 ou ultérieure installé. Assurez-vous que Maven se trouve dans votre PATH.
- IntelliJ (recommandé) ou tout autre environnement de développement intégré (IDE).
-
Ajoutez la version la plus récente du fichier JAR ou de la dépendance Maven pour le SDK Java Kafka à votre fichier
pom.xml
comme suit :<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.8.0</version> </dependency>
-
En supposant que
wd
est votre répertoire de travail pour le projet Java de cet exemple, votre fichierpom.xml
se présente comme suit :<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>oci.example</groupId> <artifactId>StreamsExampleWithKafkaApis</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.8.0</version> </dependency> </dependencies> </project>
-
L'authentification avec le protocole Kafka utilise des jetons d'authentification et le mécanisme SASL/PLAIN. Reportez-vous à Utilisation des jetons d'authentification pour plus d'informations sur la génération de jetons d'authentification. Si vous avez créé le flux de données et le pool de flux de données dans OCI, vous êtes déjà autorisé à utiliser ce flux de données conformément à OCI IAM. Vous devez donc créer des jetons d'authentification pour l'utilisateur OCI.
Remarque
Les jetons d'authentification de l'utilisateur OCI ne sont visibles qu'au moment de la création. Copiez-les et conservez-les en lieu sûr pour une utilisation ultérieure.
Production de messages
- Ouvrez votre éditeur favori, tel que Visual Studio Code, à partir du répertoire
wd
. Vous devez déjà disposer des dépendances de kit SDK Kafka pour Java dans le fichierpom.xml
du projet Java Maven si les prérequis ont été respectés. -
Créez un fichier nommé
Producer.java
dans le répertoirewd
sous le chemin/src/main/java/kafka/sdk/oss/example/
avec le code suivant. Remplacez les valeurs des variables du code comme indiqué par les commentaires de code (debootstrapServers
àstreamOrKafkaTopicName
). Ces variables sont destinées aux paramètres de connexion Kafka que vous avez collectés dans les prérequis.package kafka.sdk.oss.example; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class Producer { static String bootstrapServers = "<bootstrap_servers_endpoint>", // usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092 ; static String tenancyName = "<OCI_tenancy_name>"; static String username = "<your_OCI_username>"; static String streamPoolId = "<stream_pool_OCID>"; static String authToken = "<your_OCI_user_auth_token>"; // from step 8 of Prerequisites section static String streamOrKafkaTopicName = "<topic_stream_name>"; // from step 2 of Prerequisites section private static Properties getKafkaProperties() { Properties properties = new Properties(); properties.put("bootstrap.servers", bootstrapServers); properties.put("security.protocol", "SASL_SSL"); properties.put("sasl.mechanism", "PLAIN"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); final String value = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + tenancyName + "/" + username + "/" + streamPoolId + "\" " + "password=\"" + authToken + "\";"; properties.put("sasl.jaas.config", value); properties.put("retries", 3); // retries on transient errors and load balancing disconnection properties.put("max.request.size", 1024 * 1024); // limit request size to 1MB return properties; } public static void main(String args[]) { try { Properties properties = getKafkaProperties(); KafkaProducer producer = new KafkaProducer<>(properties); for(int i=0;i<10;i++) { ProducerRecord<String, String> record = new ProducerRecord<>(streamOrKafkaTopicName, "messageKey" + i, "messageValue" + i); producer.send(record, (md, ex) -> { if (ex != null) { System.err.println("exception occurred in producer for review :" + record.value() + ", exception is " + ex); ex.printStackTrace(); } else { System.err.println("Sent msg to " + md.partition() + " with offset " + md.offset() + " at " + md.timestamp()); } }); } // producer.send() is async, to make sure all messages are sent we use producer.flush() producer.flush(); producer.close(); } catch (Exception e) { System.err.println("Error: exception " + e); } } }
-
A partir du répertoire
wd
, exécutez la commande suivante :mvn clean install exec:java -Dexec.mainClass=kafka.sdk.oss.example.Producer
- Afficher les derniers messages envoyés au flux de données : affichez les derniers messages envoyés au flux de données pour vérifier que la production a réussi.
Utilisation des messages
- Tout d'abord, assurez-vous que le flux de données dont vous souhaitez utiliser des messages en contient. Vous pouvez utiliser la console pour produire un message de test, ou vous servir du flux de données et des messages créés dans ce démarrage rapide.
- Ouvrez votre éditeur favori, tel que Visual Studio Code, à partir du répertoire
wd
sous le chemin/src/main/java/kafka/sdk/oss/example/
. Vous devez déjà disposer des dépendances de kit SDK Kafka pour Java dans le fichierpom.xml
du projet Java Maven si les prérequis ont été respectés. -
Créez un fichier nommé
Consumer.java
dans le répertoirewd
avec le code suivant. Remplacez les valeurs des variables du code comme indiqué par les commentaires de code (debootstrapServers
àconsumerGroupName
). Ces variables sont destinées aux paramètres de connexion Kafka que vous avez collectés dans les prérequis.package kafka.sdk.oss.example; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class Consumer { static String bootstrapServers = "<bootstrap_servers_endpoint>", // usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092 ; static String tenancyName = "<OCI_tenancy_name>"; static String username = "<your_OCI_username>"; static String streamPoolId = "<stream_pool_OCID>"; static String authToken = "<your_OCI_user_auth_token>"; // from step 8 of Prerequisites section static String streamOrKafkaTopicName = "<topic_stream_name>"; // from step 2 of Prerequisites section static String consumerGroupName = "<consumer_group_name>"; private static Properties getKafkaProperties(){ Properties props = new Properties(); props.put("bootstrap.servers", bootstrapServers); props.put("group.id", consumerGroupName); props.put("enable.auto.commit", "false"); props.put("session.timeout.ms", "30000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put("security.protocol", "SASL_SSL"); props.put("sasl.mechanism", "PLAIN"); props.put("auto.offset.reset", "earliest"); final String value = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + tenancyName + "/" + username + "/" + streamPoolId + "\" " + "password=\"" + authToken + "\";"; props.put("sasl.jaas.config", value); return props; } public static void main(String[] args) { final KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(getKafkaProperties());; consumer.subscribe(Collections.singletonList(streamOrKafkaTopicName)); ConsumerRecords<Integer, String> records = consumer.poll(10000); System.out.println("size of records polled is "+ records.count()); for (ConsumerRecord<Integer, String> record : records) { System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset()); } consumer.commitSync(); consumer.close(); } }
-
A partir du répertoire
wd
, exécutez la commande suivante :mvn clean install exec:java -Dexec.mainClass=kafka.sdk.oss.example.Consumer
-
Des messages semblables à celui qui suit s'affichent :
[INFO related maven compiling and building the Java code] size of records polled is 3 Received message: (messageKey0, message value) at offset 1284 Received message: (messageKey0, message value) at offset 1285 Received message: (null, message produced using oci console) at offset 1286
Remarque
Si vous avez utilisé la console pour produire un message de test, la clé de chaque message estNull
.
Etapes suivantes
Pour plus d'informations, reportez-vous aux ressources suivantes :