49 RealTime Publishing Customization Hooks

This chapter provides an overview of RealTime Publishing and includes information on how to write your own transporter, implementation details, helper methods, example transporter implementation, full code listing, edge-case scenarios, and information on intercepting asset publishing events on the management instance.

This chapter contains the following sections:

49.1 Overview of RealTime Publishing

RealTime Publishing is a pipeline consisting of several jobs. Some jobs run on the management instance while others on the target instance. Following is a brief description of each:

  • Gatherer: Creates the list of publishable assets and decorates it with additional resources (asset types, table rows) that together make up the canonical set of data to be published.

  • Packager: Given the resource listing assembled by Gatherer, Packager creates serialized renditions of each resource and save it in the local fw_PubDataStore table.

  • Transport: Takes the serialized data in fw_PubDataStore created by Packager and copy it to the target-side fw_PubDataStore table.

  • Unpacker: Takes the serialized data in the target-side fw_PubDataStore table and deserialize/save it to the target database.

  • CacheUpdater: Given the list of assets that were successfully saved by Unpacker, CacheUpdater flushes and optionally regenerates relevant parts of the page caches.

RealTime Publishing uses asynchronous messaging to track the status of each job. It is not necessary to know the details of the messaging framework, but note that communication with the target system is facilitated through the Transporter. This also includes messages issued by Unpacker to inform the management system that an asset has been saved, prompting the management logic to mark that asset published.

49.2 Writing a Custom Transporter

Transporter offers several customization options, including:

  • Replacing the OOTB (out-of-the-box or ready to use) transport based on HTTP(s) with one using a different protocol.

  • Publishing to multiple targets within the same publishing session.

This section contains the following topics:

49.2.1 To Write Your Own Transporter

Follow these steps to write your own transporter:

  1. Subclass the com.fatwire.realtime.AbstractTransporter class.

  2. Override the methods ping, sendBatch, listTransports, toString, and remoteExecute.

  3. Install the transporter by editing the classes/AdvPub.xml file on the management side.

    replace the line:

    <bean id="DataTransporter"
      class="com.fatwire.realtime.MirrorBasedTransporterImpl" 
      singleton="false">
    

    with:

    <bean id="DataTransporter"
      class="[your transporter class]"
      singleton="false">
    

49.2.2 Implementation Details

When you override the AbstractTransporter methods, keep the following in mind:

  • ping() contains the logic that checks whether the target is up or down. Its most prominent use is to power the green/red diagnostic indicator in the publishing console. It is not necessary for ping to be successful in order to launch a publishing session, but this can be a handy tool for diagnosing connection problems.

    If you are using http(s) to connect to your target, you may be able to use the default implementation rather than override and implement your own.

  • sendBatch() is responsible for uploading data to the remote fw_PubDataStore. It is invoked multiple times with small batches of data from the local fw_PubDataStore that comes in the form of an IList. Batching helps keep memory usage down and is already done behind the scenes for you.

  • remoteExecute() is responsible for communicating with the remote system. The communication is two-way - management sends commands to dispatch remote jobs and cancellation requests, while the target sends back messages that indicate its status. The contents of these messages are immaterial to remoteExecute, all it needs to do is send those requests and return the responses.

  • listTransports() is a listing of the underlying transports, in case there are multiple targets. If there is only a single target, this method can just return a toString() rendition of the current transport.

  • toString() is a human-friendly descriptor of this transport. For example, a typical value would be http://mytarget:8081/cs/. However, any other string is acceptable, including targetDataCenter-Virginia, serverOn8080, etc.

49.2.3 Helper Methods

A few helper methods are available in AbstractTransporter:

  • protected void writeLog(String msg) write message to the publish log

  • protected AbstractTransporter getStandardTransporterInstance() get a new instance of the standard HTTP-based transporter. This can be useful to implement a transport to multiple targets.

  • protected String getParam(String param) obtain the value of a publishing parameter, as configured in the publishing console.

49.2.4 Example of a Transporter Implementation

Following is an example of a transporter implementation that works with multiple targets. The target is configured as follows:

  1. In the Destination Address, specify comma-separated destination URLs.

    For example:

    http://tgt1:9030/cs/
    http://virginia:9040/cs/
    
  2. In the More Arguments, specify ampersand-separated username, password, and optional proxy information for the additional servers, suffixed with indexes starting at 1.

    For example, with one additional target:

    REMOTEUSER1=fwadmin&REMOTEPASS1=xceladmin&PROXYSERVER1=proxy.com&PROXYPORT1=9090&PROXYUSER1=pxuser&PROXYPASSWORD1=pxpass
    
  3. In AdvPub.xml, replace the DataTransporter bean entry with:

    <bean id="DataTransporter"
      class="my.sample.MultiTransporter"
      singleton="false">
    

49.2.5 Full Code Listing

package my.sample;
import COM.FutureTense.Interfaces.*;
import com.fatwire.cs.core.realtime.TransporterReply;
import java.net.URL;
import java.util.*;
/**
 * RealTime Publishing transporter to multiple targets.
 */
public class MultiTransporter extends AbstractTransporter
{
  private boolean initialized = false;
  List<AbstractTransporter>  transporters  = new ArrayList();
  /**
   * Ping each underlying target and return true if all of them are up.
   */
  @Override
  public boolean ping(StringBuilder sbOut)
  {
  init();
  boolean ret = true;
  for(AbstractTransporter  t : transporters)
  {
  boolean thisret = t.ping(sbOut);
  sbOut.append(t.getRemoteUrl() + (thisret ? " OK" : " Not reachable"));
  sbOut.append(" ||| ");
  ret &= thisret;
  }
  return ret;
  }
  /**
   * Send the batch to each underliyng transport.
   */
  @Override
  protected int sendBatch(ICS ics, IList iList, StringBuffer outputMsg)
  {
  init();
  for(AbstractTransporter  t : transporters)
  {
  int res = t.sendBatch(ics, iList, outputMsg);
  if(res != 0)
  {
  // Just log the error for now, but this is an 
  // indication that the target may be down
  // and other notifications may also be appropriate.
  writeLog("Transporter " + t + " failed with " + res   + " " + outputMsg);
  }
  }
  return 0;
  }
  /**
   * Execute the remote command on each transporter and 
   * accumulate their responses.
   */
  @Override
  protected List<TransporterReply> remoteExecute(ICS ics, String s, 
  Map<String, String> stringStringMap)
  {
  init();
  List<TransporterReply> res = new ArrayList<TransporterReply>();
  for(AbstractTransporter  t : transporters)
  {
  List<TransporterReply> tres = t.remoteExecute(ics, s, stringStringMap);
  res.addAll(tres);
  }
  return res;
  }
  /**
   * Do some initialization by parsing out the configuration 
   * settings and instantiating a standard http transport
   * to each target.
   */
  private void init()
  {
  if(!initialized)
  {
  String remoteURLs = getRemoteUrl();
  int count = 0;
  for(String remoteUrl : remoteURLs.split(","))
    {
    String suffix = (count == 0) ? "" : String.valueOf(count);
    AbstractTransporter t1 =
    AbstractTransporter.getStandardTransporterInstance();
    URL url;
    try
    {
      url = new  URL(remoteUrl);
    }
    catch(Exception e)
    {
      throw new RuntimeException(e);
    }
    t1.setRemoteUrl(remoteUrl);
    t1.setHost(url.getHost());
    t1.setUsername(getParam("REMOTEUSER" + suffix));
    t1.setPassword(getParam("REMOTEPASS" + suffix));
    t1.setUseHttps("https".equalsIgnoreCase(url.getProtocol()));
    t1.setContextPath(url.getPath());
    t1.setPort(url.getPort());
    t1.setProxyserver(getProxyserver());
    t1.setProxyport(getProxyport());
    t1.setProxyuser(getProxyuser());
    t1.setProxypassword(getProxypassword());
    t1.setHttpVersion(getHttpVersion());
    t1.setTargetIniFile(getTargetIniFile()); transporters.add(t1);
    ++count;
    }
  initialized = true;
  writeLog("Initialized transporters: " + toString());
  }
  }
  /**
   * Provide a full listing of all underlying transports. This is 
   * can be used by other components to determine
   * whether they need to perform special actions depending on 
   * the number of targets. For example, asset publishing
   * status processing may need to buffer responses until they're 
   * received from all targets before marking assets published.
   * @return
   */
  @Override
  public List<String> listTransports()
  {
  init();
  List<String> list = new ArrayList();
  for(AbstractTransporter  t : transporters)
  {
  list.add(t.toString());
  }
   return list;
  }
  /**
   * Just a human-friendly description of the transport. This may  show  
   * up in the  logs, so make it descriptive enougn.
   */
  @Override
  public String toString()
  {
  List<String> transs = listTransports();
  StringBuilder sb = new StringBuilder();
  for(String t : transs)
  sb.append(t + " ");
  return sb.toString();
  }
}

49.2.6 Edge-Case Scenarios

While the example in Section 49.2.5, "Full Code Listing" will work in the optimistic case where all targets are running, there will be times when one target has stopped for a shorter or longer period of time. If you only publish to one target but still mark assets as published, the target that stopped will not be synchronized. You can handle such scenarios in the following ways:

  • If a target stops for a short period of time, you should not mark assets as published, but continue publishing to the target that is running. When the other target is restarted, you will have all earlier assets still queued for publishing. Those assets will be redundantly published to the first target as well, but over short periods of time this is a negligible overhead.

  • If a target stays down for a long period of time, it may be best to remove it from the list of targets in the destination configuration (in this example, remove the second target from the Destination Address in the publishing configuration). That way, assets will continue to be marked as published even though you have only one active target. When the second target is restarted, first perform a database and file system sync, and then add it back to the list of destination addresses.

49.2.7 Intercepting Asset Publishing Events on the Management Instance

In the first case above, you need to only mark assets as published once they are saved on all targets. To do so, implement custom notification logic as follows:

  1. Extend com.fatwire.realtime.messaging.AssetPublishCallback.

  2. Override the notify() and optionally progressUpdate() method.

    For a detailed sample implementation, see "Sample Implementation for Steps 1 and 2" below.

  3. Enable the callback in AdvPub.xml on the management side:

    • Add AssetCallback bean.

    • Register the bean with PubsessionMonitor.

    For the code, see "Enabling the Callback Bean for Step 3" below.

Sample Implementation for Steps 1 and 2

package my.sample;
import com.fatwire.assetapi.data.AssetId;
import java.util.HashMap;
import java.util.Map;
/**
 * Buffer asset save notifications until we've received one 
 * from each target. Then mark asset published.
 */
public class AssetPublishCallbackMulti extends AssetPublishCallback
{
  Map<String, Integer> saveEventsCount  = new HashMap<String, Integer>();
  /**
   * Receive notifications about the asset status. 
   * Currently the only available status is SAVED.
   */
  @Override
  public void notify(AssetId assetId, String status, String from)
  {
  String assetIdStr = String.valueOf(assetId);
  writeLog("Got " + status + " notification from " 
  + from + " for " + assetIdStr);
  if("SAVED".equals(status))
    {
    Integer numNotifications;
    if((numNotifications = saveEventsCount.get(assetIdStr)) == null)
      {
      numNotifications = 0;
      }
    numNotifications = numNotifications + 1;
    saveEventsCount.put(assetIdStr, numNotifications);
    if(numNotifications == this.getTargets().size())
      {
      super.notify(assetId, status, from);
      writeLog("Marked " + assetIdStr  + " published");
      }
    }
  }
  /**
   * Intercept progress update messages. Can be used for 
   * monitoring the health of the system but is not required.
   */
  @Override
  public void progressUpdate(String sessionId, String job, 
  String where, String progress, String lastAction, char status)
  {
  super.progressUpdate(sessionId, job, where, progress, lastAction, status);
  }
}

Enabling the Callback Bean for Step 3

To add the AssetCallback bean:

<bean id="AssetCallback" 
  class="my.sample.AssetPublishCallbackMulti" 
  singleton="false"/>

To register the bean with PubsessionMonitor:

<bean id="PubsessionMonitor"
  class="com.fatwire.realtime.messaging.PubsessionMonitor" 
  singleton="false">

  <constructor-arg index="0">
    <ref local="DataTransporter" />
  </constructor-arg>

  <constructor-arg index="1">
    <ref local="AssetCallback" />
  </constructor-arg>

  <property name="pollFreqMillis" value="5000" />
  <property name="timeoutMillis" value="100000" />
</bean>

49.2.8 Finishing Touches

When publishing to multiple destinations, it is useful to distinguish between their respective Unpackers and CacheUpdaters. This comes in handy when looking at the progress bars in the RT publishing console and looking at logs. To make that distinction, simply edit the AdvPub.xml file on the target side, and change the id values of the DataUnpacker and PageCacheUpdater beans.

For example:

<bean id="DataUnpacker" 
  class="com.fatwire.realtime.ParallelUnpacker" 
  singleton="false">
  <property name="id" value="Unpacker-Virginia2"/>
  ...
</bean>

<bean id="PageCacheUpdater"
  class="com.fatwire.realtime.regen.ParallelRegeneratorEh"
  singleton="false">
  <property name="id" value="CacheFlusher-Virginia2"/>
  ...
</bean>