Avvio rapido SDK per Java Streaming

Pubblica e utilizza i messaggi nel servizio di streaming utilizzando l'SDK OCI per Java.

Questa rapida panoramica ti mostra come utilizzare Oracle Cloud Infrastructure (OCI) SDK for Java e Oracle Cloud Infrastructure Streaming per pubblicare e utilizzare i messaggi.

Per i concetti chiave e ulteriori dettagli sullo streaming, vedere Panoramica dello streaming. Per ulteriori informazioni sull'uso degli SDK OCI, consulta le guide SDK.

Prerequisiti

  1. Per utilizzare l'SDK per Java, è necessario disporre dei seguenti elementi:

    • Un account Oracle Cloud Infrastructure.
    • Utente creato in tale account, in un gruppo con un criterio che concede le autorizzazioni necessarie. L'utente può essere se stesso o un'altra persona/sistema che deve chiamare l'API. Per un esempio su come impostare un nuovo utente, gruppo, compartimento e criterio, vedere Aggiunta di utenti. Per un elenco dei criteri tipici che si desidera utilizzare, vedere Criteri comuni.
    • Coppia di chiavi utilizzata per firmare le richieste API, con la chiave pubblica caricata in Oracle. Solo l'utente che chiama l'API deve possedere la chiave privata.
    • Java 8
    Nota

    Per ulteriori informazioni, vedere Configurazione dell'SDK.
  2. Raccogliere l'endpoint e l'OCID dei messaggi di un flusso. Per i passi per ottenere i dettagli per un flusso, vedere Recupero dei dettagli per un flusso. Ai fini di questo avvio rapido, il flusso deve utilizzare un endpoint pubblico e consentire a Oracle di gestire la cifratura. Se non si dispone di un flusso esistente, vedere Creazione di un flusso e Creazione di un pool di flussi.
  3. JDK 8 o versione successiva installato. Assicurati che Java sia nel tuo PATH.
  4. Maven 3.0 o installato. Assicurati che Maven sia nel tuo percorso.
  5. Intellij (consigliato) o qualsiasi altro ambiente di sviluppo integrato (IDE).
  6. Aggiungere la versione più recente della dipendenza o del file jar del maven per SDK Java OCI per IAM al file pom.xml come indicato di seguito.

    	<dependency>
    	  <groupId>com.oracle.oci.sdk</groupId>
    	  <artifactId>oci-java-sdk-common</artifactId>
    	  <version>LATEST</version>
    	</dependency>
  7. Aggiungere la versione più recente della dipendenza o del file jar del forno per OCI Java SDK for OSS al file pom.xml come indicato di seguito.

    	<dependency>
    	  <groupId>com.oracle.oci.sdk</groupId>
    	  <artifactId>oci-java-sdk-streaming</artifactId>
    	  <version>LATEST</version> 
    	</dependency>
  8. Supponendo wd come directory di lavoro per il progetto Java di questo esempio, l'aspetto di pom.xml sarà simile al seguente:

    <?xml version="1.0" encoding="UTF-8"?>
    <projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>oci.example</groupId>
        <artifactId>StreamsJava</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>com.oracle.oci.sdk</groupId>
                <artifactId>oci-java-sdk-common</artifactId>
                <version>1.33.2</version>
            </dependency>
            <dependency>
                <groupId>com.oracle.oci.sdk</groupId>
                <artifactId>oci-java-sdk-streaming</artifactId>
                <version>1.33.2</version>
            </dependency>
        </dependencies>
    </project>
  9. Assicurarsi di disporre di un file di configurazione SDK valido. Per gli ambienti di produzione, è necessario utilizzare l'autorizzazione principal dell'istanza.

