36 Working with Partitions

You can use data affinity with Coherence and also change the default partition setup.The instructions are specific to distributed caches.

This chapter includes the following sections:

Specifying Data Affinity

Learn about using data affinity with Coherence; how to configure data affinity; and review some data affinity examples.

This section includes the following topics:

Overview of Data Affinity

Data affinity describes the concept of ensuring that a group of related cache entries is contained within a single cache partition. This ensures that all relevant data is managed on a single primary cache node (without compromising fault-tolerance).

Affinity may span multiple caches (if they are managed by the same cache service, which generally is the case). For example, in a master-detail pattern such as an Order-LineItem, the Order object may be co-located with the entire collection of LineItem objects that are associated with it.

The are two benefits for using data affinity. First, only a single cache node is required to manage queries and transactions against a set of related items. Second, all concurrency operations are managed locally and avoids the need for clustered synchronization.

Several standard Coherence operations can benefit from affinity. These include cache queries, InvocableMap operations, and the getAll, putAll, and removeAll methods.

Note:

Data affinity is specified in terms of entry keys (not values). As a result, the association information must be present in the key class. Similarly, the association logic applies to the key class, not the value class.

Affinity is specified in terms of a relationship to a partitioned key. In the Order-LineItem example above, the Order objects would be partitioned normally, and the LineItem objects would be associated with the appropriate Order object.

The association does not have to be directly tied to the actual parent key - it only must be a functional mapping of the parent key. It could be a single field of the parent key (even if it is non-unique), or an integer hash of the parent key. All that matters is that all child keys return the same associated key; it does not matter whether the associated key is an actual key (it is simply a "group id"). This fact may help minimize the size impact on the child key classes that do not contain the parent key information (as it is derived data, the size of the data may be decided explicitly, and it also does not affect the behavior of the key). Note that making the association too general (having too many keys associated with the same "group id") can cause a "lumpy" distribution (if all child keys return the same association key regardless of what the parent key is, the child keys are all assigned to a single partition, and are not spread across the cluster).

Specifying Data Affinity with a KeyAssociation

For application-defined keys, the class (of the cache key) can implement com.tangosol.net.cache.KeyAssociation as follows:

Example 36-1 Creating a Key Association

import com.tangosol.net.cache.KeyAssociation;

public class LineItemId implements KeyAssociation
   {
   // {...}

   public Object getAssociatedKey()
       {
       return getOrderId();
       }

   // {...}
   }

Specifying Data Affinity with a KeyAssociator

Applications may also provide a class the implements the KeyAssociator interface:

Example 36-2 A Custom KeyAssociator

import com.tangosol.net.partition.KeyAssociator;

public class LineItemAssociator implements KeyAssociator
    {
    public Object getAssociatedKey(Object oKey)
        {
        if (oKey instanceof LineItemId)
            {
            return ((LineItemId) oKey).getOrderId();
            }
        else if (oKey instanceof OrderId)
            {
            return oKey;
            }
        else
            {
            return null;
            }
        }

    public void init(PartitionedService service)
        {
        }
    }

The key associator is configured for a NamedCache in the <distributed-scheme> element that defined the cache:

Example 36-3 Configuring a Key Associator

<distributed-scheme>
    ...
    <key-associator>
        <class-name>LineItemAssociator</class-name>
    </key-associator>
</distributed-scheme>

Deferring the Key Association Check

Key association can be implemented either on the cluster or on the extend client. When using extend clients, the best practice is to implement key association on the client, which provides the best performance by processing the keys before they are sent to the cluster. Key association is processed on the client by default. Existing client implementations that rely on key association on the cluster must set the defer-key-association-check parameter in order to force the processing of key classes on the cluster.

To force key association processing to be done on the cluster side instead of by the extend client, set the <defer-key-association-check> element, within a <remote-cache-scheme> element, in the client-side cache configuration to true. For example:

<remote-cache-scheme>
   ...
   <defer-key-association-check>true</defer-key-association-check>
</remote-cache-scheme>

Note:

If the parameter is set to true, a key class implementation must be found on the cluster even if key association is no being used.

See Implementing a Java Version of a .NET Object and Implementing a Java Version of a C++ Object) in Developing Remote Clients for Oracle Coherence for more information on deferring key association with .NET and C++ clients, respectively.

Example of Using Affinity

Example 36-4 illustrates how to use affinity to create a more efficient query (NamedCache.entrySet(Filter)) and cache access (NamedCache.getAll(Collection)).

Example 36-4 Using Affinity for a More Efficient Query

OrderId orderId = new OrderId(1234);

// this Filter is applied to all LineItem objects to fetch those
// for which getOrderId() returns the specified order identifier
// "select * from LineItem where OrderId = :orderId"Filter filterEq = new EqualsFilter("getOrderId", orderId);

// this Filter directs the query to the cluster node that currently owns
// the Order object with the given identifier
Filter filterAsc = new KeyAssociatedFilter(filterEq, orderId);

// run the optimized query to get the ChildKey objects
Set setLineItemKeys = cacheLineItems.keySet(filterAsc);

// get all the Child objects immediately
Set setLineItems = cacheLineItems.getAll(setLineItemKeys);
 
// Or remove all immediately
cacheLineItems.keySet().removeAll(setLineItemKeys);

Changing the Number of Partitions

The default partition count for a distributed cache service is 257 partitions.

Each cache server in the cluster that hosts a distributed cache service manages a balanced number of the partitions. For example, each cache server in a cluster of four cache servers manages 64 partitions. The default partition count is typically acceptable for clusters containing up to 16 cache servers. However, larger clusters require more partitions to ensure optimal performance.

All members of the same service must have the same consistent partition count. Changing the partition count with active persistence is not supported out-of-the-box. See Workarounds to Migrate a Persistent Service to a Different Partition Count in Administering Oracle Coherence.

This section includes the following topics:

Define the Partition Count

To change the number of partitions for a distribute cache service, edit the cache configuration file and add a <partition-count> element, within the <distributed-scheme> element, that includes the number of partitions to use for the service. For example:

<distributed-scheme>
   <scheme-name>distributed</scheme-name>
   <service-name>DistributedCache</service-name>
   <partition-count>1181</partition-count>
   ...
</distributed-scheme>

Deciding the number of Partitions

There is no exact formula for selecting a partition count. An ideal partition count balances the number of partitions on each cluster member with the amount of data each partition holds. Use the following guidelines when selecting a partition count and always perform tests to verify that the partition count is not adversely affecting performance.

  • The partition count should always be a prime number. A list of primes can be found at http://primes.utm.edu/lists/.

  • The number of partitions must be large enough to support a balanced distribution without each member managing too few partitions. For example, a partition count that results in only two partitions on each member is too constraining.

  • The number of partitions must not be too large that network bandwidth is wasted with transfer overhead and bookkeeping for many partition transfers (a unit of transfer is a partition). For example, transferring thousands of partitions to a new cache server member requires a greater amount of network resources and can degrade cluster performance especially during startup.

  • The amount of data a partition manages must not be too large (the more data a partition manages: the higher the partition promotion and transfer costs). The amount of data a partition manages is only limited by the amount of available memory on the cache server. A partition limit of 50MB typically ensures good performance. A partition limit between 50MB-100MB (even higher with 10GbE or faster) can be used for larger clusters. Larger limits can be used with the understanding that there will be a slight increase in transfer latency and that larger heaps with more overhead space are required.

As an example, consider a cache server that is configured with a 4G heap and stores approximately 1.3G of primary data not including indexes (leaving 2/3 of the heap for backup and scratch space). If the decided partition limit is a conservative 25MB, then a single cache server can safely use 53 partitions (1365M/25M rounded down to the previous prime). Therefore, a cluster that contains 20 cache servers can safely use 1051 partitions (53*20 rounded down to the previous prime) and stores approximately 25G of primary data. A cluster of 100 cache servers can safely use 5297 partitions and can store approximately 129G of primary data.

Changing the Partition Distribution Strategy

Partition distribution defines how partitions are assigned to storage-enabled cluster members.Coherence uses centralized partition distribution to allow global distribution decision to be carried out by each storage-enabled member. Centralized distribution allows for expressive distribution algorithms and uses a complete, global view of the service. In addition, custom centralized distribution strategies can be created by implementing the com.tangosol.net.partition.PartitionAssignmentStrategy interface.

This section includes the following topics:

Specifying a Partition Assignment Strategy

The following predefined partition assignment strategies are available:

  • simple – (default) The simple assignment strategy attempts to balance partition distribution while ensuring machine-safety.

  • mirror:<service-name> – The mirror assignment strategy attempts to co-locate the service's partitions with the partitions of the specified service. This strategy is used to increase the likelihood that key-associated, cross-service cache access remains local to a member.

  • custom – a class that implements the com.tangosol.net.partition.PartitionAssignmentStrategy interface.

To configure a partition assignment strategy for a specific partitioned cache service, add a <partition-assignment-strategy> element within a distributed cache definition:

<distributed-scheme>
   ...        
   <partition-assignment-strategy>mirror:<MyService>
   </partition-assignment-strategy>
   ...
</distributed-scheme>

To configure the partition assignment strategy for all instances of the distributed cache service type, override the partitioned cache service's partition-assignment-strategy initialization parameter in an operational override file. For example:

<?xml version='1.0'?>

<coherence xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns="http://xmlns.oracle.com/coherence/coherence-operational-config"
   xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-operational-config
   coherence-operational-config.xsd">
   <cluster-config>
      <services>
         <service id="3">
            <init-params>
               <init-param id="21">
                  <param-name>partition-assignment-strategy</param-name>
                  <param-value>mirror:<MyService></param-value>
               </init-param>
            </init-params>
         </service>
      </services>
   </cluster-config>
</coherence>

Enabling a Custom Partition Assignment Strategy

To specify a custom partition assignment strategy, include an <instance> subelement within the <partition-assignment-strategy> element and provide a fully qualified class name that implements the com.tangosol.net.partition.PartitionAssignmentStrategy interface. A custom class can also extend the com.tangosol.net.partition.SimpleAssignmentStrategy class. See instance. The following example enables a partition assignment strategy that is implemented in the MyPAStrategy class.

<distributed-scheme>
   ...        
   <partition-assignment-strategy>
      <instance>
         <class-name>package.MyPAStrategy</class-name>
      </instance>
   </partition-assignment-strategy>
   ...
</distributed-scheme>

As an alternative, the <instance> element supports the use of a <class-factory-name> element to use a factory class that is responsible for creating PartitionAssignmentStrategy instances, and a <method-name> element to specify the static factory method on the factory class that performs object instantiation. The following example gets a strategy instance using the getStrategy method on the MyPAStrategyFactory class.

<distributed-scheme>
   ...        
   <partition-assignment-strategy>
      <instance>
         <class-factory-name>package.MyPAStrategyFactory</class-factory-name>
         <method-name>getStrategy</method-name>
      </instance>
   </partition-assignment-strategy>
   ...
</distributed-scheme>

Any initialization parameters that are required for an implementation can be specified using the <init-params> element. The following example sets the iMaxTime parameter to 2000.

<distributed-scheme>
   ...        
   <partition-assignment-strategy>
      <instance>
         <class-name>package.MyPAStrategy</class-name>
         <init-params>
            <init-param>
               <param-name>iMaxTime</param-name>
               <param-value>2000</param-value>
            </init-param>
         </init-params>
      </instance>
   </partition-assignment-strategy>
   ...
</distributed-scheme>

Logging Partition Events

The most commonly used service in Coherence to store and access data is the distributed / partitioned service. This service offers partitioned access to store and retrieve data, and thus provides scalability, in addition to redundancy by ensuring that replicas are in sync.

The concept of partitioning can be entirely opaque to you (the end user) as Coherence transparently maps keys to partitions and partitions to members. As ownership members join and leave the partitioned service, the partitions are redistributed across the new/remaining members, thus avoiding an entire rehash of the data. Coherence also designates replicas providing them an initial snapshot followed by a journal of updates as they occur on the primary member.

These partition lifecycle events (members joining and leaving the service) result in partitions being blocked. Therefore, Coherence attempts to reduce this time of unavailability as much as possible. Until now, there has been minimal means to track this partition unavailability. Logging provides insight into these partition lifecycle events and highlight when they start and end. This feature helps you correlate the increased response times with these lifecycle events.

This section includes the following topics:

Data Availability

To preserve data integrity when partition events occur, for example, partition movements between members, the read and write access to data is temporarily blocked. Data gets blocked when re-distribution takes place or indices are built. The time involved is usually short, but can add up if the cache contains a significant amount of data.

Table 36-1 Types of Events

Event Type Description

Redistribution

When a server joins or leaves a cluster, many events occur on each member in the cluster, both existing and new, which correspond to the movement of data and backup partitions according to the partitioning strategy. This scheme helps determine the names of members who own partitions and the members who own the backups.

Restoring from backup

After the primary partitions are lost, the backups are moved into the primary storage in the members where they are located. These partitions are locked until the event is complete.

Recovery from persistence

Persistence maintenance, such as snapshot creation and recovery from persistence, will cause the affected partitions to be unavailable.

Index building

If an application needs to have data indexed, it is typically done by calling addIndex on a NamedCache. If the cache already contains a significant amount of data, or the cost of computing the index per entry (the ValueExtractor) is high, this operation can take some time during which any submitted queries will be blocked, waiting for the index data structures to be populated.

Note: Regular index maintenance, such as adding or deleting elements, does not incur the same unavailability penalty.

Using Partition Events Logs

By default, logging of times when partitions are unavailable is turned off as it generates a significant amount of logs. To enable logging of partition events, set the coherence.distributed.partition.events property to log and set the log level value to 8 or greater.

For example:
-Dcoherence.distributed.partition.events=log

Logging Events

The events listed in the table below are logged, one per partition except during the initial assignment of partitions. Along with the events, the owning member and the time it made the partition unavailable, are also logged.

Table 36-2 List of Events Logged

Event Type Description

ASSIGN

The partition is, either initially or because of losing the primary and all backups, assigned to a cluster member.

PRIMARY_TRANSFER_OUT

The partition is transferred to a different member.

BACKUP_TRANSFER_OUT

The primary partition owner transfers a snapshot of the partition and all its content to the targeted member as it will be in the chain of backup replicas.

PRIMARY_TRANSFER_IN

This member receives a partition and all related data for primary ownership. This event occurs due to the PRIMARY_TRANSFER_OUT event from the existing owner of the partition.

RESTORE

The loss of primary partitions results in backup owners (replicas) restoring data from the backup storage to the primary member for the affected partitions.

INDEX_BUILD

Index data structures are populated for the relevant partitions. This event affects queries that use the indices but does not block the key-based data access or mutation.

PERSISTENCE

The relevant partitions are made unavailable due to persistence maintenance operations. These operations include recovery from persistence and snapshot creation.

Note:

The times logged for these events are in millisecionds.

Example 36-5 Logging Events

On Member 1:

At startup:

