Scaling Out Your Data Grid Aggregations Linearly

All of the Data Grid capabilities described below are features of Coherence Enterprise Edition and higher.


Coherence provides a data grid by dynamically, transparently and automatically partitioning the data set across all storage enabled nodes in a cluster. We have been doing some scale out testing on our new Dual 2.3GHz PowerPC G5 Xserve cluster and here is one of the tests that we have performed using the Data Grid Aggregation feature.

The new InvocableMap tightly binds the concepts of a data grid (i.e. partitioned cache) and the processing of the data stored in the grid. When you take the InvocableMap and combine it with the linear scalability of Coherence itself you get an extremely powerful solution. The following tests show that you can take an application that Coherence provides you (the developer, the engineer, the architect, etc.) the ability to build an application once that can scale out to handle any SLA requirement, any increase in throughput requirements. For example, the following test demonstrate Coherence's ability to linearly increase the number of trades aggregated per second as you increase hardware. I.e. if one machine can aggregate X trades per second, if you add a second machine to the data grid you will be able to aggregate 2X trades per second, if you add a third machine to the data grid you will be able to aggregate 3X trades per second and so on...

The data

First, we need some data to aggregate. In this case we used a Trade object with a three properties Id, Price and Symbol.


import com.tangosol.util.Base;
import com.tangosol.util.ExternalizableHelper;


* Example Trade class
* @author erm 2005.12.27
public class Trade
        extends Base
        implements ExternalizableLite
    * Default Constructor
    public Trade()

    public Trade(int iId, double dPrice, String sSymbol)

    public int getId()
        return m_iId;

    public void setId(int iId)
        m_iId = iId;

    public double getPrice()
        return m_dPrice;

    public void setPrice(double dPrice)
        m_dPrice = dPrice;

    public String getSymbol()
        return m_sSymbol;

    public void setSymbol(String sSymbol)
        m_sSymbol = sSymbol;

    * Restore the contents of this object by loading the object's state from the
    * passed DataInput object.
    * @param in the DataInput stream to read data from in order to restore the
    *           state of this object
    * @throws IOException        if an I/O exception occurs
    * @throws NotActiveException if the object is not in its initial state, and
    *                            therefore cannot be deserialized into
    public void readExternal(DataInput in)
            throws IOException
        m_iId     = ExternalizableHelper.readInt(in);
        m_dPrice  = in.readDouble();
        m_sSymbol = ExternalizableHelper.readSafeUTF(in);

    * Save the contents of this object by storing the object's state into the
    * passed DataOutput object.
    * @param out the DataOutput stream to write the state of this object to
    * @throws IOException if an I/O exception occurs
    public void writeExternal(DataOutput out)
            throws IOException
        ExternalizableHelper.writeInt(out, m_iId);
        ExternalizableHelper.writeSafeUTF(out, m_sSymbol);

    private int    m_iId;
    private double m_dPrice;
    private String m_sSymbol;

Configure a partitioned cache

The cache configuration is easy through the XML Cache Configuration Elements. Here I define one wildcard cache-mapping that maps to one caching-scheme which has unlimited capacity:

<?xml version="1.0"?>

<!DOCTYPE cache-config SYSTEM "cache-config.dtd">


    Distributed caching scheme.



    Backing map scheme definition used by all the caches that do 
    not require any eviction policies 


Add an index to the Price property

ReflectionExtractor extPrice  = new ReflectionExtractor("getPrice");
m_cache.addIndex(extPrice, true, null);

Adding an index to this property increases performance by allowing Coherence to access the values directly rather than having to deserialize each item to accomplish the calculation. In our tests the aggregation speed was increased by more than 2x after an index was applied.

The code to perform a parallel aggregation across all JVMs in the data grid

Double DResult;
DResult = (Double) m_cache.aggregate((Filter) null, new DoubleSum("getPrice"));
The aggregation is initiated and results received by a single client. I.e. a single "low-power" client is able to utilize the full processing power of the cluster/data grid in aggregate to perform this aggregation in parallel with just one line of code.

The testing environment/process

A "test run" does a number of things:

  1. Loads 200,000 trade objects into the data grid.
  2. Adds indexes to Price property.
  3. Performs a parallel aggregation of all trade objects stored in the data grid. This aggregation step is done 20 times to obtain an "average run time" to ensure that the test takes into account garbage collection.
  4. Loads 400,000 trade objects into the data grid.
  5. Repeats steps 2 and 3.
  6. Loads 600,000 trade objects into the data grid.
  7. Repeats steps 2 and 3.
  8. Loads 800,000 trade objects into the data grid.
  9. Repeats steps 2 and 3.
  10. Loads 1,000,000 trade objects into the data grid.
  11. Repeats steps 2 and 3.
The test client itself is run on an Intel Core Duo iMac which is marked as Local Storage disabled.

The command line used to start the client was:

java ... -Dtangosol.coherence.distributed.localstorage=false -Xmx128m -Xms128m com.tangosol.examples.coherence.invocable.TradeTest

This "test suite" (and subsequent results) includes data from four "test runs":

  1. Start 4 JVMs on one Xserve - Perform a "test run"
  2. Start 4 JVMs on each of two Xserves - Perform a "test run"
  3. Start 4 JVMs on each of three Xserves - Perform a "test run"
  4. Start 4 JVMs on each of four Xserves - Perform a "test run"
In this case a "JVM" refers to a cache server instance (i.e. a data grid node) that is a standalone JVM responsible for managing/storing the data. I used the DefaultCacheServer helper class to accomplish this.

The command line used to start the server was:

java ... -Xmx384m -Xms384m -server

JDK Version

The JDK used on both the client and the servers was Java(TM) 2 Runtime Environment, Standard Edition (build 1.5.0_05-84)

The results

The lowest and highest times were not used in the calculations below resulting in a data set of eighteen results used to create an average.

As you can see in the following graph the average aggregation time for the aggregations decreases linearly as more cache servers/machines are added to the data grid!

Similarly, the following graph illustrates how the aggregations per second scales linearly as you add more machines! When moving from 1 machine to 2 machines the trades aggregated per second double, when moving from 2 machines to 4 machines the trades aggregated per second double again.


The above aggregations will complete successfully and correctly even if one of the cache servers or and entire machine fails during the aggregation!


Combining the Coherence data grid (i.e. partitioned cache) with the InvocableMap features enables:

average_agg_time.jpg (image/jpeg)
aggregation_scale_out.jpg (image/jpeg)