NoSQLSubscriberの例
この項では、GSGSubscriberExample
と呼ばれる、完全でありながら簡単なNoSQLSubscriber
の例を提供します。この実装は、Streamsの例に示されているパブリッシャの例で使用されます。
GSGSubscriberExample
は、Users
という1つの表をサブスクライブします。この表を定義し、表の行をそれに書き込むアプリケーションを確認するには、GSGStreamsWriteTableを参照してください。
まず、インポートを提供します。org.reactivestreams.Subscription
は必須のインポートです。このサンプル・コードのコンパイルと実行の両方を行うには、Java環境のクラスパスにreactive-streams.jar
ファイルが必要です。
package pubsub;
import java.util.List;
import oracle.kv.pubsub.NoSQLSubscriber;
import oracle.kv.pubsub.NoSQLSubscription;
import oracle.kv.pubsub.NoSQLSubscriptionConfig;
import oracle.kv.pubsub.StreamOperation;
import oracle.kv.pubsub.StreamPosition;
import oracle.kv.table.MapValue;
import oracle.kv.table.Row;
import org.reactivestreams.Subscription;
次に、クラスを宣言し、データ・メンバーを初期化します。前述のように、これはNoSQLSubscriber
の実装です。
class GSGSubscriberExample implements NoSQLSubscriber {
/* subscription configuration */
private final NoSQLSubscriptionConfig config;
/* number of operations to stream */
private final int numOps;
/* number of operations seen in the stream */
private long streamOps;
private NoSQLSubscription subscription;
private boolean isSubscribeSucc;
private Throwable causeOfFailure;
GSGSubscriberExample(NoSQLSubscriptionConfig config,
int numOps) {
this.config = config;
this.numOps = numOps;
causeOfFailure = null;
isSubscribeSucc = false;
streamOps = 0;
subscription = null;
}
最初に行うのは、NoSQLSubscriber.getSubscriptionConfig()
を実装することです。これは単に、実装Streamsアプリケーションによってクラスが構築されるときにクラスに提供されるNoSQLSubscriptionConfig
オブジェクトを返します。このメソッドにより、パブリッシャは、このサブスクライバのストリームを構成する方法を理解します。
@Override
public NoSQLSubscriptionConfig getSubscriptionConfig() {
return config;
}
onSubscribe()
に指定する実装によって、いくつかのことが行われます。まず、NoSQLSubscription
クラス・インスタンスをこのサブスクライバ実装で使用できるようにします。インスタンスはこのクラスにorg.reactivestreams.Subscription
型のオブジェクトとして渡され、そのオブジェクトはNoSQLSubscription
にキャストされる必要があります。
このメソッドは、このサブスクライバがサブスクリプションに対して操作のリクエストを開始する場所でもあります。NoSQLSubscription.request()
へのそのコールがない場合、このサブスクライバは、処理する操作を受け取りません。この簡単な実装では、操作はこの場所でのみリクエストされます。より複雑な実装では、操作が最初にここで要求され、サブスクライバがその数の操作を受け取った後、クラスの別の部分(通常はonNext()
)でより多くの操作が要求されることがあります。
最後に、サブスクリプションの試行が成功したことを通知します。この情報は、パブリッシャおよびサブスクライバを作成するときに、Streamsアプリケーションによって使用されます。
@Override
public void onSubscribe(Subscription s) {
subscription = (NoSQLSubscription) s;
subscription.request(numOps);
isSubscribeSucc = true;
}
次に、エラー・ハンドラと警告ハンドラを設定します。onError
がコールされるときには、サブスクリプションはすでに取り消されています。ここでは、簡単な処理を行って、単にコンソールに書き込みます。ただし、より堅牢な実装では、アプリケーション・ログ・ファイルへの書込みが行われ、エラーの性質によっては、他の通知や修正アクション(ストリーム全体の処理の終了など)が行われる可能性もあります。
@Override
public void onError(Throwable t) {
causeOfFailure = t;
System.out.println("Error: " + t.getMessage());
}
@Override
public void onWarn(Throwable t) {
System.out.println("Warning: " + t.getMessage());
}
アプリケーションはonComplete()
メソッドを提供する必要がありますが、このメソッドはコールされないため、実装では何も実行する必要はありません。
@Override
public void onComplete() {
/* no-op */
}
この例ではチェックポイントを実装しないため(詳細は、チェックポイントの使用方法を参照)、このメソッドで行うことは何もありません。
/* called when publisher finishes a checkpoint */
@Override
public void onCheckpointComplete(StreamPosition pos,
Throwable cause) {
/* no-op. This example doesn't implement checkpoints */
}
onNext()
メソッドは、サブスクライバが個々のストリーム操作をStreamOperation
オブジェクトの形式で受け取って処理する場所です。
次のメソッドでは、サブスクリプションが受け取った操作のタイプ(put
またはdelete
)を確認する方法を示します。個々の操作で何を行うかは、アプリケーションの要件によって決まります。この場合、put
操作については、囲まれたRow
オブジェクトからフィールド情報を取得し、それをコンソールに書き込みます。このコードはあまり堅牢ではありません。具体的には、特定のスキーマを持つJSONデータを想定しています。有効なJSONであればJSON表の列に書き込むことができるため、本番アプリケーションではJSON列に予期したスキーマが含まれていることを確認できるように、なんらかの防御コードがここに必要になります。
delete
操作については、単にStreamOperation
オブジェクトをコンソールに書き込みます。
@Override
public void onNext(StreamOperation t) {
switch (t.getType()) {
case PUT:
streamOps++;
System.out.println("\nFound a put. Row is:");
StreamOperation.PutEvent pe = t.asPut();
Row row = pe.getRow();
Integer uid = row.get("uid").asInteger().get();
System.out.println("UID: " + uid);
MapValue myjson = row.get("myJSON").asMap();
int quantity = myjson.get("quantity")
.asInteger().get();
String array =
myjson.get("myArray").asArray().toString();
System.out.println("\tQuantity: " + quantity);
System.out.println("\tmyArray: " + array);
break;
case DELETE:
streamOps++;
System.out.println("\nFound a delete. Row is:");
System.out.println(t);
break;
default:
throw new
IllegalStateException("Receive unsupported " +
"stream operation from shard " +
t.getRepGroupId() +
", seq: " + t.getSequenceId());
}
if (streamOps == numOps) {
getSubscription().cancel();
System.out.println("Subscription cancelled after " +
"receiving " + numOps + " operations.");
}
}
最後に、このサブスクライバから目的の情報を取得するためにストリーム・アプリケーションで使用される一連のgetterメソッドを提供します。Streamsパブリッシャの使用方法に、これらの使用方法が示されています。
String getCauseOfFailure() {
if (causeOfFailure == null) {
return "success";
}
return causeOfFailure.getMessage();
}
boolean isSubscriptionSucc() {
return isSubscribeSucc;
}
long getStreamOps() {
return streamOps;
}
NoSQLSubscription getSubscription() {
return subscription;
}
}