64 カスタマイズ・フックのリアルタイム・パブリッシュの使用

リアルタイム・パブリッシュのカスタマイズ時に実行する必要があるいくつかのことは、独自のトランスポータの作成、実装の詳細の作成、ヘルパー・メソッドの作成、トランスポータの実装例の作成、完全なコードのリストの作成、および特異な状況のシナリオの作成です。管理インスタンスでのアセット・パブリッシュ・イベントのインターセプトに関する情報を書き込む必要がある場合もあります。

顧客のビジネス・ニーズに従って、リアルタイム・パブリッシュをカスタマイズできます。「カスタマイズ・フックのリアルタイム・パブリッシュの使用」および「WebCenter Sitesのアセットおよびパブリッシュ・イベントの理解」を参照してください。

リアルタイム・パブリッシュ環境は、advpub.xmlファイルでWebCenter Sitesに構成されます。次のリストは、パブリッシュ・コンポーネントの説明に使用されるいくつかの用語を示します。
  • リソース: 承認済アセットおよび表などの非アセット・データを示すのに使用される一般的な用語です。

  • リソース・グループ: なんらかの関係のあるリソースのグループです。

トピック:

リアルタイム・パブリッシュについて

リアルタイム・パブリッシュとは、いくつかのジョブで構成されるパイプラインのことです。一部のジョブは管理インスタンスで実行され、その他のジョブはターゲット・インスタンスで実行されます。リアルタイム・パブリッシュのパラメータは、advpub.xmlファイルにあります。

次に、各ジョブについて簡単に説明します。

  • Gatherer: パブリッシュ可能なアセットのリストを作成して、そのリストを追加のリソース(アセット・タイプ、表の行)で装飾します。これらを合せて、パブリッシュするデータの正規セットを構成します。これらは、基礎となる承認グループ化戦略に依存しながら、相互依存のリソースのグループを作成することで実行されます。グループ化戦略では、承認済リソースのリストを取得して、それらをグループに編成します。使用可能なグループ化戦略は次のとおりです。
    • ApprovalAggregatingGroupingStrategy – デフォルトのグループ化戦略で、この戦略を保持することをお薦めします。リソース・グループのコレクションを作成します。リソース・グループでは、そのグループ内で同じタイプの大部分のアセットが含まれるのもが主要なアセット・タイプになります。リソース・グループの作成中に、この戦略によって、同じアセット・タイプのアセットの数が維持されます。そのグループの主要なアセット・タイプが最大数が含まれるものになります。同様に、リソース・グループ・コレクションの作成中に、この戦略によって、同じ主要アセット・タイプのグループが一緒に収集されます。この戦略では、同じタイプのアセットのロードと保存が一括で高速に機能するため、同様のグループを集約できます。この戦略では、グループのサイズが相対的に小さくなり、グループの数が多くなります。

    • ApprovalAccumulatingGroupingStrategy – この戦略では、リソース・グループを収集するだけで、追加の処理が実行されません。グループのサイズは相対的に大きくなり、グループの数が少なくなります。

  • DataSerializer and DataDeserializer: XStream実装を使用してデータをシリアライズおよびデシリアライズします。

  • Packager: Packagerは、Gathererで作成したリソースのリストを指定すると、各リソースのシリアライズされたレンディションを作成して、そのレンディションをローカルのfw_PubDataStore表に保存します。

  • Transporter: Packagerで作成したfw_PubDataStoreに含まれるシリアライズされたデータを受け取り、そのデータをターゲット側のfw_PubDataStore表にコピーします。シリアライズされたデータは、開発者ガイドで説明されているように、カスタマイズされたマルチトランスポータを提供することで、複数の宛先に送信できます。wcs_properties.jsonファイルのプロパティxcelerate.concurrenttransportunpackerでは、(デフォルトで)DataTransporterおよびDataUnpackerを同時に実行するかどうかが記載されています。「リアルタイム・パブリッシュ・トランスポータを記述するためのコード」を参照してください。

  • Unpacker: ターゲット側のfw_PubDataStore表に含まれるシリアライズされたデータを受け取り、そのデータをデシリアライズしてターゲット・データベースに保存します。DataTransporterおよびDataUnpackerが同時に実行されますが、DataUnpackerは、主要なパッケージ化が完了し、特定の優先度グループ情報が取得されるまで待機します。

    実行に必要なDataUnpackerスレッドの数は、numParalleltasksを使用して構成できます。実行するためのDataUnpackerスレッド数のデフォルト値またはnumParallelTasksは、MSSQLサーバーおよびDB2データベースの場合は1、Oracleデータベースの場合は3です。デフォルト値は変更しないことをお薦めします。

  • Monitor: パブリッシュ・セッション内のすべての参加者から受信されるすべてのメッセージを追跡して通知します。これらのメッセージは、PubMessageおよびPubProgress表に格納されます。PubSessionMonitorは、非同期メッセージ・システムのコンポーネントです。

    ポーリング頻度またはpollFreqMillisはミリ秒で測定され、デフォルト値は5000です。

    ミリ秒の時間またはtimeoutMillisは、参加者がクラッシュまたはハングしたと推定されるまでPubSessionMonitorがメッセージを待機する秒数です。デフォルト値は100000ミリ秒(100秒)です。

  • CacheUpdater: CacheUpdaterは、Unpackerで正常に保存されたアセットのリストを指定すると、ページ・キャッシュの関連部分をフラッシュして、必要な場合は再生成します。

    指定したページの再生成は、numThreadsPerServerの値に基づいて複数のスレッドで実行できます。numThreadsPerServerのデフォルト値は3です。コンポーネントregenServersでは、ページが再生成されるサーバーにURLのリストを提供します。URLが指定されていない場合、PageCacheUpdaterのデフォルト値は、(ユーザー・リクエストに基づいて)標準の再生成にデフォルトで設定されます。

