48 Scaling Out Your Data Grid Aggregations Linearly

Coherence provides a data grid by dynamically, transparently, and automatically partitioning the data set across all storage enabled nodes in a cluster. We have been doing some scale out testing on our new Dual 2.3GHz PowerPC G5 Xserve cluster and here is one of the tests that we have performed using the data grid aggregation feature.

The InvocableMap interface tightly binds the concepts of a data grid (that is, partitioned cache) and the processing of the data stored in the grid. When you take the InvocableMap and combine it with the linear scalability of Coherence itself you get an extremely powerful solution. The following tests show that you can take an application that Coherence provides you (the developer, the engineer, the architect, and so on) the ability to build an application when that can scale out to handle any SLA requirement, any increase in throughput requirements. For example, the following test demonstrate Coherence's ability to linearly increase the number of trades aggregated per second as you increase hardware. That is, if one machine can aggregate X trades per second, if you add a second machine to the data grid you will be able to aggregate 2X trades per second, if you add a third machine to the data grid you will be able to aggregate 3X trades per second and so on.

All of the Data Grid capabilities described below are features of Coherence Enterprise Edition and higher.

The Data

First, we need some data to aggregate. Example 48-1 illustrates a Trade object with a three properties Id, Price, and Symbol.

Example 48-1 Trade Object Defining Three Properties

package com.tangosol.examples.coherence.data;

import com.tangosol.util.Base;
import com.tangosol.util.ExternalizableHelper;
import com.tangosol.io.ExternalizableLite;

import java.io.IOException;
import java.io.NotActiveException;
import java.io.DataInput;
import java.io.DataOutput;

* Example Trade class
* @author erm 2005.12.27
public class Trade
        extends Base
        implements ExternalizableLite
    * Default Constructor
    public Trade()

    public Trade(int iId, double dPrice, String sSymbol)

    public int getId()
        return m_iId;

    public void setId(int iId)
        m_iId = iId;

    public double getPrice()
        return m_dPrice;

    public void setPrice(double dPrice)
        m_dPrice = dPrice;

    public String getSymbol()
        return m_sSymbol;

    public void setSymbol(String sSymbol)
        m_sSymbol = sSymbol;

    * Restore the contents of this object by loading the object's state from the
    * passed DataInput object.
    * @param in the DataInput stream to read data from to restore the
    *           state of this object
    * @throws IOException        if an I/O exception occurs
    * @throws NotActiveException if the object is not in its initial state, and
    *                            therefore cannot be deserialized into
    public void readExternal(DataInput in)
            throws IOException
        m_iId     = ExternalizableHelper.readInt(in);
        m_dPrice  = in.readDouble();
        m_sSymbol = ExternalizableHelper.readSafeUTF(in);

    * Save the contents of this object by storing the object's state into the
    * passed DataOutput object.
    * @param out the DataOutput stream to write the state of this object to
    * @throws IOException if an I/O exception occurs
    public void writeExternal(DataOutput out)
            throws IOException
        ExternalizableHelper.writeInt(out, m_iId);
        ExternalizableHelper.writeSafeUTF(out, m_sSymbol);

    private int    m_iId;
    private double m_dPrice;
    private String m_sSymbol;

Configure a Partitioned Cache

The Cache Configuration Elements are used to configure a cache. Example 48-2 defines one wildcard cache-mapping that maps to one caching-scheme which has unlimited capacity:

Example 48-2 Mapping a cache-mapping to a caching-scheme with Unlimited Capacity

<?xml version="1.0"?>

<!DOCTYPE cache-config SYSTEM "cache-config.dtd">


    Distributed caching scheme.



    Backing map scheme definition used by all the caches that do 
    not require any eviction policies 


Add an Index to the Price Property

Example 48-3 illustrates the code to add an index to the Price property. Adding an index to this property increases performance by allowing Coherence to access the values directly rather than having to deserialize each item to accomplish the calculation

Example 48-3 Adding an Index to the Price Property

ReflectionExtractor extPrice  = new ReflectionExtractor("getPrice");
m_cache.addIndex(extPrice, true, null);

In our tests the aggregation speed was increased by more than 2x after an index was applied.

Code to perform a Parallel Aggregation

Example 48-4 illustrates the code to perform a parallel aggregation across all JVMs in the data grid. The aggregation is initiated and results received by a single client. That is, a single "low-power" client is able to use the full processing power of the cluster/data grid in aggregate to perform this aggregation in parallel with just one line of code.

Example 48-4 Perform a Parallel Aggregation Across all JVMs in the Grid

Double DResult;
DResult = (Double) m_cache.aggregate((Filter) null, new DoubleSum("getPrice"));

The Testing Environment and Process

Performing a "Test Run"

A test run does several things:

  1. Loads 200,000 trade objects into the data grid.

  2. Adds indexes to Price property.

  3. Performs a parallel aggregation of all trade objects stored in the data grid. This aggregation step is done 20 times to obtain an "average run time" to ensure that the test takes into account garbage collection.

  4. Loads 400,000 trade objects into the data grid.

  5. Repeats steps 2 and 3.

  6. Loads 600,000 trade objects into the data grid.

  7. Repeats steps 2 and 3.

  8. Loads 800,000 trade objects into the data grid.

  9. Repeats steps 2 and 3.

  10. Loads 1,000,000 trade objects into the data grid.

  11. Repeats steps 2 and 3.

Client Considerations: The test client itself is run on an Intel Core Duo iMac which is marked as local storage disabled. The command line used to start the client was:

java ... -Dtangosol.coherence.distributed.localstorage=false -Xmx128m -Xms128m com.tangosol.examples.coherence.invocable.TradeTest

This "Test Suite" (and Subsequent Results) Includes Data from Four "Test Runs":

  1. Start 4 JVMs on one Xserve - Perform a "test run"

  2. Start 4 JVMs on each of two Xserves - Perform a "test run"

  3. Start 4 JVMs on each of three Xserves - Perform a "test run"

  4. Start 4 JVMs on each of four Xserves - Perform a "test run"

Server Considerations: In this case a "JVM" refers to a cache server instance (that is, a data grid node) that is a standalone JVM responsible for managing/storing the data. I used the DefaultCacheServer helper class to accomplish this.

The command line used to start the server was:

java ... -Xmx384m -Xms384m -server com.tangosol.net.DefaultCacheServer

JDK Version

The JDK used on both the client and the servers was Java 2 Runtime Environment, Standard Edition (build 1.5.0_05-84)

The Results

As you can see in the following graph the average aggregation time for the aggregations decreases linearly as more cache servers/machines are added to the data grid!


The lowest and highest times were not used in the calculations below resulting in a data set of eighteen results used to create an average.

Figure 48-1 Average Aggregation Time

This figure is described in the text

Similarly, the following graph illustrates how the aggregations per second scales linearly as you add more machines! When moving from 1 machine to 2 machines the trades aggregated per second double, when moving from 2 machines to 4 machines the trades aggregated per second double again.

Figure 48-2 Aggregation Scale-Out

This figure is described in the text



The above aggregations will complete successfully and correctly even if one of the cache servers or and entire machine fails during the aggregation!


Combining the Coherence data grid (that is, partitioned cache) with the InvocableMap features enables:

  • Applications to scale out data grid calculations linearly;

  • Groups to meet increasingly aggressive SLAs by dynamically/transparently adding more resources to the data grid. That is, if you need to achieve 1,837,932 trade aggregations per second all that is required is to start 16 more cache servers across four more machines.