NoSQLSubscriber Example

This section provides a complete, but simple, NoSQLSubscriber example called GSGSubscriberExample. This implementation is used by the publisher example shown in Streams Example.

GSGSubscriberExample subscribes to a single table called Users. To see the application that defines this table and writes table rows to it, see GSGStreamsWriteTable.

To begin, we provide our imports. Notice that org.reactivestreams.Subscription is a required import. Your Java environment must have the reactive-streams.jar file in its classpath in order to both compile and run this example code.

package pubsub;

import java.util.List;

import oracle.kv.pubsub.NoSQLSubscriber;
import oracle.kv.pubsub.NoSQLSubscription;
import oracle.kv.pubsub.NoSQLSubscriptionConfig;
import oracle.kv.pubsub.StreamOperation;
import oracle.kv.pubsub.StreamPosition;

import oracle.kv.table.MapValue;
import oracle.kv.table.Row;

import org.reactivestreams.Subscription; 

Next we declare our class, and initialize our data members. As described previously, this is an implementation of NoSQLSubscriber.

class GSGSubscriberExample implements NoSQLSubscriber {

    /* subscription configuration */
    private final NoSQLSubscriptionConfig config;

    /* number of operations to stream */
    private final int numOps;

    /* number of operations seen in the stream */
    private long streamOps;

    private NoSQLSubscription subscription;

    private boolean isSubscribeSucc;

    private Throwable causeOfFailure;

    GSGSubscriberExample(NoSQLSubscriptionConfig config,
                                 int numOps) {
        this.config = config;
        this.numOps = numOps;

        causeOfFailure = null;
        isSubscribeSucc = false;
        streamOps = 0;
        subscription = null;
    }

The first thing we do is implement NoSQLSubscriber.getSubscriptionConfig(). This simply returns our NoSQLSubscriptionConfig object, which is provided to the class when it is constructed by the implementing streams application. This method is how the publisher will learn how to configure the stream for this subscriber.

    @Override
    public NoSQLSubscriptionConfig getSubscriptionConfig() {
        return config;
    } 

The implementation we provide for onSubscribe() does several things. First, it makes the NoSQLSubscription class instance available to this subscriber implementation. Notice that the instance is passed to this class as an object of type org.reactivestreams.Subscription, and that object must be cast to NoSQLSubscription.

This method is also where this subscriber begins requesting operations from the subscription. Without that call to NoSQLSubscription.request(), this subscriber will never receive any operations to process. For this simple implementation, this is the only place operations are requested. In a more elaborate implementation, operations are initially asked for here, and once that number of operations have been received by the subscriber, more can be asked for in another part of the class usually in onNext().

Finally, we signal that the subscription attempt is a success. This information is used by our streams application when we are creating the publisher and subscriber.

    @Override
    public void onSubscribe(Subscription s) {
        subscription = (NoSQLSubscription) s;
        subscription.request(numOps);
        isSubscribeSucc = true;
    } 

Next we set up our Error and Warning handlers. Note that, when onError is called, the subscription has already been canceled. Here, we do the simple thing and simply write to the console. However, a more robust implementation would write to the application log file, and potentially take other notifications and/or corrective actions (such as quit processing the stream entirely), depending on the nature of the error.

    @Override
    public void onError(Throwable t) {
        causeOfFailure = t;
        System.out.println("Error: " + t.getMessage());
    }

    @Override
    public void onWarn(Throwable t) {
        System.out.println("Warning: " + t.getMessage());
    }

The application has to provide an onComplete() method, although the implementation is not required to do anything, since this method is not called.

    @Override
    public void onComplete() {
        /* no-op */
    } 

Because this example does not implement checkpoints (see Using Checkpoints for more information), there is nothing to do in this method.

    /* called when publisher finishes a checkpoint */
    @Override
    public void onCheckpointComplete(StreamPosition pos,
                                     Throwable cause) {
        /* no-op. This example doesn't implement checkpoints */
    } 

The onNext() method is where the subscriber receives and processes individual stream operations in the form of StreamOperation objects.

In the following method, we show how to determine what type of operation the subscription has received (either put or delete). What you would do with an individual operation is up to your application's requirements. In this case, for put operations we retrieve field information from the enclosed Row object, and write it to the console. Be aware that this code is not very robust. In particular, we expect JSON data with a specific schema. Because any valid JSON can be written to a JSON table column, some defensive code is required here for a production application to ensure that the JSON column contains the expected schema.

For delete operations, we simply write the StreamOperation object to the console.

    @Override
    public void onNext(StreamOperation t) {

        switch (t.getType()) {
            case PUT:
                streamOps++;
                System.out.println("\nFound a put. Row is:");

                StreamOperation.PutEvent pe = t.asPut();
                Row row = pe.getRow();

                Integer uid = row.get("uid").asInteger().get();
                System.out.println("UID: " + uid);

                MapValue myjson = row.get("myJSON").asMap();
                int quantity = myjson.get("quantity")
                                     .asInteger().get();
                String array = 
                    myjson.get("myArray").asArray().toString();
                System.out.println("\tQuantity: " + quantity);
                System.out.println("\tmyArray: " + array);
                break;
            case DELETE:
                streamOps++;
                System.out.println("\nFound a delete. Row is:");
                System.out.println(t);
                break;

            default:
                throw new 
                    IllegalStateException("Receive unsupported " +
                        "stream operation from shard " +
                                t.getRepGroupId() +
                                ", seq: " + t.getSequenceId());
        }
        if (streamOps == numOps) {
            getSubscription().cancel();
            System.out.println("Subscription cancelled after " +
                    "receiving " + numOps + " operations.");
        }
    } 

Finally, we provide a series of getter methods, which are used by our stream application to retrieve information of interest from this subscriber. Using a Streams Publisher shows how these are used.

    String getCauseOfFailure() {
        if (causeOfFailure == null) {
            return "success";
        }
        return causeOfFailure.getMessage();
    }

    boolean isSubscriptionSucc() {
        return isSubscribeSucc;
    }

    long getStreamOps() {
        return streamOps;
    }

    NoSQLSubscription getSubscription() {
        return subscription;
    }

}