Example Run Transaction Class

RunTransaction Class
Using RunTransaction

Usage of JE HA requires you to handle many different HA-specific exceptions. While some of these are Master-specific and others are Replica-specific, your code may still need to handle both. The reason why is that it is not uncommon for HA applications to have standard classes that perform database access, regardless of whether the class is used for a node in the Master state or a node in the Replica state.

The following class is an example class that can be used to perform transactional reads and writes in an HA application. This class is used by the on-disk HA examples that you can find in your JE distribution (see Replication Examples for more information). However, we think this particular example class is important enough that we also describe it here.

RunTransaction Class

The RunTransaction abstract class is used to implement a utility class that performs database access for HA applications. It provides all the transaction error handling and retry framework that is required for database access in an HA environment.

Because RunTransaction is a class that is meant to be used by different example HA applications, it does not actually implement the database operations. Instead, it provides an abstract method that must be implemented by the HA application that uses RunTransaction.

We begin by importing the classes that RunTransaction uses.

package je.rep.quote;

import java.io.PrintStream;

import com.sleepycat.je.EnvironmentFailureException;
import com.sleepycat.je.LockConflictException;
import com.sleepycat.je.OperationFailureException;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.rep.InsufficientAcksException;
import com.sleepycat.je.rep.InsufficientReplicasException;
import com.sleepycat.je.rep.ReplicaConsistencyException;
import com.sleepycat.je.rep.ReplicaWriteException;
import com.sleepycat.je.rep.ReplicatedEnvironment;

Then we define a series of private data members that identify how our HA transactions are going to behave in the event of an error condition.

abstract class RunTransaction {

    /* The maximum number of times to retry the transaction. */
    private static final int TRANSACTION_RETRY_MAX = 10;

    /*
     * The number of seconds to wait between retries when a sufficient
     * number of replicas are not available for a transaction.
     */
    private static final int INSUFFICIENT_REPLICA_RETRY_SEC = 1;

    /* Amount of time to wait to let a replica catch up before 
     * retrying. 
     */
    private static final int CONSISTENCY_RETRY_SEC = 1;

    /* Amount of time to wait after a lock conflict. */
    private static final int LOCK_CONFLICT_RETRY_SEC = 1;

    private final ReplicatedEnvironment env;
    private final PrintStream out; 

Then we implement our class constructor, which is very simple because all the heavy lifting is done by whatever application calls this utility class.

    RunTransaction(ReplicatedEnvironment repEnv, 
                   PrintStream printStream) {
        env = repEnv;
        out = printStream;
    } 

Now we implement our run() method. This is what actually performs all the error checking and retry work for the class.

The run() method catches the exceptions most likely to occur as we are reading and writing the database, and then handles them, but it will also throw InterruptedException and EnvironmentFailureException.

InterruptedException can be thrown if the thread calling this method is sleeping and some other thread interrupts it. The exception is possible because this method calls Thread.sleep in the retry cycle.

EnvironmentFailureException can occur both when beginning a transaction and also when committing a transaction. It means that there is something significantly wrong with the node's environment.

The readOnly parameter for this method is used to indicate that the transaction will only perform database reads. When that happens, the durability guarantee for the transaction is changed to Durability.READ_ONLY_TXN because that policy does not call for any acknowledgements. This eliminates the possibility of an InsufficientReplicasException being thrown from the Environment.beginTransaction() operation.

