65 Working with RealTime Publishing Customization Hooks

Some of things that you need to do when you customize RealTime Publishing are writing your own transporter, writing implementation details, writing helper methods, writing example transporter implementation, writing full code listing, and writing edge-case scenarios. You may also need to write information about intercepting asset publishing events on the management instance.

You can customize RealTime publishing according to your customers' business needs. See Working with RealTime Publishing Customization Hooks and Understanding Asset and Publish Events in WebCenter Sites.

A Realtime publishing environment is configured to WebCenter Sites through the advpub.xml file. The following list provides some terminologies used to describe the publishing components:
  • Resource: It is a generic term used to indicate approved assets and non-asset data like tables.

  • Resource group: It is a group of resources, possibly related in some way.

Topics:

About RealTime Publishing

RealTime Publishing is a pipeline consisting of several jobs. Some jobs run on the management instance, while others on the target instance. RealTime Publishing parameters are located in the advpub.xml file.

The following is a brief description of each job:

  • Gatherer: Creates the list of publishable assets and decorates it with additional resources (asset types, table rows) that together make up the canonical set of data to be published. It does so by creating groups of interdependent resources while relying on the underlying Approval Grouping Strategy. Grouping strategy obtains the lists of approved resources and organizes them into groups. Following are the grouping strategies available.
    • ApprovalAggregatingGroupingStrategy – This is the default grouping strategy and it is recommended to keep this strategy. It creates collection of Resource groups. In a resource group, the dominant asset type is one which has most assets of the same type in that group. While creating resource groups, this strategy keeps the count of the assets of the same asset type. The one with the maximum count is the dominant asset type for that group. Similarly, while creating resource group collection, this strategy collects the groups with the same dominant asset types together. This strategy helps to aggregate similar groups together because loading and saving assets of the same type works faster in bulk. This strategy leads to larger number of groups with relatively smaller size.

    • ApprovalAccumulatingGroupingStrategy – This strategy simply collects the resource groups without any additional processing. This leads to smaller number of groups with relatively larger size.

  • DataSerializer and DataDeserializer: It serializes and deserializes the data using XStream implementation.

  • Packager: Given the resource listing assembled by Gatherer, Packager creates serialized renditions of each resource and saves it in the local fw_PubDataStore table.

  • Transporter: Takes the serialized data in fw_PubDataStore created by Packager and copies it to the target-side fw_PubDataStore table. The serialized data can be transported to multiple destinations by providing a customized multitransporter as described in the Developer’s guide. The property xcelerate.concurrenttransportunpacker in wcs_properties.json file decides whether to run the DataTransporter and the DataUnpacker simultaneously (by default). See Code for Writing RealTime Publishing Transporter.

  • Unpacker: Takes the serialized data in the target-side fw_PubDataStore table and deserializes/saves it to the target database. Although the DataTransporter and the DataUnpacker run simultaneously, the DataUnpacker waits until the main packaging is completed and certain priority group information is received.

    The number of DataUnpacker threads required to run can be configured using numParalleltasks. The default value for number of DataUnpacker threads to run or numParallelTasks is 1 for MSSQL server and DB2 databases and the default value is 3 for Oracle database. It is recommended to not change the default value.

  • Monitor: Communicates and keeps track of all the messages it receives from all the participants in the publish session. These messages are stored to PubMessage and PubProgress table. PubSessionMonitor is a component of the asynchronous messaging system.

    Polling frequency or pollFreqMillis is measured in milliseconds and the default value is 5000.

    Time in milliseconds or timeoutMillis is the number of seconds that PubSessionMonitor should wait for a message before presuming that the participant has crashed or hung. Default value is 100000 (100 seconds) milliseconds.

  • CacheUpdater: Given the list of assets that were successfully saved by Unpacker, CacheUpdater flushes and optionally regenerates relevant parts of the page caches.

    Regeneration of specified pages can be done in multiple threads based on the value of numThreadsPerServer. The default value of numThreadsPerServer is 3. The component regenServers provides the list of URLs to the server where the page is to be regenerated. If no URLs are specified, default value of PageCacheUpdater defaults to standard regeneration (based on user request).

