Using the NoSQLSubscriber Interface

You implement subscribers using the oracle.kv.pubsub.NoSQLSubscriber interface, an extension of org.reactivestreams.Subscriber. NoSQLSubscriber provides the following methods, which you must implement:

  • onSubscribe()

    This is the method invoked after the publisher has successfully established contact with the Oracle NoSQL Database store. The argument you pass to this method is an org.reactivestreams.Subscription instance, which you cam then cast to oracle.kv.pubsub.NoSQLSubscription. See Working with Subscriptions.

    ...
    private NoSQLSubscription subscription;
    ...
    
    @Override
    public void onSubscribe(Subscription s) {
        subscription = (NoSQLSubscription) s;    
        // request 100 store operations be streamed to this 
        // subscriber.
        s.request(100);
    } 

    Note:

    You do not have to call the s.request(100)method inside onSubscribe(). Once an instance of NoSQLSubscription is available, you can call the method outside onSubscribe(). The main point of the onSubscribe() method here is to pass the user the subscription instance that the publisher generates.

  • onNext()

    Signals the next Oracle NoSQL Database operation. You pass this method a StreamOperation class instance. See Using the StreamOperation Class. This method is where you perform whatever processing you want to perform on the stream events.

    @Override
    public void onNext(StreamOperation t) {
        // perform processing on the StreamOperation
        // here. Typically you will do different
        // things depending on whether this is
        // a put or delete event.
        switch (t.getType()) {
    		case PUT:
    			{
    				// Process the put operation here.
    			}
    			break;
    		case DELETE:
    			{
    				// Process the delete operation here.
    			}
    			break;
    		default:
                // Received an unknown and therefore illegal operation 
                // type.
                throw new 
                    IllegalStateException("... exception message ...");
    		}
    	}
    } 
  • onComplete()

    Signals the completion of a subscription. Use this method to perform whatever cleanup your application requires once a subscription has ended.

    @Override
    public void onComplete() {
    	/* nothing to do, so make this a no-op */
    } 

    Note:

    You must implement this method in your stream processing application, because streaming from a KVStore table is unbounded by nature, so onComplete() will never be called. Any no-op implementation of this method will be ignored.
  • onError()

    Signals that the subscription encountered an irrecoverable error and has to be terminated. The argument passed to this method is a java.lang.Throwable class instance. Use this method to perform whatever actions you want to take in response to the error.

    @Override
    public void onError(Throwable t) {
    	logger.severe("Error: " + t.getMessage());
    } 
  • onWarn()

    Signals that the subscription encountered an irrecoverable error and has be terminated. The argument passed to this method is a java.lang.Throwable class instance. Use this method to perform whatever actions you want to take in response to the warning.

    @Override
    public void onWarn(Throwable t) {
    	logger.warning("Warning: " + t.getMessage());
    } 

    A warning does not end the subscription. Warnings in the form of ShardTimeoutException are provided as a way to inform the application that a particular shard is not responding.

  • onCheckPointComplete()

    Signals when a previously requested checkpoint has been completed. Checkpoints are performed by calling NoSQLSubscription.doCheckpoint(). Note that if an error occurred, the subscription will lose the checkpoint but the subscription itself will not terminate, and will continue streaming. See Using Checkpoints.

    Call this method with two arguments:

    • oracle.kv.pubsub.StreamPosition

      Identifies the location in the stream where the checkpoint was performed.

    • java.lang.Throwable

      Null, unless an error occurred while taking the checkpoint.

    @Override
    public void onCheckpointComplete(StreamPosition pos, 
                                     Throwable cause) {
    	if (cause == null) {
    		logger.info("Finish checkpoint at position " + pos);
    	} else {
    		logger.warning("Fail to checkpoint at position " + pos +
    					   ", cause: " + cause.getMessage());
    	}
    } 
  • onChangeResult()

    Adding and removing tables from running subscription streams are made using asynchronous calls. The asynchronous calls will return immediately without any return value. The result of the operation can be fetched using the onChangeResult callback method after the change is effective. If the change was successful, this method will be called with a non-null stream position that represents the first stream position for which the change has taken effect. If the change was unsuccessful, but the subscription is still active, this method will be called with a non-null exception that describes the cause of the failure. If the change caused the subscription to be canceled, this method will not be called, and the onError method will be called instead.

  • getSubscriptionConfig()

    Use this method to return the oracle.kv.pubsub.NoSQLSubscriptionConfig object used by this subscription. This method is invoked by the publisher when it is creating a subscription.