26 基本的なトピック・パブリッシュおよびサブスクライブ操作の実行

Coherence APIを使用して、基本的なトピック・パブリッシュおよびサブスクライブ操作を実行できます。この章には次の項があります。

トピックAPIの概要

com.tangosol.net.NamedTopic<V>インタフェースは、トピックをパブリッシュおよびサブスクライブするためにアプリケーションで使用される初期インタフェースです。PublisherおよびSubscriberを作成するファクトリ・メソッドがあります。サブスクライバ・グループを管理するメソッドもあります。

com.tangosol.net.Pubisher<V>インタフェースには、トピックに値をパブリッシュする非同期のpublishメソッドがあります。このメソッドは、送信完了時または例外がスローされた場合にトラッキングを有効にするためのCompletableFuture<Publisher.Status>を返します。返されるPublisher.Statusには、パブリッシュされたメッセージの詳細が含まれます

com.tangosol.net.Subscriber<V>インタフェースには、CompleteFuture<Element<V>>を返すreceiveメソッドがあります。サブスクライバは、(Subscriber.inGroup (subscriberGroupName)オプションを使用して) subscriber groupをサブスクライブするか、トピックを匿名のサブスクライバとして直接サブスクライブします。

パブリッシャ作成オプション

次の表に、NamedTopic.createPublisher(Publisher.Option... option)とともに使用するパブリッシャ作成オプションを示します。

パブリッシャ・オプション 説明
OnFailure.Stop Default。個々のPublisher.send(V)呼出しが失敗した場合は、その後のパブリッシュを停止し、Publisherをクローズします。
OnFailure.Continue 個々のPublisher.send(V)呼出しが失敗した場合は、その値をスキップし、引き続き他の値をパブリッシュします。
FailOnFull.enabled() トピックの未処理値の記憶域サイズが構成済のhigh-units値を超える場合、Publisher.send(V)呼出しから返されるCompletableFutureは異常終了します。トピック上でスペースが使用可能になると、操作が完了するまで完了をブロックするようにデフォルトをオーバーライドします。
OrderBy.thread() Default。同じスレッドからのすべてのvalues sentが順次に格納されていることを確認します。
OrderBy.none() sent values間で特定の順序を強制しない場合は、最大レベルの並列度が許可されます。
OrderBy.id(int) 同じIDを共有するすべてのスレッド間でsent valuesが順序付けされていることを確認してください。
OrderBy.value(ToIntFunction) sent valueに対するこのメソッドの適用に基づいて、順序単位を計算します。
OrderBy.roundRobin() トピック内の各チャネルにメッセージを順次パブリッシュし、「チャネル数」メッセージがパブリッシュされた後に、チャネル0にループバックします。

サブスクライバ作成オプション

次の表に、NamedTopic.createSubscriber(Subscriber.Option... option)とともに使用するサブスクライバ作成オプションを示します。

サブスクライバ・オプション 説明
Name.of(String) メンバーとして参加するサブスクライバ・グループ名を指定します。サブスクライバ・グループに配信される値ごとに、そのグループの1つのメンバーのみが値を受信します。
Filtered.by(Filter) このFilterに一致するSubscriber.receive()値のみ。サブスクリプション・グループごとに1つのFilterのみ。
Convert.using(Function) Subscriber.receive()より前に提供されたFunctionを使用して、トピック値を変換します。サブスクリプション・グループごとに1つのConverterのみ。
CompleteOnEmpty.enabled() Subscriber.receive()に残された値がない場合、返されるCompletableFuture.get()null要素を返します。デフォルトでは、返されるCompletableFuture.get()は次のトピック値を戻せるようになるまでブロックされます。
ChannelOwnershipListeners.of(ChannelOwnershipListener) サブスクライバに割り当てられたチャネルが変更されたときにイベントを受信するChannelOwnershipListenerを追加します。

com.tangosol.net.Subscriber<V>インタフェースには、CompleteFuture<Element<V>>を返すreceiveメソッドがあります。サブスクライバは、(Subscriber.Name.of (subscriberGroupName)を使用して) subscriber groupをサブスクライブするか、トピックを直接サブスクライブします。

トピック・インスタンスの取得

NamedTopicインスタンスへの参照を取得するために、アプリケーションではSession APIまたはCacheFactory APIを使用できます。Session APIでは、非静的なCoherenceセッションの使用および注入を可能にする簡潔なメソッド・セットが提供されるため、これはトピックの取得に推奨される方法です。対照的に、CacheFactory APIでは、Coherenceの内部的な概念やサービスの知識を必要とする、多くの静的メソッドが公開されています。Session APIにより、より効率的なライフサイクルおよび他のフレームワーク、特に注入を使用するフレームワークとの統合が可能になります。