Produzione di messaggi

  1. Aprire l'editor preferito, ad esempio Visual Studio Code, dalla directory wd. È necessario disporre già delle dipendenze oci-sdk per Java nell'ambito del pom.xml del progetto Java Maven dopo aver soddisfatto i prerequisiti.
  2. Creare un file denominato Producer.java nella directory wd con il codice seguente. Sostituire i valori delle variabili configurationFilePath, profile,ociStreamOcid e ociMessageEndpoint nello snippet di codice seguente con i valori applicabili per la tenancy in uso.

    package oci.sdk.oss.example;
    
    import com.oracle.bmc.ConfigFileReader;
    import com.oracle.bmc.auth.AuthenticationDetailsProvider;
    import com.oracle.bmc.auth.ConfigFileAuthenticationDetailsProvider;
    import com.oracle.bmc.streaming.StreamClient;
    import com.oracle.bmc.streaming.model.PutMessagesDetails;
    import com.oracle.bmc.streaming.model.PutMessagesDetailsEntry;
    import com.oracle.bmc.streaming.model.PutMessagesResultEntry;
    import com.oracle.bmc.streaming.requests.PutMessagesRequest;
    import com.oracle.bmc.streaming.responses.PutMessagesResponse;
    import org.apache.commons.lang3.StringUtils;
    
    import java.util.ArrayList;
    import java.util.List;
    
    import static java.nio.charset.StandardCharsets.UTF_8;
    
    public class Producer {
        public static void main(String[] args) throws Exception {
            final String configurationFilePath = "<config_file_path>";
            final String profile = "<config_file_profile_name>";
            final String ociStreamOcid = "<stream_OCID>";
            final String ociMessageEndpoint = "<stream_message_endpoint>";
    
    
            final ConfigFileReader.ConfigFile configFile = ConfigFileReader.parseDefault();
            final AuthenticationDetailsProvider provider =
                    new ConfigFileAuthenticationDetailsProvider(configFile);
    
            // Streams are assigned a specific endpoint url based on where they are provisioned.
            // Create a stream client using the provided message endpoint.
            StreamClient streamClient = StreamClient.builder().endpoint(ociMessageEndpoint).build(provider);
    
            // publish some messages to the stream
            publishExampleMessages(streamClient, ociStreamOcid);
    
        }
    
        private static void publishExampleMessages(StreamClient streamClient, String streamId) {
            // build up a putRequest and publish some messages to the stream
            List<PutMessagesDetailsEntry> messages = new ArrayList<>();
            for (int i = 0; i < 50; i++) {
                messages.add(
                        PutMessagesDetailsEntry.builder()
                                .key(String.format("messageKey%s", i).getBytes(UTF_8))
                                .value(String.format("messageValue%s", i).getBytes(UTF_8))
                                .build());
            }
    
            System.out.println(
                    String.format("Publishing %s messages to stream %s.", messages.size(), streamId));
            PutMessagesDetails messagesDetails =
                    PutMessagesDetails.builder().messages(messages).build();
    
            PutMessagesRequest putRequest =
                    PutMessagesRequest.builder()
                            .streamId(streamId)
                            .putMessagesDetails(messagesDetails)
                            .build();
    
            PutMessagesResponse putResponse = streamClient.putMessages(putRequest);
    
            // the putResponse can contain some useful metadata for handling failures
            for (PutMessagesResultEntry entry : putResponse.getPutMessagesResult().getEntries()) {
                if (StringUtils.isNotBlank(entry.getError())) {
                    System.out.println(
                            String.format("Error(%s): %s", entry.getError(), entry.getErrorMessage()));
                } else {
                    System.out.println(
                            String.format(
                                    "Published message to partition %s, offset %s.",
                                    entry.getPartition(),
                                    entry.getOffset()));
                }
            }
        }
    
    
    }
  3. Dalla directory wd, eseguire il comando seguente:

    mvn install exec:java -Dexec.mainClass=oci.sdk.oss.example.Producer
  4. Mostra i messaggi più recenti inviati al flusso per visualizzare i messaggi più recenti inviati al flusso per verificare che la produzione sia riuscita.