    public void run(boolean readOnly)
        throws InterruptedException, EnvironmentFailureException { 

Now we begin our retry loop and define our sleep cycle between retries. Initially, we do not actually sleep before retrying the transaction. However, some of the error conditions caught by this method will cause the thread to sleep before the operation is retried. After every sleep operation, the sleep time is returned to 0 because usually putting the thread to sleep is of no benefit.

        OperationFailureException exception = null;
        boolean success = false;
        long sleepMillis = 0;
        final TransactionConfig txnConfig = readOnly ?
         new TransactionConfig().setDurability(Durability.READ_ONLY_TXN) :
         null;

        for (int i = 0; i < TRANSACTION_RETRY_MAX; i++) {
            /* Sleep before retrying. */
            if (sleepMillis != 0) {
                Thread.sleep(sleepMillis);
                sleepMillis = 0;
             } 

Now we create our transaction, perform the database operations, and then do the work. The doTransactionWork() method is an abstract method that must be implemented by the application using this class. Otherwise, this is standard transaction begin/commit code that should hold no surprises for you.

            Transaction txn = null;
            try {
                txn = env.beginTransaction(null, null);
                doTransactionWork(txn); /* CALL APP-SPECIFIC CODE */
                txn.commit();
                success = true;
                return; 

The first error case that we check for is InsufficientReplicasException. This exception means that the Master is not in contact with enough Electable Replicas to successfully commit the transaction. It is possible that Replicas are still starting up after an application restart, so we put the thread to sleep before attempting the transaction again.

InsufficientReplicasException is thrown by Transaction.commit(), so we do have to perform the transaction all over again.

            } catch (InsufficientReplicasException insufficientReplicas) {

                /*
                 * Retry the transaction.  Give replicas a chance to 
                 * contact this master, in case they have not had a 
                 * chance to do so following an election.
                 */
                exception = insufficientReplicas;
                out.printf(insufficientReplicas.toString());
                sleepMillis = INSUFFICIENT_REPLICA_RETRY_SEC * 1000;
                continue; 

Next we check for InsufficientAcksException. This exception means that the transaction has successfully committed on the Master, but not enough Electable Replicas have acknowledged the commit within the allowed period of time. Whether you consider this to be a successful commit depends on your durability policy.

As provided here, the code considers this situation to be an unsuccessful commit. But if you have a lot of Electable Replicas and you have a strong durability guarantee on the Master, then you might be able to still consider this to be a successful commit. If so, you should set success = true; before returning from the method.

For more information on this error case, see Managing Acknowledgement Timeouts.

             } catch (InsufficientAcksException insufficientReplicas) {

                /*
                 * Transaction has been committed at this node. The 
                 * other acknowledgments may be late in arriving, 
                 * or may never arrive because the replica just 
                 * went down.
                 */

                /*
                 * INSERT APP-SPECIFIC CODE HERE: For example, repeat
                 * idempotent changes to ensure they went through.
                 *
                 * Note that 'success' is false at this point, although
                 * some applications may consider the transaction to be 
                 * complete.
                 */
                out.println(insufficientReplicas.toString());
                txn = null;
                return; 

Next we check for ReplicaWriteException. This happens when a write operation is attempted on a Replica. In response to this, any number of things can be done, including reporting the problem to the application attempting the write operation and then aborting, to forwarding the write request to the Master. This particular method responds to this condition based on how the onReplicaWrite() method is implemented.

For more information on how to handle this exception, see Managing Write Requests at a Replica.

            } catch (ReplicaWriteException replicaWrite) {

                /*
                 * Attempted a modification while in the Replica 
                 * state.
                 *
                 * CALL APP-SPECIFIC CODE HERE: Cannot accomplish 
                 * the changes on this node, redirect the write to 
                 * the new master and retry the transaction there.  
                 * This could be done by forwarding the request to 
                 * the master here, or by returning an error to the
                 * requester and retrying the request at a higher 
                 * level.
                 */
                onReplicaWrite(replicaWrite);
                return; 

Now we check for LockConflictException, which is thrown whenever a transaction experiences a lock conflict with another thread. Note that by catching this exception, we are also catching the LockPreemptedException, which happens whenever the underlying HA code "steals" a lock from an application transaction. The most common cause of this is when the HA replication stream is updating a Replica, and the Replica is holding a read lock that the replication stream requires.

Here, it is useful to sleep for a period of time before retrying the transaction.

            } catch (LockConflictException lockConflict) {

                /*
                 * Retry the transaction.  Note that LockConflictException
                 * covers the HA LockPreemptedException.
                 */
                exception = lockConflict;
                out.println(lockConflict.toString());
                sleepMillis = LOCK_CONFLICT_RETRY_SEC * 1000;
                continue; 

The last error we check for is ReplicaConsistencyException. This exception can be thrown when the transaction begins. It means that the beginTransaction() method has waited too long for the Replica to catch up relative to the Master. This situation does not really represent a failed transaction because the transaction never had a chance to proceed in the first place.

In any case, the proper thing to do is to put the thread to sleep for a period of time so that the Replica has the chance to meet its consistency requirements. Then we retry the transaction.

Note that at this point in time, the transaction handle is in whatever state it was in when beginTransaction() was called. If the handle was in the null state before attempting the operation, then it will still be in the null state. The important thing to realize here is that the transaction does not have to be aborted, because the transaction never began in the first place.

For more information on consistency policies, see Managing Consistency.

            } catch (ReplicaConsistencyException replicaConsistency) {

                /*
                 * Retry the transaction. The timeout associated with 
                 * the ReplicaConsistencyPolicy may need to be 
                 * relaxed if it's too stringent.
                 */
                exception = replicaConsistency;
                out.println(replicaConsistency.toString());
                sleepMillis = CONSISTENCY_RETRY_SEC * 1000;
                continue; 

Finally, we abort our transaction and loop again as needed. onRetryFailure() is called if the transaction has been retried too many times (as defined by TRANSACTION_RETRY_MAX. It provides the option to log the situation.

            } finally {

                if (!success) {
                    if (txn != null) {
                        txn.abort();
                    }

                    /*
                     * INSERT APP-SPECIFIC CODE HERE: Perform any 
                     * app-specific cleanup.
                     */
                }
            }
        }

        /*
         * CALL APP-SPECIFIC CODE HERE: 
         * Transaction failed, despite retries.
         */
        onRetryFailure(exception);
    } 

Having done that, the class is almost completed. Left to do is to define a couple of methods, one of which is an abstract method that must be implemented by the application that uses this class.

doTransactionWork() is an abstract method where the actual database operations are performed.

onReplicaWrite() is a method that should be implemented by the HA application that uses this class. It is used to define whatever action the Replica should take if a write is attempted on it. For examples of how this is used, see the next section.

For this implementation of the class, we simply throw the ReplicaWriteException that got us here in the first place.

    abstract void doTransactionWork(Transaction txn);

    void onReplicaWrite(ReplicaWriteException replicaWrite) {
        throw replicaWrite;
    } 

Finally, we implement onRetryFailure(), which is what this class does if the transaction retry loop goes through too many iterations. Here, we simply print the error to the console. A more robust application should probably write the error to the application logs.

    void onRetryFailure(OperationFailureException lastException) {
            out.println("Failed despite retries." +
                                ((lastException == null) ?
                                  "" :
                                  " Encountered exception:" + 
                                  lastException));
        }
    } 

Using RunTransaction

Having implemented the RunTransaction class, it is fairly easy to use. Essentially, you only have to implement the RunTransaction.doTransactionWork() method so that it performs whatever database access you want.

For example, the following method performs a read on an EntityStore used by the StockQuotes example HA application. Notice that the class is instantiated, doTransactionWork() is implemented, and the RunTransaction.run() method are all called in one place. This makes for fairly easy maintenance of the code.

