Quickstart client Kafka Java e streaming

Pubblica e utilizza i messaggi nel servizio di streaming utilizzando il client Java Kafka.

Questo avvio rapido mostra come utilizzare il client Java Kafka con Oracle Cloud Infrastructure Streaming per pubblicare e utilizzare i messaggi.

Per ulteriori informazioni, vedere Utilizzo dello streaming con Apache Kafka. Per i concetti chiave e ulteriori dettagli sullo streaming, vedere Panoramica dello streaming

Prerequisiti

  1. Per utilizzare il client Java Kafka con Streaming, è necessario disporre dei seguenti elementi:

    • Un account Oracle Cloud Infrastructure.
    • Utente creato in tale account, in un gruppo con un criterio che concede le autorizzazioni necessarie. Per un esempio su come impostare un nuovo utente, gruppo, compartimento e criterio, vedere Aggiunta di utenti. Per un elenco dei criteri tipici che si desidera utilizzare, vedere Criteri comuni.
  2. Raccogliere i seguenti dettagli:

    • OCID flusso
    • endpoint messaggi
    • OCID pool di flussi
    • FQDN del pool di flussi
    • Impostazioni di connessione Kafka:
      • Server bootstrap
      • Stringhe di connessione SASL
      • Protocollo di sicurezza

    Per i passi per creare e gestire i flussi e i pool di flussi, vedere Gestione dei flussi e Gestione dei pool di flussi. I flussi corrispondono a un argomento Kafka.

  3. JDK 8 o versione successiva installato. Assicurati che Java sia nel tuo PATH.
  4. Maven 3.0 o installato. Assicurati che Maven sia nel tuo percorso.
  5. Intellij (consigliato) o qualsiasi altro ambiente di sviluppo integrato (IDE).
  6. Aggiungere la versione più recente della dipendenza o del file jar Maven per Kafka Java SDK all'indirizzo pom.xml come indicato di seguito.

    	<dependency>
    		<groupId>org.apache.kafka</groupId>
    		<artifactId>kafka-clients</artifactId>
    		<version>2.8.0</version>
    	</dependency>
    
  7. Supponendo wd come directory di lavoro per il progetto Java di questo esempio, l'aspetto di pom.xml sarà simile al seguente:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>oci.example</groupId>
        <artifactId>StreamsExampleWithKafkaApis</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
        </properties>
    
        <dependencies>
    		<dependency>
    			<groupId>org.apache.kafka</groupId>
    			<artifactId>kafka-clients</artifactId>
    			<version>2.8.0</version>
    		</dependency>
        </dependencies>
    </project>
    
  8. L'autenticazione con il protocollo Kafka utilizza i token di autenticazione e il meccanismo SASL/PLAIN. Per la generazione del token di autenticazione, vedere Utilizzo dei token di autenticazione. Se hai creato il flusso e il pool di flussi in OCI, sei già autorizzato a utilizzare questo flusso in base a IAM OCI, quindi devi creare token di autenticazione per l'utente OCI.

    Nota

    I token di autenticazione utente OCI sono visibili solo al momento della creazione. Copialo e conservalo in un luogo sicuro per un uso futuro.

Produzione di messaggi

  1. Aprire l'editor preferito, ad esempio Visual Studio Code, dalla directory wd. È necessario disporre già delle dipendenze SDK Kafka per Java nell'ambito del progetto pom.xml Java Maven dopo aver soddisfatto i prerequisiti.
  2. Creare un nuovo file denominato Producer.java nella directory wd sotto il percorso /src/main/java/kafka/sdk/oss/example/ con il codice seguente. Sostituire i valori delle variabili nel codice come indicato dai commenti del codice, ovvero da bootstrapServers a streamOrKafkaTopicName. Queste variabili si riferiscono alle impostazioni di connessione Kafka raccolte nei prerequisiti.

    package kafka.sdk.oss.example;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    
    public class Producer {
    
        static String bootstrapServers = "<bootstrap_servers_endpoint>", // usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092 ;
        static String tenancyName = "<OCI_tenancy_name>";
        static String username = "<your_OCI_username>";
        static String streamPoolId = "<stream_pool_OCID>";
        static String authToken = "<your_OCI_user_auth_token>"; // from step 8 of Prerequisites section
        static String streamOrKafkaTopicName = "<topic_stream_name>"; // from step 2 of Prerequisites section
    
        private static Properties getKafkaProperties() {
            Properties properties = new Properties();
            properties.put("bootstrap.servers", bootstrapServers);
            properties.put("security.protocol", "SASL_SSL");
            properties.put("sasl.mechanism", "PLAIN");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
            final String value = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
                    + tenancyName + "/"
                    + username + "/"
                    + streamPoolId + "\" "
                    + "password=\""
                    + authToken + "\";";
            properties.put("sasl.jaas.config", value);
            properties.put("retries", 3); // retries on transient errors and load balancing disconnection
            properties.put("max.request.size", 1024 * 1024); // limit request size to 1MB
            return properties;
        }
    
        public static void main(String args[]) {
            try {
                Properties properties = getKafkaProperties();
                KafkaProducer producer = new KafkaProducer<>(properties);
    
                for(int i=0;i<10;i++) {
                    ProducerRecord<String, String> record = new ProducerRecord<>(streamOrKafkaTopicName, "messageKey" + i, "messageValue" + i);
                    producer.send(record, (md, ex) -> {
                        if (ex != null) {
                            System.err.println("exception occurred in producer for review :" + record.value()
                                    + ", exception is " + ex);
                            ex.printStackTrace();
                        } else {
                            System.err.println("Sent msg to " + md.partition() + " with offset " + md.offset() + " at " + md.timestamp());
                        }
                    });
                }
                // producer.send() is async, to make sure all messages are sent we use producer.flush()
                producer.flush();
                producer.close();
            } catch (Exception e) {
                System.err.println("Error: exception " + e);
            }
        }
    }
    
  3. Dalla directory wd, eseguire il comando seguente:

    mvn clean install exec:java -Dexec.mainClass=kafka.sdk.oss.example.Producer
  4. Mostra i messaggi più recenti inviati al flusso per visualizzare i messaggi più recenti inviati al flusso per verificare che la produzione sia riuscita.

