顧客のビジネス・ニーズに従って、リアルタイム・パブリッシュをカスタマイズできます。詳細は、「カスタマイズ・フックのリアルタイム・パブリッシュの使用」および「WebCenter Sitesのアセットおよびパブリッシュ・イベントの理解」を参照してください。
リソース: 承認済アセットおよび表などの非アセット・データを示すのに使用される一般的な用語です。
リソース・グループ: なんらかの関係のあるリソースのグループです。
カスタマイズ・フックのリアルタイム・パブリッシュの使用の詳細は、次の項を参照してください。
リアルタイム・パブリッシュとは、いくつかのジョブで構成されるパイプラインのことです。一部のジョブは管理インスタンスで実行され、その他のジョブはターゲット・インスタンスで実行されます。リアルタイム・パブリッシュのパラメータは、advpub.xmlファイルにあります。次に、各ジョブについて簡単に説明します。
ApprovalAggregatingGroupingStrategy – デフォルトのグループ化戦略で、この戦略を保持することをお薦めします。リソース・グループのコレクションを作成します。リソース・グループでは、そのグループ内で同じタイプの大部分のアセットが含まれるのもが主要なアセット・タイプになります。リソース・グループの作成中に、この戦略によって、同じアセット・タイプのアセットの数が維持されます。そのグループの主要なアセット・タイプが最大数が含まれるものになります。同様に、リソース・グループ・コレクションの作成中に、この戦略によって、同じ主要アセット・タイプのグループが一緒に収集されます。この戦略では、同じタイプのアセットのロードと保存が一括で高速に機能するため、同様のグループを集約できます。この戦略では、グループのサイズが相対的に小さくなり、グループの数が多くなります。
ApprovalAccumulatingGroupingStrategy – この戦略では、リソース・グループを収集するだけで、追加の処理が実行されません。グループのサイズは相対的に大きくなり、グループの数が少なくなります。
DataSerializer and DataDeserializer: XStream実装を使用してデータをシリアライズおよびデシリアライズします。
Packager: Packagerは、Gathererで作成したリソースのリストを指定すると、各リソースのシリアライズされたレンディションを作成して、そのレンディションをローカルのfw_PubDataStore表に保存します。
Transporter: Packagerで作成したfw_PubDataStoreに含まれるシリアライズされたデータを受け取り、そのデータをターゲット側のfw_PubDataStore表にコピーします。シリアライズされたデータは、開発者ガイドで説明されているように、カスタマイズされたマルチトランスポータを提供することで、複数の宛先に送信できます。wcs_properties.jsonファイルのプロパティxcelerate.concurrenttransportunpackerでは、(デフォルトで)DataTransporterおよびDataUnpackerを同時に実行するかどうかが記載されています。リアルタイム・パブリッシュ・トランスポータを記述するためのコードの詳細は、「リアルタイム・パブリッシュ・トランスポータを記述するためのコード」を参照してください。
Unpacker: ターゲット側のfw_PubDataStore表に含まれるシリアライズされたデータを受け取り、そのデータをデシリアライズしてターゲット・データベースに保存します。DataTransporterおよびDataUnpackerが同時に実行されますが、DataUnpackerは、主要なパッケージ化が完了し、特定の優先度グループ情報が取得されるまで待機します。
実行に必要なDataUnpackerスレッドの数は、numParalleltasksを使用して構成できます。実行するためのDataUnpackerスレッド数のデフォルト値またはnumParallelTasksは、MSSQLサーバーおよびDB2データベースの場合は1、Oracleデータベースの場合は3です。デフォルト値は変更しないことをお薦めします。
Monitor: パブリッシュ・セッション内のすべての参加者から受信されるすべてのメッセージを追跡して通知します。これらのメッセージは、PubMessageおよびPubProgress表に格納されます。PubSessionMonitorは、非同期メッセージ・システムのコンポーネントです。
ポーリング頻度またはpollFreqMillisはミリ秒で測定され、デフォルト値は5000です。
ミリ秒の時間またはtimeoutMillisは、参加者がクラッシュまたはハングしたと推定されるまでPubSessionMonitorがメッセージを待機する秒数です。デフォルト値は100000ミリ秒(100秒)です。
CacheUpdater: CacheUpdaterは、Unpackerで正常に保存されたアセットのリストを指定すると、ページ・キャッシュの関連部分をフラッシュして、必要な場合は再生成します。
指定したページの再生成は、numThreadsPerServerの値に基づいて複数のスレッドで実行できます。numThreadsPerServerのデフォルト値は3です。コンポーネントregenServersでは、ページが再生成されるサーバーにURLのリストを提供します。URLが指定されていない場合、PageCacheUpdaterのデフォルト値は、(ユーザー・リクエストに基づいて)標準の再生成にデフォルトで設定されます。
リアルタイム・パブリッシュは、非同期メッセージングを使用して、各ジョブのステータスを追跡します。メッセージング・フレームワークの詳細を理解する必要はありませんが、ターゲット・システムとの通信がトランスポータによって円滑化されることは理解しておく必要があります。これには、アセットが保存されていることを管理システムに通知するために、Unpackerが発行するメッセージも含まれます。このメッセージにより、そのアセットにパブリッシュ済のマークを付けるように管理ロジックに指示します。トランスポート・フェーズ中に各ターゲットで個別に実行されるため、複数ターゲット・パブリッシュのターゲットごとのパブリッシュ・ジョブは、異なる順序で実行できます。たとえば、ターゲット1のCacheFlusterがターゲット2のUnpackerの前に実行される場合があります。
トランスポータには、次に示すいくつかのカスタマイズ・オプションが用意されています。
HTTPに基づくOOTB (デフォルトまたは設定済)トラスポートを、別のプロトコルを使用するトランスポートに置き換える。
同一のパブリッシュ・セッションで複数のターゲットにパブリッシュする。
この項には次のトピックが含まれます:
AbstractTransporterのメソッドをオーバーライドするときには、次の点に注意してください。
ping()には、ターゲットが稼働しているか、停止しているかを確認するロジックを含めます。最もわかりやすい使用法は、パブリッシュ・コンソールに緑色/赤色の診断インジケータを点灯させることです。パブリッシュ・セッションの起動には、pingの成功が必ずしも必要なわけではありませんが、接続の問題を診断するにはpingが便利なツールになります。
ターゲットへの接続にhttpを使用している場合は、デフォルトの実装を使用できます。このとき、オーバーライドや独自の実装は必要ありません。
sendBatch()は、リモートfw_PubDataStoreにデータをアップロードする役割を果たします。これは、ローカルのfw_PubDataStoreから、IList形式のデータの小さなバッチで複数回呼び出されます。バッチ処理によりメモリーの使用量を抑えられ、バッチ処理は、気付かないうちに実行されています。
remoteExecute()は、リモート・システムとの通信の役割を果たします。通信は双方向で行われ、管理側はリモートのジョブと取消しのリクエストをディスパッチするコマンドを送信し、ターゲット側はステータスを示すメッセージを送り返します。これらのメッセージの内容は、remoteExecuteには重要ではありません。このメソッドに必要な動作は、該当するリクエストを送信して応答を返すことのみです。
listTransports()は、ターゲットが複数ある場合に、基になるトランスポートのリストになります。ターゲットが1つのみの場合、このメソッドは、現在のトランスポートのtoString()レンディションを返します。
toString()は、このトランスポートのわかりやすい記述子です。たとえば、一般的な値は、http://mytarget:8081/cs/です。ただし、targetDataCenter-Virginia、serverOn8080などの別の文字列も受け入れられます。
AbstractTransporterには、いくつかのヘルパー・メソッドがあります。
protected void writeLog(String msg)は、パブリッシュ・ログにメッセージを書き込みます。
protected AbstractTransporter getStandardTransporterInstance()は、標準のHTTPベース・トランスポータの新しいインスタンスを取得します。これは、複数のターゲットにトランスポートを実装する際に役立ちます。
protected String getParam(String param)は、パブリッシュ・コンソールで構成された、パブリッシュ・パラメータの値を取得します。
package my.sample;
import COM.FutureTense.Interfaces.*;
import com.fatwire.cs.core.realtime.TransporterReply;
import java.net.URL;
import java.util.*;
/**
* RealTime Publishing transporter to multiple targets.
*/
public class MultiTransporter extends AbstractTransporter
{
private boolean initialized = false;
List<AbstractTransporter> transporters = new ArrayList();
/**
* Ping each underlying target and return true if all of them are up.
*/
@Override
public boolean ping(StringBuilder sbOut)
{
init();
boolean ret = true;
for(AbstractTransporter t : transporters)
{
boolean thisret = t.ping(sbOut);
sbOut.append(t.getRemoteUrl() + (thisret ? " OK" : " Not reachable"));
sbOut.append(" ||| ");
ret &= thisret;
}
return ret;
}
/**
* Send the batch to each underliyng transport.
*/
@Override
protected int sendBatch(ICS ics, IList iList, StringBuffer outputMsg)
{
init();
for(AbstractTransporter t : transporters)
{
int res = t.sendBatch(ics, iList, outputMsg);
if(res != 0)
{
// Just log the error for now, but this is an
// indication that the target may be down
// and other notifications may also be appropriate.
writeLog("Transporter " + t + " failed with " + res + " " + outputMsg);
}
}
return 0;
}
/**
* Execute the remote command on each transporter and
* accumulate their responses.
*/
@Override
protected List<TransporterReply> remoteExecute(ICS ics, String s,
Map<String, String> stringStringMap)
{
init();
List<TransporterReply> res = new ArrayList<TransporterReply>();
for(AbstractTransporter t : transporters)
{
List<TransporterReply> tres = t.remoteExecute(ics, s, stringStringMap);
res.addAll(tres);
}
return res;
}
/**
* Do some initialization by parsing out the configuration
* settings and instantiating a standard http transport
* to each target.
*/
private void init()
{
if(!initialized)
{
String remoteURLs = getRemoteUrl();
int count = 0;
for(String remoteUrl : remoteURLs.split(","))
{
String suffix = (count == 0) ? "" : String.valueOf(count);
AbstractTransporter t1 =
AbstractTransporter.getStandardTransporterInstance();
URL url;
try
{
url = new URL(remoteUrl);
}
catch(Exception e)
{
throw new RuntimeException(e);
}
t1.setRemoteUrl(remoteUrl);
t1.setHost(url.getHost());
t1.setUsername(getParam("REMOTEUSER" + suffix));
t1.setPassword(getParam("REMOTEPASS" + suffix));
t1.setUseHttps("https".equalsIgnoreCase(url.getProtocol()));
t1.setContextPath(url.getPath());
t1.setPort(url.getPort());
t1.setProxyserver(getProxyserver());
t1.setProxyport(getProxyport());
t1.setProxyuser(getProxyuser());
t1.setProxypassword(getProxypassword());
t1.setHttpVersion(getHttpVersion());
t1.setTargetIniFile(getTargetIniFile()); transporters.add(t1);
++count;
}
initialized = true;
writeLog("Initialized transporters: " + toString());
}
}
/**
* Provide a full listing of all underlying transports. This is
* can be used by other components to determine
* whether they need to perform special actions depending on
* the number of targets. For example, asset publishing
* status processing may need to buffer responses until they're
* received from all targets before marking assets published.
* @return
*/
@Override
public List<String> listTransports()
{
init();
List<String> list = new ArrayList();
for(AbstractTransporter t : transporters)
{
list.add(t.toString());
}
return list;
}
/**
* Just a human-friendly description of the transport. This may show
* up in the logs, so make it descriptive enougn.
*/
@Override
public String toString()
{
List<String> transs = listTransports();
StringBuilder sb = new StringBuilder();
for(String t : transs)
sb.append(t + " ");
return sb.toString();
}
}
「リアルタイム・パブリッシュ・トランスポータを記述するためのコード」に示した例は、すべてのターゲットが実行しているという最適な状況では動作しますが、あるターゲットが短時間または長時間停止しているときもあります。あるターゲットにのみパブリッシュしているときに、アセットにパブリッシュ済としてのマークが付けられると、停止したターゲットは同期されなくなります。このようなシナリオには、次に示す方法で対応できます。
ターゲットが短時間停止している場合は、アセットにパブリッシュ済のマークを付けないようにして、実行中のターゲットにはパブリッシュを続けます。別のターゲットが再起動されたときには、その時点より前のすべてのアセットがパブリッシュ用のキューに入れられた状態になります。これらのアセットは、最初のターゲットにも重複してパブリッシュされることになりますが、短時間の場合、このオーバーヘッドは無視できます。
ターゲットが長時間停止したままになる場合は、そのターゲットを宛先構成のターゲットのリストからから削除することが最善策になるといえます(この例では、パブリッシュ構成の「宛先アドレス」から2番目のターゲットを削除します)。このようにすると、アクティブなターゲットが1つしかない場合でも、アセットはパブリッシュ済としてマークされ続けます。2番目のターゲットが再起動されたときには、最初にデータベースとファイル・システムの同期を実行して、その後で2番目のターゲットを宛先アドレスのリストに戻します。
前述の最初の状況では、すべてのターゲットに保存されたアセットにのみにパブリッシュ済としてのマークを付けること必要になります。これを行うには、次に示すように、カスタムの通知ロジックを実装します。
手順1および2の実装例
package my.sample;
import com.fatwire.assetapi.data.AssetId;
import java.util.HashMap;
import java.util.Map;
/**
* Buffer asset save notifications until we've received one
* from each target. Then mark asset published.
*/
public class AssetPublishCallbackMulti extends AssetPublishCallback
{
Map<String, Integer> saveEventsCount = new HashMap<String, Integer>();
/**
* Receive notifications about the asset status.
* Currently the only available status is SAVED.
*/
@Override
public void notify(AssetId assetId, String status, String from)
{
String assetIdStr = String.valueOf(assetId);
writeLog("Got " + status + " notification from "
+ from + " for " + assetIdStr);
if("SAVED".equals(status))
{
Integer numNotifications;
if((numNotifications = saveEventsCount.get(assetIdStr)) == null)
{
numNotifications = 0;
}
numNotifications = numNotifications + 1;
saveEventsCount.put(assetIdStr, numNotifications);
if(numNotifications == this.getTargets().size())
{
super.notify(assetId, status, from);
writeLog("Marked " + assetIdStr + " published");
}
}
}
/**
* Intercept progress update messages. Can be used for
* monitoring the health of the system but is not required.
*/
@Override
public void progressUpdate(String sessionId, String job,
String where, String progress, String lastAction, char status)
{
super.progressUpdate(sessionId, job, where, progress, lastAction, status);
}
}
手順3のコールバックBeanの有効化
AssetCallback Beanを追加するには:
<bean id="AssetCallback" class="my.sample.AssetPublishCallbackMulti" singleton="false"/>
このBeanをPubsessionMonitorに登録するには:
<bean id="PubsessionMonitor"
class="com.fatwire.realtime.messaging.PubsessionMonitor"
singleton="false">
<constructor-arg index="0">
<ref local="DataTransporter" />
</constructor-arg>
<constructor-arg index="1">
<ref local="AssetCallback" />
</constructor-arg>
<property name="pollFreqMillis" value="5000" />
<property name="timeoutMillis" value="100000" />
</bean>
複数の宛先にパブリッシュする場合は、それぞれの宛先ごとのUnpackerとCacheUpdaterを区別すると便利になります。これは、RTパブリッシュ・コンソールでプログレス・バーを調べるときや、ログを調べるときに役立ちます。
このような区別を行うために、ターゲット側のAdvPub.xmlファイルを編集して、DataUnpacker BeanとPageCacheUpdater BeanのID値を変更します。
例:
<bean id="DataUnpacker" class="com.fatwire.realtime.ParallelUnpacker" singleton="false"> <property name="id" value="Unpacker-Virginia2"/> ... </bean> <bean id="PageCacheUpdater" class="com.fatwire.realtime.regen.ParallelRegeneratorEh" singleton="false"> <property name="id" value="CacheFlusher-Virginia2"/> ... </bean>