Démarrage rapide du kit SDK pour Java avec Streaming

Publiez et utilisez des messages dans le service Streaming à l'aide du kit SDK OCI pour Java.

Ce démarrage rapide vous explique comment utiliser le kit SDK Oracle Cloud Infrastructure (OCI) pour Java et Oracle Cloud Infrastructure Streaming afin de publier et d'utiliser des messages.

Pour plus d'informations sur les concepts clés et Streaming, reportez-vous à Présentation de Streaming. Pour plus d'informations sur l'utilisation des kits SDK OCI, reportez-vous aux guides SDK.

Prérequis

  1. Afin d'utiliser le kit SDK pour Java, vous devez disposer des éléments suivants :

    • Un compte Oracle Cloud Infrastructure.
    • Un utilisateur créé dans ce compte, dans un groupe avec une stratégie qui octroie les droits d'accès requis. L'utilisateur peut être vous-même, ou une autre personne/un autre système devant appeler l'API. Pour obtenir un exemple de configuration d'un nouvel utilisateur, d'un nouveau groupe, d'un nouveau compartiment et d'une nouvelle stratégie, reportez-vous à Ajout d'utilisateurs. Pour obtenir la liste des stratégies standard que vous pouvez utiliser, reportez-vous à Stratégies courantes.
    • Une paire de clés utilisée lors de la signature des demandes d'API, avec la clé publique téléchargée vers Oracle. Seul l'utilisateur appelant l'API doit disposer de la clé privée.
    • Java 8
    Remarque

    Pour plus d'informations, reportez-vous à Configuration du kit SDK.
  2. Collectez l'adresse des messages et l'OCID d'un flux de données. Pour connaître les étapes d'obtention des détails d'un flux de données, reportez-vous à Obtention des détails d'un flux de données. Dans le cadre de ce démarrage rapide, le flux de données doit utiliser une adresse publique et laisser Oracle gérer le cryptage. Si vous n'avez pas de flux existant, reportez-vous à Création d'un flux de données et à Création d'un pool de flux de données.
  3. JDK version 8 ou ultérieure installé. Assurez-vous que Java se trouve dans votre PATH.
  4. Maven version 3.0 ou ultérieure installé. Assurez-vous que Maven se trouve dans votre PATH.
  5. IntelliJ (recommandé) ou tout autre environnement de développement intégré (IDE).
  6. Ajoutez la dernière version de la dépendance ou du fichier JAR Maven pour OCI Java SDK for IAM à votre fichier pom.xml, comme suit :

    	<dependency>
    	  <groupId>com.oracle.oci.sdk</groupId>
    	  <artifactId>oci-java-sdk-common</artifactId>
    	  <version>LATEST</version>
    	</dependency>
  7. Ajoutez la dernière version de la dépendance ou du fichier JAR Maven pour le kit SDK Java OCI pour OSS à votre fichier pom.xml, comme suit :

    	<dependency>
    	  <groupId>com.oracle.oci.sdk</groupId>
    	  <artifactId>oci-java-sdk-streaming</artifactId>
    	  <version>LATEST</version> 
    	</dependency>
  8. En supposant que wd est votre répertoire de travail pour le projet Java de cet exemple, votre fichier pom.xml se présente comme suit :

    <?xml version="1.0" encoding="UTF-8"?>
    <projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>oci.example</groupId>
        <artifactId>StreamsJava</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>com.oracle.oci.sdk</groupId>
                <artifactId>oci-java-sdk-common</artifactId>
                <version>1.33.2</version>
            </dependency>
            <dependency>
                <groupId>com.oracle.oci.sdk</groupId>
                <artifactId>oci-java-sdk-streaming</artifactId>
                <version>1.33.2</version>
            </dependency>
        </dependencies>
    </project>
  9. Assurez-vous que vous disposez d'un fichier de configuration de kit SDK valide. Pour les environnements de production, vous devez utiliser l'autorisation de principal d'instance.

Production de messages

  1. Ouvrez votre éditeur favori, tel que Visual Studio Code, à partir du répertoire wd. Vous devez déjà disposer de dépendances oci-sdk pour Java dans le fichier pom.xml du projet Java Maven si les prérequis ont été respectés.
  2. Créez un fichier nommé Producer.java dans le répertoire wd avec le code suivant. Remplacez les valeurs des variables configurationFilePath, profile, ociStreamOcid et ociMessageEndpoint du fragment de code suivant par les valeurs applicables à votre location.

    package oci.sdk.oss.example;
    
    import com.oracle.bmc.ConfigFileReader;
    import com.oracle.bmc.auth.AuthenticationDetailsProvider;
    import com.oracle.bmc.auth.ConfigFileAuthenticationDetailsProvider;
    import com.oracle.bmc.streaming.StreamClient;
    import com.oracle.bmc.streaming.model.PutMessagesDetails;
    import com.oracle.bmc.streaming.model.PutMessagesDetailsEntry;
    import com.oracle.bmc.streaming.model.PutMessagesResultEntry;
    import com.oracle.bmc.streaming.requests.PutMessagesRequest;
    import com.oracle.bmc.streaming.responses.PutMessagesResponse;
    import org.apache.commons.lang3.StringUtils;
    
    import java.util.ArrayList;
    import java.util.List;
    
    import static java.nio.charset.StandardCharsets.UTF_8;
    
    public class Producer {
        public static void main(String[] args) throws Exception {
            final String configurationFilePath = "<config_file_path>";
            final String profile = "<config_file_profile_name>";
            final String ociStreamOcid = "<stream_OCID>";
            final String ociMessageEndpoint = "<stream_message_endpoint>";
    
    
            final ConfigFileReader.ConfigFile configFile = ConfigFileReader.parseDefault();
            final AuthenticationDetailsProvider provider =
                    new ConfigFileAuthenticationDetailsProvider(configFile);
    
            // Streams are assigned a specific endpoint url based on where they are provisioned.
            // Create a stream client using the provided message endpoint.
            StreamClient streamClient = StreamClient.builder().endpoint(ociMessageEndpoint).build(provider);
    
            // publish some messages to the stream
            publishExampleMessages(streamClient, ociStreamOcid);
    
        }
    
        private static void publishExampleMessages(StreamClient streamClient, String streamId) {
            // build up a putRequest and publish some messages to the stream
            List<PutMessagesDetailsEntry> messages = new ArrayList<>();
            for (int i = 0; i < 50; i++) {
                messages.add(
                        PutMessagesDetailsEntry.builder()
                                .key(String.format("messageKey%s", i).getBytes(UTF_8))
                                .value(String.format("messageValue%s", i).getBytes(UTF_8))
                                .build());
            }
    
            System.out.println(
                    String.format("Publishing %s messages to stream %s.", messages.size(), streamId));
            PutMessagesDetails messagesDetails =
                    PutMessagesDetails.builder().messages(messages).build();
    
            PutMessagesRequest putRequest =
                    PutMessagesRequest.builder()
                            .streamId(streamId)
                            .putMessagesDetails(messagesDetails)
                            .build();
    
            PutMessagesResponse putResponse = streamClient.putMessages(putRequest);
    
            // the putResponse can contain some useful metadata for handling failures
            for (PutMessagesResultEntry entry : putResponse.getPutMessagesResult().getEntries()) {
                if (StringUtils.isNotBlank(entry.getError())) {
                    System.out.println(
                            String.format("Error(%s): %s", entry.getError(), entry.getErrorMessage()));
                } else {
                    System.out.println(
                            String.format(
                                    "Published message to partition %s, offset %s.",
                                    entry.getPartition(),
                                    entry.getOffset()));
                }
            }
        }
    
    
    }
  3. A partir du répertoire wd, exécutez la commande suivante :

    mvn install exec:java -Dexec.mainClass=oci.sdk.oss.example.Producer
  4. Afficher les derniers messages envoyés au flux de données : affichez les derniers messages envoyés au flux de données pour vérifier que la production a réussi.

