Client Java Kafka et service de diffusion en continu - Démarrage rapide

Publier et consommer des messages dans le service de diffusion en continu à l'aide du client Java Kafka.

Ce démarrage rapide vous montre comment utiliser le client Java Kafka avec le service Oracle Cloud Infrastructure Streaming pour publier et consommer des messages.

Pour plus d'informations, voir Utilisation du service de diffusion en continu avec Apache Kafka. Pour les concepts clés et plus de détails sur le service de diffusion en continu, voir Aperçu du service de diffusion en continu

Préalables

  1. Pour utiliser le client Java Kafka avec le service de diffusion en continu, vous devez disposer des éléments suivants :

    • Un compte Oracle Cloud Infrastructure
    • Utilisateur créé dans ce compte, dans un groupe avec une politique qui accorde les autorisations requises. Pour des exemples sur la configuration d'un nouvel utilisateur, d'un groupe, d'un compartiment et d'une politique, voir Ajout d'utilisateurs. Pour obtenir la liste des politiques types que vous pouvez utiliser, voir Politiques communes.
  2. Collectez les détails suivants :

    • OCID du flux
    • Point d'extrémité pour les messages
    • OCID du groupe de flux
    • Nom de domaine complet du groupe de flux
    • Paramètres de connexion Kafka :
      • Serveurs d'amorçage
      • Chaînes de connexion SASL
      • Protocole de sécurité

    Pour les étapes de création et de gestion des flux et des groupes de flux, voir Gestion des flux et Gestion des groupes de flux. Les flux correspondent à une rubrique Kafka.

  3. JDK 8 ou une version supérieure doit être installé. Assurez-vous que Java se trouve dans votre chemin d'accès.
  4. Maven 3.0 doit être installé. Assurez-vous que Maven se trouve dans votre chemin d'accès.
  5. Intellij (recommandé) ou tout autre environnement de développement intégré (IDE).
  6. Ajoutez la dernière version de la dépendance ou du fichier Jar Maven pour la trousse SDK pour Java Kafka à votre pom.xml comme suit :

    	<dependency>
    		<groupId>org.apache.kafka</groupId>
    		<artifactId>kafka-clients</artifactId>
    		<version>2.8.0</version>
    	</dependency>
    
  7. En supposant que wd soit le répertoire de travail pour votre projet Java dans cet exemple, pom.xml ressemblera à ce qui suit :

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>oci.example</groupId>
        <artifactId>StreamsExampleWithKafkaApis</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
        </properties>
    
        <dependencies>
    		<dependency>
    			<groupId>org.apache.kafka</groupId>
    			<artifactId>kafka-clients</artifactId>
    			<version>2.8.0</version>
    		</dependency>
        </dependencies>
    </project>
    
  8. L'authentification avec le protocole Kafka utilise des jetons d'authentification et le mécanisme SASL/PLAIN. Voir Utilisation des jetons d'authentification pour la génération du jeton d'authentification. Si vous avez créé le flux et le groupe de flux dans OCI, vous êtes déjà autorisé à utiliser ce flux selon le service GIA pour OCI, de sorte que vous devez créer des jetons d'authentification pour votre utilisateur OCI.

    Note

    Les jetons d'authentification d'utilisateur OCI ne sont visibles qu'au moment de la création. Copiez-le et conservez-le dans un endroit sûr pour une utilisation future.

