Some of things that you need to do when you customize RealTime Publishing are writing your own transporter, writing implementation details, writing helper methods, writing example transporter implementation, writing full code listing, and writing edge-case scenarios. You may also need to write information about intercepting asset publishing events on the management instance.
You can customize RealTime publishing according to your customers' business needs. See Working with RealTime Publishing Customization Hooks and Understanding Asset and Publish Events in WebCenter Sites.
Resource: It is a generic term used to indicate approved assets and non-asset data like tables.
Resource group: It is a group of resources, possibly related in some way.
Topics:
RealTime Publishing is a pipeline consisting of several jobs. Some jobs run on the management instance, while others on the target instance. RealTime Publishing parameters are located in the advpub.xml
file.
The following is a brief description of each job:
ApprovalAggregatingGroupingStrategy – This is the default grouping strategy and it is recommended to keep this strategy. It creates collection of Resource groups. In a resource group, the dominant asset type is one which has most assets of the same type in that group. While creating resource groups, this strategy keeps the count of the assets of the same asset type. The one with the maximum count is the dominant asset type for that group. Similarly, while creating resource group collection, this strategy collects the groups with the same dominant asset types together. This strategy helps to aggregate similar groups together because loading and saving assets of the same type works faster in bulk. This strategy leads to larger number of groups with relatively smaller size.
ApprovalAccumulatingGroupingStrategy – This strategy simply collects the resource groups without any additional processing. This leads to smaller number of groups with relatively larger size.
DataSerializer and DataDeserializer: It serializes and deserializes the data using XStream implementation.
Packager: Given the resource listing assembled by Gatherer, Packager creates serialized renditions of each resource and saves it in the local fw_PubDataStore
table.
Transporter: Takes the serialized data in fw_PubDataStore
created by Packager and copies it to the target-side fw_PubDataStore
table. The serialized data can be transported to multiple destinations by providing a customized multitransporter as described in the Developer’s guide. The property xcelerate.concurrenttransportunpacker
in wcs_properties.json file decides whether to run the DataTransporter
and the DataUnpacker
simultaneously (by default). See Code for Writing RealTime Publishing Transporter.
Unpacker: Takes the serialized data in the target-side fw_PubDataStore
table and deserializes/saves it to the target database. Although the DataTransporter
and the DataUnpacker
run simultaneously, the DataUnpacker
waits until the main packaging is completed and certain priority group information is received.
The number of DataUnpacker
threads required to run can be configured using numParalleltasks
. The default value for number of DataUnpacker
threads to run or numParallelTasks
is 1 for MSSQL server and DB2 databases and the default value is 3 for Oracle database. It is recommended to not change the default value.
Monitor: Communicates and keeps track of all the messages it receives from all the participants in the publish session. These messages are stored to PubMessage
and PubProgress
table. PubSessionMonitor
is a component of the asynchronous messaging system.
Polling frequency or pollFreqMillis
is measured in milliseconds and the default value is 5000.
Time in milliseconds or timeoutMillis
is the number of seconds that PubSessionMonitor
should wait for a message before presuming that the participant has crashed or hung. Default value is 100000 (100 seconds) milliseconds.
CacheUpdater: Given the list of assets that were successfully saved by Unpacker, CacheUpdater flushes and optionally regenerates relevant parts of the page caches.
Regeneration of specified pages can be done in multiple threads based on the value of numThreadsPerServer
. The default value of numThreadsPerServer
is 3. The component regenServers
provides the list of URLs to the server where the page is to be regenerated. If no URLs are specified, default value of PageCacheUpdater defaults to standard regeneration (based on user request).
RealTime Publishing uses asynchronous messaging to track the status of each job. It is not necessary to know the details of the messaging framework, but note that communication with the target system is facilitated through the Transporter. This also includes messages issued by Unpacker to inform the management system that an asset has been saved, prompting the management logic to mark that asset published. The publishing jobs per each target of the multi-target publish can complete in different orders because they are independently run on each target during the transport phase. For example, CacheFluster for target 1 might complete before Unpacker for target 2.
With a transporter, you can replace the HTTP(s)-based OOTB (out-of-the-box or ready to use) transport with another transport that uses a different protocol, and you can publish to multiple targets within the same publishing session.
Topics:
When you override the AbstractTransporter
methods, keep the following points in mind.
ping()
contains the logic that checks whether the target is up or down. Its most prominent use is to power the green/red diagnostic indicator in the publishing console. It is not necessary for ping to be successful to launch a publishing session, but this can be a handy tool for diagnosing connection problems.
If you are using http(s) to connect to your target, you may be able to use the default implementation rather than override and implement your own.
sendBatch()
is responsible for uploading data to the remote fw_PubDataStore
. It is invoked multiple times with small batches of data from the local fw_PubDataStore
that comes in the form of an IList. Batching helps keep memory usage down and is done behind the scenes for you.
remoteExecute()
is responsible for communicating with the remote system. The communication is two-way: management sends commands to dispatch remote jobs and cancellation requests, while the target sends back messages that indicate its status. The contents of these messages are immaterial to remoteExecute
, all it needs to do is send those requests and return the responses.
listTransports()
is a listing of the underlying transports, in case there are multiple targets. If there is only a single target, this method can just return a toString()
rendition of the current transport.
toString()
is a human-friendly descriptor of this transport. For example, a typical value would be http://mytarget:8081/cs/
. However, any other string is acceptable, including targetDataCenter-Virginia
, serverOn8080
, and so on.
A few helper methods are available in AbstractTransporter
:
protected void writeLog(String msg)
writes messages to the publish log.
protected AbstractTransporter getStandardTransporterInstance()
get a new instance of the standard HTTP-based transporter. This can be useful to implement a transport to multiple targets.
protected String getParam(String param)
obtains the value of a publishing parameter, as configured in the publishing console.
Following is an example of a transporter implementation that works with multiple targets. The target is configured as follows:
com.fatwire.realtime.mypackage; import COM.FutureTense.Interfaces.*; import com.fatwire.cs.core.realtime.TransporterReply; import java.net.URL; import java.util.*; /** * RealTime Publishing transporter to multiple targets. */ public class MultiTransporter extends AbstractTransporter { private boolean initialized = false; List<AbstractTransporter> transporters = new ArrayList(); /** * Ping each underlying target and return true if all of them are up. */ @Override public boolean ping(StringBuilder sbOut) { init(); boolean ret = true; for(AbstractTransporter t : transporters) { boolean thisret = t.ping(sbOut); sbOut.append(t.getRemoteUrl() + (thisret ? " OK" : " Not reachable")); sbOut.append(" ||| "); ret &= thisret; } return ret; } /** * Send the batch to each underliyng transport. */ @Override protected int sendBatch(ICS ics, IList iList, StringBuffer outputMsg) { init(); for(AbstractTransporter t : transporters) { int res = t.sendBatch(ics, iList, outputMsg); if(res != 0) { // Just log the error for now, but this is an // indication that the target may be down // and other notifications may also be appropriate. writeLog("Transporter " + t + " failed with " + res + " " + outputMsg); } } return 0; } /** * Execute the remote command on each transporter and * accumulate their responses. */ @Override protected List<TransporterReply> remoteExecute(ICS ics, String s, Map<String, String> stringStringMap) { init(); List<TransporterReply> res = new ArrayList<TransporterReply>(); for(AbstractTransporter t : transporters) { List<TransporterReply> tres = t.remoteExecute(ics, s, stringStringMap); res.addAll(tres); } return res; } /** * Do some initialization by parsing out the configuration * settings and instantiating a standard http transport * to each target. */ private void init() { if(!initialized) { String remoteURLs = getRemoteUrl(); int count = 0; for(String remoteUrl : remoteURLs.split(",")) { String suffix = (count == 0) ? "" : String.valueOf(count); AbstractTransporter t1 = AbstractTransporter.getStandardTransporterInstance(); URL url; try { url = new URL(remoteUrl); } catch(Exception e) { throw new RuntimeException(e); } t1.setRemoteUrl(remoteUrl); t1.setHost(url.getHost()); t1.setUsername(getParam("REMOTEUSER" + suffix)); t1.setPassword(getParam("REMOTEPASS" + suffix)); t1.setUseHttps("https".equalsIgnoreCase(url.getProtocol())); t1.setContextPath(url.getPath()); t1.setPort(url.getPort()); t1.setProxyserver(getProxyserver()); t1.setProxyport(getProxyport()); t1.setProxyuser(getProxyuser()); t1.setProxypassword(getProxypassword()); t1.setHttpVersion(getHttpVersion()); t1.setTargetIniFile(getTargetIniFile()); transporters.add(t1); ++count; } initialized = true; writeLog("Initialized transporters: " + toString()); } } /** * Provide a full listing of all underlying transports. This is * can be used by other components to determine * whether they need to perform special actions depending on * the number of targets. For example, asset publishing * status processing may need to buffer responses until they're * received from all targets before marking assets published. * @return */ @Override public List<String> listTransports() { init(); List<String> list = new ArrayList(); for(AbstractTransporter t : transporters) { list.add(t.toString()); } return list; } /** * Just a human-friendly description of the transport. This may show * up in the logs, so make it descriptive enougn. */ @Override public String toString() { List<String> transs = listTransports(); StringBuilder sb = new StringBuilder(); for(String t : transs) sb.append(t + " "); return sb.toString(); } }
While the example in Code for Writing RealTime Publishing Transporter works in the optimistic case where all targets are running, there will be times when one target has stopped for a shorter or longer period of time. If you only publish to one target but still mark assets as published, then the target that stopped is not synchronized. You can handle such scenarios in the following ways:
If a target stops for a short period of time, you should not mark assets as published, but continue publishing to the target that is running. When the other target is restarted, you have all earlier assets still queued for publishing. Those assets are redundantly published to the first target as well, but over short periods of time this is a negligible overhead.
If a target stays down for a long period of time, it may be best to remove it from the list of targets in the destination configuration (in this example, remove the second target from the Destination Address in the publishing configuration). That way, assets continue to be marked as published even though you have only one active target. When the second target is restarted, first perform a database and file system sync, and then add it back to the list of destination addresses.
In the first case above, you have to only mark assets as published after they are saved on all targets. To do so, implement custom notification logic as follows:
When publishing to multiple destinations, it is useful to distinguish between their respective Unpackers and CacheUpdaters. This comes in handy when looking at the progress bars in the RT publishing console and looking at logs.
To make that distinction, simply edit the AdvPub.xml
file on the target side, and change the ID values of the DataUnpacker
and PageCacheUpdater
beans.
For example:
<bean id="DataUnpacker" class="com.fatwire.realtime.ParallelUnpacker" singleton="false"> <property name="id" value="Unpacker-Virginia2"/> ... </bean> <bean id="PageCacheUpdater" class="com.fatwire.realtime.regen.ParallelRegeneratorEh" singleton="false"> <property name="id" value="CacheFlusher-Virginia2"/> ... </bean>