リアルタイム・パブリッシュは、非同期メッセージングを使用して、各ジョブのステータスを追跡します。メッセージング・フレームワークの詳細を理解する必要はありませんが、ターゲット・システムとの通信がトランスポータによって円滑化されることは理解しておく必要があります。これには、アセットが保存されていることを管理システムに通知するために、Unpackerが発行するメッセージも含まれます。このメッセージにより、そのアセットにパブリッシュ済のマークを付けるように管理ロジックに指示します。トランスポート・フェーズ中に各ターゲットで個別に実行されるため、複数ターゲット・パブリッシュのターゲットごとのパブリッシュ・ジョブは、異なる順序で実行できます。たとえば、ターゲット1のCacheFlusterがターゲット2のUnpackerの前に実行される場合があります。

カスタム・トランスポータの作成

トランスポータでは、HTTP(s)ベースのOOTB (即時利用可能または使用準備済)トランスポートを、異なるプロトコルを使用する別のトランスポートと置き換えることができ、また、同じパブリッシュ・セッション内で複数のターゲットにパブリッシュできます。

トピック:

独自のトランスポータの作成

独自のトランスポータを作成するには、次のステップを実行します。

  1. com.fatwire.realtime.AbstractTransporterクラスのサブクラスを作成します。
  2. メソッドpingsendBatchlistTransportstoString、およびremoteExecuteをオーバーライドします。
  3. 管理側のclasses/AdvPub.xmlファイルを編集して、トランスポータをインストールします。

    次の行:

    <bean id="DataTransporter"
      class="com.fatwire.realtime.MultiTransporter" 
      scope ="prototype">
    

    次の行に置き換えます

    <bean id="DataTransporter" class="[your transporter class]" scope="prototype">

AbstractTransporterメソッドのオーバーライドの考慮事項

