In this chapter, you learn how to populate a Coherence cache with domain objects that are read from text files. You also learn the most efficient method of loading data into a Cache (in sequence).
This chapter contains the following sections:
This chapter assumes that you have completed "Caching a Complex Object using Java Serialization" and "Caching a Complex Object using Coherence PortableObject".
This chapter also assumes that you are familiar with using java.io.BufferedReader
to read text files, java.lang.String.split
method to parse text files, and java.text.SimpleDateFormat
to parse dates.
The following examples provide brief illustrations of these classes.
The code in Example 4-1 illustrates how to use BufferedReader
to open and close a text file called myFile
:
The sample code in Example 4-2 illustrates how to use the String.split
method to split a string into an array of substrings based on a delimiter. In this case, the delimiter is a comma:
The sample code in Example 4-3 illustrates how to parse a date expressed as a yyyy-MM-dd
formatted string into a java.util.Date
instance:
This exercise demonstrates two ways to create a console application that will populate a Coherence cache with domain objects. The application can use either the Java Serialization
or the Coherence com.tangosol.io.pof.PortableObject
implementation. The exercise also includes a stopwatch program so you can observe the time savings that the PortableObject
implementation provides.
To populate a cache with domain objects:
Create a new project called Lab5
.
See "Creating and Caching Complex Objects" for information on creating a new project.
Download the samples.zip
file from the Coherence Library main page:
The zip file contains the endofdaystocksummaries
folder which, in turn, contains several text files with stock price data.
Click the Tutorial Sample Data link on the Oracle Coherence Library main page to download the samples.zip
file.
Unzip samples.zip
and place the endofdaystocksummaries
folder in the \home\oracle\labs\Lab5
directory.
Write a console application (Java class) called CacheLoading
to load the entire end-of-day-stock summaries contained in the endofdaystocksummaries
folder into a single Coherence cache called "eodss
."
Example 4-4 illustrates a possible solution.
Example 4-4 Sample Cache Loading Program
package com.oracle.coherence.handson; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import com.tangosol.net.CacheFactory; import com.tangosol.net.NamedCache; public class CacheLoading { public static void loadPricesFor(String symbol, NamedCache namedCache) throws IOException, NumberFormatException, ParseException { SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd"); String fileName = "./endofdaystocksummaries/" + symbol + ".CSV"; BufferedReader in = new BufferedReader(new FileReader(fileName)); in.readLine(); String line = null; while ((line = in.readLine()) != null) { String[] parts = line.split(","); EndOfDayStockSummary eodss = new EndOfDayStockSummary(symbol, dateFormat.parse(parts[0]).getTime(), Double.parseDouble(parts[1]), Double.parseDouble(parts[2]), Double.parseDouble(parts[3]), Double.parseDouble(parts[4]), Double.parseDouble(parts[6]), Long.parseLong(parts[5])); namedCache.put(eodss.getKey(), eodss); } in.close(); } /** * @param args * @throws IOException * @throws ParseException * @throws NumberFormatException */ public static void main(String[] args) throws IOException, NumberFormatException, ParseException { NamedCache namedCache = CacheFactory.getCache("eodss"); String[] symbols = {"AAPL", "BT", "DELL", "GOOG", "HPQ", "JAVA", "MSFT", "ORCL", "YHOO"}; StopWatch st=new StopWatch(); st.start(); for (String symbol : symbols) { System.out.printf("Loading Symbol %s\n", symbol); loadPricesFor(symbol, namedCache); } st.stop(); System.out.println("elapsed time is:"+st.getElapsedTime()); } }
Create a Java class called EndOfDayStockSummary
and copy the code from the EndOfDayStockSummary
class from the previous exercise (see "Caching a Complex Object using Java Serialization").
Create a StopWatch
Java class to determine how long it takes to put
the summaries into the cache and how many may be put
per second.
Example 4-5 illustrates a possible solution.
Example 4-5 Sample Stopwatch Program
package com.oracle.coherence.handson; public class StopWatch { private long startTime; //time in ms since the EPOC private long finishTime; //time in ms since the EPOC private boolean isRunning; public StopWatch() { this.startTime = -1; this.finishTime = -1; isRunning = false; } public synchronized void start() { if (!isRunning) { this.startTime = System.currentTimeMillis(); isRunning = true; } } public synchronized void stop() { if (isRunning) { this.finishTime = System.currentTimeMillis(); isRunning = false; } } public boolean isRunning() { return isRunning; } public long getElapsedTime() { if (isRunning) { return System.currentTimeMillis() - startTime; } else if (startTime != -1) { return finishTime - startTime; } else { throw new IllegalStateException("StopWatch hasn't been started as yet. Not possible to determine the elapsed time."); } } public String toString(int count) { return String.format("StopWatch{startTime=%d, finishTime=%d, elapsedTime=%d ms (%f sec), rate=%f per sec}", startTime, finishTime, getElapsedTime(), getElapsedTime() / 1000.0, (double)count * 1000.0 / getElapsedTime()); } public String toString() { return String.format("StopWatch{startTime=%d, finishTime=%d, elapsedTime=%d ms (%f sec)}", startTime, finishTime, getElapsedTime(), getElapsedTime() / 1000.0); } }
Run your application using the following cluster configurations:
Run CacheLoader
with no cache servers running.
Figure 4-1 Sample Program Run without a Cache Server
Run CacheLoader
with a single cache server running (that is, run cache-server.cmd
in a shell).
Figure 4-2 Sample Program Run with One Cache Server
Run CacheLoader
with two cache servers running (that is, run cache-server.cmd
in a second shell).
Figure 4-3 Sample Program Run with Two Cache Servers
Run the CacheLoading
application configured with the following Java Options and with both cache servers running.
-Dtangosol.coherence.distributed.localstorage=false
Figure 4-4 Sample Program Run with Two Cache Servers and Local Storage Set to False
Modify your application to use the PortableObject
implementation of the EndOfDayStockSummary
class (You can copy the code from "Caching a Complex Object using Coherence PortableObject").
Save the changes and rebuild your application.
Stop all running cache servers.
Edit the lab4-pof-congig.xml
file to comment-out the user type Person
.
... <!-- <user-type> <type-id>1001</type-id> <class-name>com.oracle.coherence.handson.Person</class-name> </user-type> --> ...
Edit the CLASSPATH
environment variable in pof-cache-server.cmd
to add the \home\oracle\labs\Lab5\classes
directory. Start pof-cache-server.cmd
.
C:\oracle\product\coherence\bin>pof-cache-server.cmd
Edit the Run/Debug/Profile configuration. Change the Java process to be storage-disabled and also append the following line to the Java Options.
-Dtangosol.coherence.cacheconfig=\home\oracle\labs\lab4-cache-config.xml
Add additional CLASSPATH
entries to the existing project properties.
Navigate to Tools > Project Properties > Libraries and Classpath > Add JAR/Directory. In your project, add \home\oracle\labs
to your CLASSPATH
. Execute the application. What effect does this have on throughput?
Figure 4-5 Sample Program Run with Pof Serialization
Use Table 4-1 to enter your results. Observe that the throughput is more efficient when using PortableObject
as compared to Serialization
.
Table 4-1 Throughput Calculations: Using PortableObject versus Serialization
CacheLoading Application Configuration | No Cache Servers: Throughput (per sec) | One Cache Servers: Throughput (per sec) | Two Cache Servers: Throughput (per sec) |
---|---|---|---|
Using the Java serialization version of EndOfDayStockSummary |
|||
Using the Java serialization version of EndOfDayStockSummary with local storage disabled |
N/A |
||
Using the PortableObject version of EndOfDayStockSummary |
|||
Using the PortableObject version of EndOfDayStockSummary with local storage disabled |
N/A |
This exercise introduces the concept of querying and aggregating data in a cache. In this exercise:
Create a Java class to populate the cache with 10,000 Person
objects
Query the cache for specific data
Aggregate information within the cache and observe the changes in query times when you add cache members (On dual core machines only)
After putting complex objects in the named caches, you look at querying and aggregating information within the grid. The com.tangosol.util.QueryMap
interface provides methods for managing the values or keys within a cache. You can use filters to restrict your results. You can also define indexes to optimize your queries.
Because Coherence serializes information when storing, you will have the overhead of deserializing when querying. When indexes are added, the values identified in the index are not serialized and therefore, offer quicker access time. Some of the more useful methods in the QueryMap
interface are:
Set
entrySet(Filterfilter)
—Returns a set of entries that are contained in the map that satisfy the filter
addIndex(ValueExtractorextractor,booleanfOrdered, Comparator comparator)
—Adds an index
Set
keySet(Filter filter)
—Similar to entrySet
, but returns keys, not values
It is important to note that filtering occurs at Cache Entry Owner level. In a partitioned topology, filtering can be done in parallel because it is the primary partitions that do the filtering. The QueryMap
interface uses the Filter
classes. You can find more information on these classes in the API for the com.tangosol.util.filter
package.
All Coherence NamedCaches
implement the com.tangosol.util.QueryMap
interface. This allows NamedCaches
to support the searching for keys or entries in a cache that satisfy some condition. The condition can being represented as an object that implements the com.tangosol.util.Filter
interface.
The com.tangosol.util.filter
package contains several predefined classes that provide implementations of standard query expressions. Examples of these classes include GreaterFilter
, GreaterEquals
, LikeFilter
, NotEqualsFilter
, InFilter
, and so on. You can use these filters to construct and compose object-based equivalents of almost all SQL WHERE
clause expressions.
Note:
Coherence does not provide aSQLFilter
because it is unlikely that the objects placed in a cache are modeled in a relational manner, that is, using rows and columns (as they are typically represented in a database). Additionally, it is common that objects placed in a cache are not easily modeled using relational models, for example, large blobs.The Filter
classes use standard Java method reflection to represent test conditions. For example, the following filter represents the condition where the value returned from the getSymbol
method on an object in a cache (for example, endofdaystocksummary
) is for Oracle (ORCL
):
new EqualsFilter("getSymbol", "ORCL");
If the object tested with this filter fails to have a getSymbol
method, then the test will fail.
A couple of examples will make things clearer:
Return a set of people where their last name begins with Sm
:
Set macPeople = people.entrySet( new LikeFilter("getLastName", "Sm"));
Return a set containing all open trades:
Set openTrades = trades.entrySet( new EqualsFilter("isOpen", BOOLEAN.TRUE));
In addition to the entrySet
and keySet
methods defined by the QueryMap
interface, Coherence supports the definition of indexes, using the addIndex
method, to improve query performance. Unlike relational database systems, where indexes are defined according to well-known and strictly enforced collections of named columns (that is, a schema), Coherence does not have a schema. Though lacking a formal schema for data allows for significant flexibility and polymorphism, within applications, it means that an approach different from that of traditional database systems is required to define indexes and therefore, increase query performance.
To define the values that are to be indexed for each object placed in a cache, Coherence introduces the concept of a ValueExtractor
. A com.tangosol.util.ValueExtractor
is a simple interface that defines an "extract
" method. If given an object parameter, a ValueExtractor
returns some value based on the parameter.
A simple example of a ValueExtractor
implementation is the com.tangosol.util.extractor.ReflectionExtractor
, which uses reflection to return the result of a method call on an object. For example:
new ReflectionExtractor("getSymbol")
ValueExtractors
may be used throughout the Coherence API. Typically, however, they are used to define indexes.
To aggregate and query data in the cache:
Create a Java class to populate the cache with 10,000 random People
objects.
Create a JDeveloper project and call it Lab6
.
Change the Java process to be storage-disabled when it is run. Right click the new project and select Project Properties. In the project properties dialog box, select Run/Debug/Profile. In the Java Options field, enter these parameters:
-Dtangosol.coherence.distributed.localstorage=false -Dtangosol.coherence.log.level=3
Use the Person
class that you created in the earlier lab (Lab4
). If the Person
class in Lab4
implements PortableObject
, then modify the Person.java
in Lab4
to use Serialization
. Save and rebuild Lab4
. Right-click Lab6
and select Project Properties. Select Libraries and Classpath. If you do not have \home\oracle\labs\Lab4\classes
in your CLASSPATH
, click Add Jar/Directory.
Figure 4-8 Adding Jars and Directories to Classpath
Select the \home\oracle\labs\Lab4\classes
directory.
Figure 4-9 Adding Classes to the Classpath
Create a new Java class called PopulatePeople
. Ensure that it has a main
method.
Create the code in the new class to connect to the cache and create 10,000 random Person
objects. Hint: Use java.util.Random
to generate some random ages.
Random generator = new Random(); int age = generator.nextInt(100);
Example 4-6 illustrates a possible solution.
Example 4-6 Sample PopulatePeople Class
package com.oracle.coherence.handson;import com.tangosol.net.CacheFactory;import com.tangosol.net.NamedCache;import com.tangosol.util.extractor.ReflectionExtractor;import java.io.IOException;import java.util.Random;public class PopulatePeople { public PopulatePeople() { } public static void main(String[] args) throws IOException { NamedCache person = CacheFactory.getCache("person"); // add indexes //person.addIndex(new ReflectionExtractor("getGender"), true, null); //person.addIndex(new ReflectionExtractor("getAgeDouble"), false, null); Random generator = new Random(); for (int i = 1; i <= 10000; i++) { Person p = new Person(i, "Surname" + i, "Firstname" + i, "Address" + i, generator.nextInt(100) + 1, (generator.nextInt(2) == 1 ? Person.FEMALE : Person.MALE) ) ; person.put(p.getId(),p); } System.out.println("The entry set size is " + person.entrySet().size()); }}
Stop all other cache servers and run cache-server.sh
.
C:\oracle\product\coherence\bin>cache-server.cmd
Run PopulatePeople
from the JDeveloper IDE.
You should see results similar to the following:
Figure 4-11 Results of Populating the Cache
Can you think of a more efficient way of performing the 10,000 puts? Hint: See the information on bulk loading in Pre-Loading the Cache chapter of the Developer's Guide for Oracle Coherence.
Create a class to perform your queries.
Create a new Java class called QueryExample
with a main
method.
Use the entrySet
method to get the number of males, and the number of males aged 35 and above. Use the size()
method to get the number of records returned. You will use the aggregation functions to perform this more efficiently in a later practice.
Example 4-7 illustrates a possible solution.
Example 4-7 Sample QueryExample Class
package com.oracle.coherence.handson;import com.tangosol.net.CacheFactory;import com.tangosol.net.NamedCache;import com.tangosol.util.filter.AndFilter;import com.tangosol.util.filter.EqualsFilter;import com.tangosol.util.filter.GreaterEqualsFilter;import java.util.Set;public class QueryExample { public QueryExample() { } public static void main(String[] args) { NamedCache person = CacheFactory.getCache("person"); // get a set of all males Set males = person.entrySet( new EqualsFilter("getGender",Person.MALE )); Set malesOver35 = person.entrySet( new AndFilter( new EqualsFilter("getGender",Person.MALE ), new GreaterEqualsFilter("getAge",35)) ); System.out.println("Total number of males is " + males.size()); System.out.println("Total number of males > 35 " + malesOver35.size()); }}
Stop all running cache servers. Navigate to the \oracle\product\coherence\bin
directory. Edit the cache-server.cmd
file and modify the -cp
entry on the line beginning with $JAVAEXEC -server
to remove \home\oracle\labs\Lab4\classes
from the CLASSPATH
environment variable. Restart your cache server. What happens when you try to run the QueryExample
code? You should get an error similar to the following:
Exception in thread "main" (Wrapped: Failed request execution for DistributedCache service on Member(Id=1, Timestamp=2008-12-23 16:24:48.848, Address=130.35.99.248:8088, MachineId=49912, Location=site:us.oracle.com,machine:tpfaeffl-pc,process:6024, Role=CoherenceServer) (Wrapped) readObject failed: java.lang.ClassNotFoundException: com.oracle.coherence.handson.Person
What happened and why? When a QueryMap
is used, that is, when you use the entrySet
method to retrieve a set of entries in the cache that map your request, the request is processed on the storage-enabled members, and then returned to the process that requested the set.
What happens here is that the Coherence cache server does not know yet about the Person
object that you created. You will need to add the Person
object into CLASSPATH
of the Coherence cache server.
Edit cache-server.cmd
file and add the \home\oracle\labs\Lab4\classes
entry in the $JAVAEXEC -server
command. The command should look similar to the following:
$JAVAEXEC -server -showversion $JAVA_OPTS -cp "$COHERENCE_HOME/lib/coherence.jar:/home/oracle/labs/Lab4/classes" com.tangosol.net.DefaultCacheServer $1
Restart your cache server, execute PopulatePeople
and QueryExample
, and see what happens.
Figure 4-13 Results of Executing the Query
Create a class to perform aggregations on the data in the cache.
An EntryAggregator
(com.tangosol.util.InvocableMap.EntryAggregator
) enables you to perform operations on all or a specific set of objects and get an aggregation as a result. EntryAggregators
are essentially agents that execute services in parallel against the data within the cluster.
Aggregations are performed in parallel and can benefit from the addition of cluster members.
There are two ways of aggregating: aggregate over a collection of keys or by specifying a filter. Example 4-8 illustrates the methods that perform these aggregations.
Example 4-8 Methods to Aggregate Over Keys or by Specifying Filters
Object aggregate(Collection keys, InvocableMap.entryAggregator agg) Object aggregate(Filter filter, InvocableMap.entryAggregator agg)
The following example uses a filter.
Create a new Java class in the Lab6
project called AggregationExample
. Ensure that it has a main
method.
Figure 4-14 Creating an Aggregation Class
Create a new method in the Person
class that you created in the earlier lab [Lab4
-Person.java
] so that you can use the correct aggregators. Call the method getAgeDouble
.
public double getAgeDouble() { return (double)this.age; }
Figure 4-15 Creating the getAgeDouble Method
Stop and start the cache server after making this change.
In AggregationExample.java
, write the code to get the following from the cache:
—Average age of all males
—Average age of all females
—Average age
—Maximum age
Hint 1: The following code gets the average age for all males:
Double averageAgeMales = (Double)person.aggregate( new EqualsFilter("getGender",Person.MALE ), new DoubleAverage("getAgeDouble") );
Hint 2: To query all the objects in a named cache, you can use either (Filter)null
or a new
AlwaysFilter()
.
Example 4-9 illustrates a possible solution.
Example 4-9 Sample Data Aggregation Class
package com.oracle.coherence.handson;
import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;
import com.tangosol.util.Filter;
import com.tangosol.util.aggregator.DoubleAverage;
import com.tangosol.util.aggregator.DoubleMax;
import com.tangosol.util.filter.AlwaysFilter;
import com.tangosol.util.filter.EqualsFilter;
import java.math.BigDecimal;
public class AggregationExample {
public AggregationExample() {
}
public static void main(String[] args) {
Double averageAgeMales = null;
Double averageAgeFemales = null;
Double maxAge = null;
Double averageAge = null;
int max = 100;
NamedCache person = CacheFactory.getCache("person");
long totalTime = 0;
// create a new query
for (int i = 0; i< max; i++) {
long startTime = System.currentTimeMillis();
averageAgeMales = (Double)person.aggregate(
new EqualsFilter("getGender",Person.MALE ),
new DoubleAverage("getAgeDouble")
);
averageAgeFemales = (Double)person.aggregate(
new EqualsFilter("getGender",Person.FEMALE ),
new DoubleAverage("getAgeDouble")
);
maxAge = (Double)person.aggregate(
(Filter)null, // new AlwaysFilter()
new DoubleMax("getAgeDouble")
);
averageAge = (Double)person.aggregate(
new EqualsFilter("getGender",Person.FEMALE ),
new DoubleAverage("getAgeDouble")
);
long endTime = System.currentTimeMillis();
System.out.println("Total time taken is " + (endTime - startTime) / 1000F + " seconds");
totalTime += (endTime - startTime);
}
System.out.println("Average time is " + (totalTime / max) / 1000F);
//System.out.println("Average age of males is " + averageAgeMales);
//System.out.println("Average age of femals is " + averageAgeFemales);
//System.out.println("Max age is " + maxAge);
//System.out.println("Average age " + averageAge);
}
}
Observe what happens when you add new cache members.
Modify AggregationExample
to put in timing information. Run the queries in a loop for 100 times and get an average time. This ensures that garbage collection or normal machine spikes will not influence the results.
Hint: You can get the current time using the following code:
long timeStart = System.currentTimeMillis();
This line is illustrated in bold font in Example 4-9.
Run your code with one cache server and note the average time.
Figure 4-16 Aggregation Process Run with One Cache Server
Start up a second cache server, allow for the data to be redistributed, and rerun the code. What happens?
Figure 4-17 Aggregation Process Run with Two Cache Servers
Note:
If you do not have a dual core machine, you may not see an improvement in aggregation times. Why do you think this happens? The following are some of the sample timings:What happens if you increase the number of cache servers to three?
Figure 4-18 Aggregation Process Run with Three Cache Servers
Add indexes to the PopulatePeople
class to improve performance. Hints: Find the addIndex
method in the Javadoc for the QueryMap
interface. Note that when you add indexes, you should see a significant improvement in performance. Un-comment the index code lines and re-run your application.
// add indexes person.addIndex(new ReflectionExtractor("getGender"), true, null); person.addIndex(new ReflectionExtractor("getAgeDouble"), false, null);
Figure 4-19 Aggregation Process Run with One Cache Server and Indexing
Figure 4-20 Aggregation Process Run with Three Cache Servers and Indexing