Inicio rápido del cliente Java de Kafka y Streaming
Publique y consuma mensajes en el servicio Streaming con el cliente Java de Kafka.
Este inicio rápido muestra cómo utilizar el cliente Java de Kafka con Oracle Cloud Infrastructure Streaming para publicar y consumir mensajes.
Para obtener más información, consulte Uso de Streaming con Apache Kafka. Para obtener conceptos clave y más información sobre Streaming, consulte Visión general de Streaming.
Requisitos
-
Para utilizar el cliente Java de Kafka con Streaming, debe tener lo siguiente:
- Una cuenta de Oracle Cloud Infrastructure.
- Un usuario creado en esa cuenta, en un grupo con una política que otorgue los permisos necesarios. Para obtener un ejemplo de cómo configurar un nuevo usuario, grupo, compartimento y política, consulte Adición de usuarios. Para obtener una lista de las políticas típicas que puede que desee utilizar, consulte Políticas Comunes.
-
Recopile los siguientes detalles:
- OCID de flujo
- Punto final de mensajes
- OCID de pool de flujos
- FQDN de pool de flujos
- Configuración de conexión de Kafka:
- Servidores de inicialización de datos
- Cadenas de conexión de SASL
- Protocolo de seguridad
Para conocer los pasos para crear y gestionar flujos y pools de flujos, consulte Gestión de flujos y Gestión de pools de flujos. Los flujos corresponden a un tema de Kafka.
- JDK 8 o superior instalado. Asegúrese de que Java está en su PATH (ruta de acceso).
- Maven 3.0 o superior instalado. Asegúrese de que Maven está en su PATH.
- Intellij (recomendado) o cualquier otro entorno de desarrollo integrado (IDE).
-
Agregue la versión más reciente de la dependencia de Maven o de jar para el SDK Java de Kafka a su
pom.xml
de la siguiente forma:<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.8.0</version> </dependency>
-
Suponiendo que
wd
sea el directorio de trabajo del proyecto Java de este ejemplo,pom.xml
tendrá un aspecto similar al siguiente:<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>oci.example</groupId> <artifactId>StreamsExampleWithKafkaApis</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.8.0</version> </dependency> </dependencies> </project>
-
La autenticación con el protocolo de Kafka utiliza tokens de autenticación y el mecanismo SASL/PLAIN. Consulte Trabajar con tokens de autenticación para la generación de tokens de autenticación. Si ha creado el flujo y el pool de flujos en OCI, ya está autorizado a utilizar este flujo según OCI IAM, por lo que debe crear tokens de autenticación para el usuario de OCI.
Nota
Los tokens de autenticación de usuario de OCI solo son visibles en el momento de la creación. Cópielo y guárdelo en un lugar seguro para su uso en el futuro.
Producción de mensajes
- Abra su editor favorito, como Visual Studio Code, desde el directorio
wd
. Ya debe tener las dependencias del SDK de Kafka para Java como parte depom.xml
del proyecto Java de Maven después de cumplir los requisitos. -
Cree un nuevo archivo denominado
Producer.java
en el directoriowd
en la ruta/src/main/java/kafka/sdk/oss/example/
con el siguiente código. Sustituya los valores de las variables del código según se indica en los comentarios de código, es decir, debootstrapServers
astreamOrKafkaTopicName
. Estas variables son para la configuración de conexión de Kafka que ha recopilado en los requisitos.package kafka.sdk.oss.example; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class Producer { static String bootstrapServers = "<bootstrap_servers_endpoint>", // usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092 ; static String tenancyName = "<OCI_tenancy_name>"; static String username = "<your_OCI_username>"; static String streamPoolId = "<stream_pool_OCID>"; static String authToken = "<your_OCI_user_auth_token>"; // from step 8 of Prerequisites section static String streamOrKafkaTopicName = "<topic_stream_name>"; // from step 2 of Prerequisites section private static Properties getKafkaProperties() { Properties properties = new Properties(); properties.put("bootstrap.servers", bootstrapServers); properties.put("security.protocol", "SASL_SSL"); properties.put("sasl.mechanism", "PLAIN"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); final String value = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + tenancyName + "/" + username + "/" + streamPoolId + "\" " + "password=\"" + authToken + "\";"; properties.put("sasl.jaas.config", value); properties.put("retries", 3); // retries on transient errors and load balancing disconnection properties.put("max.request.size", 1024 * 1024); // limit request size to 1MB return properties; } public static void main(String args[]) { try { Properties properties = getKafkaProperties(); KafkaProducer producer = new KafkaProducer<>(properties); for(int i=0;i<10;i++) { ProducerRecord<String, String> record = new ProducerRecord<>(streamOrKafkaTopicName, "messageKey" + i, "messageValue" + i); producer.send(record, (md, ex) -> { if (ex != null) { System.err.println("exception occurred in producer for review :" + record.value() + ", exception is " + ex); ex.printStackTrace(); } else { System.err.println("Sent msg to " + md.partition() + " with offset " + md.offset() + " at " + md.timestamp()); } }); } // producer.send() is async, to make sure all messages are sent we use producer.flush() producer.flush(); producer.close(); } catch (Exception e) { System.err.println("Error: exception " + e); } } }
-
Desde el directorio
wd
, ejecute el siguiente comando:mvn clean install exec:java -Dexec.mainClass=kafka.sdk.oss.example.Producer
- Mostrar los últimos mensajes enviados al flujo para ver los últimos mensajes enviados al flujo para verificar que la producción se ha realizado correctamente.
Consumo de mensajes
- En primer lugar, asegúrese de que el flujo del que desea consumir mensajes contiene mensajes. Puede utilizar la consola para producir un mensaje de prueba o utilizar el flujo y los mensajes que hemos creado en este inicio rápido.
- Abra su editor favorito, como Visual Studio Code, desde el directorio
wd
en la ruta/src/main/java/kafka/sdk/oss/example/
. Ya debe tener las dependencias del SDK de Kafka para Java como parte depom.xml
del proyecto Java de Maven después de cumplir los requisitos. -
Cree un nuevo archivo denominado
Consumer.java
en el directoriowd
con el siguiente código. Sustituya los valores de las variables del código según se indica en los comentarios de código, es decir, debootstrapServers
aconsumerGroupName
. Estas variables son para la configuración de conexión de Kafka que ha recopilado en los requisitos.package kafka.sdk.oss.example; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class Consumer { static String bootstrapServers = "<bootstrap_servers_endpoint>", // usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092 ; static String tenancyName = "<OCI_tenancy_name>"; static String username = "<your_OCI_username>"; static String streamPoolId = "<stream_pool_OCID>"; static String authToken = "<your_OCI_user_auth_token>"; // from step 8 of Prerequisites section static String streamOrKafkaTopicName = "<topic_stream_name>"; // from step 2 of Prerequisites section static String consumerGroupName = "<consumer_group_name>"; private static Properties getKafkaProperties(){ Properties props = new Properties(); props.put("bootstrap.servers", bootstrapServers); props.put("group.id", consumerGroupName); props.put("enable.auto.commit", "false"); props.put("session.timeout.ms", "30000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put("security.protocol", "SASL_SSL"); props.put("sasl.mechanism", "PLAIN"); props.put("auto.offset.reset", "earliest"); final String value = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + tenancyName + "/" + username + "/" + streamPoolId + "\" " + "password=\"" + authToken + "\";"; props.put("sasl.jaas.config", value); return props; } public static void main(String[] args) { final KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(getKafkaProperties());; consumer.subscribe(Collections.singletonList(streamOrKafkaTopicName)); ConsumerRecords<Integer, String> records = consumer.poll(10000); System.out.println("size of records polled is "+ records.count()); for (ConsumerRecord<Integer, String> record : records) { System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset()); } consumer.commitSync(); consumer.close(); } }
-
Desde el directorio
wd
, ejecute el siguiente comando:mvn clean install exec:java -Dexec.mainClass=kafka.sdk.oss.example.Consumer
-
Se mostrarán mensajes similares a los siguientes:
[INFO related maven compiling and building the Java code] size of records polled is 3 Received message: (messageKey0, message value) at offset 1284 Received message: (messageKey0, message value) at offset 1285 Received message: (null, message produced using oci console) at offset 1286
Nota
Si ha utilizado la consola para producir un mensaje de prueba, la clave de cada mensaje esNull
Pasos siguientes
Consulte los siguientes recursos para obtener más información: