SDK de inicio rápido de Streaming para Java

Publique y consuma mensajes en el servicio Streaming mediante el SDK de OCI para Java.

En este inicio rápido se muestra cómo utilizar el SDK de Oracle Cloud Infrastructure (OCI) para Java y Oracle Cloud Infrastructure Streaming para publicar y consumir mensajes.

Para conocer los conceptos clave y más detalles de Streaming, consulte Visión general de Streaming. Para obtener más información sobre el uso de los SDK de OCI, consulte las Guías sobre SDK.

Requisitos

  1. Para utilizar el SDK para Java, debe tener lo siguiente:

    • Una cuenta de Oracle Cloud Infrastructure.
    • Un usuario creado en esa cuenta, en un grupo con una política que otorgue los permisos necesarios. El usuario puede ser usted mismo u otra persona/sistema que necesite llamar a la API. Para obtener un ejemplo de cómo configurar un nuevo usuario, grupo, compartimento y política, consulte Adición de usuarios. Para obtener una lista de las políticas típicas que puede que desee utilizar, consulte Políticas Comunes.
    • Par de claves utilizado para firmar solicitudes de API con la clave pública cargada en Oracle. Solo el usuario que llama a la API debe poseer la clave privada.
    • Java 8
    Nota

    Para obtener más información, consulte Configuración del SDK.
  2. Recopile el punto final de mensajes y el OCID de un flujo. Para obtener más información sobre un flujo, consulte Obtención de detalles de un flujo. Para este inicio rápido, el flujo debe utilizar un punto final público y permitir que Oracle gestione el cifrado. Consulte Creación de un flujo y Creación de un pool de flujos si no tiene un flujo existente.
  3. JDK 8 o superior instalado. Asegúrese de que Java está en su PATH (ruta de acceso).
  4. Maven 3.0 o superior instalado. Asegúrese de que Maven está en su PATH.
  5. Intellij (recomendado) o cualquier otro entorno de desarrollo integrado (IDE).
  6. Agregue la versión más reciente de la dependencia de maven o de jar para el SDK de Java de OCI para IAM a su pom.xml de la siguiente forma:

    	<dependency>
    	  <groupId>com.oracle.oci.sdk</groupId>
    	  <artifactId>oci-java-sdk-common</artifactId>
    	  <version>LATEST</version>
    	</dependency>
  7. Agregue la versión más reciente de la dependencia de maven o de jar para el SDK de Java de OCI para OSS a su pom.xml de la siguiente forma:

    	<dependency>
    	  <groupId>com.oracle.oci.sdk</groupId>
    	  <artifactId>oci-java-sdk-streaming</artifactId>
    	  <version>LATEST</version> 
    	</dependency>
  8. Suponiendo que wd sea el directorio de trabajo del proyecto Java de este ejemplo, pom.xml tendrá un aspecto similar al siguiente:

    <?xml version="1.0" encoding="UTF-8"?>
    <projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>oci.example</groupId>
        <artifactId>StreamsJava</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>com.oracle.oci.sdk</groupId>
                <artifactId>oci-java-sdk-common</artifactId>
                <version>1.33.2</version>
            </dependency>
            <dependency>
                <groupId>com.oracle.oci.sdk</groupId>
                <artifactId>oci-java-sdk-streaming</artifactId>
                <version>1.33.2</version>
            </dependency>
        </dependencies>
    </project>
  9. Asegúrese de que tiene un archivo de configuración de SDK válido. Para entornos de producción, debe utilizar la autorización de principal de instancia.

