Service de diffusion en continu et trousse SDK pour Java - Démarrage rapide
Publier et consommer des messages dans le service de diffusion en continu à l'aide de la trousse SDK OCI pour Java.
Ce démarrage rapide vous montre comment utiliser la trousse SDK pour Java pour Oracle Cloud Infrastructure (OCI) et le service de diffusion en continu pour Oracle Cloud Infrastructure pour publier et consommer des messages.
Pour les concepts clés et plus de détails sur le service de diffusion en continu, voir Aperçu du service de diffusion en continu. Pour plus d'informations sur l'utilisation des trousses SDK pour OCI, consultez les guides sur les SDK.
Préalables
-
Pour utiliser la trousse SDK pour Java, vous devez disposer des éléments suivants :
- Un compte Oracle Cloud Infrastructure
- Utilisateur créé dans ce compte, dans un groupe avec une politique qui accorde les autorisations requises. Il peut s'agir d'un utilisateur pour vous-même, une autre personne ou un autre système qui doit appeler l'API. Pour des exemples sur la configuration d'un nouvel utilisateur, d'un groupe, d'un compartiment et d'une politique, voir Ajout d'utilisateurs. Pour obtenir la liste des politiques types que vous pouvez utiliser, voir Politiques communes.
- Une paire de clés utilisée pour signer des demandes d'API, avec la clé publique chargée dans Oracle. Seul l'utilisateur appelant l'API doit disposer de la clé privée.
- Java 8
- Collectez le point d'extrémité et l'OCID des messages d'un flux. Pour les étapes d'obtention des détails d'un flux, voir Obtention des détails d'un flux. Aux fins de ce démarrage rapide, le flux doit utiliser un point d'extrémité public et laisser Oracle gérer le chiffrement. Reportez-vous aux sections Création d'un flux et Création d'un groupe de flux si vous n'avez pas de flux existant.
- JDK 8 ou une version supérieure doit être installé. Assurez-vous que Java se trouve dans votre chemin d'accès.
- Maven 3.0 doit être installé. Assurez-vous que Maven se trouve dans votre chemin d'accès.
- Intellij (recommandé) ou tout autre environnement de développement intégré (IDE).
-
Ajoutez la dernière version de la dépendance ou du fichier JAR maven pour la trousse SDK Java pour IAM à votre compte
pom.xml
comme suit :<dependency> <groupId>com.oracle.oci.sdk</groupId> <artifactId>oci-java-sdk-common</artifactId> <version>LATEST</version> </dependency>
-
Ajoutez la dernière version de la dépendance ou du fichier JAR maven pour la trousse SDK Java pour OSS à votre compte
pom.xml
comme suit :<dependency> <groupId>com.oracle.oci.sdk</groupId> <artifactId>oci-java-sdk-streaming</artifactId> <version>LATEST</version> </dependency>
-
En supposant que
wd
soit le répertoire de travail pour votre projet Java dans cet exemple,pom.xml
ressemblera à ce qui suit :<?xml version="1.0" encoding="UTF-8"?> <projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>oci.example</groupId> <artifactId>StreamsJava</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>com.oracle.oci.sdk</groupId> <artifactId>oci-java-sdk-common</artifactId> <version>1.33.2</version> </dependency> <dependency> <groupId>com.oracle.oci.sdk</groupId> <artifactId>oci-java-sdk-streaming</artifactId> <version>1.33.2</version> </dependency> </dependencies> </project>
- Assurez-vous de disposer d'un fichier de configuration de trousse SDK valide. Pour les environnements de production, vous devez utiliser l'autorisation du principal d'instance.
Production de messages
- Ouvrez votre éditeur favori, tel que Visual Studio Code, à partir du répertoire
wd
. Vous devez déjà avoir des dépendancesoci-sdk
pour Java dans le cadre du fichierpom.xml
de votre projet Maven Java après avoir satisfait les préalables. -
Créez un fichier nommé
Producer.java
dans le répertoirewd
avec le code suivant. Remplacez les valeurs des variablesconfigurationFilePath
,profile
,ociStreamOcid
etociMessageEndpoint
dans l'extrait de code suivant par les valeurs applicables à votre location.package oci.sdk.oss.example; import com.oracle.bmc.ConfigFileReader; import com.oracle.bmc.auth.AuthenticationDetailsProvider; import com.oracle.bmc.auth.ConfigFileAuthenticationDetailsProvider; import com.oracle.bmc.streaming.StreamClient; import com.oracle.bmc.streaming.model.PutMessagesDetails; import com.oracle.bmc.streaming.model.PutMessagesDetailsEntry; import com.oracle.bmc.streaming.model.PutMessagesResultEntry; import com.oracle.bmc.streaming.requests.PutMessagesRequest; import com.oracle.bmc.streaming.responses.PutMessagesResponse; import org.apache.commons.lang3.StringUtils; import java.util.ArrayList; import java.util.List; import static java.nio.charset.StandardCharsets.UTF_8; public class Producer { public static void main(String[] args) throws Exception { final String configurationFilePath = "<config_file_path>"; final String profile = "<config_file_profile_name>"; final String ociStreamOcid = "<stream_OCID>"; final String ociMessageEndpoint = "<stream_message_endpoint>"; final ConfigFileReader.ConfigFile configFile = ConfigFileReader.parseDefault(); final AuthenticationDetailsProvider provider = new ConfigFileAuthenticationDetailsProvider(configFile); // Streams are assigned a specific endpoint url based on where they are provisioned. // Create a stream client using the provided message endpoint. StreamClient streamClient = StreamClient.builder().endpoint(ociMessageEndpoint).build(provider); // publish some messages to the stream publishExampleMessages(streamClient, ociStreamOcid); } private static void publishExampleMessages(StreamClient streamClient, String streamId) { // build up a putRequest and publish some messages to the stream List<PutMessagesDetailsEntry> messages = new ArrayList<>(); for (int i = 0; i < 50; i++) { messages.add( PutMessagesDetailsEntry.builder() .key(String.format("messageKey%s", i).getBytes(UTF_8)) .value(String.format("messageValue%s", i).getBytes(UTF_8)) .build()); } System.out.println( String.format("Publishing %s messages to stream %s.", messages.size(), streamId)); PutMessagesDetails messagesDetails = PutMessagesDetails.builder().messages(messages).build(); PutMessagesRequest putRequest = PutMessagesRequest.builder() .streamId(streamId) .putMessagesDetails(messagesDetails) .build(); PutMessagesResponse putResponse = streamClient.putMessages(putRequest); // the putResponse can contain some useful metadata for handling failures for (PutMessagesResultEntry entry : putResponse.getPutMessagesResult().getEntries()) { if (StringUtils.isNotBlank(entry.getError())) { System.out.println( String.format("Error(%s): %s", entry.getError(), entry.getErrorMessage())); } else { System.out.println( String.format( "Published message to partition %s, offset %s.", entry.getPartition(), entry.getOffset())); } } } }
-
Depuis le répertoire
wd
, exécutez la commande suivante :mvn install exec:java -Dexec.mainClass=oci.sdk.oss.example.Producer
- Afficher les derniers messages envoyés au flux pour voir les derniers messages envoyés au flux pour vérifier que la production a réussi.
Consommation de messages
- Assurez-vous tout d'abord que le flux à partir duquel vous voulez consommer des messages contient des messages. Vous pouvez utiliser la console pour produire un message de test ou utiliser le flux et les messages que nous avons créés dans ce démarrage rapide.
- Ouvrez votre éditeur favori, tel que Visual Studio Code, à partir du répertoire
wd
. Vous devez déjà avoir des dépendancesoci-sdk
pour Java dans le cadre du fichierpom.xml
de votre projet Maven Java après avoir satisfait les préalables. -
Créez un fichier nommé
Consumer.java
dans le répertoirewd
avec le code suivant. Remplacez les valeurs des variablesconfigurationFilePath
,profile
,ociStreamOcid
etociMessageEndpoint
dans l'extrait de code suivant par les valeurs applicables à votre location.package oci.sdk.oss.example; import com.google.common.util.concurrent.Uninterruptibles; import com.oracle.bmc.ConfigFileReader; import com.oracle.bmc.auth.AuthenticationDetailsProvider; import com.oracle.bmc.auth.ConfigFileAuthenticationDetailsProvider; import com.oracle.bmc.streaming.StreamClient; import com.oracle.bmc.streaming.model.CreateGroupCursorDetails; import com.oracle.bmc.streaming.model.Message; import com.oracle.bmc.streaming.requests.CreateGroupCursorRequest; import com.oracle.bmc.streaming.requests.GetMessagesRequest; import com.oracle.bmc.streaming.responses.CreateGroupCursorResponse; import com.oracle.bmc.streaming.responses.GetMessagesResponse; import java.util.concurrent.TimeUnit; import static java.nio.charset.StandardCharsets.UTF_8; public class Consumer { public static void main(String[] args) throws Exception { final String configurationFilePath = "<config_file_path>"; final String profile = "<config_file_profile_name>"; final String ociStreamOcid = "<stream_OCID>"; final String ociMessageEndpoint = "<stream_message_endpoint>"; final ConfigFileReader.ConfigFile configFile = ConfigFileReader.parseDefault(); final AuthenticationDetailsProvider provider = new ConfigFileAuthenticationDetailsProvider(configFile); // Streams are assigned a specific endpoint url based on where they are provisioned. // Create a stream client using the provided message endpoint. StreamClient streamClient = StreamClient.builder().endpoint(ociMessageEndpoint).build(provider); // A cursor can be created as part of a consumer group. // Committed offsets are managed for the group, and partitions // are dynamically balanced amongst consumers in the group. System.out.println("Starting a simple message loop with a group cursor"); String groupCursor = getCursorByGroup(streamClient, ociStreamOcid, "exampleGroup", "exampleInstance-1"); simpleMessageLoop(streamClient, ociStreamOcid, groupCursor); } private static void simpleMessageLoop( StreamClient streamClient, String streamId, String initialCursor) { String cursor = initialCursor; for (int i = 0; i < 10; i++) { GetMessagesRequest getRequest = GetMessagesRequest.builder() .streamId(streamId) .cursor(cursor) .limit(25) .build(); GetMessagesResponse getResponse = streamClient.getMessages(getRequest); // process the messages System.out.println(String.format("Read %s messages.", getResponse.getItems().size())); for (Message message : ((GetMessagesResponse) getResponse).getItems()) { System.out.println( String.format( "%s: %s", message.getKey() == null ? "Null" :new String(message.getKey(), UTF_8), new String(message.getValue(), UTF_8))); } // getMessages is a throttled method; clients should retrieve sufficiently large message // batches, as to avoid too many http requests. Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); // use the next-cursor for iteration cursor = getResponse.getOpcNextCursor(); } } private static String getCursorByGroup( StreamClient streamClient, String streamId, String groupName, String instanceName) { System.out.println( String.format( "Creating a cursor for group %s, instance %s.", groupName, instanceName)); CreateGroupCursorDetails cursorDetails = CreateGroupCursorDetails.builder() .groupName(groupName) .instanceName(instanceName) .type(CreateGroupCursorDetails.Type.TrimHorizon) .commitOnGet(true) .build(); CreateGroupCursorRequest createCursorRequest = CreateGroupCursorRequest.builder() .streamId(streamId) .createGroupCursorDetails(cursorDetails) .build(); CreateGroupCursorResponse groupCursorResponse = streamClient.createGroupCursor(createCursorRequest); return groupCursorResponse.getCursor().getValue(); } }
-
Depuis le répertoire
wd
, exécutez la commande suivante :mvn install exec:java -Dexec.mainClass=oci.sdk.oss.example.Consumer
-
Des messages similaires aux suivants doivent s'afficher :
Starting a simple message loop with a group cursor Creating a cursor for group exampleGroup, instance exampleInstance-1. Read 25 messages. Null: Example Test Message 0 Null: Example Test Message 0 Read 2 messages Null: Example Test Message 0 Null: Example Test Message 0 Read 1 messages Null: Example Test Message 0 Read 10 messages key 0: value 0 key 1: value 1
Note
Si vous avez utilisé la console pour produire un message de test, la clé de chaque message estNull
.
Étapes suivantes
Pour plus d'informations, voir les ressources suivantes :