Inicio rápido del cliente Java de Kafka y Streaming

Publique y consuma mensajes en el servicio Streaming con el cliente Java de Kafka.

Este inicio rápido muestra cómo utilizar el cliente Java de Kafka con Oracle Cloud Infrastructure Streaming para publicar y consumir mensajes.

Para obtener más información, consulte Uso de Streaming con Apache Kafka. Para obtener conceptos clave y más información sobre Streaming, consulte Visión general de Streaming.

Requisitos

  1. Para utilizar el cliente Java de Kafka con Streaming, debe tener lo siguiente:

    • Una cuenta de Oracle Cloud Infrastructure.
    • Un usuario creado en esa cuenta, en un grupo con una política que otorgue los permisos necesarios. Para obtener un ejemplo de cómo configurar un nuevo usuario, grupo, compartimento y política, consulte Adición de usuarios. Para obtener una lista de las políticas típicas que puede que desee utilizar, consulte Políticas Comunes.
  2. Recopile los siguientes detalles:

    • OCID de flujo
    • Punto final de mensajes
    • OCID de pool de flujos
    • FQDN de pool de flujos
    • Configuración de conexión de Kafka:
      • Servidores de inicialización de datos
      • Cadenas de conexión de SASL
      • Protocolo de seguridad

    Para conocer los pasos para crear y gestionar flujos y pools de flujos, consulte Gestión de flujos y Gestión de pools de flujos. Los flujos corresponden a un tema de Kafka.

  3. JDK 8 o superior instalado. Asegúrese de que Java está en su PATH (ruta de acceso).
  4. Maven 3.0 o superior instalado. Asegúrese de que Maven está en su PATH.
  5. Intellij (recomendado) o cualquier otro entorno de desarrollo integrado (IDE).
  6. Agregue la versión más reciente de la dependencia de Maven o de jar para el SDK Java de Kafka a su pom.xml de la siguiente forma:

    	<dependency>
    		<groupId>org.apache.kafka</groupId>
    		<artifactId>kafka-clients</artifactId>
    		<version>2.8.0</version>
    	</dependency>
    
  7. Suponiendo que wd sea el directorio de trabajo del proyecto Java de este ejemplo, pom.xml tendrá un aspecto similar al siguiente:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>oci.example</groupId>
        <artifactId>StreamsExampleWithKafkaApis</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
        </properties>
    
        <dependencies>
    		<dependency>
    			<groupId>org.apache.kafka</groupId>
    			<artifactId>kafka-clients</artifactId>
    			<version>2.8.0</version>
    		</dependency>
        </dependencies>
    </project>
    
  8. La autenticación con el protocolo de Kafka utiliza tokens de autenticación y el mecanismo SASL/PLAIN. Consulte Trabajar con tokens de autenticación para la generación de tokens de autenticación. Si ha creado el flujo y el pool de flujos en OCI, ya está autorizado a utilizar este flujo según OCI IAM, por lo que debe crear tokens de autenticación para el usuario de OCI.

    Nota

    Los tokens de autenticación de usuario de OCI solo son visibles en el momento de la creación. Cópielo y guárdelo en un lugar seguro para su uso en el futuro.

Producción de mensajes

  1. Abra su editor favorito, como Visual Studio Code, desde el directorio wd. Ya debe tener las dependencias del SDK de Kafka para Java como parte de pom.xml del proyecto Java de Maven después de cumplir los requisitos.
  2. Cree un nuevo archivo denominado Producer.java en el directorio wd en la ruta /src/main/java/kafka/sdk/oss/example/ con el siguiente código. Sustituya los valores de las variables del código según se indica en los comentarios de código, es decir, de bootstrapServers a streamOrKafkaTopicName. Estas variables son para la configuración de conexión de Kafka que ha recopilado en los requisitos.

    package kafka.sdk.oss.example;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    
    public class Producer {
    
        static String bootstrapServers = "<bootstrap_servers_endpoint>", // usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092 ;
        static String tenancyName = "<OCI_tenancy_name>";
        static String username = "<your_OCI_username>";
        static String streamPoolId = "<stream_pool_OCID>";
        static String authToken = "<your_OCI_user_auth_token>"; // from step 8 of Prerequisites section
        static String streamOrKafkaTopicName = "<topic_stream_name>"; // from step 2 of Prerequisites section
    
        private static Properties getKafkaProperties() {
            Properties properties = new Properties();
            properties.put("bootstrap.servers", bootstrapServers);
            properties.put("security.protocol", "SASL_SSL");
            properties.put("sasl.mechanism", "PLAIN");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
            final String value = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
                    + tenancyName + "/"
                    + username + "/"
                    + streamPoolId + "\" "
                    + "password=\""
                    + authToken + "\";";
            properties.put("sasl.jaas.config", value);
            properties.put("retries", 3); // retries on transient errors and load balancing disconnection
            properties.put("max.request.size", 1024 * 1024); // limit request size to 1MB
            return properties;
        }
    
        public static void main(String args[]) {
            try {
                Properties properties = getKafkaProperties();
                KafkaProducer producer = new KafkaProducer<>(properties);
    
                for(int i=0;i<10;i++) {
                    ProducerRecord<String, String> record = new ProducerRecord<>(streamOrKafkaTopicName, "messageKey" + i, "messageValue" + i);
                    producer.send(record, (md, ex) -> {
                        if (ex != null) {
                            System.err.println("exception occurred in producer for review :" + record.value()
                                    + ", exception is " + ex);
                            ex.printStackTrace();
                        } else {
                            System.err.println("Sent msg to " + md.partition() + " with offset " + md.offset() + " at " + md.timestamp());
                        }
                    });
                }
                // producer.send() is async, to make sure all messages are sent we use producer.flush()
                producer.flush();
                producer.close();
            } catch (Exception e) {
                System.err.println("Error: exception " + e);
            }
        }
    }
    
  3. Desde el directorio wd, ejecute el siguiente comando:

    mvn clean install exec:java -Dexec.mainClass=kafka.sdk.oss.example.Producer
  4. Mostrar los últimos mensajes enviados al flujo para ver los últimos mensajes enviados al flujo para verificar que la producción se ha realizado correctamente.