Producción de mensajes

  1. Abra su editor favorito, como Visual Studio Code, desde el directorio wd. Ya debe tener las dependencias oci-sdk para Java como parte de pom.xml del proyecto Java de Maven después de cumplir los requisitos.
  2. Cree un archivo denominado Producer.java en el directorio wd con el siguiente código. Sustituya los valores de las variables configurationFilePath, profile,ociStreamOcid y ociMessageEndpoint en el siguiente fragmento de código por los valores aplicables a su arrendamiento.

    package oci.sdk.oss.example;
    
    import com.oracle.bmc.ConfigFileReader;
    import com.oracle.bmc.auth.AuthenticationDetailsProvider;
    import com.oracle.bmc.auth.ConfigFileAuthenticationDetailsProvider;
    import com.oracle.bmc.streaming.StreamClient;
    import com.oracle.bmc.streaming.model.PutMessagesDetails;
    import com.oracle.bmc.streaming.model.PutMessagesDetailsEntry;
    import com.oracle.bmc.streaming.model.PutMessagesResultEntry;
    import com.oracle.bmc.streaming.requests.PutMessagesRequest;
    import com.oracle.bmc.streaming.responses.PutMessagesResponse;
    import org.apache.commons.lang3.StringUtils;
    
    import java.util.ArrayList;
    import java.util.List;
    
    import static java.nio.charset.StandardCharsets.UTF_8;
    
    public class Producer {
        public static void main(String[] args) throws Exception {
            final String configurationFilePath = "<config_file_path>";
            final String profile = "<config_file_profile_name>";
            final String ociStreamOcid = "<stream_OCID>";
            final String ociMessageEndpoint = "<stream_message_endpoint>";
    
    
            final ConfigFileReader.ConfigFile configFile = ConfigFileReader.parseDefault();
            final AuthenticationDetailsProvider provider =
                    new ConfigFileAuthenticationDetailsProvider(configFile);
    
            // Streams are assigned a specific endpoint url based on where they are provisioned.
            // Create a stream client using the provided message endpoint.
            StreamClient streamClient = StreamClient.builder().endpoint(ociMessageEndpoint).build(provider);
    
            // publish some messages to the stream
            publishExampleMessages(streamClient, ociStreamOcid);
    
        }
    
        private static void publishExampleMessages(StreamClient streamClient, String streamId) {
            // build up a putRequest and publish some messages to the stream
            List<PutMessagesDetailsEntry> messages = new ArrayList<>();
            for (int i = 0; i < 50; i++) {
                messages.add(
                        PutMessagesDetailsEntry.builder()
                                .key(String.format("messageKey%s", i).getBytes(UTF_8))
                                .value(String.format("messageValue%s", i).getBytes(UTF_8))
                                .build());
            }
    
            System.out.println(
                    String.format("Publishing %s messages to stream %s.", messages.size(), streamId));
            PutMessagesDetails messagesDetails =
                    PutMessagesDetails.builder().messages(messages).build();
    
            PutMessagesRequest putRequest =
                    PutMessagesRequest.builder()
                            .streamId(streamId)
                            .putMessagesDetails(messagesDetails)
                            .build();
    
            PutMessagesResponse putResponse = streamClient.putMessages(putRequest);
    
            // the putResponse can contain some useful metadata for handling failures
            for (PutMessagesResultEntry entry : putResponse.getPutMessagesResult().getEntries()) {
                if (StringUtils.isNotBlank(entry.getError())) {
                    System.out.println(
                            String.format("Error(%s): %s", entry.getError(), entry.getErrorMessage()));
                } else {
                    System.out.println(
                            String.format(
                                    "Published message to partition %s, offset %s.",
                                    entry.getPartition(),
                                    entry.getOffset()));
                }
            }
        }
    
    
    }
  3. Desde el directorio wd, ejecute el siguiente comando:

    mvn install exec:java -Dexec.mainClass=oci.sdk.oss.example.Producer
  4. Mostrar los últimos mensajes enviados al flujo para ver los últimos mensajes enviados al flujo para verificar que la producción se ha realizado correctamente.

