SDK for Javaストリーミング・クイックスタート

このクイックスタートでは、Oracle Cloud Infrastructure (OCI) SDK for JavaおよびOracle Cloud Infrastructure Streamingを使用して、メッセージを公開および消費する方法を示します。

主要概念およびストリーミングの詳細は、ストリーミングの概要を参照してください。OCI SDKの使用の詳細は、SDKガイドを参照してください。

前提条件

  1. SDK for Javaを使用するには、次が必要です:

    • Oracle Cloud Infrastructureアカウント。
    • そのアカウントで作成され、必要な権限を付与するポリシーがあるグループに含まれるユーザー。このユーザーは、自分自身であるか、またはAPIをコールする必要がある別の個人/システムです。新しいユーザー、グループ、コンパートメントおよびポリシーの設定方法の例は、ユーザーの追加を参照してください。使用する一般的なポリシーのリストは、共通ポリシーを参照してください。
    • APIリクエストの署名に使用されるキー・ペア(公開キーがOracleにアップロードされている)。秘密キーはAPIをコールするユーザーのみが所有する必要があります。
    • Java 8
    ノート

    詳細は、SDKの構成を参照してください。
  2. ストリームのメッセージ・エンドポイントおよびOCIDを収集します。ストリームの詳細を表示する手順は、ストリームおよびストリーム・プールのリストを参照してください。このクイックスタートの目的では、ストリームでパブリック・エンドポイントを使用し、Oracleで暗号化を管理する必要があります。既存のストリームがない場合は、ストリームの作成およびストリーム・プールの作成を参照してください。
  3. JDK 8以上がインストール済。JavaがPATH内にあることを確認します。
  4. Maven 3.0がインストール済。MavenがPATH内にあることを確認します。
  5. Intellij (推奨)またはその他の統合開発環境(IDE)。
  6. OCI Java SDK for IAMの最新バージョンのMaven依存性またはjarをpom.xmlに次のように追加します:

    	<dependency>
    	  <groupId>com.oracle.oci.sdk</groupId>
    	  <artifactId>oci-java-sdk-common</artifactId>
    	  <version>LATEST</version>
    	</dependency>
  7. OCI Java SDK for OSSの最新バージョンのMaven依存性またはjarをpom.xmlに次のように追加します:

    	<dependency>
    	  <groupId>com.oracle.oci.sdk</groupId>
    	  <artifactId>oci-java-sdk-streaming</artifactId>
    	  <version>LATEST</version> 
    	</dependency>
  8. この例のJavaプロジェクトの作業ディレクトリとしてwdを使用すると、pom.xmlは次のようになります:

    <?xml version="1.0" encoding="UTF-8"?>
    <projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>oci.example</groupId>
        <artifactId>StreamsJava</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>com.oracle.oci.sdk</groupId>
                <artifactId>oci-java-sdk-common</artifactId>
                <version>1.33.2</version>
            </dependency>
            <dependency>
                <groupId>com.oracle.oci.sdk</groupId>
                <artifactId>oci-java-sdk-streaming</artifactId>
                <version>1.33.2</version>
            </dependency>
        </dependencies>
    </project>
  9. 有効なSDK構成ファイルがあることを確認します。本番環境では、インスタンス・プリンシパル認可を使用する必要があります。

