NoSQLSubscriberインタフェースの使用方法

サブスクライバを実装するには、org.reactivestreams.Subscriberの拡張であるoracle.kv.pubsub.NoSQLSubscriberインタフェースを使用します。NoSQLSubscriberには、実装する必要がある次のメソッドが用意されています。

  • onSubscribe()

    これは、パブリッシャがOracle NoSQL Databaseストアとの接続を正常に確立した後に起動されるメソッドです。このメソッドに渡す引数は、org.reactivestreams.Subscriptionインスタンスです。これはその後、oracle.kv.pubsub.NoSQLSubscriptionにキャストできます。サブスクリプションの操作を参照してください。

    ...
    private NoSQLSubscription subscription;
    ...
    
    @Override
    public void onSubscribe(Subscription s) {
        subscription = (NoSQLSubscription) s;    
        // request 100 store operations be streamed to this 
        // subscriber.
        s.request(100);
    } 

    ノート:

    onSubscribe()内でs.request(100)メソッドをコールする必要はありません。NoSQLSubscriptionのインスタンスが使用可能になると、onSubscribe()の外部でこのメソッドをコールできます。ここでのonSubscribe()メソッドの重要な点は、パブリッシャが生成するサブスクリプション・インスタンスをユーザーに渡すことです。

  • onNext()

    次のOracle NoSQL Database操作を通知します。このメソッドにはStreamOperationクラス・インスタンスを渡します。StreamOperationクラスの使用方法を参照してください。このメソッドでは、ストリーム・イベントに対して行う処理を実行します。

    @Override
    public void onNext(StreamOperation t) {
        // perform processing on the StreamOperation
        // here. Typically you will do different
        // things depending on whether this is
        // a put,delete, or transaction event.
        switch (t.getType()) {
            case PUT:
                {
                    // Process the put operation here.
                }
                break;
            case DELETE:
                {
                    // Process the delete operation here.
                }
                break;
            case TRANSACTION:
                {
                    // Process the transaction operation here.
                }
                break;
            default:
                // Received an unknown and therefore illegal operation 
                // type.
                throw new
                IllegalStateException("... exception message ...");
        }
    }
    }

    変更ストリームの行操作にユーザー定義の行メタデータが含まれている場合、このメタデータは、onNext()によって受信される対応するStreamOperationオブジェクトで使用可能になります。サブスクライバは、各行操作にアタッチされたこのユーザー定義情報にコア・データとともにアクセスできます。これは、様々なAPIを介して実行されるすべての行変更に適用されます。ユーザー定義の行メタデータの詳細は、開発者ガイドユーザー定義行メタデータの使用を参照してください。

  • onComplete()

    サブスクリプションの完了を通知します。このメソッドを使用して、サブスクリプションが終了した後にアプリケーションが必要とするクリーン・アップを実行します。

    @Override
    public void onComplete() {
    	/* nothing to do, so make this a no-op */
    } 

    ノート:

    本来、KVStore表からのストリーミングはバインド解除されず、onComplete()がコールされることはないため、ストリーム処理アプリケーションではこのメソッドを実装する必要があります。このメソッドのno-op実装は無視されます。
  • onError()

    サブスクリプションでリカバリ不能なエラーが発生したため、終了する必要があることを通知します。このメソッドに渡される引数は、java.lang.Throwableクラス・インスタンスです。このメソッドを使用して、エラーに対して行うアクションを実行します。

    @Override
    public void onError(Throwable t) {
    	logger.severe("Error: " + t.getMessage());
    } 
  • onWarn()

    サブスクリプションでサービスの潜在的な問題が発生したことを通知します。このメソッドに渡される引数は、java.lang.Throwableクラス・インスタンスです。このメソッドを使用して、警告に対して行うアクションを実行します。

    @Override
    public void onWarn(Throwable t) {
    	logger.warning("Warning: " + t.getMessage());
    } 

    警告ではサブスクリプションは終了しません。ShardTimeoutException形式の警告は、長時間応答しない特定のシャードなど、中断のないサービスの問題をアプリケーションに通知する手段として提供されます。

  • onCheckPointComplete()

    以前にリクエストされたチェックポイントが完了したときに通知します。チェックポイントは、NoSQLSubscription.doCheckpoint()をコールすることによって実行されます。エラーが発生した場合、サブスクリプションはチェックポイントを失いますが、サブスクリプション自体は終了せず、ストリーミングを続行します。チェックポイントの使用方法を参照してください。

    このメソッドは、次の2つの引数を指定してコールします。

    • oracle.kv.pubsub.StreamPosition

      チェックポイントが実行されたストリーム内の位置を識別します。

    • java.lang.Throwable

      チェックポイントの取得中にエラーが発生した場合を除き、Nullです。

    @Override
    public void onCheckpointComplete(StreamPosition pos, 
                                     Throwable cause) {
    	if (cause == null) {
    		logger.info("Finish checkpoint at position " + pos);
    	} else {
    		logger.warning("Fail to checkpoint at position " + pos +
    					   ", cause: " + cause.getMessage());
    	}
    } 
  • onChangeResult()

    実行中のサブスクリプション・ストリームに対する表の追加および削除は、非同期コールを使用して行われます。非同期コールは、値を返さずに即座に制御を戻します。変更が有効になったら、onChangeResultコールバック・メソッドを使用して、操作の結果をフェッチできます。変更が成功した場合は、変更が有効になった最初のストリーム位置を表すnull以外のストリーム位置を使用して、このメソッドがコールされます。変更は成功しなかったが、サブスクリプションが引き続きアクティブである場合は、失敗の原因を示すnull以外の例外を使用してこのメソッドがコールされます。変更が原因でサブスクリプションが取り消された場合、このメソッドはコールされず、かわりにonErrorメソッドがコールされます。

  • getSubscriptionConfig()

    このメソッドを使用して、このサブスクリプションで使用されるoracle.kv.pubsub.NoSQLSubscriptionConfigオブジェクトを返します。このメソッドは、パブリッシャがサブスクリプションを作成する際にパブリッシャによって起動されます。