Messaggi di consumo

  1. In primo luogo, assicurarsi che il flusso da cui si desidera utilizzare i messaggi contenga messaggi. È possibile utilizzare la console per generare un messaggio di test oppure utilizzare il flusso e i messaggi creati in questo avvio rapido.
  2. Aprire l'editor preferito, ad esempio Visual Studio Code, dalla directory wd sotto il percorso /src/main/java/kafka/sdk/oss/example/. È necessario disporre già delle dipendenze SDK Kafka per Java nell'ambito del progetto pom.xml Java Maven dopo aver soddisfatto i prerequisiti.
  3. Creare un nuovo file denominato Consumer.java nella directory wd con il codice seguente. Sostituire i valori delle variabili nel codice come indicato dai commenti del codice, ovvero da bootstrapServers a consumerGroupName. Queste variabili si riferiscono alle impostazioni di connessione Kafka raccolte nei prerequisiti.

    package kafka.sdk.oss.example;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    
    public class Consumer {
        static String bootstrapServers = "<bootstrap_servers_endpoint>", // usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092 ;
        static String tenancyName = "<OCI_tenancy_name>";
        static String username = "<your_OCI_username>";
        static String streamPoolId = "<stream_pool_OCID>";
        static String authToken = "<your_OCI_user_auth_token>"; // from step 8 of Prerequisites section
        static String streamOrKafkaTopicName = "<topic_stream_name>"; // from step 2 of Prerequisites section
        static String consumerGroupName = "<consumer_group_name>"; 
    
        private static Properties getKafkaProperties(){
            Properties props = new Properties();
            props.put("bootstrap.servers", bootstrapServers);
            props.put("group.id", consumerGroupName);
            props.put("enable.auto.commit", "false");
            props.put("session.timeout.ms", "30000");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                    "org.apache.kafka.common.serialization.StringDeserializer");
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                    "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("security.protocol", "SASL_SSL");
            props.put("sasl.mechanism", "PLAIN");
            props.put("auto.offset.reset", "earliest");
            final String value = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
                    + tenancyName + "/"
                    + username + "/"
                    + streamPoolId + "\" "
                    + "password=\""
                    + authToken + "\";";
            props.put("sasl.jaas.config", value);
            return props;
        }
    
        public static void main(String[] args) {
            final KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(getKafkaProperties());;
            consumer.subscribe(Collections.singletonList(streamOrKafkaTopicName));
            ConsumerRecords<Integer, String> records = consumer.poll(10000);
    
            System.out.println("size of records polled is "+ records.count());
            for (ConsumerRecord<Integer, String> record : records) {
                System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
            }
    
            consumer.commitSync();
            consumer.close();
        }
    }
    
    
  4. Dalla directory wd, eseguire il comando seguente:

    mvn clean install exec:java -Dexec.mainClass=kafka.sdk.oss.example.Consumer
    
  5. Dovresti vedere messaggi simili ai seguenti:

    [INFO related maven compiling and building the Java code]
    size of records polled is 3
    Received message: (messageKey0, message value) at offset 1284
    Received message: (messageKey0, message value) at offset 1285
    Received message: (null, message produced using oci console) at offset 1286
    Nota

    Se è stata utilizzata la console per generare un messaggio di test, la chiave per ogni messaggio è Null