13 Pre-Loading the Cache

This section describes different patterns you can use to pre-load the cache. The patterns include bulk loading and distributed loading.

13.1 Performing Bulk Loading and Processing

Example 13-5, PagedQuery.java, demonstrates techniques for efficiently bulk loading and processing items in a Coherence Cache.

13.1.1 Bulk Writing to a Cache

A common scenario when using Coherence is to pre-populate a cache before the application uses it. A simple way to do this is illustrated by the Java code in Example 13-1:

Example 13-1 Pre-Loading a Cache

public static void bulkLoad(NamedCache cache, Connection conn)
    {
    Statement s;
    ResultSet rs;
    
    try
        {
        s = conn.createStatement();
        rs = s.executeQuery("select key, value from table");
        while (rs.next())
            {
            Integer key   = new Integer(rs.getInt(1));
            String  value = rs.getString(2);
            cache.put(key, value);
            }
        ...
        }
    catch (SQLException e)
        {...}
    }

This technique works, but each call to put may result in network traffic, especially for partitioned and replicated caches. Additionally, each call to put will return the object it just replaced in the cache (per the java.util.Map interface) which will add more unnecessary overhead. Loading the cache can be made much more efficient by using the ConcurrentMap.putAll method instead. This is illustrated in Example 13-2:

Example 13-2 Pre-Loading a Cache Using ConcurrentMap.putAll

public static void bulkLoad(NamedCache cache, Connection conn)
    {
    Statement s;
    ResultSet rs;
    Map       buffer = new HashMap();

    try
        {
        int count = 0;
        s = conn.createStatement();
        rs = s.executeQuery("select key, value from table");
        while (rs.next())
            {
            Integer key   = new Integer(rs.getInt(1));
            String  value = rs.getString(2);
            buffer.put(key, value);

            // this loads 1000 items at a time into the cache
            if ((count++ % 1000) == 0)
                {
                cache.putAll(buffer);
                buffer.clear();
                }
            }
        if (!buffer.isEmpty())
            {
            cache.putAll(buffer);
            }
        ...
        }
    catch (SQLException e)
        {...}
    }

13.1.2 Efficient processing of filter results

Coherence provides the ability to query caches based on criteria by using the Filter API. Here is an example (given entries with integers as keys and strings as values):

Example 13-3 Using a Filter to Query a Cache

NamedCache c = CacheFactory.getCache("test");

// Search for entries that start with 'c'
Filter query = new LikeFilter(IdentityExtractor.INSTANCE, "c%", '\\', true);

// Perform query, return all entries that match
Set results = c.entrySet(query);
for (Iterator i = results.iterator(); i.hasNext();)
    {
    Map.Entry e = (Map.Entry) i.next();
    out("key: "+e.getKey() + ", value: "+e.getValue());
    }

This example works for small data sets, but it may encounter problems, such as running out of heap space, if the data set is too large. Example 13-4 illustrates a pattern to process query results in batches to avoid this problem:

Example 13-4 Processing Query Results in Batches

public static void performQuery()
    {
    NamedCache c = CacheFactory.getCache("test");

    // Search for entries that start with 'c'
    Filter query = new LikeFilter(IdentityExtractor.INSTANCE, "c%", '\\', true);

    // Perform query, return keys of entries that match
    Set keys = c.keySet(query);

    // The amount of objects to process at a time
    final int BUFFER_SIZE = 100;

    // Object buffer
    Set buffer = new HashSet(BUFFER_SIZE);

    for (Iterator i = keys.iterator(); i.hasNext();)
        {
        buffer.add(i.next());

        if (buffer.size() >= BUFFER_SIZE)
            {
            // Bulk load BUFFER_SIZE number of objects from cache
            Map entries = c.getAll(buffer);

            // Process each entry
            process(entries);

            // Done processing these keys, clear buffer
            buffer.clear();
            }
        }
        // Handle the last partial chunk (if any)
        if (!buffer.isEmpty())
            {
            process(c.getAll(buffer));
            }
    }

