64 カスタマイズ・フックのリアルタイム・パブリッシュの使用
リアルタイム・パブリッシュのカスタマイズ時に実行する必要があるいくつかのことは、独自のトランスポータの作成、実装の詳細の作成、ヘルパー・メソッドの作成、トランスポータの実装例の作成、完全なコードのリストの作成、および特異な状況のシナリオの作成です。管理インスタンスでのアセット・パブリッシュ・イベントのインターセプトに関する情報を書き込む必要がある場合もあります。
顧客のビジネス・ニーズに従って、リアルタイム・パブリッシュをカスタマイズできます。「カスタマイズ・フックのリアルタイム・パブリッシュの使用」および「WebCenter Sitesのアセットおよびパブリッシュ・イベントの理解」を参照してください。
-
リソース: 承認済アセットおよび表などの非アセット・データを示すのに使用される一般的な用語です。
-
リソース・グループ: なんらかの関係のあるリソースのグループです。
トピック:
リアルタイム・パブリッシュについて
リアルタイム・パブリッシュとは、いくつかのジョブで構成されるパイプラインのことです。一部のジョブは管理インスタンスで実行され、その他のジョブはターゲット・インスタンスで実行されます。リアルタイム・パブリッシュのパラメータは、advpub.xml
ファイルにあります。
次に、各ジョブについて簡単に説明します。
-
Gatherer: パブリッシュ可能なアセットのリストを作成して、そのリストを追加のリソース(アセット・タイプ、表の行)で装飾します。これらを合せて、パブリッシュするデータの正規セットを構成します。これらは、基礎となる承認グループ化戦略に依存しながら、相互依存のリソースのグループを作成することで実行されます。グループ化戦略では、承認済リソースのリストを取得して、それらをグループに編成します。使用可能なグループ化戦略は次のとおりです。
-
ApprovalAggregatingGroupingStrategy – デフォルトのグループ化戦略で、この戦略を保持することをお薦めします。リソース・グループのコレクションを作成します。リソース・グループでは、そのグループ内で同じタイプの大部分のアセットが含まれるのもが主要なアセット・タイプになります。リソース・グループの作成中に、この戦略によって、同じアセット・タイプのアセットの数が維持されます。そのグループの主要なアセット・タイプが最大数が含まれるものになります。同様に、リソース・グループ・コレクションの作成中に、この戦略によって、同じ主要アセット・タイプのグループが一緒に収集されます。この戦略では、同じタイプのアセットのロードと保存が一括で高速に機能するため、同様のグループを集約できます。この戦略では、グループのサイズが相対的に小さくなり、グループの数が多くなります。
-
ApprovalAccumulatingGroupingStrategy – この戦略では、リソース・グループを収集するだけで、追加の処理が実行されません。グループのサイズは相対的に大きくなり、グループの数が少なくなります。
-
-
DataSerializer and DataDeserializer: XStream実装を使用してデータをシリアライズおよびデシリアライズします。
-
Packager: Packagerは、Gathererで作成したリソースのリストを指定すると、各リソースのシリアライズされたレンディションを作成して、そのレンディションをローカルの
fw_PubDataStore
表に保存します。 -
Transporter: Packagerで作成した
fw_PubDataStore
に含まれるシリアライズされたデータを受け取り、そのデータをターゲット側のfw_PubDataStore
表にコピーします。シリアライズされたデータは、開発者ガイドで説明されているように、カスタマイズされたマルチトランスポータを提供することで、複数の宛先に送信できます。wcs_properties.jsonファイルのプロパティxcelerate.concurrenttransportunpacker
では、(デフォルトで)DataTransporter
およびDataUnpacker
を同時に実行するかどうかが記載されています。「リアルタイム・パブリッシュ・トランスポータを記述するためのコード」を参照してください。 -
Unpacker: ターゲット側の
fw_PubDataStore
表に含まれるシリアライズされたデータを受け取り、そのデータをデシリアライズしてターゲット・データベースに保存します。DataTransporter
およびDataUnpacker
が同時に実行されますが、DataUnpacker
は、主要なパッケージ化が完了し、特定の優先度グループ情報が取得されるまで待機します。実行に必要な
DataUnpacker
スレッドの数は、numParalleltasks
を使用して構成できます。実行するためのDataUnpacker
スレッド数のデフォルト値またはnumParallelTasks
は、MSSQLサーバーおよびDB2データベースの場合は1、Oracleデータベースの場合は3です。デフォルト値は変更しないことをお薦めします。 -
Monitor: パブリッシュ・セッション内のすべての参加者から受信されるすべてのメッセージを追跡して通知します。これらのメッセージは、
PubMessage
およびPubProgress
表に格納されます。PubSessionMonitor
は、非同期メッセージ・システムのコンポーネントです。ポーリング頻度または
pollFreqMillis
はミリ秒で測定され、デフォルト値は5000です。ミリ秒の時間または
timeoutMillis
は、参加者がクラッシュまたはハングしたと推定されるまでPubSessionMonitor
がメッセージを待機する秒数です。デフォルト値は100000ミリ秒(100秒)です。 -
CacheUpdater: CacheUpdaterは、Unpackerで正常に保存されたアセットのリストを指定すると、ページ・キャッシュの関連部分をフラッシュして、必要な場合は再生成します。
指定したページの再生成は、
numThreadsPerServer
の値に基づいて複数のスレッドで実行できます。numThreadsPerServer
のデフォルト値は3です。コンポーネントregenServers
では、ページが再生成されるサーバーにURLのリストを提供します。URLが指定されていない場合、PageCacheUpdaterのデフォルト値は、(ユーザー・リクエストに基づいて)標準の再生成にデフォルトで設定されます。
リアルタイム・パブリッシュは、非同期メッセージングを使用して、各ジョブのステータスを追跡します。メッセージング・フレームワークの詳細を理解する必要はありませんが、ターゲット・システムとの通信がトランスポータによって円滑化されることは理解しておく必要があります。これには、アセットが保存されていることを管理システムに通知するために、Unpackerが発行するメッセージも含まれます。このメッセージにより、そのアセットにパブリッシュ済のマークを付けるように管理ロジックに指示します。トランスポート・フェーズ中に各ターゲットで個別に実行されるため、複数ターゲット・パブリッシュのターゲットごとのパブリッシュ・ジョブは、異なる順序で実行できます。たとえば、ターゲット1のCacheFlusterがターゲット2のUnpackerの前に実行される場合があります。
カスタム・トランスポータの作成
トランスポータでは、HTTP(s)ベースのOOTB (即時利用可能または使用準備済)トランスポートを、異なるプロトコルを使用する別のトランスポートと置き換えることができ、また、同じパブリッシュ・セッション内で複数のターゲットにパブリッシュできます。
トピック:
AbstractTransporterメソッドのオーバーライドの考慮事項
AbstractTransporter
のメソッドをオーバーライドするときには、次の点に注意してください。
-
ping()
には、ターゲットが稼働しているか、停止しているかを確認するロジックを含めます。最もわかりやすい使用法は、パブリッシュ・コンソールに緑色/赤色の診断インジケータを点灯させることです。パブリッシュ・セッションの起動には、pingの成功が必ずしも必要なわけではありませんが、接続の問題を診断するにはpingが便利なツールになります。ターゲットへの接続にhttpを使用している場合は、デフォルトの実装を使用できます。このとき、オーバーライドや独自の実装は必要ありません。
-
sendBatch()
は、リモートfw_PubDataStore
にデータをアップロードする役割を果たします。これは、ローカルのfw_PubDataStore
から、IList形式のデータの小さなバッチで複数回呼び出されます。バッチ処理によりメモリーの使用量を抑えられ、バッチ処理は、気付かないうちに実行されています。 -
remoteExecute()
は、リモート・システムとの通信の役割を果たします。通信は双方向で行われ、管理側はリモートのジョブと取消しのリクエストをディスパッチするコマンドを送信し、ターゲット側はステータスを示すメッセージを送り返します。これらのメッセージの内容は、remoteExecute
には重要ではありません。このメソッドに必要な動作は、該当するリクエストを送信して応答を返すことのみです。 -
listTransports()
は、ターゲットが複数ある場合に、基になるトランスポートのリストになります。ターゲットが1つのみの場合、このメソッドは、現在のトランスポートのtoString()
レンディションを返します。 -
toString()
は、このトランスポートのわかりやすい記述子です。たとえば、一般的な値は、http://mytarget:8081/cs/
です。ただし、targetDataCenter-Virginia
、serverOn8080
などの別の文字列も受け入れられます。
AbstractTransporterのヘルパー・メソッド
AbstractTransporter
には、いくつかのヘルパー・メソッドがあります。
-
protected void writeLog(String msg)
は、パブリッシュ・ログにメッセージを書き込みます。 -
protected AbstractTransporter getStandardTransporterInstance()
は、標準のHTTPベース・トランスポータの新しいインスタンスを取得します。これは、複数のターゲットにトランスポートを実装する際に役立ちます。 -
protected String getParam(String param)
は、パブリッシュ・コンソールで構成された、パブリッシュ・パラメータの値を取得します。
リアルタイム・パブリッシュ・トランスポータを記述するためのコード
com.fatwire.realtime.mypackage; import COM.FutureTense.Interfaces.*; import com.fatwire.cs.core.realtime.TransporterReply; import java.net.URL; import java.util.*; /** * RealTime Publishing transporter to multiple targets. */ public class MultiTransporter extends AbstractTransporter { private boolean initialized = false; List<AbstractTransporter> transporters = new ArrayList(); /** * Ping each underlying target and return true if all of them are up. */ @Override public boolean ping(StringBuilder sbOut) { init(); boolean ret = true; for(AbstractTransporter t : transporters) { boolean thisret = t.ping(sbOut); sbOut.append(t.getRemoteUrl() + (thisret ? " OK" : " Not reachable")); sbOut.append(" ||| "); ret &= thisret; } return ret; } /** * Send the batch to each underliyng transport. */ @Override protected int sendBatch(ICS ics, IList iList, StringBuffer outputMsg) { init(); for(AbstractTransporter t : transporters) { int res = t.sendBatch(ics, iList, outputMsg); if(res != 0) { // Just log the error for now, but this is an // indication that the target may be down // and other notifications may also be appropriate. writeLog("Transporter " + t + " failed with " + res + " " + outputMsg); } } return 0; } /** * Execute the remote command on each transporter and * accumulate their responses. */ @Override protected List<TransporterReply> remoteExecute(ICS ics, String s, Map<String, String> stringStringMap) { init(); List<TransporterReply> res = new ArrayList<TransporterReply>(); for(AbstractTransporter t : transporters) { List<TransporterReply> tres = t.remoteExecute(ics, s, stringStringMap); res.addAll(tres); } return res; } /** * Do some initialization by parsing out the configuration * settings and instantiating a standard http transport * to each target. */ private void init() { if(!initialized) { String remoteURLs = getRemoteUrl(); int count = 0; for(String remoteUrl : remoteURLs.split(",")) { String suffix = (count == 0) ? "" : String.valueOf(count); AbstractTransporter t1 = AbstractTransporter.getStandardTransporterInstance(); URL url; try { url = new URL(remoteUrl); } catch(Exception e) { throw new RuntimeException(e); } t1.setRemoteUrl(remoteUrl); t1.setHost(url.getHost()); t1.setUsername(getParam("REMOTEUSER" + suffix)); t1.setPassword(getParam("REMOTEPASS" + suffix)); t1.setUseHttps("https".equalsIgnoreCase(url.getProtocol())); t1.setContextPath(url.getPath()); t1.setPort(url.getPort()); t1.setProxyserver(getProxyserver()); t1.setProxyport(getProxyport()); t1.setProxyuser(getProxyuser()); t1.setProxypassword(getProxypassword()); t1.setHttpVersion(getHttpVersion()); t1.setTargetIniFile(getTargetIniFile()); transporters.add(t1); ++count; } initialized = true; writeLog("Initialized transporters: " + toString()); } } /** * Provide a full listing of all underlying transports. This is * can be used by other components to determine * whether they need to perform special actions depending on * the number of targets. For example, asset publishing * status processing may need to buffer responses until they're * received from all targets before marking assets published. * @return */ @Override public List<String> listTransports() { init(); List<String> list = new ArrayList(); for(AbstractTransporter t : transporters) { list.add(t.toString()); } return list; } /** * Just a human-friendly description of the transport. This may show * up in the logs, so make it descriptive enougn. */ @Override public String toString() { List<String> transs = listTransports(); StringBuilder sb = new StringBuilder(); for(String t : transs) sb.append(t + " "); return sb.toString(); } }
境界状況のシナリオの理解
「リアルタイム・パブリッシュ・トランスポータを記述するためのコード」に示した例は、すべてのターゲットが実行しているという最適な状況では動作しますが、あるターゲットが短時間または長時間停止しているときもあります。あるターゲットにのみパブリッシュしているときに、アセットにパブリッシュ済としてのマークが付けられると、停止したターゲットは同期されなくなります。このようなシナリオには、次に示す方法で対応できます。
-
ターゲットが短時間停止している場合は、アセットにパブリッシュ済のマークを付けないようにして、実行中のターゲットにはパブリッシュを続けます。別のターゲットが再起動されたときには、その時点より前のすべてのアセットがパブリッシュ用のキューに入れられた状態になります。これらのアセットは、最初のターゲットにも重複してパブリッシュされることになりますが、短時間の場合、このオーバーヘッドは無視できます。
-
ターゲットが長時間停止したままになる場合は、そのターゲットを宛先構成のターゲットのリストからから削除することが最善策になるといえます(この例では、パブリッシュ構成の「宛先アドレス」から2番目のターゲットを削除します)。このようにすると、アクティブなターゲットが1つしかない場合でも、アセットはパブリッシュ済としてマークされ続けます。2番目のターゲットが再起動されたときには、最初にデータベースとファイル・システムの同期を実行して、その後で2番目のターゲットを宛先アドレスのリストに戻します。
管理インスタンスでのアセット・パブリッシュ・イベントのインターセプト
前述の最初の状況では、すべてのターゲットに保存されたアセットにのみにパブリッシュ済としてのマークを付けること必要になります。これを行うには、次に示すように、カスタムの通知ロジックを実装します。
UnpackersとCacheUpdatesの区別
複数の宛先にパブリッシュする場合は、それぞれの宛先ごとのUnpackerとCacheUpdaterを区別すると便利になります。これは、RTパブリッシュ・コンソールでプログレス・バーを調べるときや、ログを調べるときに役立ちます。
このような区別を行うために、ターゲット側のAdvPub.xml
ファイルを編集して、DataUnpacker
BeanとPageCacheUpdater
BeanのID値を変更します。
たとえば:
<bean id="DataUnpacker" class="com.fatwire.realtime.ParallelUnpacker" singleton="false"> <property name="id" value="Unpacker-Virginia2"/> ... </bean> <bean id="PageCacheUpdater" class="com.fatwire.realtime.regen.ParallelRegeneratorEh" singleton="false"> <property name="id" value="CacheFlusher-Virginia2"/> ... </bean>