RealTime Publishing uses asynchronous messaging to track the status of each job. It is not necessary to know the details of the messaging framework, but note that communication with the target system is facilitated through the Transporter. This also includes messages issued by Unpacker to inform the management system that an asset has been saved, prompting the management logic to mark that asset published. The publishing jobs per each target of the multi-target publish can complete in different orders because they are independently run on each target during the transport phase. For example, CacheFluster for target 1 might complete before Unpacker for target 2.

Writing a Custom Transporter

With a transporter, you can replace the HTTP(s)-based OOTB (out-of-the-box or ready to use) transport with another transport that uses a different protocol, and you can publish to multiple targets within the same publishing session.

Topics:

Writing Your Own Transporter

Follow these steps to write your own transporter:

  1. Subclass the com.fatwire.realtime.AbstractTransporter class.
  2. Override the methods ping, sendBatch, listTransports, toString, and remoteExecute.
  3. Install the transporter by editing the classes/AdvPub.xml file on the management side.

    Replace the line:

    <bean id="DataTransporter"
      class="com.fatwire.realtime.MultiTransporter" 
      scope ="prototype">
    

    with:

    <bean id="DataTransporter" class="[your transporter class]" scope="prototype">

Considerations About Overriding AbstractTransporter Methods

When you override the AbstractTransporter methods, keep the following points in mind.

  • ping() contains the logic that checks whether the target is up or down. Its most prominent use is to power the green/red diagnostic indicator in the publishing console. It is not necessary for ping to be successful to launch a publishing session, but this can be a handy tool for diagnosing connection problems.

    If you are using http(s) to connect to your target, you may be able to use the default implementation rather than override and implement your own.

  • sendBatch() is responsible for uploading data to the remote fw_PubDataStore. It is invoked multiple times with small batches of data from the local fw_PubDataStore that comes in the form of an IList. Batching helps keep memory usage down and is done behind the scenes for you.

  • remoteExecute() is responsible for communicating with the remote system. The communication is two-way: management sends commands to dispatch remote jobs and cancellation requests, while the target sends back messages that indicate its status. The contents of these messages are immaterial to remoteExecute, all it needs to do is send those requests and return the responses.

  • listTransports() is a listing of the underlying transports, in case there are multiple targets. If there is only a single target, this method can just return a toString() rendition of the current transport.

  • toString() is a human-friendly descriptor of this transport. For example, a typical value would be http://mytarget:8081/cs/. However, any other string is acceptable, including targetDataCenter-Virginia, serverOn8080, and so on.

Helper Methods in AbstractTransporter

A few helper methods are available in AbstractTransporter:

  • protected void writeLog(String msg) writes messages to the publish log.

  • protected AbstractTransporter getStandardTransporterInstance() get a new instance of the standard HTTP-based transporter. This can be useful to implement a transport to multiple targets.

  • protected String getParam(String param) obtains the value of a publishing parameter, as configured in the publishing console.

Implementing a Transporter: Example

Following is an example of a transporter implementation that works with multiple targets. The target is configured as follows:

  1. In the Destination Address, specify comma-separated destination URLs.

    For example:

    http://tgt1:9030/cs/
    http://virginia:9040/cs/
    
  2. In the More Arguments, specify ampersand-separated user name, password, and optional proxy information for the additional servers, suffixed with indexes starting at 1.

    For example, with one additional target:

    REMOTEUSER1=fwadmin&REMOTEPASS1=xceladmin&PROXYSERVER1=proxy.com&PROXYPORT1=9090&PROXYUSER1=pxuser&PROXYPASSWORD1=pxpass
    
  3. In AdvPub.xml, replace the DataTransporter bean entry with the following:
    <bean id="DataTransporter"
      class="my.sample.MultiTransporter"
      singleton="false">
     <property name="id" value="Transporter"/>              
    </bean> 

Code for Writing RealTime Publishing Transporter

com.fatwire.realtime.mypackage;
import COM.FutureTense.Interfaces.*; 
import com.fatwire.cs.core.realtime.TransporterReply;
import java.net.URL;
import java.util.*;
/**
 * RealTime Publishing transporter to multiple targets.
 */