public static void process(Map map)
    {
    for (Iterator ie = map.entrySet().iterator(); ie.hasNext();)
        {

        Map.Entry e = (Map.Entry) ie.next();
        out("key: "+e.getKey() + ", value: "+e.getValue());
        }
    }

In this example, all keys for entries that match the filter are returned, but only BUFFER_SIZE (in this case, 100) entries are retrieved from the cache at a time.

Note that LimitFilter can be used to process results in parts, similar to the example above. However LimitFilter is meant for scenarios where the results will be paged, such as in a user interface. It is not an efficient means to process all data in a query result.

13.1.3 A Bulk Loading and Processing Example

Example 13-5 illustrates PagedQuery.java, a sample program that demonstrates the concepts described in the previous section.

To run the example, follow these steps:

  1. Save the following Java file as com/tangosol/examples/PagedQuery.java

  2. Point the classpath to the Coherence libraries and the current directory

  3. Compile and run the example

Example 13-5 A Sample Bulk Loading Program

package com.tangosol.examples;

import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;
import com.tangosol.net.cache.NearCache;
import com.tangosol.util.Base;
import com.tangosol.util.Filter;
import com.tangosol.util.filter.LikeFilter;

import java.io.Serializable;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.HashSet;


/**
* This sample application demonstrates the following:
* <ul>
* <li>
* <b>Obtaining a back cache from a near cache for populating a cache.</b>
* Since the near cache holds a limited subset of the data in a cache it is
* more efficient to bulk load data directly into the back cache instead of
* the near cache.
* </li>
* <li>
* <b>Populating a cache in bulk using <tt>putAll</tt>.</b>
* This is more efficient than <tt>put</tt> for a large amount of entries.
* </li>
* <li>
* <b>Executing a filter against a cache and processing the results in bulk.</b>
* This sample issues a query against the cache using a filter.  The result is
* a set of keys that represent the query results.  Instead of iterating
* through the keys and loading each item individually with a <tt>get</tt>,
* this sample loads entries from the cache in bulk using <tt>getAll</tt> which
* is more efficient.
* </li>
*
* @author cp
*/
public class PagedQuery
        extends Base
    {
    /**
    * Command line execution entry point.
    */
    public static void main(String[] asArg)
        {
        NamedCache cacheContacts = CacheFactory.getCache("contacts",
                Contact.class.getClassLoader());

        populateCache(cacheContacts);

        executeFilter(cacheContacts);

        CacheFactory.shutdown();
        }

    // ----- populate the cache ---------------------------------------------
    
    /**
    * Populate the cache with test data.  This example shows how to populate
    * the cache a chunk at a time using {@link NamedCache#putAll} which is more
    * efficient than {@link NamedCache#put}.
    *
    * @param cacheDirect  the cache to populate. Note that this should <b>not</b>
    *                     be a near cache since that will thrash the cache
    *                     if the load size exceeds the near cache max size.  
    */
    public static void populateCache(NamedCache cacheDirect)
        {
        if (cacheDirect.isEmpty())
            {
            Map mapBuffer = new HashMap();
            for (int i = 0; i < 100000; ++i)
                {
                // make up some fake data
                Contact contact = new Contact();
                contact.setName(getRandomName() + ' ' + getRandomName());
                contact.setPhone(getRandomPhone());
                mapBuffer.put(new Integer(i), contact);

                // this loads 1000 items at a time into the cache
                if ((i % 1000) == 0)
                    {
                    out("Adding "+mapBuffer.size()+" entries to cache");
                    cacheDirect.putAll(mapBuffer);
                    mapBuffer.clear();
                    }
                }
            if (!mapBuffer.isEmpty())
                {
                cacheDirect.putAll(mapBuffer);
                }
            }
        }

    /**
    * Creates a random name.
    *
    * @return  a random string between 4 to 11 chars long
    */
    public static String getRandomName()
        {
        Random rnd = getRandom();
        int    cch = 4 + rnd.nextInt(7);
        char[] ach = new char[cch];
        ach[0] = (char) ('A' + rnd.nextInt(26));
        for (int of = 1; of < cch; ++of)
            {
            ach[of] = (char) ('a' + rnd.nextInt(26));
            }
        return new String(ach);
        }

    /**
    * Creates a random phone number
    *
    * @return  a random string of integers 10 chars long
    */
    public static String getRandomPhone()
        {
        Random rnd = getRandom();
        return "("
            + toDecString(100 + rnd.nextInt(900), 3)
            + ") "
            + toDecString(100 + rnd.nextInt(900), 3)
            + "-"
            + toDecString(10000, 4);
        }

    // ----- process the cache ----------------------------------------------

    /**
    * Query the cache and process the results in batches.  This example
    * shows how to load a chunk at a time using {@link NamedCache#getAll}
    * which is more efficient than {@link NamedCache#get}.
    *
    * @param cacheDirect  the cache to issue the query against
    */
    private static void executeFilter(NamedCache cacheDirect)
        {
        Filter query = new LikeFilter("getName", "C%");

        // Let's say we want to process 100 entries at a time
        final int CHUNK_COUNT = 100;

        // Start by querying for all the keys that match
        Set setKeys = cacheDirect.keySet(query);

        // Create a collection to hold the "current" chunk of keys
        Set setBuffer = new HashSet();

        // Iterate through the keys
        for (Iterator iter = setKeys.iterator(); iter.hasNext(); )
            {
            // Collect the keys into the current chunk
            setBuffer.add(iter.next());

            // handle the current chunk when it gets big enough
            if (setBuffer.size() >= CHUNK_COUNT)
                {
                // Instead of retrieving each object with a get,
                // retrieve a chunk of objects at a time with a getAll.
                processContacts(cacheDirect.getAll(setBuffer));
                setBuffer.clear();
                }
            }

        // Handle the last partial chunk (if any)
        if (!setBuffer.isEmpty())
            {
            processContacts(cacheDirect.getAll(setBuffer));
            }
        }

    /**
    * Process the map of contacts. In a real application some sort of
    * processing for each map entry would occur. In this example each
    * entry is logged to output. 
    *
    * @param map  the map of contacts to be processed
    */
    public static void processContacts(Map map)
        {
        out("processing chunk of " + map.size() + " contacts:");
        for (Iterator iter = map.entrySet().iterator(); iter.hasNext(); )
            {
            Map.Entry entry = (Map.Entry) iter.next();
            out("  [" + entry.getKey() + "]=" + entry.getValue());
            }
        }

    // ----- inner classes --------------------------------------------------

    /**
    * Sample object used to populate cache
    */
    public static class Contact
            extends Base
            implements Serializable
        {
        public Contact() {}

        public String getName()
            {
            return m_sName;
            }
        public void setName(String sName)
            {
            m_sName = sName;
            }

        public String getPhone()
            {
            return m_sPhone;
            }
        public void setPhone(String sPhone)
            {
            m_sPhone = sPhone;
            }

        public String toString()
            {
            return "Contact{"
                    + "Name=" + getName()
                    + ", Phone=" + getPhone()
                    + "}";
            }

        public boolean equals(Object o)
            {
            if (o instanceof Contact)
                {
                Contact that = (Contact) o;
                return equals(this.getName(), that.getName())
                    && equals(this.getPhone(), that.getPhone());
                }
            return false;
            }

        public int hashCode()
            {
            int result;
            result = (m_sName != null ? m_sName.hashCode() : 0);
            result = 31 * result + (m_sPhone != null ? m_sPhone.hashCode() : 0);
            return result;
            }

        private String m_sName;
        private String m_sPhone;
        }
    }

Example 13-6 illustrates the terminal output from Coherence when you compile and run the example:

Example 13-6 Terminal Output from the Bulk Loading Program

$ export COHERENCE_HOME=[**Coherence install directory**]

$ export CLASSPATH=$COHERENCE_HOME/lib/coherence.jar:.

$ javac com/tangosol/examples/PagedQuery.java

$ java com.tangosol.examples.PagedQuery

2008-09-15 12:19:44.156 Oracle Coherence 3.4/405 <Info> (thread=main, member=n/a): Loaded operational configuration from
 resource "jar:file:/C:/coherence/lib/coherence.jar!/tangosol-coherence.xml"
2008-09-15 12:19:44.171 Oracle Coherence 3.4/405 <Info> (thread=main, member=n/a): Loaded operational overrides from
resource "jar:file:/C:/coherence/lib/coherence.jar!/tangosol-coherence-override-dev.xml"
2008-09-15 12:19:44.171 Oracle Coherence 3.4/405 <D5> (thread=main, member=n/a): Optional configuration override
"/tangosol-coherence-override.xml" is not specified

Oracle Coherence Version 3.4/405
 Grid Edition: Development mode
Copyright (c) 2000-2008 Oracle. All rights reserved.

2008-09-15 12:19:44.812 Oracle Coherence GE 3.4/405 <D5> (thread=Cluster, member=n/a): Service Cluster joined the cluster
with senior service member n/a
2008-09-15 12:19:48.062 Oracle Coherence GE 3.4/405 <Info> (thread=Cluster, member=n/a): Created a new cluster with
Member(Id=1, Timestamp=2008-09-15 12:19:44.609, Address=xxx.xxx.x.xxx:8088, MachineId=26828, Edition=Grid Edition,
Mode=Development, CpuCount=2, SocketCount=1) UID=0xC0A800CC00000112B9BC9B6168CC1F98
Adding 1024 entries to cache
Adding 1024 entries to cache