AbstractTransporterのメソッドをオーバーライドするときには、次の点に注意してください。

  • ping()には、ターゲットが稼働しているか、停止しているかを確認するロジックを含めます。最もわかりやすい使用法は、パブリッシュ・コンソールに緑色/赤色の診断インジケータを点灯させることです。パブリッシュ・セッションの起動には、pingの成功が必ずしも必要なわけではありませんが、接続の問題を診断するにはpingが便利なツールになります。

    ターゲットへの接続にhttpを使用している場合は、デフォルトの実装を使用できます。このとき、オーバーライドや独自の実装は必要ありません。

  • sendBatch()は、リモートfw_PubDataStoreにデータをアップロードする役割を果たします。これは、ローカルのfw_PubDataStoreから、IList形式のデータの小さなバッチで複数回呼び出されます。バッチ処理によりメモリーの使用量を抑えられ、バッチ処理は、気付かないうちに実行されています。

  • remoteExecute()は、リモート・システムとの通信の役割を果たします。通信は双方向で行われ、管理側はリモートのジョブと取消しのリクエストをディスパッチするコマンドを送信し、ターゲット側はステータスを示すメッセージを送り返します。これらのメッセージの内容は、remoteExecuteには重要ではありません。このメソッドに必要な動作は、該当するリクエストを送信して応答を返すことのみです。

  • listTransports()は、ターゲットが複数ある場合に、基になるトランスポートのリストになります。ターゲットが1つのみの場合、このメソッドは、現在のトランスポートのtoString()レンディションを返します。

  • toString()は、このトランスポートのわかりやすい記述子です。たとえば、一般的な値は、http://mytarget:8081/cs/です。ただし、targetDataCenter-VirginiaserverOn8080などの別の文字列も受け入れられます。

AbstractTransporterのヘルパー・メソッド

AbstractTransporterには、いくつかのヘルパー・メソッドがあります。

  • protected void writeLog(String msg)は、パブリッシュ・ログにメッセージを書き込みます。

  • protected AbstractTransporter getStandardTransporterInstance()は、標準のHTTPベース・トランスポータの新しいインスタンスを取得します。これは、複数のターゲットにトランスポートを実装する際に役立ちます。

  • protected String getParam(String param)は、パブリッシュ・コンソールで構成された、パブリッシュ・パラメータの値を取得します。

トランスポータの実装: 例

次に、複数のターゲットに対して動作する、トランスポータの実装例を示します。ターゲットは、次のように構成します。

  1. 「宛先アドレス」には、宛先URLをカンマで区切って指定します。

    たとえば:

    http://tgt1:9030/cs/
    http://virginia:9040/cs/
    
  2. 「他の引数」には、アンパサンドで区切ったユーザー名とパスワードを指定し、必要に応じて、1から始まるインデックスを接尾辞に使用して、追加サーバーのプロキシ情報を指定します。

    たとえば、追加ターゲットが1つある場合:

    REMOTEUSER1=fwadmin&REMOTEPASS1=xceladmin&PROXYSERVER1=proxy.com&PROXYPORT1=9090&PROXYUSER1=pxuser&PROXYPASSWORD1=pxpass
    
  3. AdvPub.xmlで、DataTransporter Beanのエントリを、次のエントリに置換します。
    <bean id="DataTransporter"
      class="my.sample.MultiTransporter"
      singleton="false">
     <property name="id" value="Transporter"/>              
    </bean> 

リアルタイム・パブリッシュ・トランスポータを記述するためのコード

com.fatwire.realtime.mypackage;
import COM.FutureTense.Interfaces.*; 
import com.fatwire.cs.core.realtime.TransporterReply;
import java.net.URL;
import java.util.*;
/**
 * RealTime Publishing transporter to multiple targets.
 */
