Streams Example

This section presents an example of a streams application. While this example is simplified as much as possible, its broad outline is typical for applications of this nature.

This example application makes use of the example Subscriber that we described in NoSQLSubscriber Example.

This application begins by defining information required by the application. It indicates what table the application will watch a single subscriber can receive operations from multiple tables, but for this example we will only subscribe to the table Users. Also, num is the number of operations the subscriber will request for the Users table.

Note:

Stream operations support namespaces. If you want to subscribe to a table in a namespace, prefix the table name with a namespace and a colon (:), as follows: ns1:Users.

We then provide Oracle NoSQL Database connection information. Because this is a simple example that exists purely for illustration purposes, we avoid authentication issues by using a non-secure store. However, in a production environment you will probably be required to provide authentication credentials as described in Authenticating to a Secure Store and Reauthentication.

Finally, we provide some information that is specific to a streaming application. MAX_SUBSCRIPTION_TIME_MS is used to identify how long the application can wait before it times out. CKPT_TABLE_NAME is the name of the checkpoint table. This information is required when constructing a NoSQLPublisher, but is not otherwise used by this particular application. For information about checkpoints, see Using Checkpoints.

package pubsub;

import oracle.kv.KVStoreConfig;
import oracle.kv.pubsub.NoSQLPublisher;
import oracle.kv.pubsub.NoSQLPublisherConfig;
import oracle.kv.pubsub.NoSQLSubscriptionConfig;

public class GSGStreamExample {

    /* table to subscribe */
    private static final String TABLE_NAME = "Users";
    /* Number of operations to stream */
    private static final int num = 100;

    private static final String storeName = "kvstore";
    private static final String[] hhosts = {"localhost:5000"};

    /* max subscription allowed time before forced termination */
    private static final long MAX_SUBSCRIPTION_TIME_MS =
        Long.MAX_VALUE;

    private static final String rootPath = ".";
    private static final String CKPT_TABLE_NAME = "CheckpointTable";

    public static void main(final String args[]) throws Exception {

        final GSGStreamExample gte = new GSGStreamExample(args);
        gte.run();
    }

    private GSGStreamExample(final String[] argv) {
    } 

First we construct a NoSQLPublisher object. NoSQLPublisherConfig is used to specify the Oracle NoSQL Database connection information.

