Managing Consistency

Setting Consistency Policies
Time Consistency Policies
Commit Point Consistency Policies

In a traditional stand-alone transactional application, consistency means that a transaction takes the database from one consistent state to another. What defines a consistent state is application-specific. This transition is made atomically, that is, either all the operations that constitute the transaction are performed, or none of them are. JE HA supports this type of transactional consistency both on the Master, as well as on the Replicas as the replication stream is replayed. That is, in the absence of failures, the Replicas will see exactly the same sequence of transitions, from one consistent state to another, as the Master.

A JE HA application must additionally concern itself with the data consistency of the Replica with respect to the Master. In a distributed system like JE HA, the changes made at the Master are not always instantaneously available at every Replica, although they eventually will be. For example, consider a three node group where a change is made on the Master and the transaction is committed with a durability policy requiring acknowledgments from a simple majority of nodes. After a successful commit of this transaction, the changes will be available at the Master and at one other Replica, thus satisfying the requirement for a simple majority of acknowledgments. The state of the Master and the acknowledging Replica will be consistent with each other after the transaction has been committed, but the transaction commit makes no guarantees about the state of the third Replica after the commit.

In general, Replicas not directly involved in contributing to the acknowledgment of a transaction commit will lag in the replay of the replication stream because they do not synchronize their commits with the Master. As a consequence, their state, on an instantaneous basis, may not be current with respect to the Master. However, in the absence of further updates, all Replicas will eventually catch up and reflect the instantaneous state of the Master. This means that a Replica which is not consistent with the Master simply reflects an earlier locally consistent state at the Master because transaction updates on the Replica are always applied, atomically and in order. From the application's perspective, the environment on the Replica goes through exactly the same sequence of changes to its persistent state as the Master.

A Replica may similarly lag behind the Master if it has been down for some period of time and was unable to communicate with the Master. Such a Replica will catch up, when it is brought back up and will eventually become consistent with the Master.

Given the distributed nature of a JE HA application, and the fact that some nodes might lag behind the Master, the question you have to ask yourself is how long will it take for that node to be consistent relative to the Master. More to the point: how far behind the Master are you willing to allow the node to lag?

This should be one of your biggest concerns when it comes to architecting a JE HA application.

You define how current the nodes in your replication group must be by defining a consistency policy. You define your consistency policy using an implementation of the ReplicaConsistencyPolicy interface. This interface allows you to define how current the Replica must be before a transaction can be started on the Replica. (Remember that all read operations are performed within a transaction.) If the Replica is not current enough, then the start of that transaction is delayed until that level of consistency has been reached. This means that Replicas that are not current enough will block read operations until they are brought up to date.

Obviously your consistency policy can have an affect on your Replica's read performance by increasing the latency experienced by read transactions. This is because transactions may have to wait to either begin or commit until the consistency policy can be satisfied. If the consistency policy is so stringent that it cannot be satisfied using the available resources, the Replica's availability for reads may deteriorate as transactions timeout. A Durability.SyncPolicy.SYNC policy on the Replica can slow down write operations on the Replica, making it harder for the Replica to meet its consistency guarantee. Conversely, a Durability.SyncPolicy.NO_SYNC policy on the Replica makes it easy for the Replica to keep up, which means you can have a stronger consistency guarantee.

One of three interface implementations are available for you to use when defining your consistency policy:

Setting Consistency Policies

You set a consistency policy by using ReplicationConfig.setConsistencyPolicy(). For example:

   EnvironmentConfig envConfig = new EnvironmentConfig();
   envConfig.setAllowCreate(true);
   envConfig.setTransactional(true);

   // Require no synchronization for transactional commit on the 
   // Master, but full synchronization on the Replicas. Also,
   // wait for acknowledgements from a simple majority of Replicas.
   Durability durability =
          new Durability(Durability.SyncPolicy.NO_SYNC,
                         Durability.SyncPolicy.SYNC,
                         Durability.ReplicaAckPolicy.SIMPLE_MAJORITY);

   envConfig.setDurability(durability);

   // Identify the node
   ReplicationConfig repConfig = 
        new ReplicationConfig("PlanetaryRepGroup",
                              "Jupiter",
                              "jupiter.example.com:5002");
 
   // Use the node at mercury.example.com:5001 as a helper to find the rest
   // of the group.
   repConfig.setHelperHosts("mercury.example.com:5001");

   // Turn off consistency policies. Transactions can occur
   // regardless of how consistent the Replica is relative
   // to the Master.
   NoConsistencyRequiredPolicy ncrp =
        new NoConsistencyRequiredPolicy();
   repConfig.setConsistencyPolicy(ncrp);

   ReplicatedEnvironment repEnv =
      new ReplicatedEnvironment(home, repConfig, envConfig); 

Note that the consistency policy is set on a node-by-node basis. There is no requirement that you set the same policy for every node in your replication group.

You can also set consistency policies on a transaction-by-transaction basis when you begin the transaction:

   // Turn off consistency policies. The transactions can
   // be performed regardless of how consistent the Replica is 
   // relative to the Master.
   NoConsistencyRequiredPolicy ncrp =
        new NoConsistencyRequiredPolicy();

   TransactionConfig tc = new TransactionConfig();
   tc.setConsistencyPolicy(ncrp);
   // env is a ReplicatedEnvironment handle
   env.beginTransaction(null, tc); 

Time Consistency Policies

A time consistency policy is a time-oriented policy that defines how far back in time the Replica is permitted to lag the Master. It does so by comparing the time associated with the latest transaction committed on the Master with the current time. If the Replica lags by an amount greater than the permissible lag, it will hold back the start of the transaction until the Replica has replayed enough of the replication stream to narrow the lag to within the permissible lag.