public class MultiTransporter extends AbstractTransporter
{
  private boolean initialized = false;
  List<AbstractTransporter>  transporters  = new ArrayList();
  /**
   * Ping each underlying target and return true if all of them are up.
   */
  @Override
  public boolean ping(StringBuilder sbOut)
  {
  init();
  boolean ret = true;
  for(AbstractTransporter  t : transporters)
  {
  boolean thisret = t.ping(sbOut);
  sbOut.append(t.getRemoteUrl() + (thisret ? " OK" : " Not reachable"));
  sbOut.append(" ||| ");
  ret &= thisret;
  }
  return ret;
  }
  /**
   * Send the batch to each underliyng transport.
   */
  @Override
  protected int sendBatch(ICS ics, IList iList, StringBuffer outputMsg)
  {
  init();
  for(AbstractTransporter  t : transporters)
  {
  int res = t.sendBatch(ics, iList, outputMsg);
  if(res != 0)
  {
  // Just log the error for now, but this is an 
  // indication that the target may be down
  // and other notifications may also be appropriate.
  writeLog("Transporter " + t + " failed with " + res   + " " + outputMsg);
  }
  }
  return 0;
  }
  /**
   * Execute the remote command on each transporter and 
   * accumulate their responses.
   */
  @Override
  protected List<TransporterReply> remoteExecute(ICS ics, String s, 
  Map<String, String> stringStringMap)
  {
  init();
  List<TransporterReply> res = new ArrayList<TransporterReply>();
  for(AbstractTransporter  t : transporters)
  {
  List<TransporterReply> tres = t.remoteExecute(ics, s, stringStringMap);
  res.addAll(tres);
  }
  return res;
  }
  /**
   * Do some initialization by parsing out the configuration 
   * settings and instantiating a standard http transport
   * to each target.
   */
  private void init()
  {
  if(!initialized)
  {
  String remoteURLs = getRemoteUrl();
  int count = 0;
  for(String remoteUrl : remoteURLs.split(","))
    {
    String suffix = (count == 0) ? "" : String.valueOf(count);
    AbstractTransporter t1 =
    AbstractTransporter.getStandardTransporterInstance();
    URL url;
    try
    {
      url = new  URL(remoteUrl);
    }
    catch(Exception e)
    {
      throw new RuntimeException(e);
    }
    t1.setRemoteUrl(remoteUrl);
    t1.setHost(url.getHost());
    t1.setUsername(getParam("REMOTEUSER" + suffix));
    t1.setPassword(getParam("REMOTEPASS" + suffix));
    t1.setUseHttps("https".equalsIgnoreCase(url.getProtocol()));
    t1.setContextPath(url.getPath());
    t1.setPort(url.getPort());
    t1.setProxyserver(getProxyserver());
    t1.setProxyport(getProxyport());
    t1.setProxyuser(getProxyuser());
    t1.setProxypassword(getProxypassword());
    t1.setHttpVersion(getHttpVersion());
    t1.setTargetIniFile(getTargetIniFile()); transporters.add(t1);
    ++count;
    }
  initialized = true;
  writeLog("Initialized transporters: " + toString());
  }
  }
  /**
   * Provide a full listing of all underlying transports. This is 
   * can be used by other components to determine
   * whether they need to perform special actions depending on 
   * the number of targets. For example, asset publishing
   * status processing may need to buffer responses until they're 
   * received from all targets before marking assets published.
   * @return
   */
  @Override
  public List<String> listTransports()
  {
  init();
  List<String> list = new ArrayList();
  for(AbstractTransporter  t : transporters)
  {
  list.add(t.toString());
  }
   return list;
  }
  /**
   * Just a human-friendly description of the transport. This may  show  
   * up in the  logs, so make it descriptive enougn.
   */
  @Override
  public String toString()
  {
  List<String> transs = listTransports();
  StringBuilder sb = new StringBuilder();
  for(String t : transs)
  sb.append(t + " ");
  return sb.toString();
  }
}

境界状況のシナリオの理解

「リアルタイム・パブリッシュ・トランスポータを記述するためのコード」に示した例は、すべてのターゲットが実行しているという最適な状況では動作しますが、あるターゲットが短時間または長時間停止しているときもあります。あるターゲットにのみパブリッシュしているときに、アセットにパブリッシュ済としてのマークが付けられると、停止したターゲットは同期されなくなります。このようなシナリオには、次に示す方法で対応できます。

  • ターゲットが短時間停止している場合は、アセットにパブリッシュ済のマークを付けないようにして、実行中のターゲットにはパブリッシュを続けます。別のターゲットが再起動されたときには、その時点より前のすべてのアセットがパブリッシュ用のキューに入れられた状態になります。これらのアセットは、最初のターゲットにも重複してパブリッシュされることになりますが、短時間の場合、このオーバーヘッドは無視できます。

  • ターゲットが長時間停止したままになる場合は、そのターゲットを宛先構成のターゲットのリストからから削除することが最善策になるといえます(この例では、パブリッシュ構成の「宛先アドレス」から2番目のターゲットを削除します)。このようにすると、アクティブなターゲットが1つしかない場合でも、アセットはパブリッシュ済としてマークされ続けます。2番目のターゲットが再起動されたときには、最初にデータベースとファイル・システムの同期を実行して、その後で2番目のターゲットを宛先アドレスのリストに戻します。