Production de messages

  1. Ouvrez votre éditeur favori, tel que Visual Studio Code, à partir du répertoire wd. Vous devez déjà avoir les dépendances de la trousse SDK Kafka pour Java dans le cadre du fichier pom.xml de votre projet Maven Java après avoir satisfait les préalables.
  2. Créez un nouveau fichier nommé Producer.java dans le répertoire wd sous le chemin /src/main/java/kafka/sdk/oss/example/ avec le code suivant. Remplacez les valeurs des variables du code comme indiqué par les commentaires du code, à savoir bootstrapServers à streamOrKafkaTopicName. Ces variables concernent les paramètres de connexion Kafka que vous avez collectés dans les préalables.

    package kafka.sdk.oss.example;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    
    public class Producer {
    
        static String bootstrapServers = "<bootstrap_servers_endpoint>", // usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092 ;
        static String tenancyName = "<OCI_tenancy_name>";
        static String username = "<your_OCI_username>";
        static String streamPoolId = "<stream_pool_OCID>";
        static String authToken = "<your_OCI_user_auth_token>"; // from step 8 of Prerequisites section
        static String streamOrKafkaTopicName = "<topic_stream_name>"; // from step 2 of Prerequisites section
    
        private static Properties getKafkaProperties() {
            Properties properties = new Properties();
            properties.put("bootstrap.servers", bootstrapServers);
            properties.put("security.protocol", "SASL_SSL");
            properties.put("sasl.mechanism", "PLAIN");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
            final String value = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
                    + tenancyName + "/"
                    + username + "/"
                    + streamPoolId + "\" "
                    + "password=\""
                    + authToken + "\";";
            properties.put("sasl.jaas.config", value);
            properties.put("retries", 3); // retries on transient errors and load balancing disconnection
            properties.put("max.request.size", 1024 * 1024); // limit request size to 1MB
            return properties;
        }
    
        public static void main(String args[]) {
            try {
                Properties properties = getKafkaProperties();
                KafkaProducer producer = new KafkaProducer<>(properties);
    
                for(int i=0;i<10;i++) {
                    ProducerRecord<String, String> record = new ProducerRecord<>(streamOrKafkaTopicName, "messageKey" + i, "messageValue" + i);
                    producer.send(record, (md, ex) -> {
                        if (ex != null) {
                            System.err.println("exception occurred in producer for review :" + record.value()
                                    + ", exception is " + ex);
                            ex.printStackTrace();
                        } else {
                            System.err.println("Sent msg to " + md.partition() + " with offset " + md.offset() + " at " + md.timestamp());
                        }
                    });
                }
                // producer.send() is async, to make sure all messages are sent we use producer.flush()
                producer.flush();
                producer.close();
            } catch (Exception e) {
                System.err.println("Error: exception " + e);
            }
        }
    }
    
  3. Depuis le répertoire wd, exécutez la commande suivante :

    mvn clean install exec:java -Dexec.mainClass=kafka.sdk.oss.example.Producer
  4. Afficher les derniers messages envoyés au flux pour voir les derniers messages envoyés au flux pour vérifier que la production a réussi.

Consommation de messages

  1. Assurez-vous tout d'abord que le flux à partir duquel vous voulez consommer des messages contient des messages. Vous pouvez utiliser la console pour produire un message de test ou utiliser le flux et les messages que nous avons créés dans ce démarrage rapide.
  2. Ouvrez votre éditeur favori, tel que Visual Studio Code, à partir du répertoire wd sous le chemin /src/main/java/kafka/sdk/oss/example/. Vous devez déjà avoir les dépendances de la trousse SDK Kafka pour Java dans le cadre du fichier pom.xml de votre projet Maven Java après avoir satisfait les préalables.
  3. Créez un nouveau fichier nommé Consumer.java dans le répertoire wd avec le code suivant. Remplacez les valeurs des variables du code comme indiqué par les commentaires du code, à savoir bootstrapServers à consumerGroupName. Ces variables concernent les paramètres de connexion Kafka que vous avez collectés dans les préalables.

    package kafka.sdk.oss.example;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    
    public class Consumer {
        static String bootstrapServers = "<bootstrap_servers_endpoint>", // usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092 ;
        static String tenancyName = "<OCI_tenancy_name>";
        static String username = "<your_OCI_username>";
        static String streamPoolId = "<stream_pool_OCID>";
        static String authToken = "<your_OCI_user_auth_token>"; // from step 8 of Prerequisites section
        static String streamOrKafkaTopicName = "<topic_stream_name>"; // from step 2 of Prerequisites section
        static String consumerGroupName = "<consumer_group_name>"; 
    
        private static Properties getKafkaProperties(){
            Properties props = new Properties();
            props.put("bootstrap.servers", bootstrapServers);
            props.put("group.id", consumerGroupName);
            props.put("enable.auto.commit", "false");
            props.put("session.timeout.ms", "30000");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                    "org.apache.kafka.common.serialization.StringDeserializer");
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                    "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("security.protocol", "SASL_SSL");
            props.put("sasl.mechanism", "PLAIN");
            props.put("auto.offset.reset", "earliest");
            final String value = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
                    + tenancyName + "/"
                    + username + "/"
                    + streamPoolId + "\" "
                    + "password=\""
                    + authToken + "\";";
            props.put("sasl.jaas.config", value);
            return props;
        }
    
        public static void main(String[] args) {
            final KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(getKafkaProperties());;
            consumer.subscribe(Collections.singletonList(streamOrKafkaTopicName));
            ConsumerRecords<Integer, String> records = consumer.poll(10000);
    
            System.out.println("size of records polled is "+ records.count());
            for (ConsumerRecord<Integer, String> record : records) {
                System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
            }
    
            consumer.commitSync();
            consumer.close();
        }
    }
    
    
  4. Depuis le répertoire wd, exécutez la commande suivante :

    mvn clean install exec:java -Dexec.mainClass=kafka.sdk.oss.example.Consumer
    
  5. Des messages similaires aux suivants doivent s'afficher :

    [INFO related maven compiling and building the Java code]
    size of records polled is 3
    Received message: (messageKey0, message value) at offset 1284
    Received message: (messageKey0, message value) at offset 1285
    Received message: (null, message produced using oci console) at offset 1286
    Note

    Si vous avez utilisé la console pour produire un message de test, la clé de chaque message est Null.