SDK für Java mit Streaming verwenden - Schnellstart
Veröffentlichen und konsumieren Sie Nachrichten im Streaming-Service mit dem OCI-SDK für Java.
In diesem Schnellstart wird gezeigt, wie Sie das Oracle Cloud Infrastructure-(OCI-)SDK für Java und Oracle Cloud Infrastructure Streaming verwenden, um Nachrichten zu veröffentlichen und zu konsumieren.
Wichtige Konzepte und weitere Streamingdetails finden Sie unter Überblick über Streaming. Weitere Informationen zur Verwendung der OCI-SDKs finden Sie in den SDK-Dokumentationen.
Voraussetzungen
-
Um das SDK für Java zu verwenden, müssen Sie über Folgendes verfügen:
- Ein Oracle Cloud Infrastructure-Account.
- Ein in diesem Account erstellter Benutzer in einer Gruppe mit einer Policy, die die erforderlichen Berechtigungen erteilt. Dieser Benutzerkann Ihr eigener Benutzer oder eine andere Person/ein anderes System sein, die/das die API aufrufen muss. Ein Beispiel für die Einrichtung eines neuen Benutzers, einer neuen Gruppe, eines neuen Compartments und einer neuen Policy finden Sie unter Benutzer hinzufügen. Eine Liste der typischen Policys, die Sie verwenden können, finden Sie unter Allgemeine Policys.
- Ein Schlüsselpaar zum Signieren von API-Anforderungen, wobei der Public Key bei Oracle hochgeladen wird. Nur der Benutzer, der die API aufruft, sollte im Besitz des Private Keys sein.
- Java 8
- Erfassen Sie den Nachrichtenendpunkt und die OCID eines Streams. Die Schritte zum Abrufen von Details für einen Stream finden Sie unter Details für einen Stream abrufen. Im Rahmen dieses Schnellstarts sollte der Stream einen öffentlichen Endpunkt und die von Oracle verwaltete Verschlüsselung verwenden. Informationen hierzu finden Sie unter Streams erstellen und Streampool erstellen, wenn kein Stream vorhanden ist.
- JDK 8 oder höher ist installiert. Stellen Sie sicher, dass sich Java in Ihrem PATH befindet.
- Maven 3.0 oder höher ist installiert. Stellen Sie sicher, dass sich Maven in Ihrem Pfad befindet.
- Intellij (empfohlen) oder eine andere Integrated Development Environment (IDE).
-
Fügen Sie die neueste Version der Maven-Abhängigkeit oder JAR-Datei für das OCI-Java-SDK für IAM wie folgt zu Ihrer
pom.xml
hinzu:<dependency> <groupId>com.oracle.oci.sdk</groupId> <artifactId>oci-java-sdk-common</artifactId> <version>LATEST</version> </dependency>
-
Fügen Sie die neueste Version der Maven-Abhängigkeit oder JAR-Datei für OCI-Java-SDK für OSS wie folgt zu Ihrer
pom.xml
hinzu:<dependency> <groupId>com.oracle.oci.sdk</groupId> <artifactId>oci-java-sdk-streaming</artifactId> <version>LATEST</version> </dependency>
-
Wenn Sie
wd
als Arbeitsverzeichnis für das Java-Projekt in diesem Beispiel verwenden, siehtpom.xml
in etwa wie folgt aus:<?xml version="1.0" encoding="UTF-8"?> <projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>oci.example</groupId> <artifactId>StreamsJava</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>com.oracle.oci.sdk</groupId> <artifactId>oci-java-sdk-common</artifactId> <version>1.33.2</version> </dependency> <dependency> <groupId>com.oracle.oci.sdk</groupId> <artifactId>oci-java-sdk-streaming</artifactId> <version>1.33.2</version> </dependency> </dependencies> </project>
- Stellen Sie sicher, dass Sie über eine gültige SDK-Konfigurationsdatei verfügen. Verwenden Sie für Produktionsumgebungen die Instanz-Principal-Autorisierung.
Nachrichten erzeugen
- Öffnen Sie Ihren bevorzugten Editor, wie Visual Studio Code, im Verzeichnis
wd
. Sie sollten bereits überoci-sdk
-Abhängigkeiten für Java als Teil der Dateipom.xml
Ihres Maven-Java-Projekts verfügen, nachdem Sie die Schritte unter Voraussetzungen ausgeführt haben. -
Erstellen Sie eine Datei namens
Producer.java
im Verzeichniswd
mit dem folgenden Code. Ersetzen Sie die Werte der VariablenconfigurationFilePath
,profile
,ociStreamOcid
undociMessageEndpoint
im folgenden Code-Snippet durch die Werte für Ihren Mandanten.package oci.sdk.oss.example; import com.oracle.bmc.ConfigFileReader; import com.oracle.bmc.auth.AuthenticationDetailsProvider; import com.oracle.bmc.auth.ConfigFileAuthenticationDetailsProvider; import com.oracle.bmc.streaming.StreamClient; import com.oracle.bmc.streaming.model.PutMessagesDetails; import com.oracle.bmc.streaming.model.PutMessagesDetailsEntry; import com.oracle.bmc.streaming.model.PutMessagesResultEntry; import com.oracle.bmc.streaming.requests.PutMessagesRequest; import com.oracle.bmc.streaming.responses.PutMessagesResponse; import org.apache.commons.lang3.StringUtils; import java.util.ArrayList; import java.util.List; import static java.nio.charset.StandardCharsets.UTF_8; public class Producer { public static void main(String[] args) throws Exception { final String configurationFilePath = "<config_file_path>"; final String profile = "<config_file_profile_name>"; final String ociStreamOcid = "<stream_OCID>"; final String ociMessageEndpoint = "<stream_message_endpoint>"; final ConfigFileReader.ConfigFile configFile = ConfigFileReader.parseDefault(); final AuthenticationDetailsProvider provider = new ConfigFileAuthenticationDetailsProvider(configFile); // Streams are assigned a specific endpoint url based on where they are provisioned. // Create a stream client using the provided message endpoint. StreamClient streamClient = StreamClient.builder().endpoint(ociMessageEndpoint).build(provider); // publish some messages to the stream publishExampleMessages(streamClient, ociStreamOcid); } private static void publishExampleMessages(StreamClient streamClient, String streamId) { // build up a putRequest and publish some messages to the stream List<PutMessagesDetailsEntry> messages = new ArrayList<>(); for (int i = 0; i < 50; i++) { messages.add( PutMessagesDetailsEntry.builder() .key(String.format("messageKey%s", i).getBytes(UTF_8)) .value(String.format("messageValue%s", i).getBytes(UTF_8)) .build()); } System.out.println( String.format("Publishing %s messages to stream %s.", messages.size(), streamId)); PutMessagesDetails messagesDetails = PutMessagesDetails.builder().messages(messages).build(); PutMessagesRequest putRequest = PutMessagesRequest.builder() .streamId(streamId) .putMessagesDetails(messagesDetails) .build(); PutMessagesResponse putResponse = streamClient.putMessages(putRequest); // the putResponse can contain some useful metadata for handling failures for (PutMessagesResultEntry entry : putResponse.getPutMessagesResult().getEntries()) { if (StringUtils.isNotBlank(entry.getError())) { System.out.println( String.format("Error(%s): %s", entry.getError(), entry.getErrorMessage())); } else { System.out.println( String.format( "Published message to partition %s, offset %s.", entry.getPartition(), entry.getOffset())); } } } }
-
Führen Sie im Verzeichnis
wd
den folgenden Befehl aus:mvn install exec:java -Dexec.mainClass=oci.sdk.oss.example.Producer
- Zeigen Sie die neuesten Nachrichten an, die an den Stream gesendet wurden, um die neuesten Nachrichten anzuzeigen, die an den Stream gesendet wurden, um zu prüfen, ob die Produktion erfolgreich war.
Nachrichten konsumieren
- Stellen Sie zunächst sicher, dass der Stream, aus dem Sie Nachrichten konsumieren möchten, Nachrichten enthält. Sie können eine Testnachricht mit der Konsole erstellen oder den Stream und die Nachrichten verwenden, die wir in diesem Schnellstart erstellt haben.
- Öffnen Sie Ihren bevorzugten Editor, wie Visual Studio Code, im Verzeichnis
wd
. Sie sollten bereits überoci-sdk
-Abhängigkeiten für Java als Teil der Dateipom.xml
Ihres Maven-Java-Projekts verfügen, nachdem Sie die Schritte unter Voraussetzungen ausgeführt haben. -
Erstellen Sie eine Datei namens
Consumer.java
im Verzeichniswd
mit dem folgenden Code. Ersetzen Sie die Werte der VariablenconfigurationFilePath
,profile
,ociStreamOcid
undociMessageEndpoint
im folgenden Code-Snippet durch die Werte für Ihren Mandanten.package oci.sdk.oss.example; import com.google.common.util.concurrent.Uninterruptibles; import com.oracle.bmc.ConfigFileReader; import com.oracle.bmc.auth.AuthenticationDetailsProvider; import com.oracle.bmc.auth.ConfigFileAuthenticationDetailsProvider; import com.oracle.bmc.streaming.StreamClient; import com.oracle.bmc.streaming.model.CreateGroupCursorDetails; import com.oracle.bmc.streaming.model.Message; import com.oracle.bmc.streaming.requests.CreateGroupCursorRequest; import com.oracle.bmc.streaming.requests.GetMessagesRequest; import com.oracle.bmc.streaming.responses.CreateGroupCursorResponse; import com.oracle.bmc.streaming.responses.GetMessagesResponse; import java.util.concurrent.TimeUnit; import static java.nio.charset.StandardCharsets.UTF_8; public class Consumer { public static void main(String[] args) throws Exception { final String configurationFilePath = "<config_file_path>"; final String profile = "<config_file_profile_name>"; final String ociStreamOcid = "<stream_OCID>"; final String ociMessageEndpoint = "<stream_message_endpoint>"; final ConfigFileReader.ConfigFile configFile = ConfigFileReader.parseDefault(); final AuthenticationDetailsProvider provider = new ConfigFileAuthenticationDetailsProvider(configFile); // Streams are assigned a specific endpoint url based on where they are provisioned. // Create a stream client using the provided message endpoint. StreamClient streamClient = StreamClient.builder().endpoint(ociMessageEndpoint).build(provider); // A cursor can be created as part of a consumer group. // Committed offsets are managed for the group, and partitions // are dynamically balanced amongst consumers in the group. System.out.println("Starting a simple message loop with a group cursor"); String groupCursor = getCursorByGroup(streamClient, ociStreamOcid, "exampleGroup", "exampleInstance-1"); simpleMessageLoop(streamClient, ociStreamOcid, groupCursor); } private static void simpleMessageLoop( StreamClient streamClient, String streamId, String initialCursor) { String cursor = initialCursor; for (int i = 0; i < 10; i++) { GetMessagesRequest getRequest = GetMessagesRequest.builder() .streamId(streamId) .cursor(cursor) .limit(25) .build(); GetMessagesResponse getResponse = streamClient.getMessages(getRequest); // process the messages System.out.println(String.format("Read %s messages.", getResponse.getItems().size())); for (Message message : ((GetMessagesResponse) getResponse).getItems()) { System.out.println( String.format( "%s: %s", message.getKey() == null ? "Null" :new String(message.getKey(), UTF_8), new String(message.getValue(), UTF_8))); } // getMessages is a throttled method; clients should retrieve sufficiently large message // batches, as to avoid too many http requests. Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); // use the next-cursor for iteration cursor = getResponse.getOpcNextCursor(); } } private static String getCursorByGroup( StreamClient streamClient, String streamId, String groupName, String instanceName) { System.out.println( String.format( "Creating a cursor for group %s, instance %s.", groupName, instanceName)); CreateGroupCursorDetails cursorDetails = CreateGroupCursorDetails.builder() .groupName(groupName) .instanceName(instanceName) .type(CreateGroupCursorDetails.Type.TrimHorizon) .commitOnGet(true) .build(); CreateGroupCursorRequest createCursorRequest = CreateGroupCursorRequest.builder() .streamId(streamId) .createGroupCursorDetails(cursorDetails) .build(); CreateGroupCursorResponse groupCursorResponse = streamClient.createGroupCursor(createCursorRequest); return groupCursorResponse.getCursor().getValue(); } }
-
Führen Sie im Verzeichnis
wd
den folgenden Befehl aus:mvn install exec:java -Dexec.mainClass=oci.sdk.oss.example.Consumer
-
Meldungen wie die Folgenden sollten angezeigt werden:
Starting a simple message loop with a group cursor Creating a cursor for group exampleGroup, instance exampleInstance-1. Read 25 messages. Null: Example Test Message 0 Null: Example Test Message 0 Read 2 messages Null: Example Test Message 0 Null: Example Test Message 0 Read 1 messages Null: Example Test Message 0 Read 10 messages key 0: value 0 key 1: value 1
Hinweis
Wenn Sie die Konsole zum Erzeugen einer Testnachricht verwendet haben, lautet der Schlüssel für jede NachrichtNull
Nächste Schritte
Weitere Informationen finden Sie in den folgenden Ressourcen: