26 基本的なトピック・パブリッシュおよびサブスクライブ操作の実行
- トピックAPIの概要
com.tangosol.net.NamedTopic<V>
インタフェースは、トピックをパブリッシュおよびサブスクライブするためにアプリケーションで使用される初期インタフェースです。Publisher
およびSubscriber
を作成するファクトリ・メソッドがあります。サブスクライバ・グループを管理するメソッドもあります。 - トピック・インスタンスの取得
NamedTopic
インスタンスへの参照を取得するために、アプリケーションはSession
APIまたはCacheFactory
APIを使用できます。 - NamedTopic型チェックの使用
Coherenceでは、明示的な型を使用してSession
またはCacheFactory
APIのいずれかを使用している場合、厳密に型指定されたNamedTopic
インスタンスをリクエストできます。 - トピックへのパブリッシュ
アクティブなPublisher
(クローズされていない)は、送信メソッドを使用してトピックに値を非同期で送信し、CompletableFuture
を返します。 - トピックへの直接サブスクライブ
アクティブなサブスクライバは、トピックに配信されるすべての値を受信します。 - サブスクライバ・リソースの解放
サブスクライバは、Subscriber.close()
操作を明示的に起動するか、try-with-resources
文を使用してサブスクライバを作成することにより、クローズできます。 - サブスクライバ・グループのサブスクライブ
トピック値は、サブスクライバ・グループ・メンバーが受信するか期限切れになるまで保持されます。デフォルトでは、トピック値に有効期限はありません。 - NamedTopicのサブスクライバ・グループ
サブスクライバ・グループは、2つの方法で作成できます。 - トピックのサブスクライバ・グループの破棄
サブスクライバ・グループの有効期間は、そのサブスクライバ・グループ・メンバーに依存しません。 - メッセージのコミット
メッセージが処理され、再配信しないように指定するには、サブスクライバがメッセージをコミットする必要があります。サブスクライバは、受信したすべてのメッセージをコミットする必要はありません。メッセージをコミットすると、コミットされたメッセージの前にサブスクライバが受信したメッセージが自動的にコミットされます。 - トピックによって使用されるリソースの管理
パブリッシャとサブスクライバの両方にリソースが関連付けられており、不要になったリソースを閉じる必要があります。 - トピック・フロー制御の理解
トピックのPublisher.send
およびSubscriber.receive
メソッドを使用すると、データ交換リクエストを送信する非同期(非ブロッキング)方法が提供されます。これらのリクエストを効率的に管理するためのデフォルトのフロー制御メカニズムが用意されています。 - トピック記憶域に上限を設定するためのパブリッシャ・フロー制御の管理
トピックにパブリッシュされる値が、その値を処理するサブスクライバの能力を大きく上回っているデータ処理パイプラインでは、トピック上の未使用の値に対して無制限の記憶域を許可するか、トピックに保持されている値に対する最大記憶域サイズの制約を設定できます。
親トピック: データ・グリッド操作の実行
トピックAPIの概要
com.tangosol.net.NamedTopic<V>
インタフェースは、トピックをパブリッシュおよびサブスクライブするためにアプリケーションで使用される初期インタフェースです。Publisher
およびSubscriber
を作成するファクトリ・メソッドがあります。サブスクライバ・グループを管理するメソッドもあります。
com.tangosol.net.Pubisher<V>
インタフェースには、トピックに値をパブリッシュする非同期のpublish
メソッドがあります。このメソッドは、送信完了時または例外がスローされた場合にトラッキングを有効にするためのCompletableFuture<Publisher.Status>
を返します。返されるPublisher.Status
には、パブリッシュされたメッセージの詳細が含まれます
com.tangosol.net.Subscriber<V>
インタフェースには、CompleteFuture<Element<V>>
を返すreceiveメソッドがあります。サブスクライバは、(Subscriber.inGroup
(subscriberGroupName
)オプションを使用して) subscriber group
をサブスクライブするか、トピックを匿名のサブスクライバとして直接サブスクライブします。
パブリッシャ作成オプション
次の表に、NamedTopic.createPublisher(Publisher.Option... option)
とともに使用するパブリッシャ作成オプションを示します。
パブリッシャ・オプション | 説明 |
---|---|
OnFailure.Stop |
Default 。個々のPublisher.send(V) 呼出しが失敗した場合は、その後のパブリッシュを停止し、Publisher をクローズします。
|
OnFailure.Continue |
個々のPublisher.send(V) 呼出しが失敗した場合は、その値をスキップし、引き続き他の値をパブリッシュします。
|
FailOnFull.enabled() |
トピックの未処理値の記憶域サイズが構成済のhigh-units 値を超える場合、Publisher.send(V) 呼出しから返されるCompletableFuture は異常終了します。トピック上でスペースが使用可能になると、操作が完了するまで完了をブロックするようにデフォルトをオーバーライドします。
|
OrderBy.thread() |
Default 。同じスレッドからのすべてのvalues sent が順次に格納されていることを確認します。
|
OrderBy.none() |
sent values 間で特定の順序を強制しない場合は、最大レベルの並列度が許可されます。
|
OrderBy.id(int) |
同じIDを共有するすべてのスレッド間でsent values が順序付けされていることを確認してください。
|
OrderBy.value(ToIntFunction) |
sent value に対するこのメソッドの適用に基づいて、順序単位を計算します。
|
OrderBy.roundRobin() |
トピック内の各チャネルにメッセージを順次パブリッシュし、「チャネル数」メッセージがパブリッシュされた後に、チャネル0にループバックします。 |
親トピック: トピックAPIの概要
サブスクライバ作成オプション
次の表に、NamedTopic.createSubscriber(Subscriber.Option... option)
とともに使用するサブスクライバ作成オプションを示します。
サブスクライバ・オプション | 説明 |
---|---|
Name.of(String) |
メンバーとして参加するサブスクライバ・グループ名を指定します。サブスクライバ・グループに配信される値ごとに、そのグループの1つのメンバーのみが値を受信します。 |
Filtered.by(Filter) |
このFilter に一致するSubscriber.receive() 値のみ。サブスクリプション・グループごとに1つのFilter のみ。
|
Convert.using(Function) |
Subscriber.receive() より前に提供されたFunction を使用して、トピック値を変換します。サブスクリプション・グループごとに1つのConverter のみ。
|
CompleteOnEmpty.enabled() |
Subscriber.receive() に残された値がない場合、返されるCompletableFuture.get() はnull 要素を返します。デフォルトでは、返されるCompletableFuture.get() は次のトピック値を戻せるようになるまでブロックされます。
|
ChannelOwnershipListeners.of(ChannelOwnershipListener) |
サブスクライバに割り当てられたチャネルが変更されたときにイベントを受信するChannelOwnershipListener を追加します。
|
com.tangosol.net.Subscriber<V>
インタフェースには、CompleteFuture<Element<V>>
を返すreceiveメソッドがあります。サブスクライバは、(Subscriber.Name.of
(subscriberGroupName
)を使用して) subscriber group
をサブスクライブするか、トピックを直接サブスクライブします。
親トピック: トピックAPIの概要
トピック・インスタンスの取得
NamedTopic
インスタンスへの参照を取得するために、アプリケーションではSession
APIまたはCacheFactory
APIを使用できます。Session
APIでは、非静的なCoherenceセッションの使用および注入を可能にする簡潔なメソッド・セットが提供されるため、これはトピックの取得に推奨される方法です。対照的に、CacheFactory
APIでは、Coherenceの内部的な概念やサービスの知識を必要とする、多くの静的メソッドが公開されています。Session
APIにより、より効率的なライフサイクルおよび他のフレームワーク、特に注入を使用するフレームワークとの統合が可能になります。
次の例では、デフォルトのセッション・プロバイダを使用してセッションを作成し、Session.getTopic
メソッドを使用してNamedTopic
インスタンスへの参照を取得する方法を示します。トピックの名前はパラメータとして含まれます。
import com.tangosol.net.*;
...
Session session = Session.create();
NamedTopic<String> Topic = session.getTopic(“topic”,ValueTypeAssertion.withType(String.class));
または
ConfigurableCacheFactory ccf = CacheFactory.getConfigurableCacheFactory();
NamedTopic<String> topic =
ccf.ensureTopic(“topic”, ValueTypeAssertion.withType(String.class));
汎用型チェックを省略するには、topic-mapping構成でオプションの<value-type>
定義を省略し、Session.getTopic
またはConfigurableCacheFactory.ensureTopic
メソッドのパラメータとしてのValueTypeAssertion
オプションを省略します。
NamedTopic型チェックの使用
Session
またはCacheFactory
APIのいずれかを使用している場合、厳密に型指定されたNamedTopic
インスタンスをリクエストできます。デフォルトでは、NamedTopic<Object>
インスタンスが返されます。これは、NamedTopic
インスタンスを作成および使用するための最も柔軟なメカニズムですが、トピック・インスタンスとのやり取りに必要な値の型をアプリケーションで設定する必要があります。
次の例では、アプリケーションで使用する任意のタイプのオブジェクトを格納します。
import com.tangosol.net.*;
...
Session session = Session.create();
NamedTopic<Object> topic = session.getTopic(“MyTopic”);
または
ConfigurableCacheFactory ccf = CacheFactory.getConfigurableCacheFactory();
NamedTopic<Object> topic = ccf.ensureTopic(“MyTopic”);
getTopic
メソッドは、型チェックすることなく、必要に応じて特定の型のNamedTopic
インスタンスをリクエストする場合に使用できます。ValueTypeAssertion
インタフェースをgetTopic
メソッドとともに使用して、NamedTopic
インスタンスで使用される値の型の正確性をアサートします。このメソッドは、トピックがRAW型を使用する必要があることをアサートする場合に使用できます。たとえば:
NamedTopic<Object> topic = session.getTopic("MyTopic", ValueTypeAssertion.withRawTypes());
同様に、CacheFactory
を使用している場合、
NamedTopic<String> topic =
ccf.ensureTopic(“MyTopic”, ValueTypeAssertion.withRawTypes());
型の安全性を強化するために、トピックを作成して、トピックで使用される値の型を明示的にアサートできます。たとえば、トピックを作成して、値の型がString
である必要があることをアサートするには、アプリケーションで次を使用できます。
NamedTopic<String> topic = session.getTopic("MyTopic", ValueTypeAssertion.withTypes(String.class);
NamedTopic
インスタンスは、アサートされた型に準拠する必要はなく、無視することもできますが、コンパイル時に警告メッセージが表示されます。たとえば:
NamedTopic topic = session.getTopic("MyTopic", ValueTypeAssertion.withTypes(String.class);
同様に、RAW型を使用し、ネーム・キャッシュ・インスタンスで特定の型を使用できることをアサートすることを、アプリケーションで選択することもできます。ただし、どちらの場合も、型がチェックされないままでは、エラーが発生することがあります。たとえば:
NamedTopic<String> topic = session.getTopic("MyTopic", ValueTypeAssertion.withRawTypes());
型の安全性を最も強くするには、特定の型をトピック定義の一部として、キャッシュ構成ファイルに宣言することもできます。トピック定義の部分として構成された型と異なる型をアプリケーションが使用すると、ランタイム・エラーが発生します。次の例では、String
型の値のみをサポートするトピックを構成します。
<topic-mapping> <topic-name>MyTopic</topic-name> <scheme-name>distributed-topic</scheme-name> <value-type>String</value-type> </topic-mapping>
最後に、明示的な型チェックの無効化をアプリケーションで選択できます。型チェックが無効の場合、型の安全性はアプリケーションで確保する必要があります。
NamedTopic<Object> topic = session.getTopic("MyTopic", ValueTypeAssertion.withoutTypeChecking());
トピックへのパブリッシュ
Publisher
(クローズされていない)は、送信メソッドを使用してトピックに値を非同期で送信し、CompletableFuture
を返します。たとえば、CompletableFuture
でjoinを呼び出すことにより、非同期呼出しが同期呼出しに変わります。
多数の未処理の非同期操作の完了を処理するために使用可能な多数のオプションについては、「CompletableFuture
」を参照してください。
NamedTopic<String> topic = ….; Publisher<String> publisher = topic.createPublisher(); CompletableFuture future = publisher.send(“someValue”); // Completes when asynchronous send completes. // Throws an exception if send operation completed exceptionally. future.join();
トピック値の要件
トピック値はシリアライズ可能である必要があります(たとえば、java.io.Serializable
またはCoherence Portable Object Formatのシリアライズ)。Portable Object Formatの使用を参照してください
親トピック: トピックへのパブリッシュ
未処理のパブリッシャ送信の完了の待機
パブリッシャが、各送信の完了を待機せずにトピックに複数の値を送信した場合、Publisher.flush().join()
呼出しは、保留中のすべてのPublisher.send()
の完了を同期的に待機します。1つ以上の未処理の操作が異常終了しても、この呼出しは常に正常に完了します。トピックに値が正常にパブリッシュされなかったときにそれを検出する必要がある場合は、CompletableFuture
メソッドを使用して異常終了を検出します。
親トピック: トピックへのパブリッシュ
パブリッシャ・リソースの解放
Publisher.close()
を明示的に呼び出すか、例26-1に示すようにtry-with-resources
文を使用してパブリッシャを作成することにより、パブリッシャ・リソースを解放します。Publisher.close()
はブロック操作であり、クローズが完了するまで戻りません。パブリッシャは、クローズされた後はアクティブではなくなります(「Publisher.isActive()
」を参照)。そのPublisher
でのその後のすべてのsend
操作は、パブリッシャがアクティブでなくなったことを示すIllegalStateException
により失敗します。ただし、close
呼出しの前に行われている未処理のsend
操作については、すべて未処理のCompletableFutures
を完了できます。Publisher.flush.join()
と同様、クローズは正常に終了する場合も異常終了する場合もあり、異常終了する未処理のCompletableFutures
に対してはクローズから例外がスローされません。
親トピック: トピックへのパブリッシュ
トピックへの直接サブスクライブ
NamedTopic<String> topic = ….; Subscriber<String> subscriber = topic.createSubscriber(); CompletableFuture<Element<String>> future = receive(); String received = future.get().getValue();
サブスクライバ・リソースの解放
Subscriber.close()
操作を明示的に起動するか、try-with-resources
文を使用してサブスクライバを作成することにより、クローズできます。詳細は、例26-2を参照してください。
サブスクライバのクローズはブロック操作であり、操作が完了するまで戻りません。Subscriber
に対してclose
が呼び出された後、非アクティブなSubscriber
に対するその後のすべてのreceive
呼出しはIllegalStateException
で失敗します。receive
呼出しからの未処理のCompletableFuture
の大部分は異常終了しますが、一部は完了する可能性があります。
サブスクライバ・グループのサブスクライブ
トピック値は、サブスクライバ・グループ・メンバーが受信するか期限切れになるまで保持されます。デフォルトでは、トピック値に有効期限はありません。
「<paged-topic-scheme>」の構成の「<expiry-delay>
」サブ要素を参照してください。
NamedTopic<String> topic = ….; Subscriber<String> subscriber = topic.createSubscriber(Subscriber.Name.of(“subscriberGroup”)); CompletableFuture<Element<String>> future = receive(); String value = future.get().getValue();
NamedTopicのサブスクライバ・グループ
サブスクライバ・グループは、2つの方法で作成できます。
coherence-cache-config
ファイルの<topic-mapping>
要素に恒久サブスクライバ・グループを静的に構成します。例21-3を参照してください。- サブスクライバ・グループを結合する
Subscriber.Name.of(aSubscriberGroupName)
オプションを使用してサブスクライバを作成し、そのトピックにsubscriber group
がまだ存在しない場合に、サブスクライバ・グループを動的に作成します。
NamedTopic.getSubscriberGroups
は、構成済のサブスクライバ・グループと動的に作成されたサブスクライバ・グループの両方を戻します。
NamedTopic<String> topic = ….; Set<String> subscriberGroups = topic.getSubscriberGroups();
トピックのサブスクライバ・グループの破棄
既存のすべてのサブスクライバ・グループ・メンバーはこの操作の影響を受け、サブスクライバ・グループ・メンバーに対する未処理の非同期受信操作はすべて取り消されます。
NamedTopic<String> topic = ….; topic.destroySubscriberGroup(“subscriberGroup”);
メッセージのコミット
メッセージを同期的にコミットするには、サブスクライバのreceive()
メソッドによって返されるCompletableFuture
が提供するElement
でcommit()
メソッドを使用します。
Subscriber.CommitResult
を使用して、成功や失敗などのコミット操作に関する情報を判断できます。Subscriber<String> subscriber = session.createSubscriber("test-topic", Subscriber.inGroup("test"));
CompletableFuture<Subscriber.Element<String>> future = subscriber.receive();
Subscriber.Element<String> element = future.get();
String message = element.getValue();
// process message...
Subscriber.CommitResult result = element.commit();
メッセージを非同期的にコミットするには、要素のcommitAsync()
メソッドを使用します。
commitAsync()
メソッドをコールします。返されるCompletableFuture
は、コミット操作の完了時に完了します。CompletableFuture<Subscriber.CommitResult> future = element.commitAsync();
メッセージは、サブスクライバのcommitメソッドを使用して、チャネルとポジションのみを使用してコミットすることもできます。
Subscriber<String> subscriber = session.createSubscriber("test-topic", Subscriber.inGroup("test"));
CompletableFuture<Subscriber.Element<String>> future = subscriber.receive();
Subscriber.Element<String> element = future.get();
int channel = element.getChannel();
Position position = element.getPosition();
String message = element.getValue();
// process message...
Subscriber.CommitResult result = subscriber.commit(channel, position);
トピックで使用されるリソースの管理
パブリッシャとサブスクライバの両方にリソースが関連付けられており、不要になったリソースを閉じる必要があります。
Publisher
とSubscriber
はどちらもtry-with-resource
パターンとともに使用できます。この使用パターンにより、これらのトピック・リソースの両方に対してclose
が確実に呼び出されます。パブリッシャの自動クローズされたリソースの詳細は、例26-1を参照してください。サブスクライバの自動クローズされたリソースの詳細は、例26-2を参照してください。
例21-3に示すように、サブスクライバ・グループdurableSubscriber
がトピックaTopic
に対して静的に構成されるとします。
例26-1 パブリッシャ用のtry-with-resource
Session session = Session.create(); NamedTopic topic = session.createTopic(“aTopic”, ValueTypeAssertion.withType(String.class)); try (Publisher<String> publisher = topic.createPublisher(…)) { int MAX = …; for (int i = 0; I < MAX; i++) { publisher.send(“value” + i); } // wait for all outstanding asynchronous operations to complete before publisher is implicitly closed by exiting try-with-resource block. publisher.flush().join(); }
例26-2 try-with-resourceによるサブスクライバの処理
//Process outstanding values on a subscriber group try (Subscriber<String> subscriber = topic.createSubscriber(Subscriber.Name.of(“durableSubscriber”)), Subscriber.CompleteOnEmpy.enabled())) { while (true) { // asynchronous receive from direct subscriber to topic. CompletableFuture<Element<String>> result= subscriber.receive(); Element<String> topicElement = result.get(); if (topicElement == null) { // no more elements available break; } else { String value = topicElement.getValue(); // process value received from subscriber. ... } // auto close subscriber and its resources. Outstanding CompletableFutures for // receive will mostly complete exceptionally, though some may complete. // a subscriber group accumulates future delivered values even if there // are no active subscriber group members. }
NamedTopic.destroy()
は、すべてのトピック・リソースを解放し、インスタンスを破棄します。未処理のPublishers
およびSubscribers
はすべてクローズされます。Publishers
およびSubscribers
に対する未処理の操作はすべて異常終了します。Subscriber groups
は破棄され、永続記憶域を含むすべてのトピック記憶域が解放されます。
トピック・フロー制御の理解
トピックのPublisher.send
およびSubscriber.receive
メソッドを使用すると、データ交換リクエストを送信する非同期(非ブロッキング)方法が提供されます。これらのリクエストを効率的に管理するためのデフォルトのフロー制御メカニズムが用意されています。
自動フロー制御では、未処理の送信または受信操作(あるいはその両方)が多すぎることが検出され、未処理の値を消費するためのトピックの記憶域が不足している場合、システムがすべての未処理のリクエストを遅れずに処理できるように、非同期のsend
またはreceive
(あるいはその両方)に対する未処理のCompletableFuture
が制限されることがあります。Publisher
とSubscriber
の両方のクラスは、アプリケーションがFlowControl
インスタンスを取得するための方法を提供します。このインスタンスにより、自動フロー制御のオプトアウトが可能になり、リクエスト・フローの速度を手動で制御できます。詳細は、com.tangosol.net.FlowControl
Javadocを参照してください。
トピック記憶域に上限を設定するためのパブリッシャ・フロー制御の管理
トピックにパブリッシュされる値が、その値を処理するサブスクライバの能力を大きく上回っているデータ処理パイプラインでは、トピック上の未使用の値に対して無制限の記憶域を許可するか、トピックに保持されている値に対する最大記憶域サイズの制約を設定できます。
デフォルトでは、トピックに保存される値に対して記憶域サイズの制約はありません。例26-3に示すように、トピックの記憶域制限が構成されている場合、トピックの記憶域使用量の制限に使用できるオプションがいくつかあります。
トピックの記憶域制限が構成された後、次の送信がトピックの記憶域制限を超える場合のデフォルトの動作として、トピックに対するパブリッシャが制限、ブロックされます。目的は、パブリッシャをブロックすることで、トピックで使用される記憶域の量を制限しながら、遅れずに処理するための時間をサブスクライバに与えることです。サブスクライバが遅れずに処理し、トピックに対するそれらの値がすべてのサブスクライバとサブスクライバのグループによって消費された後、パブリッシャは送信を再開できます。
パブリッシャの作成時に、デフォルトの動作を変更するためのオプションがあります。Publisher.FailOnFull.enabled()
オプションは、パブリッシャsend
がブロックされずに異常終了することを示します。デフォルトのPublisher
オプションPublisher.OnFailure.Stop
では、この例外が発生した場合にパブリッシャが自動的にクローズされます。このデフォルトでは、パブリッシャは値を配信できなかった最初の例外ケースでクローズされているため、パブリッシャからパブリッシュされた値の順序が保持されます。
例26-3 FailOnFullのパブリッシュ
Session session = Session.create(); NamedTopic topic = session.createTopic(“aTopic”, ValueTypeAssertion.withType(String.class)); try (Publisher<String> publisher = topic.createPublisher(Publisher.FailOnFull.enabled())) { int MAX = …; int cSent = 0; for (int i = 0; I < MAX; i++) { try { // synchronous send that throws exception if publisher is full. publisher.send(“value” + i).join(); cSent++; } catch (Exception e) { // when full throws CompletionException with a cause of IllegalStateException: the topic is at capacity. break; } } if (cSent != MAX) { // all values were not published to topic. } }
この順序を維持する必要がない場合は、Publisher.OnFailure.Continue
オプションを指定してパブリッシャを作成し、トピックが一杯になった場合に発生する送信のみが失敗するようにできます。
トピック全体への送信に対するデフォルトのブロックをオーバーライドするには、次のパターンを使用してフロー制御の一時停止からスレッドを除外します。この手法では、フロー制御が無効になり、このNonBlocking
スレッドのトピックに対する記憶域の制限がなくなります。
例26-4 トピック記憶域制限に達したときのフロー制御の一時停止からのスレッドの除外
import com.tangosol.net.*; import com.oracle.coherence.common.base.NonBlocking; Session session = Session.create(): NamedTopic topic = Session.createTopic(“aTopic”, ValueTypeAssertion.withType(String.class)); try (NonBlocking nb = NonBlocking(); Publisher<String> publisher = topic.createPublisher(…)) { int MAX = …; for (int i = 0; I < MAX; i++) { publisher.send(“value” + i); } }