Consumo de mensajes

  1. En primer lugar, asegúrese de que el flujo del que desea consumir mensajes contiene mensajes. Puede utilizar la consola para producir un mensaje de prueba o utilizar el flujo y los mensajes que hemos creado en este inicio rápido.
  2. Abra su editor favorito, como Visual Studio Code, desde el directorio wd en la ruta /src/main/java/kafka/sdk/oss/example/. Ya debe tener las dependencias del SDK de Kafka para Java como parte de pom.xml del proyecto Java de Maven después de cumplir los requisitos.
  3. Cree un nuevo archivo denominado Consumer.java en el directorio wd con el siguiente código. Sustituya los valores de las variables del código según se indica en los comentarios de código, es decir, de bootstrapServers a consumerGroupName. Estas variables son para la configuración de conexión de Kafka que ha recopilado en los requisitos.

    package kafka.sdk.oss.example;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    
    public class Consumer {
        static String bootstrapServers = "<bootstrap_servers_endpoint>", // usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092 ;
        static String tenancyName = "<OCI_tenancy_name>";
        static String username = "<your_OCI_username>";
        static String streamPoolId = "<stream_pool_OCID>";
        static String authToken = "<your_OCI_user_auth_token>"; // from step 8 of Prerequisites section
        static String streamOrKafkaTopicName = "<topic_stream_name>"; // from step 2 of Prerequisites section
        static String consumerGroupName = "<consumer_group_name>"; 
    
        private static Properties getKafkaProperties(){
            Properties props = new Properties();
            props.put("bootstrap.servers", bootstrapServers);
            props.put("group.id", consumerGroupName);
            props.put("enable.auto.commit", "false");
            props.put("session.timeout.ms", "30000");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                    "org.apache.kafka.common.serialization.StringDeserializer");
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                    "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("security.protocol", "SASL_SSL");
            props.put("sasl.mechanism", "PLAIN");
            props.put("auto.offset.reset", "earliest");
            final String value = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
                    + tenancyName + "/"
                    + username + "/"
                    + streamPoolId + "\" "
                    + "password=\""
                    + authToken + "\";";
            props.put("sasl.jaas.config", value);
            return props;
        }
    
        public static void main(String[] args) {
            final KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(getKafkaProperties());;
            consumer.subscribe(Collections.singletonList(streamOrKafkaTopicName));
            ConsumerRecords<Integer, String> records = consumer.poll(10000);
    
            System.out.println("size of records polled is "+ records.count());
            for (ConsumerRecord<Integer, String> record : records) {
                System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
            }
    
            consumer.commitSync();
            consumer.close();
        }
    }
    
    
  4. Desde el directorio wd, ejecute el siguiente comando:

    mvn clean install exec:java -Dexec.mainClass=kafka.sdk.oss.example.Consumer
    
  5. Se mostrarán mensajes similares a los siguientes:

    [INFO related maven compiling and building the Java code]
    size of records polled is 3
    Received message: (messageKey0, message value) at offset 1284
    Received message: (messageKey0, message value) at offset 1285
    Received message: (null, message produced using oci console) at offset 1286
    Nota

    Si ha utilizado la consola para producir un mensaje de prueba, la clave de cada mensaje es Null