public class MultiTransporter extends AbstractTransporter
{
  private boolean initialized = false;
  List<AbstractTransporter>  transporters  = new ArrayList();
  /**
   * Ping each underlying target and return true if all of them are up.
   */
  @Override
  public boolean ping(StringBuilder sbOut)
  {
  init();
  boolean ret = true;
  for(AbstractTransporter  t : transporters)
  {
  boolean thisret = t.ping(sbOut);
  sbOut.append(t.getRemoteUrl() + (thisret ? " OK" : " Not reachable"));
  sbOut.append(" ||| ");
  ret &= thisret;
  }
  return ret;
  }
  /**
   * Send the batch to each underliyng transport.
   */
  @Override
  protected int sendBatch(ICS ics, IList iList, StringBuffer outputMsg)
  {
  init();
  for(AbstractTransporter  t : transporters)
  {
  int res = t.sendBatch(ics, iList, outputMsg);
  if(res != 0)
  {
  // Just log the error for now, but this is an 
  // indication that the target may be down
  // and other notifications may also be appropriate.
  writeLog("Transporter " + t + " failed with " + res   + " " + outputMsg);
  }
  }
  return 0;
  }
  /**
   * Execute the remote command on each transporter and 
   * accumulate their responses.
   */
  @Override
  protected List<TransporterReply> remoteExecute(ICS ics, String s, 
  Map<String, String> stringStringMap)
  {
  init();
  List<TransporterReply> res = new ArrayList<TransporterReply>();
  for(AbstractTransporter  t : transporters)
  {
  List<TransporterReply> tres = t.remoteExecute(ics, s, stringStringMap);
  res.addAll(tres);
  }
  return res;
  }
  /**
   * Do some initialization by parsing out the configuration 
   * settings and instantiating a standard http transport
   * to each target.
   */
  private void init()
  {
  if(!initialized)
  {
  String remoteURLs = getRemoteUrl();
  int count = 0;
  for(String remoteUrl : remoteURLs.split(","))
    {
    String suffix = (count == 0) ? "" : String.valueOf(count);
    AbstractTransporter t1 =
    AbstractTransporter.getStandardTransporterInstance();
    URL url;
    try
    {
      url = new  URL(remoteUrl);
    }
    catch(Exception e)
    {
      throw new RuntimeException(e);
    }
    t1.setRemoteUrl(remoteUrl);
    t1.setHost(url.getHost());
    t1.setUsername(getParam("REMOTEUSER" + suffix));
    t1.setPassword(getParam("REMOTEPASS" + suffix));
    t1.setUseHttps("https".equalsIgnoreCase(url.getProtocol()));
    t1.setContextPath(url.getPath());
    t1.setPort(url.getPort());
    t1.setProxyserver(getProxyserver());
    t1.setProxyport(getProxyport());
    t1.setProxyuser(getProxyuser());
    t1.setProxypassword(getProxypassword());
    t1.setHttpVersion(getHttpVersion());
    t1.setTargetIniFile(getTargetIniFile()); transporters.add(t1);
    ++count;
    }
  initialized = true;
  writeLog("Initialized transporters: " + toString());
  }
  }
  /**
   * Provide a full listing of all underlying transports. This is 
   * can be used by other components to determine
   * whether they need to perform special actions depending on 
   * the number of targets. For example, asset publishing
   * status processing may need to buffer responses until they're 
   * received from all targets before marking assets published.
   * @return
   */
  @Override
  public List<String> listTransports()
  {
  init();
  List<String> list = new ArrayList();
  for(AbstractTransporter  t : transporters)
  {
  list.add(t.toString());
  }
   return list;
  }
  /**
   * Just a human-friendly description of the transport. This may  show  
   * up in the  logs, so make it descriptive enougn.
   */
  @Override
  public String toString()
  {
  List<String> transs = listTransports();
  StringBuilder sb = new StringBuilder();
  for(String t : transs)
  sb.append(t + " ");
  return sb.toString();
  }
}

Understanding Edge-Case Scenarios

While the example in Code for Writing RealTime Publishing Transporter works in the optimistic case where all targets are running, there will be times when one target has stopped for a shorter or longer period of time. If you only publish to one target but still mark assets as published, then the target that stopped is not synchronized. You can handle such scenarios in the following ways:

  • If a target stops for a short period of time, you should not mark assets as published, but continue publishing to the target that is running. When the other target is restarted, you have all earlier assets still queued for publishing. Those assets are redundantly published to the first target as well, but over short periods of time this is a negligible overhead.

  • If a target stays down for a long period of time, it may be best to remove it from the list of targets in the destination configuration (in this example, remove the second target from the Destination Address in the publishing configuration). That way, assets continue to be marked as published even though you have only one active target. When the second target is restarted, first perform a database and file system sync, and then add it back to the list of destination addresses.

Intercepting Asset Publishing Events on the Management Instance

In the first case above, you have to only mark assets as published after they are saved on all targets. To do so, implement custom notification logic as follows:

  1. Extend com.fatwire.realtime.messaging.AssetPublishCallback.
  2. Override the notify() and optionally the progressUpdate() method.

    Sample Implementation for Steps 1 and 2

    package my.sample;
    import com.fatwire.assetapi.data.AssetId;
    import java.util.HashMap;
    import java.util.Map;
    /**
     * Buffer asset save notifications until we've received one 
     * from each target. Then mark asset published.
     */
    public class AssetPublishCallbackMulti extends AssetPublishCallback
    {
      Map<String, Integer> saveEventsCount  = new HashMap<String, Integer>();
      /**
       * Receive notifications about the asset status. 
       * Currently the only available status is SAVED.
       */
      @Override
      public void notify(AssetId assetId, String status, String from)
      {
      String assetIdStr = String.valueOf(assetId);
      writeLog("Got " + status + " notification from " 
      + from + " for " + assetIdStr);
      if("SAVED".equals(status))
        {
        Integer numNotifications;
        if((numNotifications = saveEventsCount.get(assetIdStr)) == null)
          {
          numNotifications = 0;
          }
        numNotifications = numNotifications + 1;
        saveEventsCount.put(assetIdStr, numNotifications);
        if(numNotifications == this.getTargets().size())
          {
          super.notify(assetId, status, from);
          writeLog("Marked " + assetIdStr  + " published");
          }
        }
      }
      /**
       * Intercept progress update messages. Can be used for 
       * monitoring the health of the system but is not required.
       */
      @Override
      public void progressUpdate(String sessionId, String job, 
      String where, String progress, String lastAction, char status)
      {
      super.progressUpdate(sessionId, job, where, progress, lastAction, status);
      }
    }
    
  3. Enable the callback in AdvPub.xml on the management side:
    • Add AssetCallback bean.

    • Register the bean with PubsessionMonitor.

    Enabling the Callback Bean for Step 3

    To add the AssetCallback bean:

    <bean id="AssetCallback" 
      class="my.sample.AssetPublishCallbackMulti" 
      singleton="false"/>
    

    To register the bean with PubsessionMonitor:

    <bean id="PubsessionMonitor"
      class="com.fatwire.realtime.messaging.PubsessionMonitor" 
      singleton="false">
    
      <constructor-arg index="0">
        <ref local="DataTransporter" />
      </constructor-arg>
    
      <constructor-arg index="1">
        <ref local="AssetCallback" />
      </constructor-arg>
    
      <property name="pollFreqMillis" value="5000" />
      <property name="timeoutMillis" value="100000" />
    </bean>

Distinguishing Between Unpackers and CacheUpdates

When publishing to multiple destinations, it is useful to distinguish between their respective Unpackers and CacheUpdaters. This comes in handy when looking at the progress bars in the RT publishing console and looking at logs.

To make that distinction, simply edit the AdvPub.xml file on the target side, and change the ID values of the DataUnpacker and PageCacheUpdater beans.

For example:

<bean id="DataUnpacker" 
  class="com.fatwire.realtime.ParallelUnpacker" 
  singleton="false">
  <property name="id" value="Unpacker-Virginia2"/>
  ...
</bean>

<bean id="PageCacheUpdater"
  class="com.fatwire.realtime.regen.ParallelRegeneratorEh"
  singleton="false">
  <property name="id" value="CacheFlusher-Virginia2"/>
  ...
</bean>