25 基本的なトピック・パブリッシュおよびサブスクライブ操作の実行
- トピックAPIの概要
com.tangosol.net.NamedTopic<V>インタフェースは、トピックをパブリッシュおよびサブスクライブするためにアプリケーションで使用される初期インタフェースです。PublisherおよびSubscriberを作成するファクトリ・メソッドがあります。サブスクライバ・グループを管理するメソッドもあります。 - トピック・インスタンスの取得
NamedTopicインスタンスへの参照を取得するために、アプリケーションはSessionAPIまたはCacheFactoryAPIを使用できます。 - NamedTopic型チェックの使用
Coherenceでは、明示的な型を使用してSessionまたはCacheFactoryAPIのいずれかを使用している場合、厳密に型指定されたNamedTopicインスタンスをリクエストできます。 - トピックへのパブリッシュ
アクティブなPublisher(クローズされていない)は、送信メソッドを使用してトピックに値を非同期で送信し、CompletableFutureを返します。 - トピックへの直接サブスクライブ
アクティブなサブスクライバは、トピックに配信されるすべての値を受信します。 - サブスクライバ・リソースの解放
サブスクライバは、Subscriber.close()操作を明示的に起動するか、try-with-resources文を使用してサブスクライバを作成することにより、クローズできます。 - サブスクライバ・グループのサブスクライブ
トピック値は、サブスクライバ・グループ・メンバーが受信するか期限切れになるまで保持されます。デフォルトでは、トピック値に有効期限はありません。 - NamedTopicのサブスクライバ・グループ
サブスクライバ・グループは、2つの方法で作成できます。 - トピックのサブスクライバ・グループの破棄
サブスクライバ・グループの有効期間は、そのサブスクライバ・グループ・メンバーに依存しません。 - トピックによって使用されるリソースの管理
パブリッシャとサブスクライバの両方にリソースが関連付けられており、不要になったリソースを閉じる必要があります。 - トピック・フロー制御の理解
トピックのPublisher.sendおよびSubscriber.receiveメソッドを使用すると、データ交換リクエストを送信する非同期(非ブロッキング)方法が提供されます。これらのリクエストを効率的に管理するためのデフォルトのフロー制御メカニズムが用意されています。 - トピック記憶域に上限を設定するためのパブリッシャ・フロー制御の管理
トピックにパブリッシュされる値が、その値を処理するサブスクライバの能力を大きく上回っているデータ処理パイプラインでは、トピック上の未使用の値に対して無制限の記憶域を許可するか、トピックに保持されている値に対する最大記憶域サイズの制約を設定できます。
親トピック: データ・グリッド操作の実行
トピックAPIの概要
com.tangosol.net.NamedTopic<V>インタフェースは、トピックをパブリッシュおよびサブスクライブするためにアプリケーションで使用される初期インタフェースです。PublisherおよびSubscriberを作成するファクトリ・メソッドがあります。サブスクライバ・グループを管理するメソッドもあります。
com.tangosol.net.Pubisher<V>インタフェースには、トピックに値をパブリッシュする非同期のsendメソッドがあります。このメソッドは、送信完了時または例外がスローされた場合にトラッキングを有効にするためのCompletableFuture<Void>を返します。
com.tangosol.net.Subscriber<V>インタフェースには、CompleteFuture<Element<V>>を返すreceiveメソッドがあります。サブスクライバは、(Subscriber.Name.of (subscriberGroupName)を使用して) subscriber groupをサブスクライブするか、トピックを直接サブスクライブします。
パブリッシャ作成オプション
次に、NamedTopic.createPublisher(Publisher.Option[]とともに使用するパブリッシャ作成オプションを示します。
| パブリッシャ・オプション | 説明 |
|---|---|
OnFailure.Stop |
Default。個々のPublisher.send(V)呼出しが失敗した場合は、その後のパブリッシュを停止し、Publisherをクローズします。
|
OnFailure.Continue |
個々のPublisher.send(V)呼出しが失敗した場合は、その値をスキップし、引き続き他の値をパブリッシュします。
|
FailOnFull.enabled() |
トピックの未処理値の記憶域サイズが構成済のhigh-unitsを超える場合、Publisher.send(V)呼出しから返されるCompletableFutureは異常終了します。トピック上でスペースが使用可能になると、操作が完了するまで完了をブロックするようにデフォルトをオーバーライドします。
|
OrderBy.thread() |
Default。同じスレッドからのすべてのvalues sentが順次に格納されていることを確認します。
|
OrderBy.none() |
sent values間で特定の順序を強制しない場合は、最大レベルの並列度が許可されます。
|
OrderBy.id(int) |
同じIDを共有するすべてのスレッド間でsent valuesが順序付けされていることを確認してください。
|
OrderBy.value(ToIntFunction) |
sent valueに適用されているこのメソッドに基づいて、順序単位を計算します。
|
親トピック: トピックAPIの概要
サブスクライバ作成オプション
次に、NamedTopic.createSubscriber(Subscriber.Option[]とともに使用するサブスクライバ作成オプションを示します。
| サブスクライバ・オプション | 説明 |
|---|---|
Name.of(String) |
メンバーとして参加するサブスクライバ・グループ名を指定します。サブスクライバ・グループに配信される値ごとに、そのグループの1つのメンバーのみが値を受信します。 |
Filtered.by(Filter) |
このFilterに一致するSubscriber.receive()値のみ。サブスクリプション・グループごとに1つのFilterのみ。
|
Convert.using(Function) |
Subscriber.receive()より前に提供されたFunctionを使用して、トピック値を変換します。サブスクリプション・グループごとに1つのConverterのみ。
|
CompleteOnEmpty.enabled() |
Subscriber.receive()に残された値がない場合、返されるCompletableFuture.get()はnull要素を返します。デフォルトでは、返されるCompletableFuture.get()は次のトピック値を戻せるようになるまでブロックされます。
|
com.tangosol.net.Subscriber<V>インタフェースには、CompleteFuture<Element<V>>を返すreceiveメソッドがあります。サブスクライバは、(Subscriber.Name.of (subscriberGroupName)を使用して) subscriber groupをサブスクライブするか、トピックを直接サブスクライブします。
親トピック: トピックAPIの概要
トピック・インスタンスの取得
NamedTopicインスタンスへの参照を取得するために、アプリケーションではSession APIまたはCacheFactory APIを使用できます。Session APIでは、非静的なCoherenceセッションの使用および注入を可能にする簡潔なメソッド・セットが提供されるため、これはトピックの取得に推奨される方法です。対照的に、CacheFactory APIでは、Coherenceの内部的な概念やサービスの知識を必要とする、多くの静的メソッドが公開されています。Session APIにより、より効率的なライフサイクルおよび他のフレームワーク、特に注入を使用するフレームワークとの統合が可能になります。
次の例では、デフォルトのセッション・プロバイダを使用してセッションを作成し、Session.getTopicメソッドを使用してNamedTopicインスタンスへの参照を取得する方法を示します。トピックの名前はパラメータとして含まれます。
import com.tangosol.net.*;
...
Session session = Session.create();
NamedTopic<String> Topic = session.getTopic(“topic”,ValueTypeAssertion.withType(String.class));または
ConfigurableCacheFactory ccf = CacheFactory.getConfigurableCacheFactory();
NamedTopic<String> topic =
ccf.ensureTopic(“topic”, ValueTypeAssertion.withType(String.class));汎用型チェックを省略するには、topic-mapping構成でオプションの<value-type>定義を省略し、Session.getTopicまたはConfigurableCacheFactory.ensureTopicメソッドのパラメータとしてのValueTypeAssertionオプションを省略します。
NamedTopic型チェックの使用
SessionまたはCacheFactory APIのいずれかを使用している場合、厳密に型指定されたNamedTopicインスタンスをリクエストできます。デフォルトでは、NamedTopic<Object>インスタンスが返されます。これは、NamedTopicインスタンスを作成および使用するための最も柔軟なメカニズムですが、トピック・インスタンスとのやり取りに必要な値の型をアプリケーションで設定する必要があります。
次の例では、アプリケーションで使用する任意のタイプのオブジェクトを格納します。
import com.tangosol.net.*;
...
Session session = Session.create();
NamedTopic<Object> topic = session.getTopic(“MyTopic”);または
ConfigurableCacheFactory ccf = CacheFactory.getConfigurableCacheFactory();
NamedTopic<Object> topic = ccf.ensureTopic(“MyTopic”);getTopicメソッドは、型チェックすることなく、必要に応じて特定の型のNamedTopicインスタンスをリクエストする場合に使用できます。ValueTypeAssertionインタフェースをgetTopicメソッドとともに使用して、NamedTopicインスタンスで使用される値の型の正確性をアサートします。このメソッドは、トピックがRAW型を使用する必要があることをアサートする場合に使用できます。たとえば:
NamedTopic<Object> topic = session.getTopic("MyTopic",
ValueTypeAssertion.withRawTypes());
同様に、CacheFactoryを使用している場合、
NamedTopic<String> topic =
ccf.ensureTopic(“MyTopic”, ValueTypeAssertion.withRawTypes());型の安全性を強化するために、トピックを作成して、トピックで使用される値の型を明示的にアサートできます。たとえば、トピックを作成して、値の型がStringである必要があることをアサートするには、アプリケーションで次を使用できます。
NamedTopic<String> topic = session.getTopic("MyTopic",
ValueTypeAssertion.withTypes(String.class);
NamedTopicインスタンスは、アサートされた型に準拠する必要はなく、無視することもできますが、コンパイル時に警告メッセージが表示されます。たとえば:
NamedTopic topic = session.getTopic("MyTopic",
ValueTypeAssertion.withTypes(String.class);
同様に、RAW型を使用し、ネーム・キャッシュ・インスタンスで特定の型を使用できることをアサートすることを、アプリケーションで選択することもできます。ただし、どちらの場合も、型がチェックされないままでは、エラーが発生することがあります。たとえば:
NamedTopic<String> topic = session.getTopic("MyTopic",
ValueTypeAssertion.withRawTypes());
型の安全性を最も強くするには、特定の型をトピック定義の一部として、キャッシュ構成ファイルに宣言することもできます。トピック定義の部分として構成された型と異なる型をアプリケーションが使用すると、ランタイム・エラーが発生します。次の例では、String型の値のみをサポートするトピックを構成します。
<topic-mapping> <topic-name>MyTopic</topic-name> <scheme-name>distributed-topic</scheme-name> <value-type>String</value-type> </topic-mapping>
最後に、明示的な型チェックの無効化をアプリケーションで選択できます。型チェックが無効の場合、型の安全性はアプリケーションで確保する必要があります。
NamedTopic<Object> topic = session.getTopic("MyTopic",
ValueTypeAssertion.withoutTypeChecking());トピックへのパブリッシュ
Publisher (クローズされていない)は、送信メソッドを使用してトピックに値を非同期で送信し、CompletableFutureを返します。たとえば、CompletableFutureでjoinを呼び出すことにより、非同期呼出しが同期呼出しに変わります。
多数の未処理の非同期操作の完了を処理するために使用可能な多数のオプションについては、「CompletableFuture」を参照してください。
NamedTopic<String> topic = ….; Publisher<String> publisher = topic.createPublisher(); CompletableFuture future = publisher.send(“someValue”); // Completes when asynchronous send completes. // Throws an exception if send operation completed exceptionally. future.join();
トピック値の要件
トピック値はシリアライズ可能である必要があります(たとえば、java.io.SerializableまたはCoherence Portable Object Formatのシリアライズ)。Portable Object Formatの使用を参照してください
親トピック: トピックへのパブリッシュ
未処理のパブリッシャ送信の完了の待機
パブリッシャが、各送信の完了を待機せずにトピックに複数の値を送信した場合、Publisher.flush().join()呼出しは、保留中のすべてのPublisher.send()の完了を同期的に待機します。1つ以上の未処理の操作が異常終了しても、この呼出しは常に正常に完了します。トピックに値が正常にパブリッシュされなかったときにそれを検出する必要がある場合は、CompletableFutureメソッドを使用して異常終了を検出します。
親トピック: トピックへのパブリッシュ
パブリッシャ・リソースの解放
Publisher.close()を明示的に呼び出すか、例25-1に示すようにtry-with-resources文を使用してパブリッシャを作成することにより、パブリッシャ・リソースを解放します。Publisher.close()はブロック操作であり、クローズが完了するまで戻りません。パブリッシャは、クローズされた後はアクティブではなくなります(「Publisher.isActive()」を参照)。そのPublisherでのその後のすべてのsend操作は、パブリッシャがアクティブでなくなったことを示すIllegalStateExceptionにより失敗します。ただし、close呼出しの前に行われている未処理のsend操作については、すべて未処理のCompletableFuturesを完了できます。Publisher.flush.join()と同様、クローズは正常に終了する場合も異常終了する場合もあり、異常終了する未処理のCompletableFuturesに対してはクローズから例外がスローされません。
親トピック: トピックへのパブリッシュ
トピックへの直接サブスクライブ
NamedTopic<String> topic = ….; Subscriber<String> subscriber = topic.createSubscriber(); CompletableFuture<Element<String>> future = receive(); String received = future.get().getValue();
サブスクライバ・リソースの解放
Subscriber.close()操作を明示的に起動するか、try-with-resources文を使用してサブスクライバを作成することにより、クローズできます。詳細は、例25-2を参照してください。
サブスクライバのクローズはブロック操作であり、操作が完了するまで戻りません。Subscriberに対してcloseが呼び出された後、非アクティブなSubscriberに対するその後のすべてのreceive呼出しはIllegalStateExceptionで失敗します。receive呼出しからの未処理のCompletableFutureの大部分は異常終了しますが、一部は完了する可能性があります。
サブスクライバ・グループのサブスクライブ
トピック値は、サブスクライバ・グループ・メンバーが受信するか期限切れになるまで保持されます。デフォルトでは、トピック値に有効期限はありません。
「<paged-topic-scheme>」の構成の「<expiry-delay>」サブ要素を参照してください。
NamedTopic<String> topic = ….;
Subscriber<String> subscriber =
topic.createSubscriber(Subscriber.Name.of(“subscriberGroup”));
CompletableFuture<Element<String>> future = receive();
String value = future.get().getValue();NamedTopicのサブスクライバ・グループ
サブスクライバ・グループは、2つの方法で作成できます。
coherence-cache-configファイルの<topic-mapping>要素に恒久サブスクライバ・グループを静的に構成します。例20-3を参照してください。- サブスクライバ・グループを結合する
Subscriber.Name.of(aSubscriberGroupName)オプションを使用してサブスクライバを作成し、そのトピックにsubscriber groupがまだ存在しない場合に、サブスクライバ・グループを動的に作成します。
NamedTopic.getSubscriberGroupsは、構成済のサブスクライバ・グループと動的に作成されたサブスクライバ・グループの両方を戻します。
NamedTopic<String> topic = ….; Set<String> subscriberGroups = topic.getSubscriberGroups();
トピックのサブスクライバ・グループの破棄
既存のすべてのサブスクライバ・グループ・メンバーはこの操作の影響を受け、サブスクライバ・グループ・メンバーに対する未処理の非同期受信操作はすべて取り消されます。
NamedTopic<String> topic = ….; topic.destroySubscriberGroup(“subscriberGroup”);
トピックで使用されるリソースの管理
パブリッシャとサブスクライバの両方にリソースが関連付けられており、不要になったリソースを閉じる必要があります。
PublisherとSubscriberはどちらもtry-with-resourceパターンとともに使用できます。この使用パターンにより、これらのトピック・リソースの両方に対してcloseが確実に呼び出されます。パブリッシャの自動クローズされたリソースの詳細は、例25-1を参照してください。サブスクライバの自動クローズされたリソースの詳細は、例25-2を参照してください。
例20-3に示すように、サブスクライバ・グループdurableSubscriberがトピックaTopicに対して静的に構成されるとします。
例25-1 パブリッシャ用のtry-with-resource
Session session = Session.create();
NamedTopic topic = session.createTopic(“aTopic”,
ValueTypeAssertion.withType(String.class));
try (Publisher<String> publisher = topic.createPublisher(…)) {
int MAX = …;
for (int i = 0; I < MAX; i++) {
publisher.send(“value” + i);
}
// wait for all outstanding asynchronous operations to complete before publisher is implicitly closed by exiting try-with-resource block.
publisher.flush().join();
}
例25-2 try-with-resourceによるサブスクライバの処理
//Process outstanding values on a subscriber group
try (Subscriber<String> subscriber =
topic.createSubscriber(Subscriber.Name.of(“durableSubscriber”)),
Subscriber.CompleteOnEmpy.enabled())) {
while (true) {
// asynchronous receive from direct subscriber to topic.
CompletableFuture<Element<String>> result= subscriber.receive();
Element<String> topicElement = result.get();
if (topicElement == null) {
// no more elements available
break;
} else {
String value = topicElement.getValue();
// process value received from subscriber.
...
}
// auto close subscriber and its resources. Outstanding CompletableFutures for
// receive will mostly complete exceptionally, though some may complete.
// a subscriber group accumulates future delivered values even if there
// are no active subscriber group members.
}
NamedTopic.destroy()は、すべてのトピック・リソースを解放し、インスタンスを破棄します。未処理のPublishersおよびSubscribersはすべてクローズされます。PublishersおよびSubscribersに対する未処理の操作はすべて異常終了します。Subscriber groupsは破棄され、永続記憶域を含むすべてのトピック記憶域が解放されます。
トピック・フロー制御の理解
トピックのPublisher.sendおよびSubscriber.receiveメソッドを使用すると、データ交換リクエストを送信する非同期(非ブロッキング)方法が提供されます。これらのリクエストを効率的に管理するためのデフォルトのフロー制御メカニズムが用意されています。
自動フロー制御では、未処理の送信または受信操作(あるいはその両方)が多すぎることが検出され、未処理の値を消費するためのトピックの記憶域が不足している場合、システムがすべての未処理のリクエストを遅れずに処理できるように、非同期のsendまたはreceive (あるいはその両方)に対する未処理のCompletableFutureが制限されることがあります。PublisherとSubscriberの両方のクラスは、アプリケーションがFlowControlインスタンスを取得するための方法を提供します。このインスタンスにより、自動フロー制御のオプトアウトが可能になり、リクエスト・フローの速度を手動で制御できます。詳細は、com.tangosol.net.FlowControl Javadocを参照してください。
トピック記憶域に上限を設定するためのパブリッシャ・フロー制御の管理
トピックにパブリッシュされる値が、その値を処理するサブスクライバの能力を大きく上回っているデータ処理パイプラインでは、トピック上の未使用の値に対して無制限の記憶域を許可するか、トピックに保持されている値に対する最大記憶域サイズの制約を設定できます。
デフォルトでは、トピックに保存される値に対して記憶域サイズの制約はありません。例25-3に示すように、トピックの記憶域制限が構成されている場合、トピックの記憶域使用量の制限に使用できるオプションがいくつかあります。
トピックの記憶域制限が構成された後、次の送信がトピックの記憶域制限を超える場合のデフォルトの動作として、トピックに対するパブリッシャが制限、ブロックされます。目的は、パブリッシャをブロックすることで、トピックで使用される記憶域の量を制限しながら、遅れずに処理するための時間をサブスクライバに与えることです。サブスクライバが遅れずに処理し、トピックに対するそれらの値がすべてのサブスクライバとサブスクライバのグループによって消費された後、パブリッシャは送信を再開できます。
パブリッシャの作成時に、デフォルトの動作を変更するためのオプションがあります。Publisher.FailOnFull.enabled()オプションは、パブリッシャsendがブロックされずに異常終了することを示します。デフォルトのPublisherオプションPublisher.OnFailure.Stopでは、この例外が発生した場合にパブリッシャが自動的にクローズされます。このデフォルトでは、パブリッシャは値を配信できなかった最初の例外ケースでクローズされているため、パブリッシャからパブリッシュされた値の順序が保持されます。
例25-3 FailOnFullのパブリッシュ
Session session = Session.create();
NamedTopic topic = session.createTopic(“aTopic”,
ValueTypeAssertion.withType(String.class));
try (Publisher<String> publisher = topic.createPublisher(Publisher.FailOnFull.enabled())) {
int MAX = …;
int cSent = 0;
for (int i = 0; I < MAX; i++) {
try {
// synchronous send that throws exception if publisher is full.
publisher.send(“value” + i).join();
cSent++;
} catch (Exception e) {
// when full throws CompletionException with a cause of IllegalStateException: the topic is at capacity.
break;
}
}
if (cSent != MAX) {
// all values were not published to topic.
}
}
この順序を維持する必要がない場合は、Publisher.OnFailure.Continueオプションを指定してパブリッシャを作成し、トピックが一杯になった場合に発生する送信のみが失敗するようにできます。
トピック全体への送信に対するデフォルトのブロックをオーバーライドするには、次のパターンを使用してフロー制御の一時停止からスレッドを除外します。この手法では、フロー制御が無効になり、このNonBlockingスレッドのトピックに対する記憶域の制限がなくなります。
例25-4 トピック記憶域制限に達したときのフロー制御の一時停止からのスレッドの除外
import com.tangosol.net.*;
import com.oracle.coherence.common.base.NonBlocking;
Session session = Session.create():
NamedTopic topic = Session.createTopic(“aTopic”,
ValueTypeAssertion.withType(String.class));
try (NonBlocking nb = NonBlocking();
Publisher<String> publisher = topic.createPublisher(…)) {
int MAX = …;
for (int i = 0; I < MAX; i++) {
publisher.send(“value” + i);
}
}