    private void printStocks(final PrintStream out)
        throws InterruptedException {

        new RunTransaction(repEnv, out) {

            @Override
            void doTransactionWork(Transaction txn) {

                // dao is a DataAccessor class used to access
                // an entity store.
                final EntityCursor<Quote> quotes =
                    dao.quoteById.entities(txn, null);
                try {
                    out.println("\tSymbol\tPrice");
                    out.println("\t======\t=====");

                    int count = 0;
                    for (Quote quote : quotes) {
                        out.println("\t" +  quote.stockSymbol +
                                    "\t" + quote.lastTrade);
                        count++;
                    }
                    out.println("\n\t" + count + " stock"
                                + ((count == 1) ? "" : "s") +
                                " listed.\n");
                } finally {
                    quotes.close();
                }
            }
            }.run(true /*readOnly*/);

        /* Output local indication of processing. */
        System.out.println("Processed print request");
    } 

In the previous example, we do not bother to override the RunTransaction.onReplicaWrite() method because this transaction is performing read-only access to the database. Regardless of whether the transaction is run on a Master or a Replica, ReplicaWriteException can not be raised here, so we can safely use the default implementation.

However, if we were running a transaction that performs a database write, then we should probably do something with onReplicaWrite() other than merely re-throwing the exception.

The following is an example usage of RunTransaction that is also used in the StockQuotes example.

    void updateStock(final String line, final PrintStream printStream)
        throws InterruptedException {

        // Quote is a utility class used to parse a line of input
        // obtained from the console.
        final Quote quote = QuoteUtil.parseQuote(line);
        if (quote == null) {
            return;
        }

        new RunTransaction(repEnv, printStream) {

            void doTransactionWork(Transaction txn) {
                // dao is a Data Accessor class used to perform access
                // to the entity store.
                dao.quoteById.put(txn, quote);
                /* Output local indication of processing. */
                System.out.println("Processed update request: " + line);
            }

            // For this example, we simply log the error condition.
            // For a more robust example, so other action might be
            // taken; for example, log the situation and then route
            // the write request to the Master.
            void onReplicaWrite(ReplicaWriteException replicaWrite) {
                /* Attempted a modification while in the replica state. */
                printStream.println
                    (repEnv.getNodeName() +
                     " is not currently the master.  Perform the update" +
                     " at the node that's currently the master.");
            }
            }.run(false /*not readOnly */);
    }