...repeated many times...

Adding 1024 entries to cache
Adding 1024 entries to cache
Adding 1024 entries to cache
processing chunk of 100 contacts:
  [25827]=Contact{Name=Cgkyleass Kmknztk, Phone=(285) 452-0000}
  [4847]=Contact{Name=Cyedlujlc Ruexrtgla, Phone=(255) 296-0000}
...repeated many times
  [33516]=Contact{Name=Cjfwlxa Wsfhrj, Phone=(683) 968-0000}
  [71832]=Contact{Name=Clfsyk Dwncpr, Phone=(551) 957-0000}
processing chunk of 100 contacts:
  [38789]=Contact{Name=Cezmcxaokf Kwztt, Phone=(725) 575-0000}
  [87654]=Contact{Name=Cuxcwtkl Tqxmw, Phone=(244) 521-0000}
...repeated many times
  [96164]=Contact{Name=Cfpmbvq Qaxty, Phone=(596) 381-0000}
  [29502]=Contact{Name=Cofcdfgzp Nczpdg, Phone=(563) 983-0000}
...
processing chunk of 80 contacts:
  [49179]=Contact{Name=Czbjokh Nrinuphmsv, Phone=(140) 353-0000}
  [84463]=Contact{Name=Cyidbd Rnria, Phone=(571) 681-0000}
...
  [2530]=Contact{Name=Ciazkpbos Awndvrvcd, Phone=(676) 700-0000}
  [9371]=Contact{Name=Cpqo Rmdw, Phone=(977) 729-0000}

13.2 Performing Distributed Bulk Loading

When pre-populating a Coherence partitioned cache with a large data set, it may be more efficient to distribute the work to Coherence cluster members. Distributed loading will allow for higher data throughput rates to the cache by leveraging the aggregate network bandwidth and CPU power of the cluster. When performing a distributed load, the application will need to decide on the following:

  • which cluster members will perform the load

  • how to divide the data set among the members

The application should consider the load that will be placed on the underlying data source (such as a database or file system) when selecting members and dividing work. For example, a single database can easily be overwhelmed if too many members execute queries concurrently.

13.2.1 A Distributed Bulk Loading Example

This section outlines the general steps to perform a simple distributed load. The example assumes that the data is stored in files and will be distributed to all storage-enabled members of a cluster.

  1. Retrieve the set of storage-enabled members. For example, the following method uses the getStorageEnabledMembers method to retrieve the storage-enabled members of a distributed cache.

    Example 13-7 Retrieving Storage-Enabled Members of the Cache

    protected Set getStorageMembers(NamedCache cache)
            {
            return ((DistributedCacheService) cache.getCacheService())
                    .getStorageEnabledMembers();
            }
    
  2. Divide the work among the storage enabled cluster members. For example, the following routine returns a map, keyed by member, containing a list of files assigned to that member.

    Example 13-8 Routine to Get a List of Files Assigned to a Cache Member

    protected Map<Member, List<String>> divideWork(Set members, List<String> fileNames)
            {
            Iterator i = members.iterator();
            Map<Member, List<String>> mapWork = new HashMap(members.size());
            for (String sFileName : fileNames)
                {
                Member member = (Member) i.next();
                List<String> memberFileNames = mapWork.get(member);
                if (memberFileNames == null)
                    {
                    memberFileNames = new ArrayList();
                    mapWork.put(member, memberFileNames);
                    }
                memberFileNames.add(sFileName);
    
                // recycle through the members
                if (!i.hasNext())
                    {
                    i = members.iterator();
                    }
                }
            return mapWork;
            }
    
  3. Launch a task that will perform the load on each member. For example, use Coherence's InvocationService to launch the task. In this case, the implementation of LoaderInvocable will need to iterate through memberFileNames and process each file, loading its contents into the cache. The cache operations normally performed on the client will need to be executed through the LoaderInvocable.

    Example 13-9 Class to Load Each Member of the Cache

    public void load()
            {
            NamedCache cache = getCache();
    
            Set members = getStorageMembers(cache);
    
            List<String> fileNames = getFileNames();
    
            Map<Member, List<String>> mapWork = divideWork(members, fileNames);
    
            InvocationService service = (InvocationService)
                    CacheFactory.getService("InvocationService");        
    
            for (Map.Entry<Member, List<String>> entry : mapWork.entrySet())
                {
                Member member = entry.getKey();
                List<String> memberFileNames = entry.getValue();
    
                LoaderInvocable task = new LoaderInvocable(memberFileNames, cache.getCacheName());
                service.execute(task, Collections.singleton(member), this);
                }
            }
    

13.2.2 Running a Distributed Bulk Loading Example

The examples in the previous section are taken from DistributedLoader.java, which is included in the attached zip file, coherence-example-distributedload.zip. This sample application uses the InvocationService to distribute the task of loading data into a cache. Each storage-enabled member of the cluster will be responsible for loading a portion of the data. The data in this example will come from several CSV files and the unit of work is one file. All storage-enabled nodes must have access to all of the data files.

To build and run the example, you must have the following software installed:

  • J2SE SDK 1.4 or later

  • Apache Ant

  • Oracle Coherence

13.2.2.1 Building the Sample Application

  1. Extract the contents of coherence-example-distributedload.zip into your file system.

  2. Update the bin\set-env.cmd file to reflect your system environment.

  3. Open a command prompt and execute the following command in the bin directory to build the samples:

    C:\distributedLoad\bin\ant.cmd build
    

    After running the samples, you can completely remove all build artifacts from your file system by running the clean command:

    C:\distributedLoad\bin\ant.cmd clean
    

13.2.2.2 Running the Sample Application

  1. Start multiple cache servers (from the bin directory):

    C:\distributedLoad\bin\server.cmd
    
  2. Run the client loader (from the bin directory):

    C:\distributedLoad\bin\load.cmd
    

After entering load.cmd on the command line, you will messages indicating that the various members are joining the services. Then you will see messages that indicate that the date is being distributed among the members. In this example, four cache servers were started.

Example 13-10 Server Response from the Sample Distributed Loading Application

...
  Member(Id=1, Timestamp=2008-09-15 16:49:04.359, Address=ip_address:8088, Mac
hineId=49690, Location=site:us.oracle.com,machine:machine_name,process:21952, Rol
e=CoherenceServer)
  Member(Id=2, Timestamp=2008-09-15 16:49:50.25, Address=ip_address:8089, Mach
ineId=49690, Location=site:us.oracle.com,machine:machine_name,process:16604, Role
=CoherenceServer)
  Member(Id=3, Timestamp=2008-09-15 16:49:54.937, Address=ip_address:8090, Mac
hineId=49690, Location=site:us.oracle.com,machine:machine_name,process:7344, Role
=CoherenceServer)
  Member(Id=4, Timestamp=2008-09-15 16:49:58.734, Address=ip_address:8091, Mac
hineId=49690, Location=site:us.oracle.com,machine:machine_name,process:19052, Role
=CoherenceServer)
  )
