Implementing Checkpoints in GSGSubscriberExample

In this section, we illustrate how to implement checkpoints by adding functionality to the examples provided in NoSQLSubscriber Example.

Be aware that you must also add functionality to the example streams application shown in Streams Example. For those updates, see the previous section, Implementing Checkpoints in GSGStreamExample.

New additions to the original example code are indicated by bold text.

The changes to GSGSubscriberExample.java are moderately extensive. To begin, we add some private data members necessary for our checkpoint implementation.

  • chkptInv is the checkpoint interval that we defined when we updated GSGStreamsExample for checkpoints. This variable indicates the number of operations that this subscriber sees before running a checkpoint.

  • ckptSucc is a flag to indicate whether a checkpoint is successful.

  • CHECKPOINT_TIMEOUT_MS is the time in milliseconds a checkpoint can run before it is declared a failure.

package pubsub;

import java.util.List;

import oracle.kv.pubsub.NoSQLSubscriber;
import oracle.kv.pubsub.NoSQLSubscription;
import oracle.kv.pubsub.NoSQLSubscriptionConfig;
import oracle.kv.pubsub.StreamOperation;
import oracle.kv.pubsub.StreamPosition;

import oracle.kv.table.MapValue;
import oracle.kv.table.Row;

import org.reactivestreams.Subscription;

class GSGSubscriberExample implements NoSQLSubscriber {

    /* subscription configuration */
    private final NoSQLSubscriptionConfig config;

    /* number of operations to stream */
    private final int numOps;

    /* number of operations seen in the stream */
    private long streamOps;

    private NoSQLSubscription subscription;

    private boolean isSubscribeSucc;

    private Throwable causeOfFailure;

/* checkpoint interval in number of ops */
    private final long ckptInv;

    /* 
     * true if checkpoint is successful. 
     * because this value can technically be changed by
     * different threads, we declare it as volatile
     */
    private volatile boolean ckptSucc;
    /*
     * amount of time in milliseconds that the checkpoint
     * has to run before the operation times out.
     */
    private final static long CHECKPOINT_TIMEOUT_MS = 60 * 1000;
        

Next we change our class signature to allow specification of the checkpoint interval when this class is constructed. We also initialize our ckptInv private data member.

    GSGSubscriberExample(NoSQLSubscriptionConfig config,
                                 int numOps, long ckptIntv) {
        this.config = config;
        this.numOps = numOps;

        causeOfFailure = null;
        isSubscribeSucc = false;
        streamOps = 0;
        subscription = null;

        this.ckptInv = ckptIntv;
    } 

    @Override
    public NoSQLSubscriptionConfig getSubscriptionConfig() {
        return config;
    }

    @Override
    public void onSubscribe(Subscription s) {
        subscription = (NoSQLSubscription) s;
        subscription.request(numOps);
        isSubscribeSucc = true;
    }

    @Override
    public void onError(Throwable t) {
        causeOfFailure = t;
        System.out.println("Error: " + t.getMessage());
    }

    @Override
    public void onComplete() {
        /* shall be no-op */
    }

    @Override
    public void onWarn(Throwable t) {
        System.out.println("Warning: " + t.getMessage());
    } 

Next, we implement onCheckpointComplete(), which was not implemented earlier. In this simple example, we use it only to indicate the checkpoint's success status. You can tell if the checkpoint is successful if the cause method parameter is null.

Notice that we cannot examine the return status of NoSQLSubscription.doCheckpoint() because that method runs asynchronously, in a separate thread. The reason is so that doCheckpoint() is free to return immediately without waiting for the checkpoint to complete.

    /* called when publisher finishes a checkpoint */
    @Override
    public void onCheckpointComplete(StreamPosition pos,
                                     Throwable cause) {
    if (cause == null) {
            ckptSucc = true;
            System.out.println("Finish checkpoint at position " +
                    pos);
        } else {
            ckptSucc = false;
            System.out.println("Fail to checkpoint at position " +
                    pos + ", cause: " + cause.getMessage());
        }        

    } 

Next, we update the onNext() method to always call a new internal method, performCheckpoint() (described next).

We could have added logic here to determine if it is time to run a checkpoint. Instead, we push that functionality into the new doCheckpoint() method.

    @Override
    public void onNext(StreamOperation t) {

        switch (t.getType()) {
            case PUT:
                streamOps++;
                System.out.println("\nFound a put. Row is:");

                StreamOperation.PutEvent pe = t.asPut();
                Row row = pe.getRow();

                Integer uid = row.get("uid").asInteger().get();
                System.out.println("UID: " + uid);

                MapValue myjson = row.get("myJSON").asMap();
                int quantity = myjson.get("quantity")
                                     .asInteger().get();
                String array = 
                    myjson.get("myArray").asArray().toString();
                System.out.println("\tQuantity: " + quantity);
                System.out.println("\tmyArray: " + array);
                break;
            case DELETE:
                streamOps++;
                System.out.println("\nFound a delete. Row is:");
                System.out.println(t);
                break;

            default:
                throw new 
                    IllegalStateException("Receive unsupported " +
                        "stream operation from shard " +
                                t.getRepGroupId() +
                                ", seq: " + t.getSequenceId());
        }

        performCheckpoint();
        if (streamOps == numOps) {
            getSubscription().cancel();
            System.out.println("Subscription cancelled after " +
                    "receiving " + numOps + " operations.");
        }
    }

Finally, we implement a new private method, performCheckpoint(). This method implements the bulk of the checkpoint functionality.

In this method, we first check if chkptInv is 0. If it is, we return:

    private void performCheckpoint() {

        /* If 0, turn off checkpointing */
        if (ckptInv == 0) {
            return;
        }

A checkpoint is run if the number of streamOps is greater than zero, and if the number of streamOps is evenly divisible by ckptInv. If these conditions are met, NoSQLSubscription.getCurrentPosition() is used to get the current StreamPosition, and then NoSQLSubscription.doCheckpoint() is used to actually perform the checkpoint.

Finally, once the checkpoint concludes, we check its success status. Regardless of the success status, we report it to the console, and then we are done. For production code, we recommend that you consider taking more elaborate actions here, especially if the checkpoint was not successful.

            if (ckptSucc) {
                System.out.println("\nCheckpoint succeeded after "
                        + streamOps +
                        " operations at position " + ckptPos +
                        ", elapsed time in ms " +
                        (System.currentTimeMillis() - start));
                /* reset for next checkpoint */
                ckptSucc = false;
            } else {
                System.out.println("\nCheckpoint timeout " +
                                   "at position " + ckptPos +
                                   ", elapsed time in ms " +
                                   (System.currentTimeMillis() - start));
            }
        }
    }

    private boolean isCkptTimeout(long start) {
        return (System.currentTimeMillis() - start) >
            CHECKPOINT_TIMEOUT_MS;
    }
 
    String getCauseOfFailure() {
        if (causeOfFailure == null) {
            return "success";
        }
        return causeOfFailure.getMessage();
    }

    boolean isSubscriptionSucc() {
        return isSubscribeSucc;
    }

    long getStreamOps() {
        return streamOps;
    }

    NoSQLSubscription getSubscription() {
        return subscription;
    }

}