NoSQLSubscriber Example
This section provides a complete, but simple, NoSQLSubscriber
example called GSGSubscriberExample
. This implementation is used by the publisher example shown in Streams Example.
GSGSubscriberExample
subscribes to a single table called Users
. To see the application that defines this table and writes table rows to it, see GSGStreamsWriteTable.
To begin, we provide our imports. Notice that org.reactivestreams.Subscription
is a required import. Your Java environment must have the reactive-streams.jar
file in its classpath in order to both compile and run this example code.
package pubsub;
import java.util.List;
import oracle.kv.pubsub.NoSQLSubscriber;
import oracle.kv.pubsub.NoSQLSubscription;
import oracle.kv.pubsub.NoSQLSubscriptionConfig;
import oracle.kv.pubsub.StreamOperation;
import oracle.kv.pubsub.StreamPosition;
import oracle.kv.table.MapValue;
import oracle.kv.table.Row;
import org.reactivestreams.Subscription;
Next we declare our class, and initialize our data members. As described previously, this is an implementation of NoSQLSubscriber
.
class GSGSubscriberExample implements NoSQLSubscriber {
/* subscription configuration */
private final NoSQLSubscriptionConfig config;
/* number of operations to stream */
private final int numOps;
/* number of operations seen in the stream */
private long streamOps;
private NoSQLSubscription subscription;
private boolean isSubscribeSucc;
private Throwable causeOfFailure;
GSGSubscriberExample(NoSQLSubscriptionConfig config,
int numOps) {
this.config = config;
this.numOps = numOps;
causeOfFailure = null;
isSubscribeSucc = false;
streamOps = 0;
subscription = null;
}
The first thing we do is implement NoSQLSubscriber.getSubscriptionConfig()
. This simply returns our NoSQLSubscriptionConfig
object, which is provided to the class when it is constructed by the implementing streams application. This method is how the publisher will learn how to configure the stream for this subscriber.
@Override
public NoSQLSubscriptionConfig getSubscriptionConfig() {
return config;
}
The implementation we provide for onSubscribe()
does several things. First, it makes the NoSQLSubscription
class instance available to this subscriber implementation. Notice that the instance is passed to this class as an object of type org.reactivestreams.Subscription
, and that object must be cast to NoSQLSubscription
.
This method is also where this subscriber begins requesting operations from the subscription. Without that call to NoSQLSubscription.request()
, this subscriber will never receive any operations to process. For this simple implementation, this is the only place operations are requested. In a more elaborate implementation, operations are initially asked for here, and once that number of operations have been received by the subscriber, more can be asked for in another part of the class — usually in onNext()
.
Finally, we signal that the subscription attempt is a success. This information is used by our streams application when we are creating the publisher and subscriber.
@Override
public void onSubscribe(Subscription s) {
subscription = (NoSQLSubscription) s;
subscription.request(numOps);
isSubscribeSucc = true;
}
Next we set up our Error and Warning handlers. Note that, when onError
is called, the subscription has already been canceled. Here, we do the simple thing and simply write to the console. However, a more robust implementation would write to the application log file, and potentially take other notifications and/or corrective actions (such as quit processing the stream entirely), depending on the nature of the error.
@Override
public void onError(Throwable t) {
causeOfFailure = t;
System.out.println("Error: " + t.getMessage());
}
@Override
public void onWarn(Throwable t) {
System.out.println("Warning: " + t.getMessage());
}
The application has to provide an onComplete()
method, although the implementation is not required to do anything, since this method is not called.
@Override
public void onComplete() {
/* no-op */
}
Because this example does not implement checkpoints (see Using Checkpoints for more information), there is nothing to do in this method.
/* called when publisher finishes a checkpoint */
@Override
public void onCheckpointComplete(StreamPosition pos,
Throwable cause) {
/* no-op. This example doesn't implement checkpoints */
}
The onNext()
method is where the subscriber receives and processes individual stream operations in the form of StreamOperation
objects.
In the following method, we show how to determine what type of operation the subscription has received (either put
or delete
). What you would do with an individual operation is up to your application's requirements. In this case, for put
operations we retrieve field information from the enclosed Row
object, and write it to the console. Be aware that this code is not very robust. In particular, we expect JSON data with a specific schema. Because any valid JSON can be written to a JSON table column, some defensive code is required here for a production application to ensure that the JSON column contains the expected schema.
For delete
operations, we simply write the StreamOperation
object to the console.
@Override
public void onNext(StreamOperation t) {
switch (t.getType()) {
case PUT:
streamOps++;
System.out.println("\nFound a put. Row is:");
StreamOperation.PutEvent pe = t.asPut();
Row row = pe.getRow();
Integer uid = row.get("uid").asInteger().get();
System.out.println("UID: " + uid);
MapValue myjson = row.get("myJSON").asMap();
int quantity = myjson.get("quantity")
.asInteger().get();
String array =
myjson.get("myArray").asArray().toString();
System.out.println("\tQuantity: " + quantity);
System.out.println("\tmyArray: " + array);
break;
case DELETE:
streamOps++;
System.out.println("\nFound a delete. Row is:");
System.out.println(t);
break;
default:
throw new
IllegalStateException("Receive unsupported " +
"stream operation from shard " +
t.getRepGroupId() +
", seq: " + t.getSequenceId());
}
if (streamOps == numOps) {
getSubscription().cancel();
System.out.println("Subscription cancelled after " +
"receiving " + numOps + " operations.");
}
}
Finally, we provide a series of getter methods, which are used by our stream application to retrieve information of interest from this subscriber. Using a Streams Publisher shows how these are used.
String getCauseOfFailure() {
if (causeOfFailure == null) {
return "success";
}
return causeOfFailure.getMessage();
}
boolean isSubscriptionSucc() {
return isSubscribeSucc;
}
long getStreamOps() {
return streamOps;
}
NoSQLSubscription getSubscription() {
return subscription;
}
}