サブスクライバのスケーリング
異なるノードで実行されるサブスクライバを追加または削除するには、次の追加ビルダーAPIを使用して
NoSQLSubscriptionConfigを作成する必要があります。 /* step 3: create a subscription configuration */
final NoSQLSubscriptionConfig subscriptionConfig =
// Scalable subscriber should set Subscriber Id
// with 2 as total number of subscribers and
// 0 as its own SubscriberId within the group of 2 subscribers
new NoSQLSubscriptionConfig.Builder(CKPT_TABLE_NAME)
.setSubscribedTables("usertable")
.setSubscriberId(new NoSQLSubscriberId(2,0))
.setStreamMode(streamMode)
.build(); APIのsetSubscriberId()では、1つの引数NoSQLSubscriberIDを使用します。NoSQLSubscriberIdは、サブスクライバの合計数とサブスクライバ索引の両方を含むオブジェクトです。したがって、NoSQLubscriberIdオブジェクトを構築するには、次の2つの引数が必要です。
-
サブスクライバの数
異なるノードで実行されるサブスクライバの合計数。たとえば、前述のコード例では、
NoSQLSubscriberIdが作成した.setSubscriberId(new NoSQLSubscriberId(2,0))には、合計で2つのサブスクライバがあります。 -
サブスクライバ索引
サブスクライバの合計数の中における現在のサブスクライバの数値索引。数値索引は0から始まります。たとえば、2つのサブスクライバ・クライアントは0および1として識別できます。
サブスクライバ・グループの一部として複数のシャード・サブスクライバを構成します
サブスクライバ・グループのシャード・サブスクライバに対して
NoSQLSubscriptionConfigオブジェクトを構築するには、マッパー関数の単一パラメータを持つNoSQLSubscriptionConfig.Builderクラスのコンストラクタを使用します。この関数は、グループ内の各サブスクライバIDから計算された個別のチェックポイント表名を返します。シャード・サブスクライバを構成するためのBuilderクラスのサンプル・コンストラクタを次に示します。public void Builder(Function<NoSQLSubscriberId, String> ckptTableMapper);シャード・サブスクライバのNoSQLSubscriptionConfigオブジェクトは、次のように構築できます。/** * Configures a sharded subscriber
* @param subscriberId subscriber id
* @param tables subscribed tables
*/
void configureShardedSubscribers(NoSQLSubscriberId subscriberId, Set<String> tables) {
finalNoSQLSubscriptionConfig conf =
newNoSQLSubscriptionConfig.Builder( id -> buildCkptTableName(id))
.setSubscribedTables(tables)
.setSubscriberId(subscriberId)
.build();
}buildCkptTableNameがサブスクライバIDをStreamCheckpointTableに連結してサブスクライバIDからチェックポイント名を構築するコード・スニペットの例を次に示します。これは、マッパー関数を使用する1つの方法です。アプリケーションは、サブスクライバごとに一意のチェックポイント表名を返すかぎり、独自のマッパー関数を作成できます。/**
* Builds stream checkpoint table from given subscriber id
* @param subscriberId subscriber id
* @return stream checkpoint table name for that subscriber
*/
private String buildCkptTableName(NoSQLSubscriberId subscriberId) {
return "StreamCheckpointTable" + subscriberId.toString();
}