Streamsの例

この項では、Streamsアプリケーションの例を示します。この例は可能なかぎり簡略化していますが、その全体像は、このような性質のアプリケーションにおいては一般的です。

このアプリケーションの例では、NoSQLSubscriberの例で説明したサブスクライバの例を利用します。

このアプリケーションでは、まず、アプリケーションに必要な情報を定義します。それは、アプリケーションが監視する表を示します。1つのサブスクライバが複数の表から操作を受け取ることもありますが、この例では表Usersのみをサブスクライブします。また、numは、サブスクライバがUsers表に対してリクエストする操作の数です。

ノート:

ストリーム操作ではネームスペースがサポートされています。ネームスペース内の表をサブスクライブする場合は、ns1:Usersのように、表名の前にネームスペースとコロン(:)を付けます。

次に、Oracle NoSQL Database接続情報を提供します。これは、説明のみを目的して存在する簡単な例であるため、非セキュアなストアを使用して認証の問題を回避します。ただし、本番環境では、セキュアなストアへの認証再認証で説明しているように、認証資格証明を入力することが必要になる可能性があります。

最後に、ストリーミング・アプリケーションに固有の情報をいくつか提供します。MAX_SUBSCRIPTION_TIME_MSは、アプリケーションがタイムアウトするまでに待機できる時間を指定するために使用します。CKPT_TABLE_NAMEは、チェックポイント表の名前です。この情報は、NoSQLPublisherの構築時に必要ですが、それ以外の場合はこの特定のアプリケーションでは使用されません。チェックポイントの詳細は、チェックポイントの使用方法を参照してください。

package pubsub;

import oracle.kv.KVStoreConfig;
import oracle.kv.pubsub.NoSQLPublisher;
import oracle.kv.pubsub.NoSQLPublisherConfig;
import oracle.kv.pubsub.NoSQLSubscriptionConfig;

public class GSGStreamExample {

    /* table to subscribe */
    private static final String TABLE_NAME = "Users";
    /* Number of operations to stream */
    private static final int num = 100;

    private static final String storeName = "kvstore";
    private static final String[] hhosts = {"localhost:5000"};

    /* max subscription allowed time before forced termination */
    private static final long MAX_SUBSCRIPTION_TIME_MS =
        Long.MAX_VALUE;

    private static final String rootPath = ".";
    private static final String CKPT_TABLE_NAME = "CheckpointTable";

    public static void main(final String args[]) throws Exception {

        final GSGStreamExample gte = new GSGStreamExample(args);
        gte.run();
    }

    private GSGStreamExample(final String[] argv) {
    } 

最初に、NoSQLPublisherオブジェクトを構築します。NoSQLPublisherConfigは、Oracle NoSQL Database接続情報を指定するために使用します。

    /* 
     * Subscribes a table. The work flow is ReactiveStream 
     * compatible 
     */
    private void run() throws Exception {

        NoSQLPublisher publisher = null;
        try {
            /* step 1 : create a publisher configuration */
            final NoSQLPublisherConfig publisherConfig =
                new NoSQLPublisherConfig.Builder(
                    new KVStoreConfig(storeName, hhosts), rootPath)
                    .build();

            /* step 2 : create a publisher */
            publisher = NoSQLPublisher.get(publisherConfig);

次に、NoSQLSubscriptionConfigを構築します。ここでは、サブスクライブする表を指定します。

            /* step 3: create a subscription configuration */
            final NoSQLSubscriptionConfig subscriptionConfig =
                /* stream with specified mode */
                new NoSQLSubscriptionConfig.Builder(CKPT_TABLE_NAME)
                .setSubscribedTables(TABLE_NAME)
                .build(); 

次に、サブスクライバを構築します。ここでは、NoSQLSubscriberの例で説明しているNoSQLSubscriber実装を使用します。

            /* step 4: create a subscriber */
            final GSGSubscriberExample subscriber =
                new GSGSubscriberExample(subscriptionConfig, num);
            System.out.println("Subscriber created to stream " +
                    num + " operations."); 

上の例では、ストリームするイベントの数を100として指定しています。ただし、連続的なストリーミングを行う場合は、新しいGSGSubscriberExample(subscriptionConfig, Long.MAX_VALUE)を使用する必要があります。

次に、サブスクリプションを作成します。GSGSubscriberExampleから(getSubscriptionSucc()メソッドを使用して)サブスクリプションの作成に関するエラーが報告された場合は、例外をスローし、エラーの性質を識別するメッセージ(getCauseOfFailure())とサブスクライバの一意のID (getSubscriberId())を表示してアプリケーションを終了します。

            /* step 5: create a subscription and start stream */
            publisher.subscribe(subscriber);
            if (!subscriber.isSubscriptionSucc()) {
                System.out.println("Subscription failed for " +
                              subscriber.getSubscriptionConfig()
                                        .getSubscriberId() +
                              ", reason " + 
                              subscriber.getCauseOfFailure());

                throw new RuntimeException("fail to subscribe");
            }
            System.out.println("Start stream " + num +
                    " operations from table " + TABLE_NAME); 

この時点で、アプリケーション・スレッドをスリープ状態にします。これにより、サブスクライバは、親アプリケーションの影響を受けることなく稼働できます。場合によっては、このスレッドをウェイクアップし、サブスクライバによって使用されたストリーム操作の数をチェックして、最大実行時間を超えていないことを確認することもできます。タイムアウトしきい値を超えた場合は、例外をスローしてアプリケーションを終了します。それ以外の場合は、必要な操作がすべて使用されるまで実行を続行します。

            /*
             * Wait for the stream to finish. Throw exception if it
             * cannot finish within max allowed elapsed time
             */
            final long s = System.currentTimeMillis();
            while (subscriber.getStreamOps() < num) {
                final long elapsed = System.currentTimeMillis() - s;
                if (elapsed >= MAX_SUBSCRIPTION_TIME_MS) {
                    throw new
                        RuntimeException("Not done within max " +
                                "allowed elapsed time");
                }
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    throw new RuntimeException("Interrupted!");
                }
            } 

最後に、アプリケーションをクリーン・アップして終了します。この時点でsubscriber.getSubscription.cancel()を使用してサブスクリプションを取り消すこともできますが、GSGSubscriberExampleクラスはすでにonNext()メソッドでそれをコールしています。特に、エラー状況への対応の一部として、ストリーム・アプリケーション自体からcancel()をコールすることで、アプリケーションをより堅牢にできます。

            /* step 6: clean up */
            subscriber.getSubscription().cancel();
            publisher.close(true);
            System.out.println("Publisher closed normally.");

        } catch (Exception exp) {
            String msg = "Error: " + exp.getMessage();
            System.out.println(msg);
            if (publisher != null) {
                publisher.close(exp, false);
                System.out.println("Publisher closed with error.");
            }
            throw exp;
        } finally {
           System.out.println("All done.");
        }
    }
} 

Streamsのサンプル出力

Users表がサンプル・データとともにロードされると(GSGStreamsWriteTableを参照)、このサンプル・プログラムからの出力は次のようになります(簡潔にするために出力は12番目の操作で切り捨てられています)。

> java pubsub.GSGStreamExample
Subscriber created to stream 100 operations.
Start stream 100 operations from table Users

Found a put. Row is:
UID: 0
	Quantity: 10
	myArray: [1,14,3,9,12,12,13,13,4,6]

Found a put. Row is:
UID: 1
	Quantity: 4
	myArray: [3,14,1,13]

Found a put. Row is:
UID: 2
	Quantity: 5
	myArray: [5,7,15,1,5]

Found a put. Row is:
UID: 3
	Quantity: 2
	myArray: [10,7]

Found a put. Row is:
UID: 4
	Quantity: 7
	myArray: [2,17,5,9,1,10,5]

Found a put. Row is:
UID: 5
	Quantity: 5
	myArray: [13,1,2,3,11]

Found a delete. Deleted row is:
Del OP [seq: 6304, shard id: 1, primary key: {
  "uid" : 2
}]

Found a put. Row is:
UID: 6
	Quantity: 9
	myArray: [16,7,11,13,13,10,11,15,5]

Found a put. Row is:
UID: 7
	Quantity: 2
	myArray: [11,3]

Found a put. Row is:
UID: 8
	Quantity: 6
	myArray: [12,12,5,11,11,3]

Found a put. Row is:
UID: 9
	Quantity: 4
	myArray: [10,7,6,4]

Found a put. Row is:
UID: 10
	Quantity: 8
	myArray: [3,9,18,11,16,12,6,2]

    ...

この例が実行されるたびに、常に、Users表について検出された最初の表操作(つまり、表の最初の行(UID 0)を作成した書込み操作)からストリーミングが開始されます。毎回、最初からストリームするのではなく、N番目の操作からストリームするには、チェックポイントを実装する必要があります。これらについては、次の章のチェックポイントの使用方法で説明します。