Início Rápido do Kafka Java Client e do Serviço Streaming
Publique e consuma mensagens no serviço Streaming usando o cliente Java Kafka.
Esse início rápido mostra como usar o cliente Kafka Java com o Oracle Cloud Infrastructure Streaming para publicar e consumir mensagens.
Para obter mais informações, consulte Usando o Streaming com o Apache Kafka. Para obter os principais conceitos e mais detalhes do Streaming, consulte Visão Geral do Streaming
Pré-requisitos
-
Para usar o cliente Kafka Java com o serviço Streaming, você deve ter o seguinte:
- Uma conta do Oracle Cloud Infrastructure.
- Um usuário criado nessa conta, em um grupo com uma política que conceda as permissões necessárias. Para obter um exemplo de como configurar um novo usuário, um novo grupo, um novo compartimento e uma nova política, consulte Adicionando Usuários. Para obter uma lista de políticas típicas que você pode usar, consulte Políticas Comuns.
-
Colete os seguintes detalhes:
- OCID do Stream
- Ponto final de mensagens
- OCID do pool de streams
- FQDN do pool de streams
- Definições de conexão do Kafka:
- Servidores de bootstrap
- Strings de conexão SASL
- Protocolo de segurança
Para obter as etapas de criação e gerenciamento de streams e pools de streams, consulte Gerenciando Streams e Gerenciando Pools de Streams. Os streams correspondem a um tópico do Kafka.
- JDK 8 ou versão posterior instalada. Certifique-se de que o Java esteja no seu PATH.
- Maven 3.0 ou versão posterior instalada. Certifique-se de que o Maven esteja no seu PATH.
- Intellij (recomendado) ou qualquer outro ambiente de desenvolvimento integrado (IDE).
-
Adicione a versão mais recente da dependência ou jar do Maven para o Kafka Java SDK ao seu
pom.xml
da seguinte forma:<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.8.0</version> </dependency>
-
Supondo que
wd
seja o diretório de trabalho do projeto Java deste exemplo, seupom.xml
será semelhante ao seguinte:<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>oci.example</groupId> <artifactId>StreamsExampleWithKafkaApis</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.8.0</version> </dependency> </dependencies> </project>
-
A autenticação com o protocolo Kafka usa tokens de autenticação e o mecanismo SASL/PLAIN. Consulte Como Trabalhar com Tokens de Autenticação para saber sobre a geração do token de autenticação. Se você criou o stream e o pool de streams no OCI, já estará autorizado a usar esse stream de acordo com o OCI IAM. Portanto, crie tokens de autenticação para seu usuário do OCI.
Observação
Os tokens de autenticação do usuário do OCI só ficam visíveis no momento da criação. Copie-o e mantenha-o em algum lugar seguro para uso futuro.
Produzindo Mensagens
- Abra seu editor favorito, como o Visual Studio Code, no diretório
wd
. Você já deverá ter as dependências do Kafka SDK para Java como parte dopom.xml
do seu projeto Maven Java após você ter atingido os pré-requisitos. -
Crie um novo arquivo chamado
Producer.java
no diretóriowd
no caminho/src/main/java/kafka/sdk/oss/example/
com o código a seguir. Substitua os valores das variáveis no código conforme direcionado pelos comentários do código, ou seja,bootstrapServers
atéstreamOrKafkaTopicName
. Essas variáveis se referem às definições de conexão do Kafka que você coletou nos pré-requisitos.package kafka.sdk.oss.example; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class Producer { static String bootstrapServers = "<bootstrap_servers_endpoint>", // usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092 ; static String tenancyName = "<OCI_tenancy_name>"; static String username = "<your_OCI_username>"; static String streamPoolId = "<stream_pool_OCID>"; static String authToken = "<your_OCI_user_auth_token>"; // from step 8 of Prerequisites section static String streamOrKafkaTopicName = "<topic_stream_name>"; // from step 2 of Prerequisites section private static Properties getKafkaProperties() { Properties properties = new Properties(); properties.put("bootstrap.servers", bootstrapServers); properties.put("security.protocol", "SASL_SSL"); properties.put("sasl.mechanism", "PLAIN"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); final String value = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + tenancyName + "/" + username + "/" + streamPoolId + "\" " + "password=\"" + authToken + "\";"; properties.put("sasl.jaas.config", value); properties.put("retries", 3); // retries on transient errors and load balancing disconnection properties.put("max.request.size", 1024 * 1024); // limit request size to 1MB return properties; } public static void main(String args[]) { try { Properties properties = getKafkaProperties(); KafkaProducer producer = new KafkaProducer<>(properties); for(int i=0;i<10;i++) { ProducerRecord<String, String> record = new ProducerRecord<>(streamOrKafkaTopicName, "messageKey" + i, "messageValue" + i); producer.send(record, (md, ex) -> { if (ex != null) { System.err.println("exception occurred in producer for review :" + record.value() + ", exception is " + ex); ex.printStackTrace(); } else { System.err.println("Sent msg to " + md.partition() + " with offset " + md.offset() + " at " + md.timestamp()); } }); } // producer.send() is async, to make sure all messages are sent we use producer.flush() producer.flush(); producer.close(); } catch (Exception e) { System.err.println("Error: exception " + e); } } }
-
No diretório
wd
, execute o seguinte comando:mvn clean install exec:java -Dexec.mainClass=kafka.sdk.oss.example.Producer
- Mostrar as mensagens mais recentes enviadas ao stream para ver as mensagens mais recentes enviadas ao stream para verificar se a produção foi bem-sucedida.
Consumindo Mensagens
- Primeiro, certifique-se de que o stream cujas mensagens você deseja consumir contenha mensagens. Você pode usar a Console para produzir uma mensagem de teste ou usar o stream e as mensagens que criamos neste início rápido.
- Abra seu editor favorito, como o Visual Studio Code, no diretório
wd
no caminho/src/main/java/kafka/sdk/oss/example/
. Você já deverá ter as dependências do Kafka SDK para Java como parte dopom.xml
do seu projeto Maven Java após você ter atingido os pré-requisitos. -
Crie um novo arquivo chamado
Consumer.java
no diretóriowd
com o código a seguir. Substitua os valores das variáveis no código conforme direcionado pelos comentários do código, ou seja,bootstrapServers
atéconsumerGroupName
. Essas variáveis se referem às definições de conexão do Kafka que você coletou nos pré-requisitos.package kafka.sdk.oss.example; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class Consumer { static String bootstrapServers = "<bootstrap_servers_endpoint>", // usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092 ; static String tenancyName = "<OCI_tenancy_name>"; static String username = "<your_OCI_username>"; static String streamPoolId = "<stream_pool_OCID>"; static String authToken = "<your_OCI_user_auth_token>"; // from step 8 of Prerequisites section static String streamOrKafkaTopicName = "<topic_stream_name>"; // from step 2 of Prerequisites section static String consumerGroupName = "<consumer_group_name>"; private static Properties getKafkaProperties(){ Properties props = new Properties(); props.put("bootstrap.servers", bootstrapServers); props.put("group.id", consumerGroupName); props.put("enable.auto.commit", "false"); props.put("session.timeout.ms", "30000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put("security.protocol", "SASL_SSL"); props.put("sasl.mechanism", "PLAIN"); props.put("auto.offset.reset", "earliest"); final String value = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + tenancyName + "/" + username + "/" + streamPoolId + "\" " + "password=\"" + authToken + "\";"; props.put("sasl.jaas.config", value); return props; } public static void main(String[] args) { final KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(getKafkaProperties());; consumer.subscribe(Collections.singletonList(streamOrKafkaTopicName)); ConsumerRecords<Integer, String> records = consumer.poll(10000); System.out.println("size of records polled is "+ records.count()); for (ConsumerRecord<Integer, String> record : records) { System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset()); } consumer.commitSync(); consumer.close(); } }
-
No diretório
wd
, execute o seguinte comando:mvn clean install exec:java -Dexec.mainClass=kafka.sdk.oss.example.Consumer
-
Você deverá ver mensagens semelhantes a esta:
[INFO related maven compiling and building the Java code] size of records polled is 3 Received message: (messageKey0, message value) at offset 1284 Received message: (messageKey0, message value) at offset 1285 Received message: (null, message produced using oci console) at offset 1286
Observação
Se você usou a Console para produzir uma mensagem de teste, a chave de cada mensagem seráNull
Próximas Etapas
Consulte os seguintes recursos para obter mais informações: