SDK para Início Rápido de Streaming do Java

Publique e consuma mensagens no serviço Streaming usando o OCI SDK para Java.

Esse início rápido mostra como usar o OCI (Oracle Cloud Infrastructure) SDK for Java e o Oracle Cloud Infrastructure Streaming para publicar e consumir mensagens.

Para obter os principais conceitos e mais detalhes do Streaming, consulte Visão Geral do Streaming. Para obter mais informações sobre como usar os SDKs do OCI, consulte os Guias SDK.

Pré-requisitos

  1. Para usar o SDK para Java, você deve ter o seguinte:

    • Uma conta do Oracle Cloud Infrastructure.
    • Um usuário criado nessa conta, em um grupo com uma política que conceda as permissões necessárias. O usuário pode ser você mesmo ou outra pessoa/sistema que precise chamar a API. Para obter um exemplo de como configurar um novo usuário, um novo grupo, um novo compartimento e uma nova política, consulte Adicionando Usuários. Para obter uma lista de políticas típicas que você pode usar, consulte Políticas Comuns.
    • Um par de chaves usado para assinar solicitações de API, com a chave pública carregada por upload no sistema Oracle. Somente o usuário que chama a API deve possuir a chave privada.
    • Java 8
    Observação

    Para obter mais informações, consulte Configurando o SDK.
  2. Colete o ponto final e o OCID das Mensagens de um stream. Para obter etapas para obter detalhes de um stream, consulte Obtendo Detalhes de um Stream. Para os fins deste início rápido, o stream deve usar um ponto final público e permitir que a Oracle gerencie a criptografia. Consulte Criando um Stream e Criando um Pool de Streams se você não tiver um stream existente.
  3. JDK 8 ou versão posterior instalada. Certifique-se de que o Java esteja no seu PATH.
  4. Maven 3.0 ou versão posterior instalada. Certifique-se de que o Maven esteja no seu PATH.
  5. Intellij (recomendado) ou qualquer outro ambiente de desenvolvimento integrado (IDE).
  6. Adicione a versão mais recente de dependência ou jar maven para o OCI Java SDK for IAM ao seu pom.xml da seguinte forma:

    	<dependency>
    	  <groupId>com.oracle.oci.sdk</groupId>
    	  <artifactId>oci-java-sdk-common</artifactId>
    	  <version>LATEST</version>
    	</dependency>
  7. Adicione a versão mais recente de dependência ou jar maven para o OCI Java SDK for OSS ao seu pom.xml da seguinte forma:

    	<dependency>
    	  <groupId>com.oracle.oci.sdk</groupId>
    	  <artifactId>oci-java-sdk-streaming</artifactId>
    	  <version>LATEST</version> 
    	</dependency>
  8. Supondo que wd seja o diretório de trabalho do projeto Java deste exemplo, seu pom.xml será semelhante ao seguinte:

    <?xml version="1.0" encoding="UTF-8"?>
    <projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>oci.example</groupId>
        <artifactId>StreamsJava</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>com.oracle.oci.sdk</groupId>
                <artifactId>oci-java-sdk-common</artifactId>
                <version>1.33.2</version>
            </dependency>
            <dependency>
                <groupId>com.oracle.oci.sdk</groupId>
                <artifactId>oci-java-sdk-streaming</artifactId>
                <version>1.33.2</version>
            </dependency>
        </dependencies>
    </project>
  9. Verifique se você tem um arquivo de configuração do SDK válido. Para ambientes de produção, você deve usar a autorização do controlador de instâncias.

Produzindo Mensagens

  1. Abra seu editor favorito, como o Visual Studio Code, no diretório wd. Você já deverá ter dependências oci-sdk para Java como parte do pom.xml do seu projeto Maven Java após atender aos pré-requisitos.
  2. Crie um arquivo chamado Producer.java no diretório wd com o código a seguir. Substitua os valores das variáveis configurationFilePath, profile,ociStreamOcid e ociMessageEndpoint no trecho de código a seguir pelos valores aplicáveis à sua tenancy.

    package oci.sdk.oss.example;
    
    import com.oracle.bmc.ConfigFileReader;
    import com.oracle.bmc.auth.AuthenticationDetailsProvider;
    import com.oracle.bmc.auth.ConfigFileAuthenticationDetailsProvider;
    import com.oracle.bmc.streaming.StreamClient;
    import com.oracle.bmc.streaming.model.PutMessagesDetails;
    import com.oracle.bmc.streaming.model.PutMessagesDetailsEntry;
    import com.oracle.bmc.streaming.model.PutMessagesResultEntry;
    import com.oracle.bmc.streaming.requests.PutMessagesRequest;
    import com.oracle.bmc.streaming.responses.PutMessagesResponse;
    import org.apache.commons.lang3.StringUtils;
    
    import java.util.ArrayList;
    import java.util.List;
    
    import static java.nio.charset.StandardCharsets.UTF_8;
    
    public class Producer {
        public static void main(String[] args) throws Exception {
            final String configurationFilePath = "<config_file_path>";
            final String profile = "<config_file_profile_name>";
            final String ociStreamOcid = "<stream_OCID>";
            final String ociMessageEndpoint = "<stream_message_endpoint>";
    
    
            final ConfigFileReader.ConfigFile configFile = ConfigFileReader.parseDefault();
            final AuthenticationDetailsProvider provider =
                    new ConfigFileAuthenticationDetailsProvider(configFile);
    
            // Streams are assigned a specific endpoint url based on where they are provisioned.
            // Create a stream client using the provided message endpoint.
            StreamClient streamClient = StreamClient.builder().endpoint(ociMessageEndpoint).build(provider);
    
            // publish some messages to the stream
            publishExampleMessages(streamClient, ociStreamOcid);
    
        }
    
        private static void publishExampleMessages(StreamClient streamClient, String streamId) {
            // build up a putRequest and publish some messages to the stream
            List<PutMessagesDetailsEntry> messages = new ArrayList<>();
            for (int i = 0; i < 50; i++) {
                messages.add(
                        PutMessagesDetailsEntry.builder()
                                .key(String.format("messageKey%s", i).getBytes(UTF_8))
                                .value(String.format("messageValue%s", i).getBytes(UTF_8))
                                .build());
            }
    
            System.out.println(
                    String.format("Publishing %s messages to stream %s.", messages.size(), streamId));
            PutMessagesDetails messagesDetails =
                    PutMessagesDetails.builder().messages(messages).build();
    
            PutMessagesRequest putRequest =
                    PutMessagesRequest.builder()
                            .streamId(streamId)
                            .putMessagesDetails(messagesDetails)
                            .build();
    
            PutMessagesResponse putResponse = streamClient.putMessages(putRequest);
    
            // the putResponse can contain some useful metadata for handling failures
            for (PutMessagesResultEntry entry : putResponse.getPutMessagesResult().getEntries()) {
                if (StringUtils.isNotBlank(entry.getError())) {
                    System.out.println(
                            String.format("Error(%s): %s", entry.getError(), entry.getErrorMessage()));
                } else {
                    System.out.println(
                            String.format(
                                    "Published message to partition %s, offset %s.",
                                    entry.getPartition(),
                                    entry.getOffset()));
                }
            }
        }
    
    
    }
  3. No diretório wd, execute o seguinte comando:

    mvn install exec:java -Dexec.mainClass=oci.sdk.oss.example.Producer
  4. Mostrar as mensagens mais recentes enviadas ao stream para ver as mensagens mais recentes enviadas ao stream para verificar se a produção foi bem-sucedida.

Consumindo Mensagens

  1. Primeiro, certifique-se de que o stream cujas mensagens você deseja consumir contenha mensagens. Você pode usar a Console para produzir uma mensagem de teste ou usar o stream e as mensagens que criamos neste início rápido.
  2. Abra seu editor favorito, como o Visual Studio Code, no diretório wd. Você já deverá ter dependências oci-sdk para Java como parte do pom.xml do seu projeto Maven Java após atender aos pré-requisitos.
  3. Crie um arquivo chamado Consumer.java no diretório wd com o código a seguir. Substitua os valores das variáveis configurationFilePath, profile,ociStreamOcid e ociMessageEndpoint no trecho de código a seguir pelos valores aplicáveis à sua tenancy.

    package oci.sdk.oss.example;
    
    import com.google.common.util.concurrent.Uninterruptibles;
    import com.oracle.bmc.ConfigFileReader;
    import com.oracle.bmc.auth.AuthenticationDetailsProvider;
    import com.oracle.bmc.auth.ConfigFileAuthenticationDetailsProvider;
    import com.oracle.bmc.streaming.StreamClient;
    import com.oracle.bmc.streaming.model.CreateGroupCursorDetails;
    import com.oracle.bmc.streaming.model.Message;
    import com.oracle.bmc.streaming.requests.CreateGroupCursorRequest;
    import com.oracle.bmc.streaming.requests.GetMessagesRequest;
    import com.oracle.bmc.streaming.responses.CreateGroupCursorResponse;
    import com.oracle.bmc.streaming.responses.GetMessagesResponse;
    
    import java.util.concurrent.TimeUnit;
    
    import static java.nio.charset.StandardCharsets.UTF_8;
    
    
    public class Consumer {
        public static void main(String[] args) throws Exception {
            final String configurationFilePath = "<config_file_path>";
            final String profile = "<config_file_profile_name>";
            final String ociStreamOcid = "<stream_OCID>";
            final String ociMessageEndpoint = "<stream_message_endpoint>";
    
            final ConfigFileReader.ConfigFile configFile = ConfigFileReader.parseDefault();
            final AuthenticationDetailsProvider provider =
                    new ConfigFileAuthenticationDetailsProvider(configFile);
    
            // Streams are assigned a specific endpoint url based on where they are provisioned.
            // Create a stream client using the provided message endpoint.
            StreamClient streamClient = StreamClient.builder().endpoint(ociMessageEndpoint).build(provider);
    
            // A cursor can be created as part of a consumer group.
            // Committed offsets are managed for the group, and partitions
            // are dynamically balanced amongst consumers in the group.
            System.out.println("Starting a simple message loop with a group cursor");
            String groupCursor =
                    getCursorByGroup(streamClient, ociStreamOcid, "exampleGroup", "exampleInstance-1");
            simpleMessageLoop(streamClient, ociStreamOcid, groupCursor);
    
        }
    
        private static void simpleMessageLoop(
                StreamClient streamClient, String streamId, String initialCursor) {
            String cursor = initialCursor;
            for (int i = 0; i < 10; i++) {
    
                GetMessagesRequest getRequest =
                        GetMessagesRequest.builder()
                                .streamId(streamId)
                                .cursor(cursor)
                                .limit(25)
                                .build();
    
                GetMessagesResponse getResponse = streamClient.getMessages(getRequest);
    
                // process the messages
                System.out.println(String.format("Read %s messages.", getResponse.getItems().size()));
                for (Message message : ((GetMessagesResponse) getResponse).getItems()) {
                    System.out.println(
                            String.format(
                                    "%s: %s",
                                    message.getKey() == null ? "Null" :new String(message.getKey(), UTF_8),
                                    new String(message.getValue(), UTF_8)));
                }
    
                // getMessages is a throttled method; clients should retrieve sufficiently large message
                // batches, as to avoid too many http requests.
                Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
    
                // use the next-cursor for iteration
                cursor = getResponse.getOpcNextCursor();
            }
        }
    
        private static String getCursorByGroup(
                StreamClient streamClient, String streamId, String groupName, String instanceName) {
            System.out.println(
                    String.format(
                            "Creating a cursor for group %s, instance %s.", groupName, instanceName));
    
            CreateGroupCursorDetails cursorDetails =
                    CreateGroupCursorDetails.builder()
                            .groupName(groupName)
                            .instanceName(instanceName)
                            .type(CreateGroupCursorDetails.Type.TrimHorizon)
                            .commitOnGet(true)
                            .build();
    
            CreateGroupCursorRequest createCursorRequest =
                    CreateGroupCursorRequest.builder()
                            .streamId(streamId)
                            .createGroupCursorDetails(cursorDetails)
                            .build();
    
            CreateGroupCursorResponse groupCursorResponse =
                    streamClient.createGroupCursor(createCursorRequest);
            return groupCursorResponse.getCursor().getValue();
        }
    
    }
  4. No diretório wd, execute o seguinte comando:

    mvn install exec:java -Dexec.mainClass=oci.sdk.oss.example.Consumer
  5. Você deverá ver mensagens semelhantes a esta:

    Starting a simple message loop with a group cursor
    Creating a cursor for group exampleGroup, instance exampleInstance-1.
    Read 25 messages.
    Null: Example Test Message 0
    Null: Example Test Message 0
     Read 2 messages
    Null: Example Test Message 0
    Null: Example Test Message 0
     Read 1 messages
    Null: Example Test Message 0
     Read 10 messages
    key 0: value 0
    key 1: value 1
    Observação

    Se você usou a Console para produzir uma mensagem de teste, a chave de cada mensagem será Null