管理インスタンスでのアセット・パブリッシュ・イベントのインターセプト

前述の最初の状況では、すべてのターゲットに保存されたアセットにのみにパブリッシュ済としてのマークを付けること必要になります。これを行うには、次に示すように、カスタムの通知ロジックを実装します。

  1. com.fatwire.realtime.messaging.AssetPublishCallbackを開きます。
  2. notify()メソッドをオーバーライドし、必要に応じて、progressUpdate()メソッドもオーバーライドします。

    ステップ1および2の実装例

    package my.sample;
    import com.fatwire.assetapi.data.AssetId;
    import java.util.HashMap;
    import java.util.Map;
    /**
     * Buffer asset save notifications until we've received one 
     * from each target. Then mark asset published.
     */
    public class AssetPublishCallbackMulti extends AssetPublishCallback
    {
      Map<String, Integer> saveEventsCount  = new HashMap<String, Integer>();
      /**
       * Receive notifications about the asset status. 
       * Currently the only available status is SAVED.
       */
      @Override
      public void notify(AssetId assetId, String status, String from)
      {
      String assetIdStr = String.valueOf(assetId);
      writeLog("Got " + status + " notification from " 
      + from + " for " + assetIdStr);
      if("SAVED".equals(status))
        {
        Integer numNotifications;
        if((numNotifications = saveEventsCount.get(assetIdStr)) == null)
          {
          numNotifications = 0;
          }
        numNotifications = numNotifications + 1;
        saveEventsCount.put(assetIdStr, numNotifications);
        if(numNotifications == this.getTargets().size())
          {
          super.notify(assetId, status, from);
          writeLog("Marked " + assetIdStr  + " published");
          }
        }
      }
      /**
       * Intercept progress update messages. Can be used for 
       * monitoring the health of the system but is not required.
       */
      @Override
      public void progressUpdate(String sessionId, String job, 
      String where, String progress, String lastAction, char status)
      {
      super.progressUpdate(sessionId, job, where, progress, lastAction, status);
      }
    }
    
  3. 管理側で、AdvPub.xmlのコールバックを有効にします。
    • AssetCallback Beanを追加します。

    • このBeanをPubsessionMonitorに登録します。

    ステップ3のコールバックBeanの有効化

    AssetCallback Beanを追加するには:

    <bean id="AssetCallback" 
      class="my.sample.AssetPublishCallbackMulti" 
      singleton="false"/>
    

    このBeanをPubsessionMonitorに登録するには:

    <bean id="PubsessionMonitor"
      class="com.fatwire.realtime.messaging.PubsessionMonitor" 
      singleton="false">
    
      <constructor-arg index="0">
        <ref local="DataTransporter" />
      </constructor-arg>
    
      <constructor-arg index="1">
        <ref local="AssetCallback" />
      </constructor-arg>
    
      <property name="pollFreqMillis" value="5000" />
      <property name="timeoutMillis" value="100000" />
    </bean>

UnpackersとCacheUpdatesの区別

複数の宛先にパブリッシュする場合は、それぞれの宛先ごとのUnpackerとCacheUpdaterを区別すると便利になります。これは、RTパブリッシュ・コンソールでプログレス・バーを調べるときや、ログを調べるときに役立ちます。

このような区別を行うために、ターゲット側のAdvPub.xmlファイルを編集して、DataUnpacker BeanとPageCacheUpdater BeanのID値を変更します。

たとえば:

<bean id="DataUnpacker" 
  class="com.fatwire.realtime.ParallelUnpacker" 
  singleton="false">
  <property name="id" value="Unpacker-Virginia2"/>
  ...
</bean>

<bean id="PageCacheUpdater"
  class="com.fatwire.realtime.regen.ParallelRegeneratorEh"
  singleton="false">
  <property name="id" value="CacheFlusher-Virginia2"/>
  ...
</bean>