GSGSubscriberExampleへのチェックポイントの実装

この項では、NoSQLSubscriberの例で提供されている例に機能を追加して、チェックポイントを実装する方法を示します。

また、Streamsの例で示されているStreamsアプリケーションの例にも機能を追加する必要があります。それらの更新については、前の項のGSGStreamExampleへのチェックポイントの実装を参照してください。

元のサンプル・コードへの新しい追加は太字で示されます。

GSGSubscriberExample.javaには、やや大幅な変更を加えます。まず、チェックポイント実装に必要なプライベート・データ・メンバーをいくつか追加します。

  • chkptInvは、チェックポイントについてGSGStreamsExampleを更新したときに定義したチェックポイント間隔です。この変数は、このサブスクライバがチェックポイントを実行するまでの操作の数を示します。

  • ckptSuccは、チェックポイントが成功したかどうかを示すフラグです。

  • CHECKPOINT_TIMEOUT_MSは、失敗と宣言されるまでにチェックポイントを実行できる時間(ミリ秒)です。

package pubsub;

import java.util.List;

import oracle.kv.pubsub.NoSQLSubscriber;
import oracle.kv.pubsub.NoSQLSubscription;
import oracle.kv.pubsub.NoSQLSubscriptionConfig;
import oracle.kv.pubsub.StreamOperation;
import oracle.kv.pubsub.StreamPosition;

import oracle.kv.table.MapValue;
import oracle.kv.table.Row;

import org.reactivestreams.Subscription;

class GSGSubscriberExample implements NoSQLSubscriber {

    /* subscription configuration */
    private final NoSQLSubscriptionConfig config;

    /* number of operations to stream */
    private final int numOps;

    /* number of operations seen in the stream */
    private long streamOps;

    private NoSQLSubscription subscription;

    private boolean isSubscribeSucc;

    private Throwable causeOfFailure;

/* checkpoint interval in number of ops */
    private final long ckptInv;

    /* 
     * true if checkpoint is successful. 
     * because this value can technically be changed by
     * different threads, we declare it as volatile
     */
    private volatile boolean ckptSucc;
    /*
     * amount of time in milliseconds that the checkpoint
     * has to run before the operation times out.
     */
    private final static long CHECKPOINT_TIMEOUT_MS = 60 * 1000;
        

次に、このクラスが構築されるときにチェックポイント間隔を指定できるように、クラスのシグネチャを変更します。また、ckptInvプライベート・データ・メンバーを初期化します。

    GSGSubscriberExample(NoSQLSubscriptionConfig config,
                                 int numOps, long ckptIntv) {
        this.config = config;
        this.numOps = numOps;

        causeOfFailure = null;
        isSubscribeSucc = false;
        streamOps = 0;
        subscription = null;

        this.ckptInv = ckptIntv;
    } 

    @Override
    public NoSQLSubscriptionConfig getSubscriptionConfig() {
        return config;
    }

    @Override
    public void onSubscribe(Subscription s) {
        subscription = (NoSQLSubscription) s;
        subscription.request(numOps);
        isSubscribeSucc = true;
    }

    @Override
    public void onError(Throwable t) {
        causeOfFailure = t;
        System.out.println("Error: " + t.getMessage());
    }

    @Override
    public void onComplete() {
        /* shall be no-op */
    }

    @Override
    public void onWarn(Throwable t) {
        System.out.println("Warning: " + t.getMessage());
    } 

次に、これまで実装されていなかったonCheckpointComplete()を実装します。この簡単な例では、チェックポイントの成功ステータスを示すためにのみ、これを使用します。causeメソッド・パラメータがnullの場合、チェックポイントが成功したかどうかがわかります。

NoSQLSubscription.doCheckpoint()の戻りステータスを調べることはできません。このメソッドは、別個のスレッドで非同期に実行されます。その理由は、doCheckpoint()がチェックポイントの完了を待たずに即座に制御を戻すことができるようにするためです。