Consumo de mensajes

  1. En primer lugar, asegúrese de que el flujo del que desea consumir mensajes contiene mensajes. Puede utilizar la consola para producir un mensaje de prueba o utilizar el flujo y los mensajes que hemos creado en este inicio rápido.
  2. Abra su editor favorito, como Visual Studio Code, desde el directorio wd. Ya debe tener las dependencias oci-sdk para Java como parte de pom.xml del proyecto Java de Maven después de cumplir los requisitos.
  3. Cree un archivo denominado Consumer.java en el directorio wd con el siguiente código. Sustituya los valores de las variables configurationFilePath, profile,ociStreamOcid y ociMessageEndpoint en el siguiente fragmento de código por los valores aplicables a su arrendamiento.

    package oci.sdk.oss.example;
    
    import com.google.common.util.concurrent.Uninterruptibles;
    import com.oracle.bmc.ConfigFileReader;
    import com.oracle.bmc.auth.AuthenticationDetailsProvider;
    import com.oracle.bmc.auth.ConfigFileAuthenticationDetailsProvider;
    import com.oracle.bmc.streaming.StreamClient;
    import com.oracle.bmc.streaming.model.CreateGroupCursorDetails;
    import com.oracle.bmc.streaming.model.Message;
    import com.oracle.bmc.streaming.requests.CreateGroupCursorRequest;
    import com.oracle.bmc.streaming.requests.GetMessagesRequest;
    import com.oracle.bmc.streaming.responses.CreateGroupCursorResponse;
    import com.oracle.bmc.streaming.responses.GetMessagesResponse;
    
    import java.util.concurrent.TimeUnit;
    
    import static java.nio.charset.StandardCharsets.UTF_8;
    
    
    public class Consumer {
        public static void main(String[] args) throws Exception {
            final String configurationFilePath = "<config_file_path>";
            final String profile = "<config_file_profile_name>";
            final String ociStreamOcid = "<stream_OCID>";
            final String ociMessageEndpoint = "<stream_message_endpoint>";
    
            final ConfigFileReader.ConfigFile configFile = ConfigFileReader.parseDefault();
            final AuthenticationDetailsProvider provider =
                    new ConfigFileAuthenticationDetailsProvider(configFile);
    
            // Streams are assigned a specific endpoint url based on where they are provisioned.
            // Create a stream client using the provided message endpoint.
            StreamClient streamClient = StreamClient.builder().endpoint(ociMessageEndpoint).build(provider);
    
            // A cursor can be created as part of a consumer group.
            // Committed offsets are managed for the group, and partitions
            // are dynamically balanced amongst consumers in the group.
            System.out.println("Starting a simple message loop with a group cursor");
            String groupCursor =
                    getCursorByGroup(streamClient, ociStreamOcid, "exampleGroup", "exampleInstance-1");
            simpleMessageLoop(streamClient, ociStreamOcid, groupCursor);
    
        }
    
        private static void simpleMessageLoop(
                StreamClient streamClient, String streamId, String initialCursor) {
            String cursor = initialCursor;
            for (int i = 0; i < 10; i++) {
    
                GetMessagesRequest getRequest =
                        GetMessagesRequest.builder()
                                .streamId(streamId)
                                .cursor(cursor)
                                .limit(25)
                                .build();
    
                GetMessagesResponse getResponse = streamClient.getMessages(getRequest);
    
                // process the messages
                System.out.println(String.format("Read %s messages.", getResponse.getItems().size()));
                for (Message message : ((GetMessagesResponse) getResponse).getItems()) {
                    System.out.println(
                            String.format(
                                    "%s: %s",
                                    message.getKey() == null ? "Null" :new String(message.getKey(), UTF_8),
                                    new String(message.getValue(), UTF_8)));
                }
    
                // getMessages is a throttled method; clients should retrieve sufficiently large message
                // batches, as to avoid too many http requests.
                Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
    
                // use the next-cursor for iteration
                cursor = getResponse.getOpcNextCursor();
            }
        }
    
        private static String getCursorByGroup(
                StreamClient streamClient, String streamId, String groupName, String instanceName) {
            System.out.println(
                    String.format(
                            "Creating a cursor for group %s, instance %s.", groupName, instanceName));
    
            CreateGroupCursorDetails cursorDetails =
                    CreateGroupCursorDetails.builder()
                            .groupName(groupName)
                            .instanceName(instanceName)
                            .type(CreateGroupCursorDetails.Type.TrimHorizon)
                            .commitOnGet(true)
                            .build();
    
            CreateGroupCursorRequest createCursorRequest =
                    CreateGroupCursorRequest.builder()
                            .streamId(streamId)
                            .createGroupCursorDetails(cursorDetails)
                            .build();
    
            CreateGroupCursorResponse groupCursorResponse =
                    streamClient.createGroupCursor(createCursorRequest);
            return groupCursorResponse.getCursor().getValue();
        }
    
    }
  4. Desde el directorio wd, ejecute el siguiente comando:

    mvn install exec:java -Dexec.mainClass=oci.sdk.oss.example.Consumer
  5. Se mostrarán mensajes similares a los siguientes:

    Starting a simple message loop with a group cursor
    Creating a cursor for group exampleGroup, instance exampleInstance-1.
    Read 25 messages.
    Null: Example Test Message 0
    Null: Example Test Message 0
     Read 2 messages
    Null: Example Test Message 0
    Null: Example Test Message 0
     Read 1 messages
    Null: Example Test Message 0
     Read 10 messages
    key 0: value 0
    key 1: value 1
    Nota

    Si ha utilizado la consola para producir un mensaje de prueba, la clave de cada mensaje es Null