Use of a time based consistency policy requires that nodes in a replication group have their clocks reasonably synchronized. This can be easily achieved using a daemon like NTPD.

You implement a time-based consistency policy by using the TimeConsistencyPolicy class. To instantiate this class, you provide it with the following:

  • A number representing the permissible lag.

  • A TimeUnit constant indicating the units of time that the permissible lag represents.

  • A number representing the timeout period during which a transaction will wait for the Replica to catch up so that the consistency policy can be met. If the transaction waits more than the timeout period, a ReplicaConsistencyException is thrown.

  • A TimeUnit constant indicating the units of time in use for the timeout value.

For example:

   EnvironmentConfig envConfig = new EnvironmentConfig();
   envConfig.setAllowCreate(true);
   envConfig.setTransactional(true);

   // Require no synchronization for transactional commit on the 
   // Master, but full synchronization on the Replicas. Also,
   // wait for acknowledgements from a simple majority of Replicas.
   Durability durability =
          new Durability(Durability.SyncPolicy.NO_SYNC,
                         Durability.SyncPolicy.SYNC,
                         Durability.ReplicaAckPolicy.SIMPLE_MAJORITY);

   envConfig.setDurability(durability);

   // Identify the node
   ReplicationConfig repConfig = 
        new ReplicationConfig("PlanetaryRepGroup",
                              "Jupiter",
                              "jupiter.example.com:5002");
 
   // Use the node at mercury.example.com:5001 as a helper to find the rest
   // of the group.
   repConfig.setHelperHosts("mercury.example.com:5001");

   // Set consistency policy for replica.
   TimeConsistencyPolicy consistencyPolicy = new TimeConsistencyPolicy
       (1, TimeUnit.SECONDS, /* 1 sec of lag */
       10, TimeUnit.SECONDS /* Wait up to 10 sec */);
   repConfig.setConsistencyPolicy(consistencyPolicy);

   ReplicatedEnvironment repEnv =
      new ReplicatedEnvironment(home, repConfig, envConfig); 

Commit Point Consistency Policies

A commit point consistency policy defines consistency in terms of the commit of a specific transaction. This policy can be used to ensure that a Replica is at least current enough to have the changes made by a specific transaction. Because transactions are applied serially, by ensuring a Replica has a specific commit applied to it, you know that all transaction commits occurring prior to the specified transaction have also been applied to the Replica.

As is the case with a time consistency policy, if the Replica is not current enough relative to the Master, all attempts to begin a transaction will be delayed until the Replica has caught up. If the Replica does not catch up within a specified timeout period, the transaction will throw a ReplicaConsistencyException.

In order to specify a commit point consistency policy, you must provide a CommitToken that is used to identify the transaction that the Replica must have in order to be current enough. Because the commit point that you care about will change from transaction to transaction, you do not specify commit point consistency policies on an environment-wide basis. Instead, you specify them when you begin a transaction.

For example, suppose the application is a web application where a replicated group is implemented within a load balanced web server group. Each request to the web server consists of an update operation followed by read operations (say from the same client), The read operations naturally expect to see the data from the updates executed by the same request. However, the read operations might have been routed to a node that did not execute the update.

In such a case, the update request would generate a CommitToken, which would be resubmitted by the browser, along with subsequent read requests. The read request could be directed at any one of the available web servers by a load balancer. The node which executes the read request would create a CommitPointConsistencyPolicy with that CommitToken and use it at transaction begin. If the environment at the web server was already current enough, it could immediately execute the transaction and satisfy the request. If not, the "transaction begin" would stall until the Replica replay had caught up and the change was available at that web server.

You obtain a commit token using the Transaction.getCommitToken() method. Use this method after you have successfully committed the transaction that you want to base a CommitPointConsistencyPolicy upon.

For example:

Database myDatabase = null;
Environment myEnv = null;
CommitToken ct = null;
try {
    ...
    // Environment and database setup removed for brevity
    ...

    Transaction txn = myEnv.beginTransaction(null, null);

    try {
        myDatabase.put(txn, key, data);
        txn.commit();
        ct = txn.getCommitToken();
        if (ct != null) {
            // Do something with the commit token to
            // forward it to the Replica where you
            // want to use it.
        }
    } catch (Exception e) {
        if (txn != null) {
            txn.abort();
            txn = null;
        }
    }

} catch (DatabaseException de) {
    // Exception handling goes here
} 

To create your commit point token consistency policy, transfer the commit token to the Replica performing a read using whatever mechanism that makes sense for your HA application, and then create the policy for that specific transaction handle: Note that CommitToken implements Serializable, so you can use the standard Java serialization mechanisms when passing the commit token between processes.

Database myDatabase = null;
Environment myEnv = null;
CommitToken ct = null;
try {
    ...
    // Environment and database setup removed for brevity
    ...

    CommitPointConsistencyPolicy cpcp = 
        new CommitPointConsistencyPolicy(ct,      // The commit token
                           10, TimeUnit.SECONDS); // Timeout value

    TransactionConfig txnConfig = new TransactionConfig();
    txnConfig.setConsistencyPolicy(cpcp);


    Transaction txn = myEnv.beginTransaction(null, txnConfig);

    try {
        // Perform your database read here using the transaction
        // handle, txn.
        txn.commit();
    } catch (Exception e) {
        // There are quite a lot of different exceptions that can be
        // seen at this level, including the LockConflictException.
        // We just catch Exception for this example for simplicity's 
        // sake.
        if (txn != null) {
            txn.abort();
            txn = null;
        }
    }

} catch (ReplicaConsistencyException rce) {
        // Deal with this timeout error here. It is thrown by the
        // beginTransaction operation if the consistency policy 
        // cannot be met within the timeout time.
} catch (DatabaseException de) {
    // Database exception handling goes here.
} catch (Exception ee) {
    // General exception handling goes here.
}