次の例では、デフォルトのセッション・プロバイダを使用してセッションを作成し、Session.getTopicメソッドを使用してNamedTopicインスタンスへの参照を取得する方法を示します。トピックの名前はパラメータとして含まれます。

import com.tangosol.net.*;
...
Session session = Session.create();
NamedTopic<String> Topic = session.getTopic(“topic”,ValueTypeAssertion.withType(String.class));

または


ConfigurableCacheFactory ccf   = CacheFactory.getConfigurableCacheFactory();
NamedTopic<String> topic       = 
                 ccf.ensureTopic(“topic”, ValueTypeAssertion.withType(String.class));

汎用型チェックを省略するには、topic-mapping構成でオプションの<value-type>定義を省略し、Session.getTopicまたはConfigurableCacheFactory.ensureTopicメソッドのパラメータとしてのValueTypeAssertionオプションを省略します。

NamedTopic型チェックの使用

Coherenceでは、明示的な型を使用してSessionまたはCacheFactory APIのいずれかを使用している場合、厳密に型指定されたNamedTopicインスタンスをリクエストできます。デフォルトでは、NamedTopic<Object>インスタンスが返されます。これは、NamedTopicインスタンスを作成および使用するための最も柔軟なメカニズムですが、トピック・インスタンスとのやり取りに必要な値の型をアプリケーションで設定する必要があります。

次の例では、アプリケーションで使用する任意のタイプのオブジェクトを格納します。

import com.tangosol.net.*;
...
Session session = Session.create();
NamedTopic<Object> topic = session.getTopic(“MyTopic”);

または

ConfigurableCacheFactory ccf   = CacheFactory.getConfigurableCacheFactory();
NamedTopic<Object> topic       = ccf.ensureTopic(“MyTopic”);

getTopicメソッドは、型チェックすることなく、必要に応じて特定の型のNamedTopicインスタンスをリクエストする場合に使用できます。ValueTypeAssertionインタフェースをgetTopicメソッドとともに使用して、NamedTopicインスタンスで使用される値の型の正確性をアサートします。このメソッドは、トピックがRAW型を使用する必要があることをアサートする場合に使用できます。たとえば:

NamedTopic<Object> topic = session.getTopic("MyTopic",
 ValueTypeAssertion.withRawTypes());

同様に、CacheFactoryを使用している場合、


NamedTopic<String> topic       = 
                 ccf.ensureTopic(“MyTopic”, ValueTypeAssertion.withRawTypes());

型の安全性を強化するために、トピックを作成して、トピックで使用される値の型を明示的にアサートできます。たとえば、トピックを作成して、値の型がStringである必要があることをアサートするには、アプリケーションで次を使用できます。

