この章では、リアルタイム・パブリッシュの概要について説明します。また、独自のトランスポータの作成方法、実装の詳細、ヘルパー・メソッド、トランスポータの実装例、完全なコードのリスト、特異な状況のシナリオ、および管理インスタンスでのアセット・パブリッシュ・イベントのインターセプトについて説明します。
この章には次の項が含まれます。
リアルタイム・パブリッシュとは、いくつかのジョブで構成されるパイプラインのことです。一部のジョブは管理インスタンスで実行され、その他のジョブはターゲット・インスタンスで実行されます。次に、これらについて簡単に説明します。
Gatherer: パブリッシュ可能なアセットのリストを作成して、そのリストを追加のリソース(アセット・タイプ、表の行)で装飾します。これらを合せて、パブリッシュするデータの正規セットを構成します。
Packager: Packagerは、Gathererで作成したリソースのリストを指定すると、各リソースのシリアル化されたレンディションを作成して、そのレンディションをローカルのfw_PubDataStore
表に保存します。
Transport: Packagerで作成したfw_PubDataStore
に含まれるシリアル化されたデータを受け取り、そのデータをターゲット側のfw_PubDataStore
表にコピーします。
Unpacker: ターゲット側のfw_PubDataStore
表に含まれるシリアライズ化されたデータを受け取り、そのデータをデシリアライズしてターゲット・データベースに保存します。
CacheUpdater: CacheUpdaterは、Unpackerで正常に保存されたアセットのリストを指定すると、ページ・キャッシュの関連部分をフラッシュして、必要な場合は再生成します。
リアルタイム・パブリッシュは、非同期メッセージングを使用して、各ジョブのステータスを追跡します。メッセージング・フレームワークの詳細を理解する必要はありませんが、ターゲット・システムとの通信がトランスポータによって円滑化されることは理解しておく必要があります。これには、アセットが保存されていることを管理システムに通知するために、Unpackerが発行するメッセージも含まれます。このメッセージにより、そのアセットにパブリッシュ済のマークを付けるように管理ロジックに指示します。
トランスポータには、次に示すいくつかのカスタマイズ・オプションが用意されています。
HTTPに基づくOOTB (デフォルトまたは設定済)トラスポートを、別のプロトコルを使用するトランスポートに置き換える。
同一のパブリッシュ・セッションで複数のターゲットにパブリッシュする。
この項の内容は、次のとおりです。
独自のトランスポータを作成するには、次の手順を実行します。
com.fatwire.realtime.AbstractTransporter
クラスのサブクラスを作成します。
メソッドping
、sendBatch
、listTransports
、toString
、およびremoteExecute
をオーバーライドします。
管理側のclasses/AdvPub.xml
ファイルを編集して、トランスポータをインストールします。
次の行を
<bean id="DataTransporter" class="com.fatwire.realtime.MirrorBasedTransporterImpl" singleton="false">
次の行に置き換えます
<bean id="DataTransporter" class="[your transporter class]" singleton="false">
AbstractTransporter
のメソッドをオーバーライドするときには、次の点に注意してください。
ping()
には、ターゲットが稼働しているか、停止しているかを確認するロジックを含めます。最もわかりやすい使用法は、パブリッシュ・コンソールに緑色/赤色の診断インジケータを点灯させることです。パブリッシュ・セッションの起動には、pingの成功が必ずしも必要なわけではありませんが、接続の問題を診断するにはpingが便利なツールになります。
ターゲットへの接続にhttpを使用している場合は、デフォルトの実装を使用できます。このとき、オーバーライドや独自の実装は必要ありません。
sendBatch()
は、リモートfw_PubDataStore
にデータをアップロードする役割を果たします。これは、ローカルのfw_PubDataStore
から、IList形式のデータの小さなバッチで複数回呼び出されます。バッチ処理によりメモリーの使用量を抑えられます。バッチ処理は、気付かないうちに実行されています。
remoteExecute()
は、リモート・システムとの通信の役割を果たします。通信は双方向で行われ、管理側はリモートのジョブと取消しのリクエストをディスパッチするコマンドを送信し、ターゲット側はステータスを示すメッセージを送り返します。これらのメッセージの内容は、remoteExecute
には重要ではありません。このメソッドに必要な動作は、該当するリクエストを送信して応答を返すことのみです。
listTransports()
は、ターゲットが複数ある場合に、基になるトランスポートのリストになります。ターゲットが1つのみの場合、このメソッドは、現在のトランスポートのtoString()
レンディションを返します。
toString()
は、このトランスポートのわかりやすい記述子です。たとえば、一般的な値は、http://mytarget:8081/cs/
です。ただし、targetDataCenter-Virginia
、serverOn8080
などの別の文字列も受け入れられます。
AbstractTransporter
には、いくつかのヘルパー・メソッドがあります。
protected void writeLog(String msg)
は、パブリッシュ・ログにメッセージを書き込みます
protected AbstractTransporter getStandardTransporterInstance()
は、標準のHTTPベース・トランスポータの新しいインスタンスを取得します。これは、複数のターゲットにトランスポートを実装する際に役立ちます。
protected String getParam(String param)
は、パブリッシュ・コンソールで構成された、パブリッシュ・パラメータの値を取得します。
次に、複数のターゲットに対して動作する、トランスポータの実装例を示します。ターゲットは、次のように構成します。
「宛先アドレス」には、宛先URLをカンマで区切って指定します。
例:
http://tgt1:9030/cs/ http://virginia:9040/cs/
「他の引数」には、アンパサンドで区切ったユーザー名とパスワードを指定します。また、必要に応じて、1から始まるインデックスを接尾辞に使用して、追加サーバーのプロキシ情報を指定します。
追加ターゲットが1つある場合の例を示します。
REMOTEUSER1=fwadmin&REMOTEPASS1=xceladmin&PROXYSERVER1=proxy.com&PROXYPORT1=9090&PROXYUSER1=pxuser&PROXYPASSWORD1=pxpass
AdvPub.xml
で、DataTransporter
Beanのエントリを、次のエントリに置換します。
<bean id="DataTransporter" class="my.sample.MultiTransporter" singleton="false">
package my.sample; import COM.FutureTense.Interfaces.*; import com.fatwire.cs.core.realtime.TransporterReply; import java.net.URL; import java.util.*; /** * RealTime Publishing transporter to multiple targets. */ public class MultiTransporter extends AbstractTransporter { private boolean initialized = false; List<AbstractTransporter> transporters = new ArrayList(); /** * Ping each underlying target and return true if all of them are up. */ @Override public boolean ping(StringBuilder sbOut) { init(); boolean ret = true; for(AbstractTransporter t : transporters) { boolean thisret = t.ping(sbOut); sbOut.append(t.getRemoteUrl() + (thisret ? " OK" : " Not reachable")); sbOut.append(" ||| "); ret &= thisret; } return ret; } /** * Send the batch to each underliyng transport. */ @Override protected int sendBatch(ICS ics, IList iList, StringBuffer outputMsg) { init(); for(AbstractTransporter t : transporters) { int res = t.sendBatch(ics, iList, outputMsg); if(res != 0) { // Just log the error for now, but this is an // indication that the target may be down // and other notifications may also be appropriate. writeLog("Transporter " + t + " failed with " + res + " " + outputMsg); } } return 0; } /** * Execute the remote command on each transporter and * accumulate their responses. */ @Override protected List<TransporterReply> remoteExecute(ICS ics, String s, Map<String, String> stringStringMap) { init(); List<TransporterReply> res = new ArrayList<TransporterReply>(); for(AbstractTransporter t : transporters) { List<TransporterReply> tres = t.remoteExecute(ics, s, stringStringMap); res.addAll(tres); } return res; } /** * Do some initialization by parsing out the configuration * settings and instantiating a standard http transport * to each target. */ private void init() { if(!initialized) { String remoteURLs = getRemoteUrl(); int count = 0; for(String remoteUrl : remoteURLs.split(",")) { String suffix = (count == 0) ? "" : String.valueOf(count); AbstractTransporter t1 = AbstractTransporter.getStandardTransporterInstance(); URL url; try { url = new URL(remoteUrl); } catch(Exception e) { throw new RuntimeException(e); } t1.setRemoteUrl(remoteUrl); t1.setHost(url.getHost()); t1.setUsername(getParam("REMOTEUSER" + suffix)); t1.setPassword(getParam("REMOTEPASS" + suffix)); t1.setUseHttps("https".equalsIgnoreCase(url.getProtocol())); t1.setContextPath(url.getPath()); t1.setPort(url.getPort()); t1.setProxyserver(getProxyserver()); t1.setProxyport(getProxyport()); t1.setProxyuser(getProxyuser()); t1.setProxypassword(getProxypassword()); t1.setHttpVersion(getHttpVersion()); t1.setTargetIniFile(getTargetIniFile()); transporters.add(t1); ++count; } initialized = true; writeLog("Initialized transporters: " + toString()); } } /** * Provide a full listing of all underlying transports. This is * can be used by other components to determine * whether they need to perform special actions depending on * the number of targets. For example, asset publishing * status processing may need to buffer responses until they're * received from all targets before marking assets published. * @return */ @Override public List<String> listTransports() { init(); List<String> list = new ArrayList(); for(AbstractTransporter t : transporters) { list.add(t.toString()); } return list; } /** * Just a human-friendly description of the transport. This may show * up in the logs, so make it descriptive enougn. */ @Override public String toString() { List<String> transs = listTransports(); StringBuilder sb = new StringBuilder(); for(String t : transs) sb.append(t + " "); return sb.toString(); } }
第49.2.5項「完全なコードのリスト」に示した例は、すべてのターゲットが実行しているという最適な状況では動作しますが、あるターゲットが短時間または長時間停止しているときもあります。あるターゲットにのみパブリッシュしているときに、アセットにパブリッシュ済としてのマークが付けられると、停止したターゲットは同期されなくなります。このようなシナリオには、次に示す方法で対応できます。
ターゲットが短時間停止している場合は、アセットにパブリッシュ済のマークを付けないようにして、実行中のターゲットにはパブリッシュを続けます。別のターゲットが再起動されたときには、その時点より前のすべてのアセットがパブリッシュ用のキューに入れられた状態になります。これらのアセットは、最初のターゲットにも重複してパブリッシュされることになりますが、短時間の場合、このオーバーヘッドは無視できます。
ターゲットが長時間停止したままになる場合は、そのターゲットを宛先構成のターゲットのリストからから削除することが最善策になるといえます(この例では、パブリッシュ構成の「宛先アドレス」から2番目のターゲットを削除します)。このようにすると、アクティブなターゲットが1つしかない場合でも、アセットはパブリッシュ済としてマークされ続けます。2番目のターゲットが再起動されたときには、最初にデータベースとファイル・システムの同期を実行して、その後で2番目のターゲットを宛先アドレスのリストに戻します。
前述の最初の状況では、すべてのターゲットに保存されたアセットにのみにパブリッシュ済としてのマークを付けること必要になります。これを行うには、次に示すように、カスタムの通知ロジックを実装します。
com.fatwire.realtime.messaging.AssetPublishCallback
を開きます。
notify()
メソッドをオーバーライドします。必要に応じて、progressUpdate()
メソッドもオーバーライドします。
詳細な実装例は、後述の「手順1および2の実装例」を参照してください。
管理側で、AdvPub.xml
のコールバックを有効にします。
AssetCallback
Beanを追加します。
このBeanをPubsessionMonitor
に登録します。
コードについては、後述の「手順3のコールバックBeanの有効化」を参照してください。
手順1および2の実装例
package my.sample; import com.fatwire.assetapi.data.AssetId; import java.util.HashMap; import java.util.Map; /** * Buffer asset save notifications until we've received one * from each target. Then mark asset published. */ public class AssetPublishCallbackMulti extends AssetPublishCallback { Map<String, Integer> saveEventsCount = new HashMap<String, Integer>(); /** * Receive notifications about the asset status. * Currently the only available status is SAVED. */ @Override public void notify(AssetId assetId, String status, String from) { String assetIdStr = String.valueOf(assetId); writeLog("Got " + status + " notification from " + from + " for " + assetIdStr); if("SAVED".equals(status)) { Integer numNotifications; if((numNotifications = saveEventsCount.get(assetIdStr)) == null) { numNotifications = 0; } numNotifications = numNotifications + 1; saveEventsCount.put(assetIdStr, numNotifications); if(numNotifications == this.getTargets().size()) { super.notify(assetId, status, from); writeLog("Marked " + assetIdStr + " published"); } } } /** * Intercept progress update messages. Can be used for * monitoring the health of the system but is not required. */ @Override public void progressUpdate(String sessionId, String job, String where, String progress, String lastAction, char status) { super.progressUpdate(sessionId, job, where, progress, lastAction, status); } }
手順3のコールバックBeanの有効化
AssetCallback
Beanを追加するには:
<bean id="AssetCallback" class="my.sample.AssetPublishCallbackMulti" singleton="false"/>
このBeanをPubsessionMonitor
に登録するには:
<bean id="PubsessionMonitor" class="com.fatwire.realtime.messaging.PubsessionMonitor" singleton="false"> <constructor-arg index="0"> <ref local="DataTransporter" /> </constructor-arg> <constructor-arg index="1"> <ref local="AssetCallback" /> </constructor-arg> <property name="pollFreqMillis" value="5000" /> <property name="timeoutMillis" value="100000" /> </bean>
複数の宛先にパブリッシュする場合は、それぞれの宛先ごとのUnpackerとCacheUpdaterを区別すると便利になります。これは、RTパブリッシュ・コンソールでプログレス・バーを調べるときや、ログを調べるときに役立ちます。このような区別を行うために、ターゲット側のAdvPub.xml
ファイルを編集して、DataUnpacker
BeanとPageCacheUpdater
BeanのID値を変更します。
例:
<bean id="DataUnpacker" class="com.fatwire.realtime.ParallelUnpacker" singleton="false"> <property name="id" value="Unpacker-Virginia2"/> ... </bean> <bean id="PageCacheUpdater" class="com.fatwire.realtime.regen.ParallelRegeneratorEh" singleton="false"> <property name="id" value="CacheFlusher-Virginia2"/> ... </bean>