Avvio rapido SDK per Java Streaming
Pubblica e utilizza i messaggi nel servizio di streaming utilizzando l'SDK OCI per Java.
Questa rapida panoramica ti mostra come utilizzare Oracle Cloud Infrastructure (OCI) SDK for Java e Oracle Cloud Infrastructure Streaming per pubblicare e utilizzare i messaggi.
Per i concetti chiave e ulteriori dettagli sullo streaming, vedere Panoramica dello streaming. Per ulteriori informazioni sull'uso degli SDK OCI, consulta le guide SDK.
Prerequisiti
-
Per utilizzare l'SDK per Java, è necessario disporre dei seguenti elementi:
- Un account Oracle Cloud Infrastructure.
- Utente creato in tale account, in un gruppo con un criterio che concede le autorizzazioni necessarie. L'utente può essere se stesso o un'altra persona/sistema che deve chiamare l'API. Per un esempio su come impostare un nuovo utente, gruppo, compartimento e criterio, vedere Aggiunta di utenti. Per un elenco dei criteri tipici che si desidera utilizzare, vedere Criteri comuni.
- Coppia di chiavi utilizzata per firmare le richieste API, con la chiave pubblica caricata in Oracle. Solo l'utente che chiama l'API deve possedere la chiave privata.
- Java 8
- Raccogliere l'endpoint e l'OCID dei messaggi di un flusso. Per i passi per ottenere i dettagli per un flusso, vedere Recupero dei dettagli per un flusso. Ai fini di questo avvio rapido, il flusso deve utilizzare un endpoint pubblico e consentire a Oracle di gestire la cifratura. Se non si dispone di un flusso esistente, vedere Creazione di un flusso e Creazione di un pool di flussi.
- JDK 8 o versione successiva installato. Assicurati che Java sia nel tuo PATH.
- Maven 3.0 o installato. Assicurati che Maven sia nel tuo percorso.
- Intellij (consigliato) o qualsiasi altro ambiente di sviluppo integrato (IDE).
-
Aggiungere la versione più recente della dipendenza o del file jar del maven per SDK Java OCI per IAM al file
pom.xml
come indicato di seguito.<dependency> <groupId>com.oracle.oci.sdk</groupId> <artifactId>oci-java-sdk-common</artifactId> <version>LATEST</version> </dependency>
-
Aggiungere la versione più recente della dipendenza o del file jar del forno per OCI Java SDK for OSS al file
pom.xml
come indicato di seguito.<dependency> <groupId>com.oracle.oci.sdk</groupId> <artifactId>oci-java-sdk-streaming</artifactId> <version>LATEST</version> </dependency>
-
Supponendo
wd
come directory di lavoro per il progetto Java di questo esempio, l'aspetto dipom.xml
sarà simile al seguente:<?xml version="1.0" encoding="UTF-8"?> <projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>oci.example</groupId> <artifactId>StreamsJava</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>com.oracle.oci.sdk</groupId> <artifactId>oci-java-sdk-common</artifactId> <version>1.33.2</version> </dependency> <dependency> <groupId>com.oracle.oci.sdk</groupId> <artifactId>oci-java-sdk-streaming</artifactId> <version>1.33.2</version> </dependency> </dependencies> </project>
- Assicurarsi di disporre di un file di configurazione SDK valido. Per gli ambienti di produzione, è necessario utilizzare l'autorizzazione principal dell'istanza.
Produzione di messaggi
- Aprire l'editor preferito, ad esempio Visual Studio Code, dalla directory
wd
. È necessario disporre già delle dipendenzeoci-sdk
per Java nell'ambito delpom.xml
del progetto Java Maven dopo aver soddisfatto i prerequisiti. -
Creare un file denominato
Producer.java
nella directorywd
con il codice seguente. Sostituire i valori delle variabiliconfigurationFilePath
,profile
,ociStreamOcid
eociMessageEndpoint
nello snippet di codice seguente con i valori applicabili per la tenancy in uso.package oci.sdk.oss.example; import com.oracle.bmc.ConfigFileReader; import com.oracle.bmc.auth.AuthenticationDetailsProvider; import com.oracle.bmc.auth.ConfigFileAuthenticationDetailsProvider; import com.oracle.bmc.streaming.StreamClient; import com.oracle.bmc.streaming.model.PutMessagesDetails; import com.oracle.bmc.streaming.model.PutMessagesDetailsEntry; import com.oracle.bmc.streaming.model.PutMessagesResultEntry; import com.oracle.bmc.streaming.requests.PutMessagesRequest; import com.oracle.bmc.streaming.responses.PutMessagesResponse; import org.apache.commons.lang3.StringUtils; import java.util.ArrayList; import java.util.List; import static java.nio.charset.StandardCharsets.UTF_8; public class Producer { public static void main(String[] args) throws Exception { final String configurationFilePath = "<config_file_path>"; final String profile = "<config_file_profile_name>"; final String ociStreamOcid = "<stream_OCID>"; final String ociMessageEndpoint = "<stream_message_endpoint>"; final ConfigFileReader.ConfigFile configFile = ConfigFileReader.parseDefault(); final AuthenticationDetailsProvider provider = new ConfigFileAuthenticationDetailsProvider(configFile); // Streams are assigned a specific endpoint url based on where they are provisioned. // Create a stream client using the provided message endpoint. StreamClient streamClient = StreamClient.builder().endpoint(ociMessageEndpoint).build(provider); // publish some messages to the stream publishExampleMessages(streamClient, ociStreamOcid); } private static void publishExampleMessages(StreamClient streamClient, String streamId) { // build up a putRequest and publish some messages to the stream List<PutMessagesDetailsEntry> messages = new ArrayList<>(); for (int i = 0; i < 50; i++) { messages.add( PutMessagesDetailsEntry.builder() .key(String.format("messageKey%s", i).getBytes(UTF_8)) .value(String.format("messageValue%s", i).getBytes(UTF_8)) .build()); } System.out.println( String.format("Publishing %s messages to stream %s.", messages.size(), streamId)); PutMessagesDetails messagesDetails = PutMessagesDetails.builder().messages(messages).build(); PutMessagesRequest putRequest = PutMessagesRequest.builder() .streamId(streamId) .putMessagesDetails(messagesDetails) .build(); PutMessagesResponse putResponse = streamClient.putMessages(putRequest); // the putResponse can contain some useful metadata for handling failures for (PutMessagesResultEntry entry : putResponse.getPutMessagesResult().getEntries()) { if (StringUtils.isNotBlank(entry.getError())) { System.out.println( String.format("Error(%s): %s", entry.getError(), entry.getErrorMessage())); } else { System.out.println( String.format( "Published message to partition %s, offset %s.", entry.getPartition(), entry.getOffset())); } } } }
-
Dalla directory
wd
, eseguire il comando seguente:mvn install exec:java -Dexec.mainClass=oci.sdk.oss.example.Producer
- Mostra i messaggi più recenti inviati al flusso per visualizzare i messaggi più recenti inviati al flusso per verificare che la produzione sia riuscita.
Messaggi di consumo
- In primo luogo, assicurarsi che il flusso da cui si desidera utilizzare i messaggi contenga messaggi. È possibile utilizzare la console per generare un messaggio di test oppure utilizzare il flusso e i messaggi creati in questo avvio rapido.
- Aprire l'editor preferito, ad esempio Visual Studio Code, dalla directory
wd
. È necessario disporre già delle dipendenzeoci-sdk
per Java nell'ambito delpom.xml
del progetto Java Maven dopo aver soddisfatto i prerequisiti. -
Creare un file denominato
Consumer.java
nella directorywd
con il codice seguente. Sostituire i valori delle variabiliconfigurationFilePath
,profile
,ociStreamOcid
eociMessageEndpoint
nello snippet di codice seguente con i valori applicabili per la tenancy in uso.package oci.sdk.oss.example; import com.google.common.util.concurrent.Uninterruptibles; import com.oracle.bmc.ConfigFileReader; import com.oracle.bmc.auth.AuthenticationDetailsProvider; import com.oracle.bmc.auth.ConfigFileAuthenticationDetailsProvider; import com.oracle.bmc.streaming.StreamClient; import com.oracle.bmc.streaming.model.CreateGroupCursorDetails; import com.oracle.bmc.streaming.model.Message; import com.oracle.bmc.streaming.requests.CreateGroupCursorRequest; import com.oracle.bmc.streaming.requests.GetMessagesRequest; import com.oracle.bmc.streaming.responses.CreateGroupCursorResponse; import com.oracle.bmc.streaming.responses.GetMessagesResponse; import java.util.concurrent.TimeUnit; import static java.nio.charset.StandardCharsets.UTF_8; public class Consumer { public static void main(String[] args) throws Exception { final String configurationFilePath = "<config_file_path>"; final String profile = "<config_file_profile_name>"; final String ociStreamOcid = "<stream_OCID>"; final String ociMessageEndpoint = "<stream_message_endpoint>"; final ConfigFileReader.ConfigFile configFile = ConfigFileReader.parseDefault(); final AuthenticationDetailsProvider provider = new ConfigFileAuthenticationDetailsProvider(configFile); // Streams are assigned a specific endpoint url based on where they are provisioned. // Create a stream client using the provided message endpoint. StreamClient streamClient = StreamClient.builder().endpoint(ociMessageEndpoint).build(provider); // A cursor can be created as part of a consumer group. // Committed offsets are managed for the group, and partitions // are dynamically balanced amongst consumers in the group. System.out.println("Starting a simple message loop with a group cursor"); String groupCursor = getCursorByGroup(streamClient, ociStreamOcid, "exampleGroup", "exampleInstance-1"); simpleMessageLoop(streamClient, ociStreamOcid, groupCursor); } private static void simpleMessageLoop( StreamClient streamClient, String streamId, String initialCursor) { String cursor = initialCursor; for (int i = 0; i < 10; i++) { GetMessagesRequest getRequest = GetMessagesRequest.builder() .streamId(streamId) .cursor(cursor) .limit(25) .build(); GetMessagesResponse getResponse = streamClient.getMessages(getRequest); // process the messages System.out.println(String.format("Read %s messages.", getResponse.getItems().size())); for (Message message : ((GetMessagesResponse) getResponse).getItems()) { System.out.println( String.format( "%s: %s", message.getKey() == null ? "Null" :new String(message.getKey(), UTF_8), new String(message.getValue(), UTF_8))); } // getMessages is a throttled method; clients should retrieve sufficiently large message // batches, as to avoid too many http requests. Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); // use the next-cursor for iteration cursor = getResponse.getOpcNextCursor(); } } private static String getCursorByGroup( StreamClient streamClient, String streamId, String groupName, String instanceName) { System.out.println( String.format( "Creating a cursor for group %s, instance %s.", groupName, instanceName)); CreateGroupCursorDetails cursorDetails = CreateGroupCursorDetails.builder() .groupName(groupName) .instanceName(instanceName) .type(CreateGroupCursorDetails.Type.TrimHorizon) .commitOnGet(true) .build(); CreateGroupCursorRequest createCursorRequest = CreateGroupCursorRequest.builder() .streamId(streamId) .createGroupCursorDetails(cursorDetails) .build(); CreateGroupCursorResponse groupCursorResponse = streamClient.createGroupCursor(createCursorRequest); return groupCursorResponse.getCursor().getValue(); } }
-
Dalla directory
wd
, eseguire il comando seguente:mvn install exec:java -Dexec.mainClass=oci.sdk.oss.example.Consumer
-
Dovresti vedere messaggi simili ai seguenti:
Starting a simple message loop with a group cursor Creating a cursor for group exampleGroup, instance exampleInstance-1. Read 25 messages. Null: Example Test Message 0 Null: Example Test Message 0 Read 2 messages Null: Example Test Message 0 Null: Example Test Message 0 Read 1 messages Null: Example Test Message 0 Read 10 messages key 0: value 0 key 1: value 1
Nota
Se è stata utilizzata la console per generare un messaggio di test, la chiave per ogni messaggio èNull
Passo successivo
Per ulteriori informazioni, consultare le risorse elencate di seguito.