2008-09-15 16:51:00.593/4.890 Oracle Coherence GE 3.4/405 <D5> 
(thread=main, member=5): Loading stock file names from '..\data'
2008-09-15 16:51:00.593/4.890 Oracle Coherence GE 3.4/405 <D5> 
(thread=main, member=5): Files to load: [..\data\AAPL.CSV, ..\data\BT.CSV, ..
\data\DELL.CSV, ..\data\GOOG.CSV, ..\data\HPQ.CSV, ..\data\JAVA.CSV, ..\data\MSF
T.CSV, ..\data\ORCL.CSV, ..\data\YHOO.CSV]
2008-09-15 16:51:00.593/4.890 Oracle Coherence GE 3.4/405 <D5> 
(thread=main, member=5): Invoking load on member(Id=2) for files [..\data\BT.
CSV, ..\data\JAVA.CSV]
2008-09-15 16:51:00.640/4.937 Oracle Coherence GE 3.4/405 <D5> 
(thread=main, member=5): Invoking load on member(Id=3) for files [..\data\DEL
L.CSV, ..\data\MSFT.CSV]
2008-09-15 16:51:00.750/5.047 Oracle Coherence GE 3.4/405 <D5> 
(thread=main, member=5): Invoking load on member(Id=4) for files [..\data\GOO
G.CSV, ..\data\ORCL.CSV]
2008-09-15 16:51:00.781/5.078 Oracle Coherence GE 3.4/405 <D5> 
(thread=main, member=5): Invoking load on member(Id=1) for files [..\data\AAP
L.CSV, ..\data\HPQ.CSV, ..\data\YHOO.CSV]
2008-09-15 16:51:27.500/31.797 Oracle Coherence GE 3.4/405 <D5> 
(thread=Invocation:InvocationService, member=5): Finished loading on member:
 Member(Id=4, Timestamp=2008-09-15 16:49:58.734, Address=ip_address:8091, Mach
ineId=49690, Location=site:us.oracle.com,machine:machine_name,process:19052, Role
=CoherenceServer)
2008-09-15 16:51:31.640/35.937 Oracle Coherence GE 3.4/405 <D5> 
(thread=Invocation:InvocationService, member=5): Finished loading on member:
 Member(Id=2, Timestamp=2008-09-15 16:49:50.25, Address=ip_address:8089, Machi
neId=49690, Location=site:us.oracle.com,machine:machine_name,process:16604, Role=
CoherenceServer)
2008-09-15 16:51:32.812/37.109 Oracle Coherence GE 3.4/405 <D5> 
(thread=Invocation:InvocationService, member=5): Finished loading on member:
 Member(Id=3, Timestamp=2008-09-15 16:49:54.937, Address=ip_address:8090, Mach
ineId=49690, Location=site:us.oracle.com,machine:machine_name,process:7344, Role=
CoherenceServer)
2008-09-15 16:51:37.750/42.047 Oracle Coherence GE 3.4/405 <D5> 
(thread=Invocation:InvocationService, member=5): Finished loading on member:
 Member(Id=1, Timestamp=2008-09-15 16:49:04.359, Address=ip_address:8088, Mach
ineId=49690, Location=site:us.oracle.com,machine:machine_name,process:21952, Role
=CoherenceServer)
2008-09-15 16:51:37.796/42.093 Oracle Coherence GE 3.4/405 <D5> 
(thread=main, member=5): Load finished in 37.20 secs
2008-09-15 16:51:37.812/42.109 Oracle Coherence GE 3.4/405 <D5> (thread=main, member=5): Final cache size: 47131

C:\distributedload\bin>