NamedTopic<String> topic = session.getTopic("MyTopic",
 ValueTypeAssertion.withTypes(String.class);

NamedTopicインスタンスは、アサートされた型に準拠する必要はなく、無視することもできますが、コンパイル時に警告メッセージが表示されます。たとえば:

NamedTopic topic = session.getTopic("MyTopic",
 ValueTypeAssertion.withTypes(String.class);

同様に、RAW型を使用し、ネーム・キャッシュ・インスタンスで特定の型を使用できることをアサートすることを、アプリケーションで選択することもできます。ただし、どちらの場合も、型がチェックされないままでは、エラーが発生することがあります。たとえば:

NamedTopic<String> topic = session.getTopic("MyTopic",
 ValueTypeAssertion.withRawTypes());

型の安全性を最も強くするには、特定の型をトピック定義の一部として、キャッシュ構成ファイルに宣言することもできます。トピック定義の部分として構成された型と異なる型をアプリケーションが使用すると、ランタイム・エラーが発生します。次の例では、String型の値のみをサポートするトピックを構成します。

<topic-mapping>
   <topic-name>MyTopic</topic-name>
   <scheme-name>distributed-topic</scheme-name>
   <value-type>String</value-type>
</topic-mapping>

最後に、明示的な型チェックの無効化をアプリケーションで選択できます。型チェックが無効の場合、型の安全性はアプリケーションで確保する必要があります。

NamedTopic<Object> topic = session.getTopic("MyTopic",
   ValueTypeAssertion.withoutTypeChecking());

トピックへのパブリッシュ

アクティブなPublisher (クローズされていない)は、送信メソッドを使用してトピックに値を非同期で送信し、CompletableFutureを返します。たとえば、CompletableFutureでjoinを呼び出すことにより、非同期呼出しが同期呼出しに変わります。

多数の未処理の非同期操作の完了を処理するために使用可能な多数のオプションについては、「CompletableFuture」を参照してください。

NamedTopic<String> topic    = ….;
Publisher<String> publisher = topic.createPublisher();
CompletableFuture future    = publisher.send(“someValue”);

// Completes when asynchronous send completes.
// Throws an exception if send operation completed exceptionally. 
future.join();

トピック値の要件

トピック値はシリアライズ可能である必要があります(たとえば、java.io.SerializableまたはCoherence Portable Object Formatのシリアライズ)。Portable Object Formatの使用を参照してください

未処理のパブリッシャ送信の完了の待機

パブリッシャが、各送信の完了を待機せずにトピックに複数の値を送信した場合、Publisher.flush().join()呼出しは、保留中のすべてのPublisher.send()の完了を同期的に待機します。1つ以上の未処理の操作が異常終了しても、この呼出しは常に正常に完了します。トピックに値が正常にパブリッシュされなかったときにそれを検出する必要がある場合は、CompletableFutureメソッドを使用して異常終了を検出します。

パブリッシャ・リソースの解放

Publisher.close()を明示的に呼び出すか、例26-1に示すようにtry-with-resources文を使用してパブリッシャを作成することにより、パブリッシャ・リソースを解放します。Publisher.close()はブロック操作であり、クローズが完了するまで戻りません。パブリッシャは、クローズされた後はアクティブではなくなります(「Publisher.isActive()」を参照)。そのPublisherでのその後のすべてのsend操作は、パブリッシャがアクティブでなくなったことを示すIllegalStateExceptionにより失敗します。ただし、close呼出しの前に行われている未処理のsend操作については、すべて未処理のCompletableFuturesを完了できます。Publisher.flush.join()と同様、クローズは正常に終了する場合も異常終了する場合もあり、異常終了する未処理のCompletableFuturesに対してはクローズから例外がスローされません。

トピックへの直接サブスクライブ

アクティブなサブスクライバは、トピックに配信されるすべての値を受信します。サブスクライバは、作成されてからクローズされるまでアクティブです。
NamedTopic<String> topic      = ….;
Subscriber<String> subscriber = topic.createSubscriber();

CompletableFuture<Element<String>> future = receive();
String received  = future.get().getValue();

サブスクライバ・リソースの解放

サブスクライバは、Subscriber.close()操作を明示的に起動するか、try-with-resources文を使用してサブスクライバを作成することにより、クローズできます。詳細は、例26-2を参照してください。

サブスクライバのクローズはブロック操作であり、操作が完了するまで戻りません。Subscriberに対してcloseが呼び出された後、非アクティブなSubscriberに対するその後のすべてのreceive呼出しはIllegalStateExceptionで失敗します。receive呼出しからの未処理のCompletableFutureの大部分は異常終了しますが、一部は完了する可能性があります。

サブスクライバ・グループのサブスクライブ

トピック値は、サブスクライバ・グループ・メンバーが受信するか期限切れになるまで保持されます。デフォルトでは、トピック値に有効期限はありません。

「<paged-topic-scheme>」の構成の「<expiry-delay>」サブ要素を参照してください。

NamedTopic<String> topic      = ….;
Subscriber<String> subscriber =    
         topic.createSubscriber(Subscriber.Name.of(“subscriberGroup”));
CompletableFuture<Element<String>> future = receive();
String value  = future.get().getValue();

NamedTopicのサブスクライバ・グループ

サブスクライバ・グループは、2つの方法で作成できます。

次の方法があります:
  • coherence-cache-configファイルの<topic-mapping>要素に恒久サブスクライバ・グループを静的に構成します。例21-3を参照してください。
  • サブスクライバ・グループを結合するSubscriber.Name.of(aSubscriberGroupName)オプションを使用してサブスクライバを作成し、そのトピックにsubscriber groupがまだ存在しない場合に、サブスクライバ・グループを動的に作成します。
メソッドNamedTopic.getSubscriberGroupsは、構成済のサブスクライバ・グループと動的に作成されたサブスクライバ・グループの両方を戻します。
NamedTopic<String> topic            = ….;
Set<String>        subscriberGroups = topic.getSubscriberGroups();

トピックのサブスクライバ・グループの破棄

サブスクライバ・グループの有効期間は、そのサブスクライバ・グループ・メンバーに依存しません。次のコード・フラグメントは、サブスクライバ・グループの値の累積を停止して、未使用の値をすべて解放します。

既存のすべてのサブスクライバ・グループ・メンバーはこの操作の影響を受け、サブスクライバ・グループ・メンバーに対する未処理の非同期受信操作はすべて取り消されます。

NamedTopic<String> topic            = ….;
topic.destroySubscriberGroup(“subscriberGroup”);

メッセージのコミット

サブスクライバ・グループを使用する場合、メッセージは恒久的であり、「1回以上」の保証付きで配信されます。つまり、なんらかの理由でサブスクライバが切断した場合、またはサブスクライバのチャネルが別のサブスクライバに再割当てされた場合に、メッセージが再配信されます。メッセージが処理され、再配信しないように指定するには、サブスクライバがメッセージをコミットする必要があります。サブスクライバは、受信したすべてのメッセージをコミットする必要はありません。メッセージをコミットすると、コミットされたメッセージの前にサブスクライバが受信したメッセージが自動的にコミットされます。メッセージは、同期的または非同期的にコミットできます。

メッセージを同期的にコミットするには、サブスクライバのreceive()メソッドによって返されるCompletableFutureが提供するElementcommit()メソッドを使用します。

たとえば、次のコードでは、"test-topic"という名前のトピックのサブスクライバを"test"という名前のサブスクライバ・グループに作成します。メッセージは、受信され、処理されてからコミットされます。返されたSubscriber.CommitResultを使用して、成功や失敗などのコミット操作に関する情報を判断できます。
Subscriber<String> subscriber = session.createSubscriber("test-topic", Subscriber.inGroup("test"));
CompletableFuture<Subscriber.Element<String>> future = subscriber.receive();
Subscriber.Element<String> element = future.get();
String message = element.getValue();
// process message...
Subscriber.CommitResult result = element.commit();

メッセージを非同期的にコミットするには、要素のcommitAsync()メソッドを使用します。

たとえば、次のコードは、要素のcommitAsync()メソッドをコールします。返されるCompletableFutureは、コミット操作の完了時に完了します。
CompletableFuture<Subscriber.CommitResult> future = element.commitAsync();

メッセージは、サブスクライバのcommitメソッドを使用して、チャネルとポジションのみを使用してコミットすることもできます。

たとえば、次のコードでは、サブスクライバが要素を受け取り、その要素から取得されたメッセージのチャネルおよびポジションを受け取ります。処理後、メッセージはチャネルおよびポジションを使用してコミットされます。
Subscriber<String> subscriber = session.createSubscriber("test-topic", Subscriber.inGroup("test"));
CompletableFuture<Subscriber.Element<String>> future = subscriber.receive();
Subscriber.Element<String> element = future.get();

int channel = element.getChannel();
Position position = element.getPosition();

String message = element.getValue();

// process message...

Subscriber.CommitResult result = subscriber.commit(channel, position);

トピックで使用されるリソースの管理

パブリッシャとサブスクライバの両方にリソースが関連付けられており、不要になったリソースを閉じる必要があります。

PublisherSubscriberはどちらもtry-with-resourceパターンとともに使用できます。この使用パターンにより、これらのトピック・リソースの両方に対してcloseが確実に呼び出されます。パブリッシャの自動クローズされたリソースの詳細は、例26-1を参照してください。サブスクライバの自動クローズされたリソースの詳細は、例26-2を参照してください。

例21-3に示すように、サブスクライバ・グループdurableSubscriberがトピックaTopicに対して静的に構成されるとします。

例26-1 パブリッシャ用のtry-with-resource

Session    session = Session.create();
NamedTopic topic   = session.createTopic(“aTopic”,     
                                        ValueTypeAssertion.withType(String.class));
try (Publisher<String> publisher = topic.createPublisher(…)) {
   int MAX = …;
   for (int i = 0; I < MAX; i++) {
       publisher.send(“value” + i);
   }

   // wait for all outstanding asynchronous operations to complete before publisher is implicitly closed by exiting try-with-resource block. 
   publisher.flush().join(); 
}

例26-2 try-with-resourceによるサブスクライバの処理

//Process outstanding values on a subscriber group
try (Subscriber<String> subscriber =        
         topic.createSubscriber(Subscriber.Name.of(“durableSubscriber”)),
                                Subscriber.CompleteOnEmpy.enabled())) {
    while (true) {
        // asynchronous receive from direct subscriber to topic.
        CompletableFuture<Element<String>> result= subscriber.receive();
        Element<String>                    topicElement = result.get();
        if (topicElement == null) {
            // no more elements available
            break;
        } else {
 	     String value = topicElement.getValue();
            // process value received from subscriber.
            ...
        }
// auto close subscriber and its resources. Outstanding CompletableFutures for 
// receive will mostly complete exceptionally, though some may complete.
// a subscriber group accumulates future delivered values even if there
// are no active subscriber group members.
}

NamedTopic.destroy()は、すべてのトピック・リソースを解放し、インスタンスを破棄します。未処理のPublishersおよびSubscribersはすべてクローズされます。PublishersおよびSubscribersに対する未処理の操作はすべて異常終了します。Subscriber groupsは破棄され、永続記憶域を含むすべてのトピック記憶域が解放されます。

トピック・フロー制御の理解

トピックのPublisher.sendおよびSubscriber.receiveメソッドを使用すると、データ交換リクエストを送信する非同期(非ブロッキング)方法が提供されます。これらのリクエストを効率的に管理するためのデフォルトのフロー制御メカニズムが用意されています。

自動フロー制御では、未処理の送信または受信操作(あるいはその両方)が多すぎることが検出され、未処理の値を消費するためのトピックの記憶域が不足している場合、システムがすべての未処理のリクエストを遅れずに処理できるように、非同期のsendまたはreceive (あるいはその両方)に対する未処理のCompletableFutureが制限されることがあります。PublisherSubscriberの両方のクラスは、アプリケーションがFlowControlインスタンスを取得するための方法を提供します。このインスタンスにより、自動フロー制御のオプトアウトが可能になり、リクエスト・フローの速度を手動で制御できます。詳細は、com.tangosol.net.FlowControl Javadocを参照してください。

トピック記憶域に上限を設定するためのパブリッシャ・フロー制御の管理

トピックにパブリッシュされる値が、その値を処理するサブスクライバの能力を大きく上回っているデータ処理パイプラインでは、トピック上の未使用の値に対して無制限の記憶域を許可するか、トピックに保持されている値に対する最大記憶域サイズの制約を設定できます。

デフォルトでは、トピックに保存される値に対して記憶域サイズの制約はありません。例26-3に示すように、トピックの記憶域制限が構成されている場合、トピックの記憶域使用量の制限に使用できるオプションがいくつかあります。

トピックの記憶域制限が構成された後、次の送信がトピックの記憶域制限を超える場合のデフォルトの動作として、トピックに対するパブリッシャが制限、ブロックされます。目的は、パブリッシャをブロックすることで、トピックで使用される記憶域の量を制限しながら、遅れずに処理するための時間をサブスクライバに与えることです。サブスクライバが遅れずに処理し、トピックに対するそれらの値がすべてのサブスクライバとサブスクライバのグループによって消費された後、パブリッシャは送信を再開できます。

パブリッシャの作成時に、デフォルトの動作を変更するためのオプションがあります。Publisher.FailOnFull.enabled()オプションは、パブリッシャsendがブロックされずに異常終了することを示します。デフォルトのPublisherオプションPublisher.OnFailure.Stopでは、この例外が発生した場合にパブリッシャが自動的にクローズされます。このデフォルトでは、パブリッシャは値を配信できなかった最初の例外ケースでクローズされているため、パブリッシャからパブリッシュされた値の順序が保持されます。

例26-3 FailOnFullのパブリッシュ

Session    session = Session.create();
NamedTopic topic   = session.createTopic(“aTopic”,     
                                        ValueTypeAssertion.withType(String.class));

try (Publisher<String> publisher = topic.createPublisher(Publisher.FailOnFull.enabled())) {
   int MAX   = …;
   int cSent = 0;
   for (int i = 0; I < MAX; i++) {
       try {
       	// synchronous send that throws exception if publisher is full. 
       	publisher.send(“value” + i).join();
        cSent++;
      } catch (Exception e) {
        // when full throws CompletionException with a cause of IllegalStateException: the topic is at capacity.
        break;
      }
   }
   if (cSent != MAX) {
      // all values were not published to topic.
   }
}

この順序を維持する必要がない場合は、Publisher.OnFailure.Continueオプションを指定してパブリッシャを作成し、トピックが一杯になった場合に発生する送信のみが失敗するようにできます。

トピック全体への送信に対するデフォルトのブロックをオーバーライドするには、次のパターンを使用してフロー制御の一時停止からスレッドを除外します。この手法では、フロー制御が無効になり、このNonBlockingスレッドのトピックに対する記憶域の制限がなくなります。

例26-4 トピック記憶域制限に達したときのフロー制御の一時停止からのスレッドの除外

import com.tangosol.net.*;
import com.oracle.coherence.common.base.NonBlocking;

Session    session = Session.create():
NamedTopic topic   = Session.createTopic(“aTopic”,     
                                       ValueTypeAssertion.withType(String.class));
try (NonBlocking       nb        = NonBlocking();
     Publisher<String> publisher = topic.createPublisher(…)) {
   int MAX = …;
   for (int i = 0; I < MAX; i++) {
       publisher.send(“value” + i);
   }
}