メッセージの生成

  1. wdディレクトリから、Visual Studio Codeなどのお気に入りのエディタを開きます。前提条件を満たした後、Maven Javaプロジェクトのpom.xmlの一部として、Javaのoci-sdk依存性がすでに存在する必要があります。
  2. 次のコードを使用して、ディレクトリwdProducer.javaという名前のファイルを作成します。次のコード・スニペットの変数configurationFilePathprofileociStreamOcidおよびociMessageEndpointの値は、テナンシに適用可能な値で置き換えてください。

    package oci.sdk.oss.example;
    
    import com.oracle.bmc.ConfigFileReader;
    import com.oracle.bmc.auth.AuthenticationDetailsProvider;
    import com.oracle.bmc.auth.ConfigFileAuthenticationDetailsProvider;
    import com.oracle.bmc.streaming.StreamClient;
    import com.oracle.bmc.streaming.model.PutMessagesDetails;
    import com.oracle.bmc.streaming.model.PutMessagesDetailsEntry;
    import com.oracle.bmc.streaming.model.PutMessagesResultEntry;
    import com.oracle.bmc.streaming.requests.PutMessagesRequest;
    import com.oracle.bmc.streaming.responses.PutMessagesResponse;
    import org.apache.commons.lang3.StringUtils;
    
    import java.util.ArrayList;
    import java.util.List;
    
    import static java.nio.charset.StandardCharsets.UTF_8;
    
    public class Producer {
        public static void main(String[] args) throws Exception {
            final String configurationFilePath = "<config_file_path>";
            final String profile = "<config_file_profile_name>";
            final String ociStreamOcid = "<stream_OCID>";
            final String ociMessageEndpoint = "<stream_message_endpoint>";
    
    
            final ConfigFileReader.ConfigFile configFile = ConfigFileReader.parseDefault();
            final AuthenticationDetailsProvider provider =
                    new ConfigFileAuthenticationDetailsProvider(configFile);
    
            // Streams are assigned a specific endpoint url based on where they are provisioned.
            // Create a stream client using the provided message endpoint.
            StreamClient streamClient = StreamClient.builder().endpoint(ociMessageEndpoint).build(provider);
    
            // publish some messages to the stream
            publishExampleMessages(streamClient, ociStreamOcid);
    
        }
    
        private static void publishExampleMessages(StreamClient streamClient, String streamId) {
            // build up a putRequest and publish some messages to the stream
            List<PutMessagesDetailsEntry> messages = new ArrayList<>();
            for (int i = 0; i < 50; i++) {
                messages.add(
                        PutMessagesDetailsEntry.builder()
                                .key(String.format("messageKey%s", i).getBytes(UTF_8))
                                .value(String.format("messageValue%s", i).getBytes(UTF_8))
                                .build());
            }
    
            System.out.println(
                    String.format("Publishing %s messages to stream %s.", messages.size(), streamId));
            PutMessagesDetails messagesDetails =
                    PutMessagesDetails.builder().messages(messages).build();
    
            PutMessagesRequest putRequest =
                    PutMessagesRequest.builder()
                            .streamId(streamId)
                            .putMessagesDetails(messagesDetails)
                            .build();
    
            PutMessagesResponse putResponse = streamClient.putMessages(putRequest);
    
            // the putResponse can contain some useful metadata for handling failures
            for (PutMessagesResultEntry entry : putResponse.getPutMessagesResult().getEntries()) {
                if (StringUtils.isNotBlank(entry.getError())) {
                    System.out.println(
                            String.format("Error(%s): %s", entry.getError(), entry.getErrorMessage()));
                } else {
                    System.out.println(
                            String.format(
                                    "Published message to partition %s, offset %s.",
                                    entry.getPartition(),
                                    entry.getOffset()));
                }
            }
        }
    
    
    }
  3. wdディレクトリから、次のコマンドを実行します:

    mvn install exec:java -Dexec.mainClass=oci.sdk.oss.example.Producer
  4. コンソールを使用して、ストリームに送信された最新のメッセージを表示し、生成が成功したことを確認します。

