NoSQLSubscriberの例

この項では、GSGSubscriberExampleと呼ばれる、完全でありながら簡単なNoSQLSubscriberの例を提供します。この実装は、Streamsの例に示されているパブリッシャの例で使用されます。

GSGSubscriberExampleは、Usersという1つの表をサブスクライブします。この表を定義し、表の行をそれに書き込むアプリケーションを確認するには、GSGStreamsWriteTableを参照してください。

まず、インポートを提供します。org.reactivestreams.Subscriptionは必須のインポートです。このサンプル・コードのコンパイルと実行の両方を行うには、Java環境のクラスパスにreactive-streams.jarファイルが必要です。

package pubsub;

import java.util.List;

import oracle.kv.pubsub.NoSQLSubscriber;
import oracle.kv.pubsub.NoSQLSubscription;
import oracle.kv.pubsub.NoSQLSubscriptionConfig;
import oracle.kv.pubsub.StreamOperation;
import oracle.kv.pubsub.StreamPosition;

import oracle.kv.table.MapValue;
import oracle.kv.table.Row;

import org.reactivestreams.Subscription; 

次に、クラスを宣言し、データ・メンバーを初期化します。前述のように、これはNoSQLSubscriberの実装です。

class GSGSubscriberExample implements NoSQLSubscriber {

    /* subscription configuration */
    private final NoSQLSubscriptionConfig config;

    /* number of operations to stream */
    private final int numOps;

    /* number of operations seen in the stream */
    private long streamOps;

    private NoSQLSubscription subscription;

    private boolean isSubscribeSucc;

    private Throwable causeOfFailure;

    GSGSubscriberExample(NoSQLSubscriptionConfig config,
                                 int numOps) {
        this.config = config;
        this.numOps = numOps;

        causeOfFailure = null;
        isSubscribeSucc = false;
        streamOps = 0;
        subscription = null;
    }

最初に行うのは、NoSQLSubscriber.getSubscriptionConfig()を実装することです。これは単に、実装Streamsアプリケーションによってクラスが構築されるときにクラスに提供されるNoSQLSubscriptionConfigオブジェクトを返します。このメソッドにより、パブリッシャは、このサブスクライバのストリームを構成する方法を理解します。

    @Override
    public NoSQLSubscriptionConfig getSubscriptionConfig() {
        return config;
    } 

onSubscribe()に指定する実装によって、いくつかのことが行われます。まず、NoSQLSubscriptionクラス・インスタンスをこのサブスクライバ実装で使用できるようにします。インスタンスはこのクラスにorg.reactivestreams.Subscription型のオブジェクトとして渡され、そのオブジェクトはNoSQLSubscriptionにキャストされる必要があります。

このメソッドは、このサブスクライバがサブスクリプションに対して操作のリクエストを開始する場所でもあります。NoSQLSubscription.request()へのそのコールがない場合、このサブスクライバは、処理する操作を受け取りません。この簡単な実装では、操作はこの場所でのみリクエストされます。より複雑な実装では、操作が最初にここで要求され、サブスクライバがその数の操作を受け取った後、クラスの別の部分(通常はonNext())でより多くの操作が要求されることがあります。

最後に、サブスクリプションの試行が成功したことを通知します。この情報は、パブリッシャおよびサブスクライバを作成するときに、Streamsアプリケーションによって使用されます。

    @Override
    public void onSubscribe(Subscription s) {
        subscription = (NoSQLSubscription) s;
        subscription.request(numOps);
        isSubscribeSucc = true;
    } 

次に、エラー・ハンドラと警告ハンドラを設定します。onErrorがコールされるときには、サブスクリプションはすでに取り消されています。ここでは、簡単な処理を行って、単にコンソールに書き込みます。ただし、より堅牢な実装では、アプリケーション・ログ・ファイルへの書込みが行われ、エラーの性質によっては、他の通知や修正アクション(ストリーム全体の処理の終了など)が行われる可能性もあります。

    @Override
    public void onError(Throwable t) {
        causeOfFailure = t;
        System.out.println("Error: " + t.getMessage());
    }

    @Override
    public void onWarn(Throwable t) {
        System.out.println("Warning: " + t.getMessage());
    }

アプリケーションはonComplete()メソッドを提供する必要がありますが、このメソッドはコールされないため、実装では何も実行する必要はありません。

    @Override
    public void onComplete() {
        /* no-op */
    } 

この例ではチェックポイントを実装しないため(詳細は、チェックポイントの使用方法を参照)、このメソッドで行うことは何もありません。

    /* called when publisher finishes a checkpoint */
    @Override
    public void onCheckpointComplete(StreamPosition pos,
                                     Throwable cause) {
        /* no-op. This example doesn't implement checkpoints */
    } 

onNext()メソッドは、サブスクライバが個々のストリーム操作をStreamOperationオブジェクトの形式で受け取って処理する場所です。

次のメソッドでは、サブスクリプションが受け取った操作のタイプ(putまたはdelete)を確認する方法を示します。個々の操作で何を行うかは、アプリケーションの要件によって決まります。この場合、put操作については、囲まれたRowオブジェクトからフィールド情報を取得し、それをコンソールに書き込みます。このコードはあまり堅牢ではありません。具体的には、特定のスキーマを持つJSONデータを想定しています。有効なJSONであればJSON表の列に書き込むことができるため、本番アプリケーションではJSON列に予期したスキーマが含まれていることを確認できるように、なんらかの防御コードがここに必要になります。

delete操作については、単にStreamOperationオブジェクトをコンソールに書き込みます。

    @Override
    public void onNext(StreamOperation t) {

        switch (t.getType()) {
            case PUT:
                streamOps++;
                System.out.println("\nFound a put. Row is:");

                StreamOperation.PutEvent pe = t.asPut();
                Row row = pe.getRow();

                Integer uid = row.get("uid").asInteger().get();
                System.out.println("UID: " + uid);

                MapValue myjson = row.get("myJSON").asMap();
                int quantity = myjson.get("quantity")
                                     .asInteger().get();
                String array = 
                    myjson.get("myArray").asArray().toString();
                System.out.println("\tQuantity: " + quantity);
                System.out.println("\tmyArray: " + array);
                break;
            case DELETE:
                streamOps++;
                System.out.println("\nFound a delete. Row is:");
                System.out.println(t);
                break;

            default:
                throw new 
                    IllegalStateException("Receive unsupported " +
                        "stream operation from shard " +
                                t.getRepGroupId() +
                                ", seq: " + t.getSequenceId());
        }
        if (streamOps == numOps) {
            getSubscription().cancel();
            System.out.println("Subscription cancelled after " +
                    "receiving " + numOps + " operations.");
        }
    } 

最後に、このサブスクライバから目的の情報を取得するためにストリーム・アプリケーションで使用される一連のgetterメソッドを提供します。Streamsパブリッシャの使用方法に、これらの使用方法が示されています。

    String getCauseOfFailure() {
        if (causeOfFailure == null) {
            return "success";
        }
        return causeOfFailure.getMessage();
    }

    boolean isSubscriptionSucc() {
        return isSubscribeSucc;
    }

    long getStreamOps() {
        return streamOps;
    }

    NoSQLSubscription getSubscription() {
        return subscription;
    }

}