    /* called when publisher finishes a checkpoint */
    @Override
    public void onCheckpointComplete(StreamPosition pos,
                                     Throwable cause) {
    if (cause == null) {
            ckptSucc = true;
            System.out.println("Finish checkpoint at position " +
                    pos);
        } else {
            ckptSucc = false;
            System.out.println("Fail to checkpoint at position " +
                    pos + ", cause: " + cause.getMessage());
        }        

    } 

次に、新しい内部メソッドperformCheckpoint() (次に説明します)を常にコールするようにonNext()メソッドを更新します。

これまでは、チェックポイントを実行する時期かどうかを判断するためのロジックをここで追加できました。かわりに、その機能を新しいdoCheckpoint()メソッドに含めます。

    @Override
    public void onNext(StreamOperation t) {

        switch (t.getType()) {
            case PUT:
                streamOps++;
                System.out.println("\nFound a put. Row is:");

                StreamOperation.PutEvent pe = t.asPut();
                Row row = pe.getRow();

                Integer uid = row.get("uid").asInteger().get();
                System.out.println("UID: " + uid);

                MapValue myjson = row.get("myJSON").asMap();
                int quantity = myjson.get("quantity")
                                     .asInteger().get();
                String array = 
                    myjson.get("myArray").asArray().toString();
                System.out.println("\tQuantity: " + quantity);
                System.out.println("\tmyArray: " + array);
                break;
            case DELETE:
                streamOps++;
                System.out.println("\nFound a delete. Row is:");
                System.out.println(t);
                break;

            default:
                throw new 
                    IllegalStateException("Receive unsupported " +
                        "stream operation from shard " +
                                t.getRepGroupId() +
                                ", seq: " + t.getSequenceId());
        }

        performCheckpoint();
        if (streamOps == numOps) {
            getSubscription().cancel();
            System.out.println("Subscription cancelled after " +
                    "receiving " + numOps + " operations.");
        }
    }

最後に、新しいプライベート・メソッドperformCheckpoint()を実装します。このメソッドは、チェックポイント機能の大部分を実装します。

このメソッドでは、最初にchkptInv0かどうかをチェックします。該当する場合は、次のように返します。

    private void performCheckpoint() {

        /* If 0, turn off checkpointing */
        if (ckptInv == 0) {
            return;
        }

チェックポイントは、streamOpsの数がゼロより大きく、かつstreamOpsの数がckptInvで割り切れる場合に実行されます。これらの条件が満たされると、NoSQLSubscription.getCurrentPosition()を使用して現在のStreamPositionが取得され、その後、NoSQLSubscription.doCheckpoint()を使用して実際にチェックポイントが実行されます。

最後に、チェックポイントが完了すると、その成功ステータスがチェックされます。成功ステータスがどのようなものであれ、それをコンソールに報告して完了です。本番コードでは、特に、チェックポイントが成功しなかった場合は、より複雑なアクションをここで実行することを検討するようお薦めします。

            if (ckptSucc) {
                System.out.println("\nCheckpoint succeeded after "
                        + streamOps +
                        " operations at position " + ckptPos +
                        ", elapsed time in ms " +
                        (System.currentTimeMillis() - start));
                /* reset for next checkpoint */
                ckptSucc = false;
            } else {
                System.out.println("\nCheckpoint timeout " +
                                   "at position " + ckptPos +
                                   ", elapsed time in ms " +
                                   (System.currentTimeMillis() - start));
            }
        }
    }

    private boolean isCkptTimeout(long start) {
        return (System.currentTimeMillis() - start) >
            CHECKPOINT_TIMEOUT_MS;
    }
 
    String getCauseOfFailure() {
        if (causeOfFailure == null) {
            return "success";
        }
        return causeOfFailure.getMessage();
    }

    boolean isSubscriptionSucc() {
        return isSubscribeSucc;
    }

    long getStreamOps() {
        return streamOps;
    }

    NoSQLSubscription getSubscription() {
        return subscription;
    }

}