Messaggi di consumo

  1. In primo luogo, assicurarsi che il flusso da cui si desidera utilizzare i messaggi contenga messaggi. È possibile utilizzare la console per generare un messaggio di test oppure utilizzare il flusso e i messaggi creati in questo avvio rapido.
  2. Aprire l'editor preferito, ad esempio Visual Studio Code, dalla directory wd. È necessario disporre già delle dipendenze oci-sdk per Java nell'ambito del pom.xml del progetto Java Maven dopo aver soddisfatto i prerequisiti.
  3. Creare un file denominato Consumer.java nella directory wd con il codice seguente. Sostituire i valori delle variabili configurationFilePath, profile,ociStreamOcid e ociMessageEndpoint nello snippet di codice seguente con i valori applicabili per la tenancy in uso.

    package oci.sdk.oss.example;
    
    import com.google.common.util.concurrent.Uninterruptibles;
    import com.oracle.bmc.ConfigFileReader;
    import com.oracle.bmc.auth.AuthenticationDetailsProvider;
    import com.oracle.bmc.auth.ConfigFileAuthenticationDetailsProvider;
    import com.oracle.bmc.streaming.StreamClient;
    import com.oracle.bmc.streaming.model.CreateGroupCursorDetails;
    import com.oracle.bmc.streaming.model.Message;
    import com.oracle.bmc.streaming.requests.CreateGroupCursorRequest;
    import com.oracle.bmc.streaming.requests.GetMessagesRequest;
    import com.oracle.bmc.streaming.responses.CreateGroupCursorResponse;
    import com.oracle.bmc.streaming.responses.GetMessagesResponse;
    
    import java.util.concurrent.TimeUnit;
    
    import static java.nio.charset.StandardCharsets.UTF_8;
    
    
    public class Consumer {
        public static void main(String[] args) throws Exception {
            final String configurationFilePath = "<config_file_path>";
            final String profile = "<config_file_profile_name>";
            final String ociStreamOcid = "<stream_OCID>";
            final String ociMessageEndpoint = "<stream_message_endpoint>";
    
            final ConfigFileReader.ConfigFile configFile = ConfigFileReader.parseDefault();
            final AuthenticationDetailsProvider provider =
                    new ConfigFileAuthenticationDetailsProvider(configFile);
    
            // Streams are assigned a specific endpoint url based on where they are provisioned.
            // Create a stream client using the provided message endpoint.
            StreamClient streamClient = StreamClient.builder().endpoint(ociMessageEndpoint).build(provider);
    
            // A cursor can be created as part of a consumer group.
            // Committed offsets are managed for the group, and partitions
            // are dynamically balanced amongst consumers in the group.
            System.out.println("Starting a simple message loop with a group cursor");
            String groupCursor =
                    getCursorByGroup(streamClient, ociStreamOcid, "exampleGroup", "exampleInstance-1");
            simpleMessageLoop(streamClient, ociStreamOcid, groupCursor);
    
        }
    
        private static void simpleMessageLoop(
                StreamClient streamClient, String streamId, String initialCursor) {
            String cursor = initialCursor;
            for (int i = 0; i < 10; i++) {
    
                GetMessagesRequest getRequest =
                        GetMessagesRequest.builder()
                                .streamId(streamId)
                                .cursor(cursor)
                                .limit(25)
                                .build();
    
                GetMessagesResponse getResponse = streamClient.getMessages(getRequest);
    
                // process the messages
                System.out.println(String.format("Read %s messages.", getResponse.getItems().size()));
                for (Message message : ((GetMessagesResponse) getResponse).getItems()) {
                    System.out.println(
                            String.format(
                                    "%s: %s",
                                    message.getKey() == null ? "Null" :new String(message.getKey(), UTF_8),
                                    new String(message.getValue(), UTF_8)));
                }
    
                // getMessages is a throttled method; clients should retrieve sufficiently large message
                // batches, as to avoid too many http requests.
                Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
    
                // use the next-cursor for iteration
                cursor = getResponse.getOpcNextCursor();
            }
        }
    
        private static String getCursorByGroup(
                StreamClient streamClient, String streamId, String groupName, String instanceName) {
            System.out.println(
                    String.format(
                            "Creating a cursor for group %s, instance %s.", groupName, instanceName));
    
            CreateGroupCursorDetails cursorDetails =
                    CreateGroupCursorDetails.builder()
                            .groupName(groupName)
                            .instanceName(instanceName)
                            .type(CreateGroupCursorDetails.Type.TrimHorizon)
                            .commitOnGet(true)
                            .build();
    
            CreateGroupCursorRequest createCursorRequest =
                    CreateGroupCursorRequest.builder()
                            .streamId(streamId)
                            .createGroupCursorDetails(cursorDetails)
                            .build();
    
            CreateGroupCursorResponse groupCursorResponse =
                    streamClient.createGroupCursor(createCursorRequest);
            return groupCursorResponse.getCursor().getValue();
        }
    
    }
  4. Dalla directory wd, eseguire il comando seguente:

    mvn install exec:java -Dexec.mainClass=oci.sdk.oss.example.Consumer
  5. Dovresti vedere messaggi simili ai seguenti:

    Starting a simple message loop with a group cursor
    Creating a cursor for group exampleGroup, instance exampleInstance-1.
    Read 25 messages.
    Null: Example Test Message 0
    Null: Example Test Message 0
     Read 2 messages
    Null: Example Test Message 0
    Null: Example Test Message 0
     Read 1 messages
    Null: Example Test Message 0
     Read 10 messages
    key 0: value 0
    key 1: value 1
    Nota

    Se è stata utilizzata la console per generare un messaggio di test, la chiave per ogni messaggio è Null