    /* 
     * Subscribes a table. The work flow is ReactiveStream 
     * compatible 
     */
    private void run() throws Exception {

        NoSQLPublisher publisher = null;
        try {
            /* step 1 : create a publisher configuration */
            final NoSQLPublisherConfig publisherConfig =
                new NoSQLPublisherConfig.Builder(
                    new KVStoreConfig(storeName, hhosts), rootPath)
                    .build();

            /* step 2 : create a publisher */
            publisher = NoSQLPublisher.get(publisherConfig);

Next we construct a NoSQLSubscriptionConfig. This is where we identify the table(s) to which we are subscribing.

            /* step 3: create a subscription configuration */
            final NoSQLSubscriptionConfig subscriptionConfig =
                /* stream with specified mode */
                new NoSQLSubscriptionConfig.Builder(CKPT_TABLE_NAME)
                .setSubscribedTables(TABLE_NAME)
                .build(); 

Now we construct our subscriber. Here, we use the NoSQLSubscriber implementation that we describe in NoSQLSubscriber Example.

            /* step 4: create a subscriber */
            final GSGSubscriberExample subscriber =
                new GSGSubscriberExample(subscriptionConfig, num);
            System.out.println("Subscriber created to stream " +
                    num + " operations."); 

The above example specifies the number of events to be streamed as 100. However, if you want to do continuous streaming, you must use new GSGSubscriberExample(subscriptionConfig, Long.MAX_VALUE).

Next you create a subscription. If GSGSubscriberExample reports an error on creating the subscription (using the getSubscriptionSucc() method), we throw an Exception and quit the application with a message that identifies the nature of the error (getCauseOfFailure()) and the subscriber's unique ID (getSubscriberId()).

            /* step 5: create a subscription and start stream */
            publisher.subscribe(subscriber);
            if (!subscriber.isSubscriptionSucc()) {
                System.out.println("Subscription failed for " +
                              subscriber.getSubscriptionConfig()
                                        .getSubscriberId() +
                              ", reason " + 
                              subscriber.getCauseOfFailure());

                throw new RuntimeException("fail to subscribe");
            }
            System.out.println("Start stream " + num +
                    " operations from table " + TABLE_NAME); 

At this point we put the application thread to sleep, which allows the subscriber to run unimpeded by the parent application. Occasionally we allow this thread to wake up, check how many stream operations have been consumed by the subscriber, and make sure we have not exceeded our maximum amount of run time. If we have exceeded our timeout threshold, we throw an exception and quit the application. Otherwise, we continue to run until all of the required operations have been consumed.

            /*
             * Wait for the stream to finish. Throw exception if it
             * cannot finish within max allowed elapsed time
             */
            final long s = System.currentTimeMillis();
            while (subscriber.getStreamOps() < num) {
                final long elapsed = System.currentTimeMillis() - s;
                if (elapsed >= MAX_SUBSCRIPTION_TIME_MS) {
                    throw new
                        RuntimeException("Not done within max " +
                                "allowed elapsed time");
                }
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    throw new RuntimeException("Interrupted!");
                }
            } 

Finally, we clean up and close the application. Note that we could cancel the subscription at this point using subscriber.getSubscription.cancel(), but our GSGSubscriberExample class is already calling that in its onNext() method. For a more robust application, you could call cancel() from the stream application itself, particularly as a part of responding to error situations.

            /* step 6: clean up */
            subscriber.getSubscription().cancel();
            publisher.close(true);
            System.out.println("Publisher closed normally.");

        } catch (Exception exp) {
            String msg = "Error: " + exp.getMessage();
            System.out.println(msg);
            if (publisher != null) {
                publisher.close(exp, false);
                System.out.println("Publisher closed with error.");
            }
            throw exp;
        } finally {
           System.out.println("All done.");
        }
    }
} 

Sample Streams Output

Once the Users table is loaded with sample data (see GSGStreamsWriteTable), the output from this example program is as follows (the output is truncated at the 12th operation for brevity):

> java pubsub.GSGStreamExample
Subscriber created to stream 100 operations.
Start stream 100 operations from table Users

Found a put. Row is:
UID: 0
	Quantity: 10
	myArray: [1,14,3,9,12,12,13,13,4,6]

Found a put. Row is:
UID: 1
	Quantity: 4
	myArray: [3,14,1,13]

Found a put. Row is:
UID: 2
	Quantity: 5
	myArray: [5,7,15,1,5]

Found a put. Row is:
UID: 3
	Quantity: 2
	myArray: [10,7]

Found a put. Row is:
UID: 4
	Quantity: 7
	myArray: [2,17,5,9,1,10,5]

Found a put. Row is:
UID: 5
	Quantity: 5
	myArray: [13,1,2,3,11]

Found a delete. Deleted row is:
Del OP [seq: 6304, shard id: 1, primary key: {
  "uid" : 2
}]

Found a put. Row is:
UID: 6
	Quantity: 9
	myArray: [16,7,11,13,13,10,11,15,5]

Found a put. Row is:
UID: 7
	Quantity: 2
	myArray: [11,3]

Found a put. Row is:
UID: 8
	Quantity: 6
	myArray: [12,12,5,11,11,3]

Found a put. Row is:
UID: 9
	Quantity: 4
	myArray: [10,7,6,4]

Found a put. Row is:
UID: 10
	Quantity: 8
	myArray: [3,9,18,11,16,12,6,2]

    ...

Every time this example is run, it always starts streaming from the first table operation seen for the Users table; that is, for the write operation that created the first row in the table (UID 0). Instead of streaming from the beginning every time, if you want to stream from the Nth operation, you need to implement checkpoints. These are described in the next chapter, Using Checkpoints.