SDK de inicio rápido de Streaming para Java
Publique y consuma mensajes en el servicio Streaming mediante el SDK de OCI para Java.
En este inicio rápido se muestra cómo utilizar el SDK de Oracle Cloud Infrastructure (OCI) para Java y Oracle Cloud Infrastructure Streaming para publicar y consumir mensajes.
Para obtener más información sobre conceptos clave y Streaming, consulte Visión general de Streaming. Para obtener más información sobre el uso de el SDK de OCI, consulte las Guías sobre el SDK.
Requisitos
-
Para utilizar el SDK para Java, debe tener lo siguiente:
- Una cuenta de Oracle Cloud Infrastructure.
- Un usuario creado en esa cuenta, en un grupo con una política que otorgue los permisos necesarios. El usuario puede ser usted mismo u otra persona/sistema que necesite llamar a la API. Para obtener un ejemplo de cómo configurar un nuevo usuario, grupo, compartimento y política, consulte Adición de usuarios. Para obtener una lista de las políticas típicas que puede que desee utilizar, consulte Políticas Comunes.
- Par de claves utilizado para firmar solicitudes de API con la clave pública cargada en Oracle. Solo el usuario que llama a la API debe poseer la clave privada.
- Java 8
- Recopile el punto final de mensajes y el OCID de un flujo. Para obtener detalles sobre un flujo, consulte Obtención de detalles de un flujo. Para este inicio rápido, el flujo debe utilizar un punto final público y permitir que Oracle gestione el cifrado. Consulte Creación de un Flujo y Creación de un Pool de Flujos si no tiene un flujo existente.
- JDK 8 o superior instalado. Asegúrese de que Java está en su PATH (ruta de acceso).
- Maven 3.0 o superior instalado. Asegúrese de que Maven está en su PATH.
- Intellij (recomendado) o cualquier otro entorno de desarrollo integrado (IDE).
-
Agregue la versión más reciente de la dependencia de maven o de jar para el SDK de Java de OCI para IAM a su
pom.xmlde la siguiente forma:<dependency> <groupId>com.oracle.oci.sdk</groupId> <artifactId>oci-java-sdk-common</artifactId> <version>LATEST</version> </dependency> -
Agregue la versión más reciente de la dependencia de maven o de jar para el SDK de Java de OCI para OSS a su
pom.xmlde la siguiente forma:<dependency> <groupId>com.oracle.oci.sdk</groupId> <artifactId>oci-java-sdk-streaming</artifactId> <version>LATEST</version> </dependency> -
Suponiendo que
wdsea el directorio de trabajo del proyecto Java de este ejemplo,pom.xmltendrá un aspecto similar al siguiente:<?xml version="1.0" encoding="UTF-8"?> <projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>oci.example</groupId> <artifactId>StreamsJava</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>com.oracle.oci.sdk</groupId> <artifactId>oci-java-sdk-common</artifactId> <version>1.33.2</version> </dependency> <dependency> <groupId>com.oracle.oci.sdk</groupId> <artifactId>oci-java-sdk-streaming</artifactId> <version>1.33.2</version> </dependency> </dependencies> </project> - Asegúrese de que tiene un archivo de configuración de SDK válido. Para entornos de producción, debe utilizar la autorización de principal de instancia.
Producción de mensajes
- Abra su editor favorito, como Visual Studio Code, desde el directorio
wd. Ya debe tener las dependenciasoci-sdkpara Java como parte depom.xmldel proyecto Java de Maven después de cumplir los requisitos. -
Cree un archivo denominado
Producer.javaen el directoriowdcon el siguiente código. Sustituya los valores de las variablesconfigurationFilePath,profile,ociStreamOcidyociMessageEndpointen el siguiente fragmento de código por los valores aplicables a su arrendamiento.package oci.sdk.oss.example; import com.oracle.bmc.ConfigFileReader; import com.oracle.bmc.auth.AuthenticationDetailsProvider; import com.oracle.bmc.auth.ConfigFileAuthenticationDetailsProvider; import com.oracle.bmc.streaming.StreamClient; import com.oracle.bmc.streaming.model.PutMessagesDetails; import com.oracle.bmc.streaming.model.PutMessagesDetailsEntry; import com.oracle.bmc.streaming.model.PutMessagesResultEntry; import com.oracle.bmc.streaming.requests.PutMessagesRequest; import com.oracle.bmc.streaming.responses.PutMessagesResponse; import org.apache.commons.lang3.StringUtils; import java.util.ArrayList; import java.util.List; import static java.nio.charset.StandardCharsets.UTF_8; public class Producer { public static void main(String[] args) throws Exception { final String configurationFilePath = "<config_file_path>"; final String profile = "<config_file_profile_name>"; final String ociStreamOcid = "<stream_OCID>"; final String ociMessageEndpoint = "<stream_message_endpoint>"; final ConfigFileReader.ConfigFile configFile = ConfigFileReader.parseDefault(); final AuthenticationDetailsProvider provider = new ConfigFileAuthenticationDetailsProvider(configFile); // Streams are assigned a specific endpoint url based on where they are provisioned. // Create a stream client using the provided message endpoint. StreamClient streamClient = StreamClient.builder().endpoint(ociMessageEndpoint).build(provider); // publish some messages to the stream publishExampleMessages(streamClient, ociStreamOcid); } private static void publishExampleMessages(StreamClient streamClient, String streamId) { // build up a putRequest and publish some messages to the stream List<PutMessagesDetailsEntry> messages = new ArrayList<>(); for (int i = 0; i < 50; i++) { messages.add( PutMessagesDetailsEntry.builder() .key(String.format("messageKey%s", i).getBytes(UTF_8)) .value(String.format("messageValue%s", i).getBytes(UTF_8)) .build()); } System.out.println( String.format("Publishing %s messages to stream %s.", messages.size(), streamId)); PutMessagesDetails messagesDetails = PutMessagesDetails.builder().messages(messages).build(); PutMessagesRequest putRequest = PutMessagesRequest.builder() .streamId(streamId) .putMessagesDetails(messagesDetails) .build(); PutMessagesResponse putResponse = streamClient.putMessages(putRequest); // the putResponse can contain some useful metadata for handling failures for (PutMessagesResultEntry entry : putResponse.getPutMessagesResult().getEntries()) { if (StringUtils.isNotBlank(entry.getError())) { System.out.println( String.format("Error(%s): %s", entry.getError(), entry.getErrorMessage())); } else { System.out.println( String.format( "Published message to partition %s, offset %s.", entry.getPartition(), entry.getOffset())); } } } } -
Desde el directorio
wd, ejecute el siguiente comando:mvn install exec:java -Dexec.mainClass=oci.sdk.oss.example.Producer - Mostrar los últimos mensajes enviados al flujo para ver los últimos mensajes enviados al flujo para verificar que la producción se ha realizado correctamente.
Consumo de mensajes
- En primer lugar, asegúrese de que el flujo del que desea consumir mensajes contiene mensajes. Puede utilizar la consola para producir un mensaje de prueba o utilizar el flujo y los mensajes que hemos creado en este inicio rápido.
- Abra su editor favorito, como Visual Studio Code, desde el directorio
wd. Ya debe tener las dependenciasoci-sdkpara Java como parte depom.xmldel proyecto Java de Maven después de cumplir los requisitos. -
Cree un archivo denominado
Consumer.javaen el directoriowdcon el siguiente código. Sustituya los valores de las variablesconfigurationFilePath,profile,ociStreamOcidyociMessageEndpointen el siguiente fragmento de código por los valores aplicables a su arrendamiento.package oci.sdk.oss.example; import com.google.common.util.concurrent.Uninterruptibles; import com.oracle.bmc.ConfigFileReader; import com.oracle.bmc.auth.AuthenticationDetailsProvider; import com.oracle.bmc.auth.ConfigFileAuthenticationDetailsProvider; import com.oracle.bmc.streaming.StreamClient; import com.oracle.bmc.streaming.model.CreateGroupCursorDetails; import com.oracle.bmc.streaming.model.Message; import com.oracle.bmc.streaming.requests.CreateGroupCursorRequest; import com.oracle.bmc.streaming.requests.GetMessagesRequest; import com.oracle.bmc.streaming.responses.CreateGroupCursorResponse; import com.oracle.bmc.streaming.responses.GetMessagesResponse; import java.util.concurrent.TimeUnit; import static java.nio.charset.StandardCharsets.UTF_8; public class Consumer { public static void main(String[] args) throws Exception { final String configurationFilePath = "<config_file_path>"; final String profile = "<config_file_profile_name>"; final String ociStreamOcid = "<stream_OCID>"; final String ociMessageEndpoint = "<stream_message_endpoint>"; final ConfigFileReader.ConfigFile configFile = ConfigFileReader.parseDefault(); final AuthenticationDetailsProvider provider = new ConfigFileAuthenticationDetailsProvider(configFile); // Streams are assigned a specific endpoint url based on where they are provisioned. // Create a stream client using the provided message endpoint. StreamClient streamClient = StreamClient.builder().endpoint(ociMessageEndpoint).build(provider); // A cursor can be created as part of a consumer group. // Committed offsets are managed for the group, and partitions // are dynamically balanced amongst consumers in the group. System.out.println("Starting a simple message loop with a group cursor"); String groupCursor = getCursorByGroup(streamClient, ociStreamOcid, "exampleGroup", "exampleInstance-1"); simpleMessageLoop(streamClient, ociStreamOcid, groupCursor); } private static void simpleMessageLoop( StreamClient streamClient, String streamId, String initialCursor) { String cursor = initialCursor; for (int i = 0; i < 10; i++) { GetMessagesRequest getRequest = GetMessagesRequest.builder() .streamId(streamId) .cursor(cursor) .limit(25) .build(); GetMessagesResponse getResponse = streamClient.getMessages(getRequest); // process the messages System.out.println(String.format("Read %s messages.", getResponse.getItems().size())); for (Message message : ((GetMessagesResponse) getResponse).getItems()) { System.out.println( String.format( "%s: %s", message.getKey() == null ? "Null" :new String(message.getKey(), UTF_8), new String(message.getValue(), UTF_8))); } // getMessages is a throttled method; clients should retrieve sufficiently large message // batches, as to avoid too many http requests. Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); // use the next-cursor for iteration cursor = getResponse.getOpcNextCursor(); } } private static String getCursorByGroup( StreamClient streamClient, String streamId, String groupName, String instanceName) { System.out.println( String.format( "Creating a cursor for group %s, instance %s.", groupName, instanceName)); CreateGroupCursorDetails cursorDetails = CreateGroupCursorDetails.builder() .groupName(groupName) .instanceName(instanceName) .type(CreateGroupCursorDetails.Type.TrimHorizon) .commitOnGet(true) .build(); CreateGroupCursorRequest createCursorRequest = CreateGroupCursorRequest.builder() .streamId(streamId) .createGroupCursorDetails(cursorDetails) .build(); CreateGroupCursorResponse groupCursorResponse = streamClient.createGroupCursor(createCursorRequest); return groupCursorResponse.getCursor().getValue(); } } -
Desde el directorio
wd, ejecute el siguiente comando:mvn install exec:java -Dexec.mainClass=oci.sdk.oss.example.Consumer -
Se mostrarán mensajes similares a los siguientes:
Starting a simple message loop with a group cursor Creating a cursor for group exampleGroup, instance exampleInstance-1. Read 25 messages. Null: Example Test Message 0 Null: Example Test Message 0 Read 2 messages Null: Example Test Message 0 Null: Example Test Message 0 Read 1 messages Null: Example Test Message 0 Read 10 messages key 0: value 0 key 1: value 1Nota
Si ha utilizado la consola para producir un mensaje de prueba, la clave de cada mensaje esNull
Pasos siguientes
Consulte los siguientes recursos para obtener más información: