Coherence provides a data grid by dynamically, transparently, and automatically partitioning the data set across all storage enabled nodes in a cluster. We have been doing some scale out testing on our new Dual 2.3GHz PowerPC G5 Xserve cluster and here is one of the tests that we have performed using the data grid aggregation feature.
The new InvocableMap tightly binds the concepts of a data grid (that is, partitioned cache) and the processing of the data stored in the grid. When you take the InvocableMap and combine it with the linear scalability of Coherence itself you get an extremely powerful solution. The following tests show that you can take an application that Coherence provides you (the developer, the engineer, the architect, and so on) the ability to build an application when that can scale out to handle any SLA requirement, any increase in throughput requirements. For example, the following test demonstrate Coherence's ability to linearly increase the number of trades aggregated per second as you increase hardware. That is, if one machine can aggregate X trades per second, if you add a second machine to the data grid you will be able to aggregate 2X trades per second, if you add a third machine to the data grid you will be able to aggregate 3X trades per second and so on.
All of the Data Grid capabilities described below are features of Coherence Enterprise Edition and higher.
First, we need some data to aggregate. Example O-1 illustrates a Trade
object with a three properties Id
, Price
, and Symbol
.
Example O-1 Trade Object Defining Three Properties
package com.tangosol.examples.coherence.data; import com.tangosol.util.Base; import com.tangosol.util.ExternalizableHelper; import com.tangosol.io.ExternalizableLite; import java.io.IOException; import java.io.NotActiveException; import java.io.DataInput; import java.io.DataOutput; /** * Example Trade class * * @author erm 2005.12.27 */ public class Trade extends Base implements ExternalizableLite { /** * Default Constructor */ public Trade() { } public Trade(int iId, double dPrice, String sSymbol) { setId(iId); setPrice(dPrice); setSymbol(sSymbol); } public int getId() { return m_iId; } public void setId(int iId) { m_iId = iId; } public double getPrice() { return m_dPrice; } public void setPrice(double dPrice) { m_dPrice = dPrice; } public String getSymbol() { return m_sSymbol; } public void setSymbol(String sSymbol) { m_sSymbol = sSymbol; } /** * Restore the contents of this object by loading the object's state from the * passed DataInput object. * * @param in the DataInput stream to read data from to restore the * state of this object * * @throws IOException if an I/O exception occurs * @throws NotActiveException if the object is not in its initial state, and * therefore cannot be deserialized into */ public void readExternal(DataInput in) throws IOException { m_iId = ExternalizableHelper.readInt(in); m_dPrice = in.readDouble(); m_sSymbol = ExternalizableHelper.readSafeUTF(in); } /** * Save the contents of this object by storing the object's state into the * passed DataOutput object. * * @param out the DataOutput stream to write the state of this object to * * @throws IOException if an I/O exception occurs */ public void writeExternal(DataOutput out) throws IOException { ExternalizableHelper.writeInt(out, m_iId); out.writeDouble(m_dPrice); ExternalizableHelper.writeSafeUTF(out, m_sSymbol); } private int m_iId; private double m_dPrice; private String m_sSymbol; }
The cache configuration is easy through the XML Cache Configuration Elements. Example O-2 defines one wildcard cache-mapping
that maps to one caching-scheme
which has unlimited capacity:
Example O-2 Mapping a cache-mapping to a caching-scheme with Unlimited Capacity
<?xml version="1.0"?> <!DOCTYPE cache-config SYSTEM "cache-config.dtd"> <cache-config> <caching-scheme-mapping> <cache-mapping> <cache-name>*</cache-name> <scheme-name>example-distributed</scheme-name> </cache-mapping> </caching-scheme-mapping> <caching-schemes> <!-- Distributed caching scheme. --> <distributed-scheme> <scheme-name>example-distributed</scheme-name> <service-name>DistributedCache</service-name> <backing-map-scheme> <class-scheme> <scheme-ref>unlimited-backing-map</scheme-ref> </class-scheme> </backing-map-scheme> <autostart>true</autostart> </distributed-scheme> <!-- Backing map scheme definition used by all the caches that do not require any eviction policies --> <class-scheme> <scheme-name>unlimited-backing-map</scheme-name> <class-name>com.tangosol.util.SafeHashMap</class-name> <init-params></init-params> </class-scheme> </caching-schemes> </cache-config>
Example O-3 illustrates the code to add an index
to the Price
property. Adding an index to this property increases performance by allowing Coherence to access the values directly rather than having to deserialize each item to accomplish the calculation
Example O-3 Adding an Index to the Price Property
ReflectionExtractor extPrice = new ReflectionExtractor("getPrice"); m_cache.addIndex(extPrice, true, null);
In our tests the aggregation speed was increased by more than 2x after an index was applied.
Example O-4 illustrates the code to perform a parallel aggregation across all JVMs in the data grid. The aggregation is initiated and results received by a single client. That is, a single "low-power" client is able to use the full processing power of the cluster/data grid in aggregate to perform this aggregation in parallel with just one line of code.
A test run does several things:
Loads 200,000 trade objects into the data grid.
Adds indexes to Price
property.
Performs a parallel aggregation of all trade objects stored in the data grid. This aggregation step is done 20 times to obtain an "average run time" to ensure that the test takes into account garbage collection.
Loads 400,000 trade objects into the data grid.
Repeats steps 2 and 3.
Loads 600,000 trade objects into the data grid.
Repeats steps 2 and 3.
Loads 800,000 trade objects into the data grid.
Repeats steps 2 and 3.
Loads 1,000,000 trade objects into the data grid.
Repeats steps 2 and 3.
Client Considerations: The test client itself is run on an Intel Core Duo iMac which is marked as local storage disabled. The command line used to start the client was:
java ... -Dtangosol.coherence.distributed.localstorage=false -Xmx128m -Xms128m com.tangosol.examples.coherence.invocable.TradeTest
Start 4 JVMs on one Xserve - Perform a "test run"
Start 4 JVMs on each of two Xserves - Perform a "test run"
Start 4 JVMs on each of three Xserves - Perform a "test run"
Start 4 JVMs on each of four Xserves - Perform a "test run"
Server Considerations: In this case a "JVM" refers to a cache server instance (that is, a data grid node) that is a standalone JVM responsible for managing/storing the data. I used the DefaultCacheServer helper class to accomplish this.
The command line used to start the server was:
java ... -Xmx384m -Xms384m -server com.tangosol.net.DefaultCacheServer
As you can see in the following graph the average aggregation time for the aggregations decreases linearly as more cache servers/machines are added to the data grid!
Note:
The lowest and highest times were not used in the calculations below resulting in a data set of eighteen results used to create an average.Similarly, the following graph illustrates how the aggregations per second scales linearly as you add more machines! When moving from 1 machine to 2 machines the trades aggregated per second double, when moving from 2 machines to 4 machines the trades aggregated per second double again.
Note:
FAILOVER!The above aggregations will complete successfully and correctly even if one of the cache servers or and entire machine fails during the aggregation!
Combining the Coherence data grid (that is, partitioned cache) with the InvocableMap features enables:
Applications to scale out data grid calculations linearly;
Groups to meet increasingly aggressive SLAs by dynamically/transparently adding more resources to the data grid. That is, if you need to achieve 1,837,932 trade aggregations per second all that is required is to start 16 more cache servers across four more machines.