NoSQLSubscriberインタフェースの使用方法

サブスクライバを実装するには、org.reactivestreams.Subscriberの拡張であるoracle.kv.pubsub.NoSQLSubscriberインタフェースを使用します。NoSQLSubscriberには、実装する必要がある次のメソッドが用意されています。

  • onSubscribe()

    これは、パブリッシャがOracle NoSQL Databaseストアとの接続を正常に確立した後に起動されるメソッドです。このメソッドに渡す引数は、org.reactivestreams.Subscriptionインスタンスです。これはその後、oracle.kv.pubsub.NoSQLSubscriptionにキャストできます。サブスクリプションの操作を参照してください。

    ...
    private NoSQLSubscription subscription;
    ...
    
    @Override
    public void onSubscribe(Subscription s) {
        subscription = (NoSQLSubscription) s;    
        // request 100 store operations be streamed to this 
        // subscriber.
        s.request(100);
    } 

    ノート:

    onSubscribe()内でs.request(100)メソッドをコールする必要はありません。NoSQLSubscriptionのインスタンスが使用可能になると、onSubscribe()の外部でこのメソッドをコールできます。ここでのonSubscribe()メソッドの重要な点は、パブリッシャが生成するサブスクリプション・インスタンスをユーザーに渡すことです。

  • onNext()

    次のOracle NoSQL Database操作を通知します。このメソッドにはStreamOperationクラス・インスタンスを渡します。StreamOperationクラスの使用方法を参照してください。このメソッドでは、ストリーム・イベントに対して行う処理を実行します。

    @Override
    public void onNext(StreamOperation t) {
        // perform processing on the StreamOperation
        // here. Typically you will do different
        // things depending on whether this is
        // a put or delete event.
        switch (t.getType()) {
    		case PUT:
    			{
    				// Process the put operation here.
    			}
    			break;
    		case DELETE:
    			{
    				// Process the delete operation here.
    			}
    			break;
    		default:
                // Received an unknown and therefore illegal operation 
                // type.
                throw new 
                    IllegalStateException("... exception message ...");
    		}
    	}
    } 
  • onComplete()

    サブスクリプションの完了を通知します。このメソッドを使用して、サブスクリプションが終了した後にアプリケーションが必要とするクリーン・アップを実行します。

    @Override
    public void onComplete() {
    	/* nothing to do, so make this a no-op */
    } 

    ノート:

    本来、KVStore表からのストリーミングはバインド解除されず、onComplete()がコールされることはないため、ストリーム処理アプリケーションではこのメソッドを実装する必要があります。このメソッドのno-op実装は無視されます。
  • onError()

    サブスクリプションでリカバリ不能なエラーが発生したため、終了する必要があることを通知します。このメソッドに渡される引数は、java.lang.Throwableクラス・インスタンスです。このメソッドを使用して、エラーに対して行うアクションを実行します。

    @Override
    public void onError(Throwable t) {
    	logger.severe("Error: " + t.getMessage());
    } 
  • onWarn()

    サブスクリプションでリカバリ不能なエラーが発生したため、終了する必要があることを通知します。このメソッドに渡される引数は、java.lang.Throwableクラス・インスタンスです。このメソッドを使用して、警告に対して行うアクションを実行します。

    @Override
    public void onWarn(Throwable t) {
    	logger.warning("Warning: " + t.getMessage());
    } 

    警告ではサブスクリプションは終了しません。ShardTimeoutException形式の警告は、特定のシャードが応答していないことをアプリケーションに通知する手段として提供されます。

  • onCheckPointComplete()

    以前にリクエストされたチェックポイントが完了したときに通知します。チェックポイントは、NoSQLSubscription.doCheckpoint()をコールすることによって実行されます。エラーが発生した場合、サブスクリプションはチェックポイントを失いますが、サブスクリプション自体は終了せず、ストリーミングを続行します。チェックポイントの使用方法を参照してください。

    このメソッドは、次の2つの引数を指定してコールします。

    • oracle.kv.pubsub.StreamPosition

      チェックポイントが実行されたストリーム内の位置を識別します。

    • java.lang.Throwable

      チェックポイントの取得中にエラーが発生した場合を除き、Nullです。

    @Override
    public void onCheckpointComplete(StreamPosition pos, 
                                     Throwable cause) {
    	if (cause == null) {
    		logger.info("Finish checkpoint at position " + pos);
    	} else {
    		logger.warning("Fail to checkpoint at position " + pos +
    					   ", cause: " + cause.getMessage());
    	}
    } 
  • onChangeResult()

    実行中のサブスクリプション・ストリームに対する表の追加および削除は、非同期コールを使用して行われます。非同期コールは、値を返さずに即座に制御を戻します。変更が有効になったら、onChangeResultコールバック・メソッドを使用して、操作の結果をフェッチできます。変更が成功した場合は、変更が有効になった最初のストリーム位置を表すnull以外のストリーム位置を使用して、このメソッドがコールされます。変更は成功しなかったが、サブスクリプションが引き続きアクティブである場合は、失敗の原因を示すnull以外の例外を使用してこのメソッドがコールされます。変更が原因でサブスクリプションが取り消された場合、このメソッドはコールされず、かわりにonErrorメソッドがコールされます。

  • getSubscriptionConfig()

    このメソッドを使用して、このサブスクリプションで使用されるoracle.kv.pubsub.NoSQLSubscriptionConfigオブジェクトを返します。このメソッドは、パブリッシャがサブスクリプションを作成する際にパブリッシャによって起動されます。