SDK para Início Rápido de Streaming do Java
Publique e consuma mensagens no serviço Streaming usando o OCI SDK para Java.
Esse início rápido mostra como usar o OCI (Oracle Cloud Infrastructure) SDK for Java e o Oracle Cloud Infrastructure Streaming para publicar e consumir mensagens.
Para obter os principais conceitos e mais detalhes do Streaming, consulte Visão Geral do Streaming. Para obter mais informações sobre como usar os SDKs do OCI, consulte os Guias SDK.
Pré-requisitos
-
Para usar o SDK para Java, você deve ter o seguinte:
- Uma conta do Oracle Cloud Infrastructure.
- Um usuário criado nessa conta, em um grupo com uma política que conceda as permissões necessárias. O usuário pode ser você mesmo ou outra pessoa/sistema que precise chamar a API. Para obter um exemplo de como configurar um novo usuário, um novo grupo, um novo compartimento e uma nova política, consulte Adicionando Usuários. Para obter uma lista de políticas típicas que você pode usar, consulte Políticas Comuns.
- Um par de chaves usado para assinar solicitações de API, com a chave pública carregada por upload no sistema Oracle. Somente o usuário que chama a API deve possuir a chave privada.
- Java 8
- Colete o ponto final e o OCID das Mensagens de um stream. Para obter etapas para obter detalhes de um stream, consulte Obtendo Detalhes de um Stream. Para os fins deste início rápido, o stream deve usar um ponto final público e permitir que a Oracle gerencie a criptografia. Consulte Criando um Stream e Criando um Pool de Streams se você não tiver um stream existente.
- JDK 8 ou versão posterior instalada. Certifique-se de que o Java esteja no seu PATH.
- Maven 3.0 ou versão posterior instalada. Certifique-se de que o Maven esteja no seu PATH.
- Intellij (recomendado) ou qualquer outro ambiente de desenvolvimento integrado (IDE).
-
Adicione a versão mais recente de dependência ou jar maven para o OCI Java SDK for IAM ao seu
pom.xml
da seguinte forma:<dependency> <groupId>com.oracle.oci.sdk</groupId> <artifactId>oci-java-sdk-common</artifactId> <version>LATEST</version> </dependency>
-
Adicione a versão mais recente de dependência ou jar maven para o OCI Java SDK for OSS ao seu
pom.xml
da seguinte forma:<dependency> <groupId>com.oracle.oci.sdk</groupId> <artifactId>oci-java-sdk-streaming</artifactId> <version>LATEST</version> </dependency>
-
Supondo que
wd
seja o diretório de trabalho do projeto Java deste exemplo, seupom.xml
será semelhante ao seguinte:<?xml version="1.0" encoding="UTF-8"?> <projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>oci.example</groupId> <artifactId>StreamsJava</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>com.oracle.oci.sdk</groupId> <artifactId>oci-java-sdk-common</artifactId> <version>1.33.2</version> </dependency> <dependency> <groupId>com.oracle.oci.sdk</groupId> <artifactId>oci-java-sdk-streaming</artifactId> <version>1.33.2</version> </dependency> </dependencies> </project>
- Verifique se você tem um arquivo de configuração do SDK válido. Para ambientes de produção, você deve usar a autorização do controlador de instâncias.
Produzindo Mensagens
- Abra seu editor favorito, como o Visual Studio Code, no diretório
wd
. Você já deverá ter dependênciasoci-sdk
para Java como parte dopom.xml
do seu projeto Maven Java após atender aos pré-requisitos. -
Crie um arquivo chamado
Producer.java
no diretóriowd
com o código a seguir. Substitua os valores das variáveisconfigurationFilePath
,profile
,ociStreamOcid
eociMessageEndpoint
no trecho de código a seguir pelos valores aplicáveis à sua tenancy.package oci.sdk.oss.example; import com.oracle.bmc.ConfigFileReader; import com.oracle.bmc.auth.AuthenticationDetailsProvider; import com.oracle.bmc.auth.ConfigFileAuthenticationDetailsProvider; import com.oracle.bmc.streaming.StreamClient; import com.oracle.bmc.streaming.model.PutMessagesDetails; import com.oracle.bmc.streaming.model.PutMessagesDetailsEntry; import com.oracle.bmc.streaming.model.PutMessagesResultEntry; import com.oracle.bmc.streaming.requests.PutMessagesRequest; import com.oracle.bmc.streaming.responses.PutMessagesResponse; import org.apache.commons.lang3.StringUtils; import java.util.ArrayList; import java.util.List; import static java.nio.charset.StandardCharsets.UTF_8; public class Producer { public static void main(String[] args) throws Exception { final String configurationFilePath = "<config_file_path>"; final String profile = "<config_file_profile_name>"; final String ociStreamOcid = "<stream_OCID>"; final String ociMessageEndpoint = "<stream_message_endpoint>"; final ConfigFileReader.ConfigFile configFile = ConfigFileReader.parseDefault(); final AuthenticationDetailsProvider provider = new ConfigFileAuthenticationDetailsProvider(configFile); // Streams are assigned a specific endpoint url based on where they are provisioned. // Create a stream client using the provided message endpoint. StreamClient streamClient = StreamClient.builder().endpoint(ociMessageEndpoint).build(provider); // publish some messages to the stream publishExampleMessages(streamClient, ociStreamOcid); } private static void publishExampleMessages(StreamClient streamClient, String streamId) { // build up a putRequest and publish some messages to the stream List<PutMessagesDetailsEntry> messages = new ArrayList<>(); for (int i = 0; i < 50; i++) { messages.add( PutMessagesDetailsEntry.builder() .key(String.format("messageKey%s", i).getBytes(UTF_8)) .value(String.format("messageValue%s", i).getBytes(UTF_8)) .build()); } System.out.println( String.format("Publishing %s messages to stream %s.", messages.size(), streamId)); PutMessagesDetails messagesDetails = PutMessagesDetails.builder().messages(messages).build(); PutMessagesRequest putRequest = PutMessagesRequest.builder() .streamId(streamId) .putMessagesDetails(messagesDetails) .build(); PutMessagesResponse putResponse = streamClient.putMessages(putRequest); // the putResponse can contain some useful metadata for handling failures for (PutMessagesResultEntry entry : putResponse.getPutMessagesResult().getEntries()) { if (StringUtils.isNotBlank(entry.getError())) { System.out.println( String.format("Error(%s): %s", entry.getError(), entry.getErrorMessage())); } else { System.out.println( String.format( "Published message to partition %s, offset %s.", entry.getPartition(), entry.getOffset())); } } } }
-
No diretório
wd
, execute o seguinte comando:mvn install exec:java -Dexec.mainClass=oci.sdk.oss.example.Producer
- Mostrar as mensagens mais recentes enviadas ao stream para ver as mensagens mais recentes enviadas ao stream para verificar se a produção foi bem-sucedida.
Consumindo Mensagens
- Primeiro, certifique-se de que o stream cujas mensagens você deseja consumir contenha mensagens. Você pode usar a Console para produzir uma mensagem de teste ou usar o stream e as mensagens que criamos neste início rápido.
- Abra seu editor favorito, como o Visual Studio Code, no diretório
wd
. Você já deverá ter dependênciasoci-sdk
para Java como parte dopom.xml
do seu projeto Maven Java após atender aos pré-requisitos. -
Crie um arquivo chamado
Consumer.java
no diretóriowd
com o código a seguir. Substitua os valores das variáveisconfigurationFilePath
,profile
,ociStreamOcid
eociMessageEndpoint
no trecho de código a seguir pelos valores aplicáveis à sua tenancy.package oci.sdk.oss.example; import com.google.common.util.concurrent.Uninterruptibles; import com.oracle.bmc.ConfigFileReader; import com.oracle.bmc.auth.AuthenticationDetailsProvider; import com.oracle.bmc.auth.ConfigFileAuthenticationDetailsProvider; import com.oracle.bmc.streaming.StreamClient; import com.oracle.bmc.streaming.model.CreateGroupCursorDetails; import com.oracle.bmc.streaming.model.Message; import com.oracle.bmc.streaming.requests.CreateGroupCursorRequest; import com.oracle.bmc.streaming.requests.GetMessagesRequest; import com.oracle.bmc.streaming.responses.CreateGroupCursorResponse; import com.oracle.bmc.streaming.responses.GetMessagesResponse; import java.util.concurrent.TimeUnit; import static java.nio.charset.StandardCharsets.UTF_8; public class Consumer { public static void main(String[] args) throws Exception { final String configurationFilePath = "<config_file_path>"; final String profile = "<config_file_profile_name>"; final String ociStreamOcid = "<stream_OCID>"; final String ociMessageEndpoint = "<stream_message_endpoint>"; final ConfigFileReader.ConfigFile configFile = ConfigFileReader.parseDefault(); final AuthenticationDetailsProvider provider = new ConfigFileAuthenticationDetailsProvider(configFile); // Streams are assigned a specific endpoint url based on where they are provisioned. // Create a stream client using the provided message endpoint. StreamClient streamClient = StreamClient.builder().endpoint(ociMessageEndpoint).build(provider); // A cursor can be created as part of a consumer group. // Committed offsets are managed for the group, and partitions // are dynamically balanced amongst consumers in the group. System.out.println("Starting a simple message loop with a group cursor"); String groupCursor = getCursorByGroup(streamClient, ociStreamOcid, "exampleGroup", "exampleInstance-1"); simpleMessageLoop(streamClient, ociStreamOcid, groupCursor); } private static void simpleMessageLoop( StreamClient streamClient, String streamId, String initialCursor) { String cursor = initialCursor; for (int i = 0; i < 10; i++) { GetMessagesRequest getRequest = GetMessagesRequest.builder() .streamId(streamId) .cursor(cursor) .limit(25) .build(); GetMessagesResponse getResponse = streamClient.getMessages(getRequest); // process the messages System.out.println(String.format("Read %s messages.", getResponse.getItems().size())); for (Message message : ((GetMessagesResponse) getResponse).getItems()) { System.out.println( String.format( "%s: %s", message.getKey() == null ? "Null" :new String(message.getKey(), UTF_8), new String(message.getValue(), UTF_8))); } // getMessages is a throttled method; clients should retrieve sufficiently large message // batches, as to avoid too many http requests. Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); // use the next-cursor for iteration cursor = getResponse.getOpcNextCursor(); } } private static String getCursorByGroup( StreamClient streamClient, String streamId, String groupName, String instanceName) { System.out.println( String.format( "Creating a cursor for group %s, instance %s.", groupName, instanceName)); CreateGroupCursorDetails cursorDetails = CreateGroupCursorDetails.builder() .groupName(groupName) .instanceName(instanceName) .type(CreateGroupCursorDetails.Type.TrimHorizon) .commitOnGet(true) .build(); CreateGroupCursorRequest createCursorRequest = CreateGroupCursorRequest.builder() .streamId(streamId) .createGroupCursorDetails(cursorDetails) .build(); CreateGroupCursorResponse groupCursorResponse = streamClient.createGroupCursor(createCursorRequest); return groupCursorResponse.getCursor().getValue(); } }
-
No diretório
wd
, execute o seguinte comando:mvn install exec:java -Dexec.mainClass=oci.sdk.oss.example.Consumer
-
Você deverá ver mensagens semelhantes a esta:
Starting a simple message loop with a group cursor Creating a cursor for group exampleGroup, instance exampleInstance-1. Read 25 messages. Null: Example Test Message 0 Null: Example Test Message 0 Read 2 messages Null: Example Test Message 0 Null: Example Test Message 0 Read 1 messages Null: Example Test Message 0 Read 10 messages key 0: value 0 key 1: value 1
Observação
Se você usou a Console para produzir uma mensagem de teste, a chave de cada mensagem seráNull
Próximas Etapas
Consulte os seguintes recursos para obter mais informações: