GSGStreamExampleへのチェックポイントの実装

この項では、Streamsの例で提供されている例に機能を追加して、チェックポイントを実装する方法を示します。また、NoSQLSubscriberの例で示されているサブスクライバ実装にも機能を追加する必要があります。それらの更新については、次の項のGSGSubscriberExampleへのチェックポイントの実装を参照してください。

元のサンプル・コードへの新しい追加は太字で示されます。

GSGStreamsExample.javaに対する変更は、ごくわずかです。まず、NoSQLStreamModeをインポートする必要があります。

package pubsub;

import oracle.kv.KVStoreConfig;
import oracle.kv.pubsub.NoSQLPublisher;
import oracle.kv.pubsub.NoSQLPublisherConfig;
import oracle.kv.pubsub.NoSQLStreamMode;
import oracle.kv.pubsub.NoSQLSubscriptionConfig; 

次に、新しいプライベート・データ・メンバーをいくつか追加します。

これらの最初のものは、このアプリケーションがチェックポイントを実行するまでの操作の数を示すchkptIntvです。ここでは、説明のために、10個の操作ごとにチェックポイントを実行します。本番コードの場合には、これは過度に頻繁である可能性があります。また、操作数による間隔でチェックポイントを取得する必要はありません。実行する理由はどのようなものでもかまいません。たとえば、クロック間隔でチェックポイントを取得できます。また、delete操作が検出されたり、なんらかの意味のある基準に準拠する表の行が書き込まれるたびに、それらを取得することもできます。

チェックポイント間隔を超えると、ストリーム・モードがFROM_CHECKPOINTになることを示します。

public class GSGStreamExample {

    /* table to subscribe */
    private static final String TABLE_NAME = "Users";
    /* Number of operations to stream */
    private static final int num = 100;

    private static final String storeName = "kvstore";
    private static final String[] hhosts = {"localhost:5000"};

    /* max subscription allowed time before forced termination */
    private static final long MAX_SUBSCRIPTION_TIME_MS =
        Long.MAX_VALUE;

    private static final String rootPath = ".";
    private static final String CKPT_TABLE_NAME = "CheckpointTable";
    /* number of ops before a checkpoint is performed */
    private long ckptIntv = 10;
    private NoSQLStreamMode streamMode =
                            NoSQLStreamMode.FROM_CHECKPOINT;
    

次に、サブスクリプションを構成するとき、目的のストリーム・モードを追加します。

    public static void main(final String args[]) throws Exception {

        final GSGStreamExample gte = new GSGStreamExample(args);
        gte.run();
    }

    private GSGStreamExample(final String[] argv) {
    }

    /* 
     * Subscribes a table. The work flow is ReactiveStream 
     * compatible 
     */
    private void run() throws Exception {

        NoSQLPublisher publisher = null;
        try {
            /* step 1 : create a publisher configuration */
            final NoSQLPublisherConfig publisherConfig =
                new NoSQLPublisherConfig.Builder(
                    new KVStoreConfig(storeName, hhosts), rootPath)
                    .build();

            /* step 2 : create a publisher */
            publisher = NoSQLPublisher.get(publisherConfig);

            /* step 3: create a subscription configuration */
            final NoSQLSubscriptionConfig subscriptionConfig =
                /* stream with specified mode */
                new NoSQLSubscriptionConfig.Builder(CKPT_TABLE_NAME)
                .setSubscribedTables(TABLE_NAME)
                .setStreamMode(streamMode)
                .build();

このアプリケーションに対する他の変更は、チェックポイント間隔をNoSQLSubscriber実装に設定することのみです。ここでも、この例でチェックポイントをいつ取得するかをどう判断したかにのみ基づいて、この変更を加えます。本番コードの処理は、一般的に、まったく異なるものになります。

            /* step 4: create a subscriber */
            final GSGSubscriberExample subscriber =
                new GSGSubscriberExample(subscriptionConfig, num,
                        ckptIntv);
            System.out.println("Subscriber created to stream " +
                    num + " operations.");

            /* step 5: create a subscription and start stream */
            publisher.subscribe(subscriber);
            if (!subscriber.isSubscriptionSucc()) {
                System.out.println("Subscription failed for " +
                              subscriber.getSubscriptionConfig()
                                        .getSubscriberId() +
                              ", reason " +
                              subscriber.getCauseOfFailure());

                throw new RuntimeException("fail to subscribe");
            }
            System.out.println("Start stream " + num +
                    " operations from table " + TABLE_NAME);


            /*
             * Wait for the stream to finish. Throw exception if it 
             * cannot finish within max allowed elapsed time
             */
            final long s = System.currentTimeMillis();
            while (subscriber.getStreamOps() < num) {
                final long elapsed = System.currentTimeMillis() - s;
                if (elapsed >= MAX_SUBSCRIPTION_TIME_MS) {
                    throw new
                        RuntimeException("Not done within max " +
                                "allowed elapsed time");
                }
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    throw new RuntimeException("Interrupted!");
                }
            }

            /* step 6: clean up */
            publisher.close(true);
            System.out.println("Publisher closed normally.");

        } catch (Exception exp) {
            String msg = "Error: " + exp.getMessage();
            System.out.println(msg);
            if (publisher != null) {
                publisher.close(exp, false);
                System.out.println("Publisher closed with error.");
            }
            throw exp;
        } finally {
           System.out.println("All done.");
        }
    }
}