2021-06-11 09:26:10.159/5.522 Oracle Coherence GE 14.1.1.2206.1 <D8> (thread=PartitionedTopicDedicated:0x000A:5, member=1): PartitionSet{0..256}, Owner: 1, Action: ASSIGN, UnavailableTime: 0
...
Application calls addIndex() on a cache:
2021-06-11 09:28:36.872/152.234 Oracle Coherence GE 14.1.1.2206.1 <D8> (thread=PartitionedCacheDedicated:0x000B:152, member=1): PartitionId: 43, Owner: 1, Action: INDEX_BUILD, UnavailableTime: 3
...
The partitions listed are transferred to another member, along with backups (note that backups and partitions are not the same):
2021-06-11 09:28:45.573/160.935 Oracle Coherence GE 14.1.1.2206.1 <D8> (thread=DistributedCache:PartitionedCache, member=1): PartitionId: 132, Owner: 1, Action: BACKUP_TRANSFER_OUT, UnavailableTime: 1
2021-06-11 09:28:45.678/161.040 Oracle Coherence GE 14.1.1.2206.1 <D8> (thread=DistributedCache:PartitionedCache, member=1): PartitionId: 133, Owner: 1, Action: BACKUP_TRANSFER_OUT, UnavailableTime: 1
...
2021-06-11 09:28:49.911/165.273 Oracle Coherence GE 14.1.1.2206.1 <D8> (thread=DistributedCache:PartitionedCache, member=1): PartitionId: 2, Owner: 1, Action: PRIMARY_TRANSFER_OUT, UnavailableTime: 5
2021-06-11 09:28:50.017/165.379 Oracle Coherence GE 14.1.1.2206.1 <D8> (thread=DistributedCache:PartitionedCache, member=1): PartitionId: 3, Owner: 1, Action: PRIMARY_TRANSFER_OUT, UnavailableTime: 3
...

On Member 2:

Partitions are received. If the partitions have indices, they are rebuilt immediately after they are received:
2021-06-11 09:28:49.805/8.033 Oracle Coherence GE 14.1.1.2206.1 <D8> (thread=DistributedCache:PartitionedCache, member=2): PartitionId: 1, Owner: 2, Action: PRIMARY_TRANSFER_IN, UnavailableTime: 1
2021-06-11 09:28:49.806/8.034 Oracle Coherence GE 14.1.1.2206.1 <D8> (thread=PartitionedCacheDedicated:0x000B:8, member=2): PartitionId: 1, Owner: 2, Action: INDEX_BUILD, UnavailableTime: 0

Member 2 stops, back on Member 1:

Partitions are restored from backup, and the indices related to them are rebuilt:
2021-06-11 10:29:19.041/3794.322 Oracle Coherence GE 14.1.1.2206.1 <D8> (thread=DistributedCache:PartitionedCache, member=1): PartitionId: 0, Owner: 1, Action: RESTORE, UnavailableTime: 109
2021-06-11 10:29:19.041/3794.322 Oracle Coherence GE 14.1.1.2206.1 <D8> (thread=DistributedCache:PartitionedCache, member=1): PartitionId: 1, Owner: 1, Action: RESTORE, UnavailableTime: 109
...
2021-06-11 10:29:19.062/3794.343 Oracle Coherence GE 14.1.1.2206.1 <D8> (thread=PartitionedCacheDedicated:0x000E:3794, member=1): PartitionId: 1, Owner: 1, Action: INDEX_BUILD, UnavailableTime: 12
2021-06-11 10:29:19.066/3794.347 Oracle Coherence GE 14.1.1.2206.1 <D8> (thread=PartitionedCacheDedicated:0x000D:3794, member=1): PartitionId: 0, Owner: 1, Action: INDEX_BUILD, UnavailableTime: 16
2021-06-11 10:29:19.067/3794.349 Oracle Coherence GE 14.1.1.2206.1 <D8> (thread=PartitionedCacheDedicated:0x000E:3794, member=1): PartitionId: 2, Owner: 1, Action: INDEX_BUILD, UnavailableTime: 5
...

Note:

The Coherence VisualVM Plug-in provides an option to analyze the logs that are generated with events logging enabled. See Coherence VisualVM Plugin Release Notes.