メッセージの消費

  1. 最初に、メッセージを消費するストリームにメッセージが含まれていることを確認します。コンソールを使用してテスト・メッセージを生成するか、このクイックスタートで作成したストリームおよびメッセージを使用できます。
  2. wdディレクトリから、Visual Studio Codeなどのお気に入りのエディタを開きます。前提条件を満たした後、Maven Javaプロジェクトのpom.xmlの一部として、Javaのoci-sdk依存性がすでに存在する必要があります。
  3. 次のコードを使用して、ディレクトリwdConsumer.javaという名前のファイルを作成します。次のコード・スニペットの変数configurationFilePathprofileociStreamOcidおよびociMessageEndpointの値は、テナンシに適用可能な値で置き換えてください。

    package oci.sdk.oss.example;
    
    import com.google.common.util.concurrent.Uninterruptibles;
    import com.oracle.bmc.ConfigFileReader;
    import com.oracle.bmc.auth.AuthenticationDetailsProvider;
    import com.oracle.bmc.auth.ConfigFileAuthenticationDetailsProvider;
    import com.oracle.bmc.streaming.StreamClient;
    import com.oracle.bmc.streaming.model.CreateGroupCursorDetails;
    import com.oracle.bmc.streaming.model.Message;
    import com.oracle.bmc.streaming.requests.CreateGroupCursorRequest;
    import com.oracle.bmc.streaming.requests.GetMessagesRequest;
    import com.oracle.bmc.streaming.responses.CreateGroupCursorResponse;
    import com.oracle.bmc.streaming.responses.GetMessagesResponse;
    
    import java.util.concurrent.TimeUnit;
    
    import static java.nio.charset.StandardCharsets.UTF_8;
    
    
    public class Consumer {
        public static void main(String[] args) throws Exception {
            final String configurationFilePath = "<config_file_path>";
            final String profile = "<config_file_profile_name>";
            final String ociStreamOcid = "<stream_OCID>";
            final String ociMessageEndpoint = "<stream_message_endpoint>";
    
            final ConfigFileReader.ConfigFile configFile = ConfigFileReader.parseDefault();
            final AuthenticationDetailsProvider provider =
                    new ConfigFileAuthenticationDetailsProvider(configFile);
    
            // Streams are assigned a specific endpoint url based on where they are provisioned.
            // Create a stream client using the provided message endpoint.
            StreamClient streamClient = StreamClient.builder().endpoint(ociMessageEndpoint).build(provider);
    
            // A cursor can be created as part of a consumer group.
            // Committed offsets are managed for the group, and partitions
            // are dynamically balanced amongst consumers in the group.
            System.out.println("Starting a simple message loop with a group cursor");
            String groupCursor =
                    getCursorByGroup(streamClient, ociStreamOcid, "exampleGroup", "exampleInstance-1");
            simpleMessageLoop(streamClient, ociStreamOcid, groupCursor);
    
        }
    
        private static void simpleMessageLoop(
                StreamClient streamClient, String streamId, String initialCursor) {
            String cursor = initialCursor;
            for (int i = 0; i < 10; i++) {
    
                GetMessagesRequest getRequest =
                        GetMessagesRequest.builder()
                                .streamId(streamId)
                                .cursor(cursor)
                                .limit(25)
                                .build();
    
                GetMessagesResponse getResponse = streamClient.getMessages(getRequest);
    
                // process the messages
                System.out.println(String.format("Read %s messages.", getResponse.getItems().size()));
                for (Message message : ((GetMessagesResponse) getResponse).getItems()) {
                    System.out.println(
                            String.format(
                                    "%s: %s",
                                    message.getKey() == null ? "Null" :new String(message.getKey(), UTF_8),
                                    new String(message.getValue(), UTF_8)));
                }
    
                // getMessages is a throttled method; clients should retrieve sufficiently large message
                // batches, as to avoid too many http requests.
                Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
    
                // use the next-cursor for iteration
                cursor = getResponse.getOpcNextCursor();
            }
        }
    
        private static String getCursorByGroup(
                StreamClient streamClient, String streamId, String groupName, String instanceName) {
            System.out.println(
                    String.format(
                            "Creating a cursor for group %s, instance %s.", groupName, instanceName));
    
            CreateGroupCursorDetails cursorDetails =
                    CreateGroupCursorDetails.builder()
                            .groupName(groupName)
                            .instanceName(instanceName)
                            .type(CreateGroupCursorDetails.Type.TrimHorizon)
                            .commitOnGet(true)
                            .build();
    
            CreateGroupCursorRequest createCursorRequest =
                    CreateGroupCursorRequest.builder()
                            .streamId(streamId)
                            .createGroupCursorDetails(cursorDetails)
                            .build();
    
            CreateGroupCursorResponse groupCursorResponse =
                    streamClient.createGroupCursor(createCursorRequest);
            return groupCursorResponse.getCursor().getValue();
        }
    
    }
  4. wdディレクトリから、次のコマンドを実行します:

    mvn install exec:java -Dexec.mainClass=oci.sdk.oss.example.Consumer
  5. 次のようなメッセージが表示されます:

    Starting a simple message loop with a group cursor
    Creating a cursor for group exampleGroup, instance exampleInstance-1.
    Read 25 messages.
    Null: Example Test Message 0
    Null: Example Test Message 0
     Read 2 messages
    Null: Example Test Message 0
    Null: Example Test Message 0
     Read 1 messages
    Null: Example Test Message 0
     Read 10 messages
    key 0: value 0
    key 1: value 1
    ノート

    コンソールを使用してテスト・メッセージを生成した場合、各メッセージのキーはNullです