Utilisation des messages

  1. Tout d'abord, assurez-vous que le flux de données dont vous souhaitez utiliser des messages en contient. Vous pouvez utiliser la console pour produire un message de test, ou vous servir du flux de données et des messages créés dans ce démarrage rapide.
  2. Ouvrez votre éditeur favori, tel que Visual Studio Code, à partir du répertoire wd. Vous devez déjà disposer de dépendances oci-sdk pour Java dans le fichier pom.xml du projet Java Maven si les prérequis ont été respectés.
  3. Créez un fichier nommé Consumer.java dans le répertoire wd avec le code suivant. Remplacez les valeurs des variables configurationFilePath, profile, ociStreamOcid et ociMessageEndpoint du fragment de code suivant par les valeurs applicables à votre location.

    package oci.sdk.oss.example;
    
    import com.google.common.util.concurrent.Uninterruptibles;
    import com.oracle.bmc.ConfigFileReader;
    import com.oracle.bmc.auth.AuthenticationDetailsProvider;
    import com.oracle.bmc.auth.ConfigFileAuthenticationDetailsProvider;
    import com.oracle.bmc.streaming.StreamClient;
    import com.oracle.bmc.streaming.model.CreateGroupCursorDetails;
    import com.oracle.bmc.streaming.model.Message;
    import com.oracle.bmc.streaming.requests.CreateGroupCursorRequest;
    import com.oracle.bmc.streaming.requests.GetMessagesRequest;
    import com.oracle.bmc.streaming.responses.CreateGroupCursorResponse;
    import com.oracle.bmc.streaming.responses.GetMessagesResponse;
    
    import java.util.concurrent.TimeUnit;
    
    import static java.nio.charset.StandardCharsets.UTF_8;
    
    
    public class Consumer {
        public static void main(String[] args) throws Exception {
            final String configurationFilePath = "<config_file_path>";
            final String profile = "<config_file_profile_name>";
            final String ociStreamOcid = "<stream_OCID>";
            final String ociMessageEndpoint = "<stream_message_endpoint>";
    
            final ConfigFileReader.ConfigFile configFile = ConfigFileReader.parseDefault();
            final AuthenticationDetailsProvider provider =
                    new ConfigFileAuthenticationDetailsProvider(configFile);
    
            // Streams are assigned a specific endpoint url based on where they are provisioned.
            // Create a stream client using the provided message endpoint.
            StreamClient streamClient = StreamClient.builder().endpoint(ociMessageEndpoint).build(provider);
    
            // A cursor can be created as part of a consumer group.
            // Committed offsets are managed for the group, and partitions
            // are dynamically balanced amongst consumers in the group.
            System.out.println("Starting a simple message loop with a group cursor");
            String groupCursor =
                    getCursorByGroup(streamClient, ociStreamOcid, "exampleGroup", "exampleInstance-1");
            simpleMessageLoop(streamClient, ociStreamOcid, groupCursor);
    
        }
    
        private static void simpleMessageLoop(
                StreamClient streamClient, String streamId, String initialCursor) {
            String cursor = initialCursor;
            for (int i = 0; i < 10; i++) {
    
                GetMessagesRequest getRequest =
                        GetMessagesRequest.builder()
                                .streamId(streamId)
                                .cursor(cursor)
                                .limit(25)
                                .build();
    
                GetMessagesResponse getResponse = streamClient.getMessages(getRequest);
    
                // process the messages
                System.out.println(String.format("Read %s messages.", getResponse.getItems().size()));
                for (Message message : ((GetMessagesResponse) getResponse).getItems()) {
                    System.out.println(
                            String.format(
                                    "%s: %s",
                                    message.getKey() == null ? "Null" :new String(message.getKey(), UTF_8),
                                    new String(message.getValue(), UTF_8)));
                }
    
                // getMessages is a throttled method; clients should retrieve sufficiently large message
                // batches, as to avoid too many http requests.
                Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
    
                // use the next-cursor for iteration
                cursor = getResponse.getOpcNextCursor();
            }
        }
    
        private static String getCursorByGroup(
                StreamClient streamClient, String streamId, String groupName, String instanceName) {
            System.out.println(
                    String.format(
                            "Creating a cursor for group %s, instance %s.", groupName, instanceName));
    
            CreateGroupCursorDetails cursorDetails =
                    CreateGroupCursorDetails.builder()
                            .groupName(groupName)
                            .instanceName(instanceName)
                            .type(CreateGroupCursorDetails.Type.TrimHorizon)
                            .commitOnGet(true)
                            .build();
    
            CreateGroupCursorRequest createCursorRequest =
                    CreateGroupCursorRequest.builder()
                            .streamId(streamId)
                            .createGroupCursorDetails(cursorDetails)
                            .build();
    
            CreateGroupCursorResponse groupCursorResponse =
                    streamClient.createGroupCursor(createCursorRequest);
            return groupCursorResponse.getCursor().getValue();
        }
    
    }
  4. A partir du répertoire wd, exécutez la commande suivante :

    mvn install exec:java -Dexec.mainClass=oci.sdk.oss.example.Consumer
  5. Des messages semblables à celui qui suit s'affichent :

    Starting a simple message loop with a group cursor
    Creating a cursor for group exampleGroup, instance exampleInstance-1.
    Read 25 messages.
    Null: Example Test Message 0
    Null: Example Test Message 0
     Read 2 messages
    Null: Example Test Message 0
    Null: Example Test Message 0
     Read 1 messages
    Null: Example Test Message 0
     Read 10 messages
    key 0: value 0
    key 1: value 1
    Remarque

    Si vous avez utilisé la console pour produire un message de test, la clé de chaque message est Null.