Implementing Checkpoints in GSGStreamExample

This section shows how to implement checkpoints by adding the functionality to the examples provided in Streams Example. You must also add functionality to the Subscriber implementation shown in NoSQLSubscriber Example. For those updates, see the next section, Implementing Checkpoints in GSGSubscriberExample.

New additions to the original example code are indicated by bold text.

The changes to GSGStreamsExample.java are fairly minor. To begin, we need to import NoSQLStreamMode:

package pubsub;

import oracle.kv.KVStoreConfig;
import oracle.kv.pubsub.NoSQLPublisher;
import oracle.kv.pubsub.NoSQLPublisherConfig;
import oracle.kv.pubsub.NoSQLStreamMode;
import oracle.kv.pubsub.NoSQLSubscriptionConfig; 

Next, we add several new private data members.

The first of these is chkptIntv, indicating how many operations this application will see before it runs a checkpoint. In this case, for illustration purposes, we are running a checkpoint for every ten operations. If this were production code, this would probably prove to be too frequent. Also, you are not required to take a checkpoint on a number of operations interval. You can perform them for any reason whatsoever. You could, for example, take checkpoints on a clock interval. Or you could take them whenever you see a delete operation, or whenever you see a table row written that conforms to some meaningful criteria.

Beyond the checkpoint interval, we indicate our stream mode will be FROM_CHECKPOINT.

public class GSGStreamExample {

    /* table to subscribe */
    private static final String TABLE_NAME = "Users";
    /* Number of operations to stream */
    private static final int num = 100;

    private static final String storeName = "kvstore";
    private static final String[] hhosts = {"localhost:5000"};

    /* max subscription allowed time before forced termination */
    private static final long MAX_SUBSCRIPTION_TIME_MS =
        Long.MAX_VALUE;

    private static final String rootPath = ".";
    private static final String CKPT_TABLE_NAME = "CheckpointTable";
    /* number of ops before a checkpoint is performed */
    private long ckptIntv = 10;
    private NoSQLStreamMode streamMode =
                            NoSQLStreamMode.FROM_CHECKPOINT;
    

Next we add the desired stream mode when we configure the subscription.

    public static void main(final String args[]) throws Exception {

        final GSGStreamExample gte = new GSGStreamExample(args);
        gte.run();
    }

    private GSGStreamExample(final String[] argv) {
    }

    /* 
     * Subscribes a table. The work flow is ReactiveStream 
     * compatible 
     */
    private void run() throws Exception {

        NoSQLPublisher publisher = null;
        try {
            /* step 1 : create a publisher configuration */
            final NoSQLPublisherConfig publisherConfig =
                new NoSQLPublisherConfig.Builder(
                    new KVStoreConfig(storeName, hhosts), rootPath)
                    .build();

            /* step 2 : create a publisher */
            publisher = NoSQLPublisher.get(publisherConfig);

            /* step 3: create a subscription configuration */
            final NoSQLSubscriptionConfig subscriptionConfig =
                /* stream with specified mode */
                new NoSQLSubscriptionConfig.Builder(CKPT_TABLE_NAME)
                .setSubscribedTables(TABLE_NAME)
                .setStreamMode(streamMode)
                .build();

The only other change to this application is to provide our checkpoint interval to our NoSQLSubscriber implementation. Again, this change is driven purely by how we choose to know when to take a checkpoint in this example. Your production code can, and probably will, do something entirely different.

            /* step 4: create a subscriber */
            final GSGSubscriberExample subscriber =
                new GSGSubscriberExample(subscriptionConfig, num,
                        ckptIntv);
            System.out.println("Subscriber created to stream " +
                    num + " operations.");

            /* step 5: create a subscription and start stream */
            publisher.subscribe(subscriber);
            if (!subscriber.isSubscriptionSucc()) {
                System.out.println("Subscription failed for " +
                              subscriber.getSubscriptionConfig()
                                        .getSubscriberId() +
                              ", reason " +
                              subscriber.getCauseOfFailure());

                throw new RuntimeException("fail to subscribe");
            }
            System.out.println("Start stream " + num +
                    " operations from table " + TABLE_NAME);


            /*
             * Wait for the stream to finish. Throw exception if it 
             * cannot finish within max allowed elapsed time
             */
            final long s = System.currentTimeMillis();
            while (subscriber.getStreamOps() < num) {
                final long elapsed = System.currentTimeMillis() - s;
                if (elapsed >= MAX_SUBSCRIPTION_TIME_MS) {
                    throw new
                        RuntimeException("Not done within max " +
                                "allowed elapsed time");
                }
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    throw new RuntimeException("Interrupted!");
                }
            }

            /* step 6: clean up */
            publisher.close(true);
            System.out.println("Publisher closed normally.");

        } catch (Exception exp) {
            String msg = "Error: " + exp.getMessage();
            System.out.println(msg);
            if (publisher != null) {
                publisher.close(exp, false);
                System.out.println("Publisher closed with error.");
            }
            throw exp;
        } finally {
           System.out.println("All done.");
        }
    }
}