SDK for TypeScriptストリーミング・クイックスタート

このクイックスタートでは、Oracle Cloud Infrastructure (OCI) SDK for TypeScript and JavaScriptおよびOracle Cloud Infrastructure Streamingを使用して、メッセージを公開および消費する方法を示します。

主要概念およびストリーミングの詳細は、ストリーミングの概要を参照してください。OCI SDKの使用の詳細は、SDKガイドを参照してください。

前提条件

  1. SDK for TypeScript and JavaScriptを使用するには、次が必要です:

    • Oracle Cloud Infrastructureアカウント。
    • そのアカウントで作成され、必要な権限を付与するポリシーがあるグループに含まれるユーザー。このユーザーは、自分自身であるか、またはAPIをコールする必要がある別の個人/システムです。新しいユーザー、グループ、コンパートメントおよびポリシーの設定方法の例は、ユーザーの追加を参照してください。使用する一般的なポリシーのリストは、共通ポリシーを参照してください。
    • APIリクエストの署名に使用されるキー・ペア(公開キーがOracleにアップロードされている)。秘密キーはAPIをコールするユーザーのみが所有する必要があります。詳細は、開始を参照してください。
  2. ストリームのメッセージ・エンドポイントおよびOCIDを収集します。ストリームの詳細を表示する手順は、ストリームおよびストリーム・プールのリストを参照してください。このクイックスタートの目的では、ストリームでパブリック・エンドポイントを使用し、Oracleで暗号化を管理する必要があります。既存のストリームがない場合は、ストリームの作成およびストリーム・プールの作成を参照してください。
  3. Node.jsバージョン8.x以降。最新の長期サポート(LTS)バージョンをダウンロードします。
  4. NodeJSのTypeScriptインタプリタをグローバルにインストールします:

    npm install -g typescript
  5. Visual Code Studio (推奨)またはその他の統合開発環境(IDE)。
  6. パスにnpmを含むコマンド・プロンプトを開き、このクイックスタートのコードを保持するディレクトリ(wdなど)に移動し、次のコマンドを実行してOCI SDK for TypeScriptをインストールします:

    npm install oci-sdk

    または、認証用のOCI TypeScript SDKパッケージとストリーミングのみをインストールすると、依存性に関してより効率的になります:

    npm install oci-common
    npm install oci-streaming
  7. 有効なSDK構成ファイルがあることを確認します。本番環境では、インスタンス・プリンシパル認可を使用する必要があります。

メッセージの生成

  1. wdディレクトリから、Visual Studio Codeなどのお気に入りのエディタを開きます。前提条件を満たした後、このディレクトリにTypeScript用のoci-sdkパッケージがすでにインストールされている必要があります。
  2. 次のコードを使用して、wdディレクトリにProducer.tsという名前のファイルを作成します。次のコード・スニペットの変数ociConfigFileociProfileNameociStreamOcidおよびociMessageEndpointForStreamの値は、テナンシに適用可能な値で置き換えてください。

    const common = require("oci-common");
    const st = require("oci-streaming"); // OCI SDK package for OSS
    
    const ociConfigFile = "<config_file_path>";
    const ociProfileName = "<config_file_profile_name>";
    const ociMessageEndpointForStream = "<stream_message_endpoint>"; // example value "https://cell-1.streaming.region.oci.oraclecloud.com"
    const ociStreamOcid = "<stream_OCID>";
    
    // provide authentication for OCI and OSS
    const provider = new common.ConfigFileAuthenticationDetailsProvider(ociConfigFile, ociProfileName);
      
    async function main() {
      // OSS client to produce and consume messages from a Stream in OSS
      const client = new st.StreamClient({ authenticationDetailsProvider: provider });
    
      client.endpoint = ociMessageEndpointForStream;
    
      // build up a putRequest and publish some messages to the stream
      let messages = [];
      for (let i = 1; i <= 3; i++) {
        let entry = {
          key: Buffer.from("messageKey" + i).toString("base64"),
          value: Buffer.from("messageValue" + i).toString("base64")
        };
        messages.push(entry);
      }
    
      console.log("Publishing %s messages to stream %s.", messages.length, ociStreamOcid);
      const putMessageDetails = { messages: messages };
      const putMessagesRequest = {
        putMessagesDetails: putMessageDetails,
        streamId: ociStreamOcid
      };
      const putMessageResponse = await client.putMessages(putMessagesRequest);
      for (var entry of putMessageResponse.putMessagesResult.entries)
        console.log("Published messages to parition %s, offset %s", entry.partition, entry.offset);
    
    }
    
    main().catch((err) => {
      console.log("Error occurred: ", err);
    });
    
    
  3. wdディレクトリのターミナルから、次のコマンドを実行し、Producer.tsをコンパイルしてProducer.jsを生成します:

    tsc Producer.ts
  4. 同じディレクトリから、次のコマンドを実行します:

    node run Producer.js

    次のようなターミナル出力が表示されます:

    $:/path/to/directory/wd>node Producer.js
      Publishing 3 messages to stream ocid1.stream.oc1.exampleuniqueID.
      Published messages to parition 0, offset 1314
      Published messages to parition 0, offset 1315
      Published messages to parition 0, offset 1316
  5. コンソールを使用して、ストリームに送信された最新のメッセージを表示し、生成が成功したことを確認します。

メッセージの消費

  1. 最初に、メッセージを消費するストリームにメッセージが含まれていることを確認します。コンソールを使用してテスト・メッセージを生成するか、このクイックスタートで作成したストリームおよびメッセージを使用できます。
  2. wdディレクトリから、Visual Studio Codeなどのお気に入りのエディタを開きます。前提条件を満たした後、このディレクトリにTypeScript用のoci-sdkパッケージがすでにインストールされている必要があります。
  3. 次のコードを使用して、ディレクトリwdConsumer.tsという名前のファイルを作成します。次のコード・スニペットの変数ociConfigFileociProfileNameociStreamOcidおよびociMessageEndpointForStreamの値は、テナンシに適用可能な値で置き換えてください。

    const common = require("oci-common");
    const st = require("oci-streaming"); // OCI SDK package for OSS
    
    const ociConfigFile = "<config_file_path>";
    const ociProfileName = "<config_file_profile_name>";
    const ociMessageEndpointForStream = "<stream_message_endpoint>"; // example value "https://cell-1.streaming.region.oci.oraclecloud.com"
    const ociStreamOcid = "<stream_OCID>";
    
    // provide authentication for OCI and OSS
    const provider = new common.ConfigFileAuthenticationDetailsProvider(ociConfigFile, ociProfileName);
      
    async function main() {
      // OSS client to produce and consume messages from a Stream in OSS
      const client = new st.StreamClient({ authenticationDetailsProvider: provider });
    
      client.endpoint = ociMessageEndpointForStream;
    
      // Use a cursor for getting messages; each getMessages call will return a next-cursor for iteration.
      // There are a couple kinds of cursors, we will use group cursors
    
      // Committed offsets are managed for the group, and partitions
      // are dynamically balanced amongst consumers in the group.
    
      console.log("Starting a simple message loop with a group cursor");
      const groupCursor = await getCursorByGroup(client, ociStreamOcid, "exampleGroup01000", "exampleInstance-1");
      await simpleMessageLoop(client, ociStreamOcid, groupCursor);
    
    }
    
    async function getCursorByGroup(client, streamId, groupName, instanceName) {
        console.log("Creating a cursor for group %s, instance %s.", groupName, instanceName);
        const cursorDetails = {
          groupName: groupName,
          instanceName: instanceName,
          type: st.models.CreateGroupCursorDetails.Type.TrimHorizon,
          commitOnGet: true
        };
        const createCursorRequest = {
          createGroupCursorDetails: cursorDetails,
          streamId: streamId
        };
        const response = await client.createGroupCursor(createCursorRequest);
        return response.cursor.value;
      }
    
    async function simpleMessageLoop(client, streamId, initialCursor) {
        let cursor = initialCursor;
        for (var i = 0; i < 5; i++) {
          const getRequest = {
            streamId: streamId,
            cursor: cursor,
            limit: 100
          };
          const response = await client.getMessages(getRequest);
          console.log("Read %s messages.", response.items.length);
          for (var message of response.items) { 
            if (message.key !== null)  {         
                console.log("Key: %s, Value: %s, Partition: %s",
                Buffer.from(message.key, "base64").toString(),
                Buffer.from(message.value, "base64").toString(),
                Buffer.from(message.partition, "utf8").toString());
            }
           else{
                console.log("Key: Null, Value: %s, Partition: %s",
                    Buffer.from(message.value, "base64").toString(),
                    Buffer.from(message.partition, "utf8").toString());
           }
          }
          
          // getMessages is a throttled method; clients should retrieve sufficiently large message
          // batches, as to avoid too many http requests.
          await delay(2);
          cursor = response.opcNextCursor;
        }
      }
    
      async function delay(s) {
        return new Promise(resolve => setTimeout(resolve, s * 1000));
      }
    
    main().catch((err) => {
        console.log("Error occurred: ", err);
    });
    
  4. wdディレクトリのターミナルから、次のコマンドを実行し、Consumer.tsをコンパイルしてConsumer.jsを生成します:

    tsc Consumer.ts
  5. wdディレクトリから、次のコマンドを実行します:

    node run Consumer.js
  6. 次のようなメッセージが表示されます:

    Starting a simple message loop with a group cursor
    Creating a cursor for group exampleGroup01000, instance exampleInstance-1.
    Read 6 messages.
    Key: messageKey1, Value: messageValue1, Partition: 0
    Key: messageKey2, Value: messageValue2, Partition: 0
    Key: messageKey3, Value: messageValue3, Partition: 0
    Key: Null, Value: message value and key null, Partition: 0
    Key: Null, Value: message value and key null, Partition: 0
    Key: Null, Value: message value and key null, Partition: 0
    Read 0 messages.
    Read 0 messages.
    Read 0 messages.
    Read 0 messages.
    ノート

    コンソールを使用してテスト・メッセージを生成した場合、各メッセージのキーはNullです