Transaction Example

The writerThread Function

The following code provides a fully functional example of a multi-threaded transactional BDB XML application. For improved portability across platforms, this examples uses pthreads to provide threading support.

The example creates multiple threads, each of which creates a set number of XML documents that it then writes to the container. Each thread creates and writes 10 documents under a single transaction before committing and writing another 10 documents. This activity is repeated 50 times.

From the command line, you can tell the program to vary:

As we will see in Runtime Analysis each of these variables plays a role in the number of deadlocks the program encounters during its run time.

Of course, each writer thread performs deadlock detection as described in this manual. In addition, normal recovery is performed when the environment is opened.

We start with our normal include directives and other house keeping necessities:

// File TxnGuide.cpp

// We assume an ANSI-compatible compiler
#include "dbxml/DbXml.hpp"
#include <cstdlib>
#include <iostream>
#include <pthread.h>
#include <sstream>

#ifdef _WIN32
extern int getopt(int, char * const *, const char *);
#define PATHD '\\'
#else
#include <unistd.h>
#define PATHD '/'
#endif

using namespace DbXml;  

Next, we declare a few global variables. global_thread_num is used to assist in creating a portable thread ID for each thread in use by the program. global_num_deadlocks is a variable that we use to count the total number of deadlocks the program encounters during its runtime. Finally, we declare a couple of pthread mutex variables that we will use to lock these variables when they are in use.

// File TxnGuide.cpp

// Printing of pthread_t is implementation-specific, so we
// create our own thread IDs for reporting purposes.
int global_thread_num;
int global_num_deadlocks;
mutex_t thread_num_lock, thread_num_deadlocks;  

Next we perform a couple of forward function declarations. usage() provides our application's help text and writerThread is the function that will run for each thread.

We also declare a structure that we use to contain variables of local interest to our writer threads. We will pass this structure to each of our writerThread functions called by pthread_create().

// Forward declarations
int usage(void);
void *writerThread(void *);

struct ThreadVars {
    XmlContainer container;
    bool useReadCommitted;
    int numNodes;
};  

Next we implement our usage() function, which describes how to use our application.

// Usage function
int
usage()
{
    std::cerr << "\nThis program writes XML documents to a DB XML"
              << "container. The documents are written using any number\n"
              << "of threads that will perform writes "
              << "using 50 transactions. Each transaction writes \n"
              << "10 documents. You can choose to perform the "
              << "writes using default isolation, or using \n"
              << "READ COMMITTED isolation. If READ COMMITTED "
              << "is used, the application will see fewer deadlocks."
              << std::endl;
     std::cerr << "\nNote that you can vary the size of the documents "
               << "written to the container by defining the number of \n"
               << "nodes in the documents. Up to a point, and depending "
               << "on your system's performance, increasing the number \n"
               << "of nodes will increase the number of deadlocks that "
               << "your application will see." << std::endl;
    std::cerr << "Command line options are: " << std::endl;
    std::cerr << " -h <database_home_directory>" << std::endl;
    std::cerr << " [-t <number of threads>]" << std::endl;
    std::cerr << " [-n <number of nodes per document>]" << std::endl;
    std::cerr << " [-w]     (create a Wholedoc container)"   << std::endl;
    std::cerr << " [-2]     (use READ COMMITTED isolation)" << std::endl;
    return (EXIT_FAILURE);
}  

Now we implement our main() function. We start by declaring and initializing the local variables needed by the function. Notice that by default we will not use read committed isolation, we will use 5 threads, and our default container type is a node container.

int
main(int argc, char *argv[])
{

    DB_ENV *envp = NULL;
    XmlManager *mgrp = NULL;
    std::string containerName("txn.dbxml");

    ThreadVars threadInfo;
    threadInfo.useReadCommitted = false;

    // Initialize globals
    global_thread_num = 0;
    global_num_deadlocks = 0;

    int ch, i, dberr;
    int numThreads = 5;
    u_int32_t envFlags;
    XmlContainer::ContainerType containerType =
        XmlContainer::NodeContainer;
    char *dbHomeDir;

    // Application name
    const char *progName = "TxnGuide";  

Now we parse the command line options. See the usage() function above for a description of what each of these options does.

    // Parse the command line arguments
#ifdef _WIN32
    dbHomeDir = ".\\";
#else
    dbHomeDir = "./";
#endif
    while ((ch = getopt(argc, argv, "h:n:t:w2")) != EOF)
        switch (ch) {
        case 'h':
            dbHomeDir = optarg;
            break;
        case 'n':
            threadInfo.numNodes = atoi(optarg);
            break;
        case 't':
            numThreads = atoi(optarg);
            break;
        case '2':
            threadInfo.useReadCommitted = true;
            break;
        case 'w':
            containerType = XmlContainer::WholedocContainer;
            break;
        case '?':
        default:
            return (usage());
        }  

As a final bit of plumbing, we enforce the minimum values passed to the application and issue informative text indicating how the program will run:

    // Find out how many nodes we'll write to the container
    threadInfo.numNodes = threadInfo.numNodes < 1 ? 1 :
                          threadInfo.numNodes;

    // Find out how many threads
    numThreads = numThreads < 1 ? 1 : numThreads;

    std::cout << "Number nodes per document:       "
              << threadInfo.numNodes << std::endl;
    std::cout << "Number of writer threads:        " << numThreads
              << std::endl;

    std::string msg = threadInfo.useReadCommitted ?
                        "Read Committed " :
                        "Default";
    std::cout << "Isolation level:                 " << msg
              << std::endl;

    msg = containerType == XmlContainer::WholedocContainer ?
                           "Wholedoc storage" : "Node storage";
    std::cout << "Container type:                  " << msg << "\n\n"
              << std::endl;  

Now that we know what it is that the program is supposed to do, we can start to do it. We begin by opening our environment, manager and container so that they support transactional processing.

Notice here that if our container already exists, we delete and then recreate it. This allows us to avoid document ID conflict. This also allows us to change the container type from run to run of the program since the container type can only be set at container creation time.

Finally, notice that we set up deadlock detection here, and we choose to resolve deadlocks by picking the thread with the smallest number of write locks. The thread with the smallest number of write locks is the one that has performed the least amount of work. By choosing this thread for the abort/retry cycle, we minimize the amount of rework our application must perform due to a deadlock.

    // Env open flags
    envFlags =
      DB_CREATE     |  // Create the environment if it does not exist
      DB_RECOVER    |  // Run normal recovery.
      DB_INIT_LOCK  |  // Initialize the locking subsystem
      DB_INIT_LOG   |  // Initialize the logging subsystem
      DB_INIT_TXN   |  // Initialize the transactional subsystem.
      DB_INIT_MPOOL |  // Initialize the memory pool (in-memory cache)
      DB_THREAD;       // Cause the environment to be free-threaded

    dberr = db_env_create(&envp, 0);
    if (dberr) {
        std::cout << "Unable to create environment: " <<
            db_strerror(dberr) << std::endl;
        if (envp)
            envp->close(envp, 0);
        return (EXIT_FAILURE);
    }

    // Indicate that we want to internally perform deadlock 
    // detection.  Also indicate that the transaction with 
    // the fewest number of write locks will receive the 
    // deadlock notification in the event of a deadlock.
    envp->set_lk_detect(envp, DB_LOCK_MINWRITE);

    envp->open(envp, dbHomeDir, env_flags, 0);

    myManager = new XmlManager(envp, 0);
    // Create and open a DB XML manager.
    mgrp = new XmlManager(envp,
                          DBXML_ADOPT_DBENV); // Close the env when 
                                              // the manager closes.
    try {
        // If we had utility threads (for running checkpoints or 
        // deadlock detection, for example) we would spawn those
        // here. However, for a simple example such as this,
        // that is not required.

        // If the container already exists, delete it. We don't want
        // naming conflicts if this program is run multiple times.
        if (mgrp->existsContainer(containerName) != 0)
            mgrp->removeContainer(containerName);

        XmlContainerConfig cconfig;
        cconfig.setTransactional(true);  // Container is transactional.
        cconfig.setThreaded(true);
        cconfig.setAllowCreate(true);    // Create the container if it
                                         // does not exist.
        cconfig.setContainerType(containerType);

        // Open the container
        threadInfo.container =
            mgrp->openContainer(<userinput>""</userinput>,
                cconfig); </programlisting>
        // Open the container
        threadInfo.container =
            mgrp->openContainer(containerName,
                cconfig);  

Next we initialize our mutexes and we start and join our writer threads. This is all standard pthread usage, so we present it here without much comment.

        // Initialize a pthread mutex. Used to help provide thread ids.
        (void)mutex_init(&thread_num_lock, NULL);
        // Initialize a pthread mutex. Used to count the number of
        // deadlocks encountered by the various threads in this example.
        (void)mutex_init(&thread_num_deadlocks, NULL);

        // Start the writer threads.
        pthread_t writerThreads[numThreads];
        for (i = 0; i < numThreads; i++)
            (void)thread_create(
                &writerThreads[i], NULL,
                writerThread, (void *)&threadInfo);

        // Join the writers
        for (i = 0; i < numThreads; i++)
            (void)thread_join(writerThreads[i], NULL);  

Of course we need to catch and handle any exceptions thrown during our application's runtime.

    } catch(XmlException &xe) {
        std::cerr << "Error opening XmlContainer: "
                  << xe.what() << std::endl;
        return (EXIT_FAILURE);
    } catch(std::exception &ee) {
        std::cerr << "Unknown error: "
                  << ee.what() << std::endl;
        return (EXIT_FAILURE);
    }  

Once all our writer threads complete, we need to clean up a little. Remember that containers automatically close when they go out of scope. Also, our manager adopted the environment, so when the manager closes, it will close the environment for us. And, of course, the manager also closes when the last handle to it either goes out of scope or is deleted.

Consequently, to close our application all we need to do is delete the XmlManager object.

    try {
        // Close our manager if it was opened.
        if (mgrp != NULL)
            delete mgrp;

        // We don't have to close our container or
        // environment handles. The container closes
        // when it goes out of scope. The environment
        // is closed when the manager is deleted, because
        // we specified DBXML_ADOPT_DBENV on the manager
        // open.

    } catch(XmlException &xe) {
        std::cerr << progName << "Error closing manager and environment."
                  << std::endl;
        std::cerr << xe.what() << std::endl;
        return (EXIT_FAILURE);
    } catch(std::exception &ee) {
        std::cerr << progName << "Error closing manager and environment."
                  << std::endl;
        std::cerr << ee.what() << std::endl;
        return (EXIT_FAILURE);
    }  

As a final bit of clean up, we issue a count of the deadlocks seen during this program runtime and then return from the main() function.

    // Final status message and return.

    std::cout << "I'm all done." << std::endl;
    std::cout << "I saw " << global_num_deadlocks
              << " deadlocks in this program run."
              << std::endl;
    return (EXIT_SUCCESS);
}  

The writerThread Function

To perform actual work, our application spawns a number of threads, each of which runs the writerThread() function. This function:

  • Runs 50 transactions.

  • Within each transaction, it creates and writes 10 XML documents to the container.

  • The size of each document is determined by information provided on the command line.

  • Documents can be written using read committed isolation, depending on information provided on the command line.

  • In the event of a deadlock, the function will abort the transaction and then retry. Note that the function will only retry a given transaction 30 times before giving up and moving on to the next transaction.

  • Upon completing its workload, the function increments the global deadlock counter with the number of deadlocks that it saw before exiting.

To begin, the function sets up the local variables that it needs in order to perform its work. Notice that we create the XmlUpdateContext object at the top of this function; there is no need for us to continually recreate that object as the function iterates over its workload.

// A function that performs a series of writes to a
// Berkeley DBXML container. 

// The mechanism of transactional commit/abort and
// deadlock detection is illustrated here.
void *
writerThread(void *args)
{
    int j, thread_num;
    int max_retries = 30;   // Max retry on a deadlock
    int num_deadlocks = 0;

    ThreadVars *threadInfo = (ThreadVars *)args;
    XmlContainer container = threadInfo->container;
    XmlManager myManager = container.getManager();
    XmlTransaction txn;
    XmlUpdateContext context = myManager.createUpdateContext();  

Next we determine our thread ID. Some pthread packages allow us to use a pthread_t variable, as returned by pthread_self(), for this purpose because for those packages a pthread_t is really just an integer type. However, this is not universally true – in some implementations pthread_t is a structure, for example – so we use a simple global counter for this purpose instead.

    // Get the thread number
    (void)mutex_lock(&thread_num_lock);
    global_thread_num++;
    thread_num = global_thread_num;
    (void)mutex_unlock(&thread_num_lock);  

Having done that, we also initialize our random number generator. We use this to create random data for our XML documents so that they are not all identical.

    // Initialize the random number generator 
    srand(thread_num);  

Now we get to the main workload loop in our application. Here we begin the for loop that will perform the 50 transactions, and we begin the retry while loop.

    // Perform 50 transactions
    for (int i=0; i<50; i++) {
        bool retry = true;
        int retry_count = 0;
        // while loop is used for deadlock retries
        while (retry) {
            // try block used for deadlock detection and
            // general exception handling
            try {  

Now that we are inside our try block, we can create our transaction. Notice that we set on a transaction by transaction basis whether read committed isolation is used. Lowering the isolation level for the transaction for this workload will result in fewer lock contentions and therefore fewer deadlocks. See Runtime Analysis for more information.

                // Set this transaction to use READ COMMITTED isolation
                // if it is indicated by the command line switches.
                u_int32_t txnFlags =
                    threadInfo->useReadCommitted ? DB_READ_COMMITTED : 0;
                txn = myManager.createTransaction(txnFlags);  

Now we create and write the 10 documents for this transaction. Remember that the size of the document is determined by information provided on the command line. Again, the size of the document has a lot to do with the amount of lock contention the application will see.

Beyond that, this portion of the application is simply basic BDB XML library usage.

                // Perform the container writes for this transaction.
                for (j = 0; j < 10; j++) {

                    // Get a document ID
                    std::ostringstream docID;
                    docID << thread_num << i << j;

                    // Build the document
                    std::ostringstream theDoc;
                    theDoc << "<testDoc>\n";
                    for (int i = 0; i < threadInfo->numNodes; i++) {
                        int payload = rand() + i;
                        theDoc << "<payload>" << payload 
                               << "</payload>\n";
                    }
                    theDoc << "</testDoc>";

                    // Put the document
                    container.putDocument(txn,
                                          docID.str(),
                                          theDoc.str(),
                                          context,
                                          0);
                }  

Now that we are all done writing to the container, we can commit the transaction. If all has gone well, we are done with this particular transaction and we can iterate to the next transaction without retrying the current one.

                // commit
                txn.commit(0);
                retry = false;  

However, if an exception has been thrown, we must decide what to do about it. Our first concern, and the most likely cause of an exception given our workload, is that we have encountered a deadlock. So we begin by catching XmlException and testing to see if we have a deadlock situation.

If we do see a deadlock, we immediately abort the transaction which releases our locks, thereby allowing the other deadlock thread to make forward progress. We then must decide if we can retry the transaction; this is gated by the number of retry attempts we have made so far.

If we have caught an XmlException and it is not a deadlock situation, then we simply abort and give up on the current transaction. The function will then loop to the next transaction where, hopefully, we will not encounter any further unexpected exceptions.

            } catch (XmlException &xxe) {
                if (xxe.getDbErrno() == DB_LOCK_DEADLOCK) {
                    // First thing that we MUST do is abort the
                    // transaction.
                    txn.abort();

                    // Now we decide if we want to retry the operation.
                    // If we have retried less than max_retries,
                    // increment the retry count and goto retry.
                    if (retry_count < max_retries) {
                        retry_count++;
                        retry = true;
                    } else {
                        // Otherwise, just give up.
                        std::cerr << "Writer " << thread_num
                              << ": Got DB_LOCK_DEADLOCK and I'm out of "
                              << "retries. Giving up." << std::endl;
                        retry = false;
                    }
                    num_deadlocks++;
                 } else {
                    std::cerr << "Caught an XmlException : "
                             << xxe.what() << std::endl;
                    txn.abort();
                    retry = false;
                 }  

We also catch std::exception just for the sake of completeness. As is the case with a general XmlException event, here we abort and do not attempt to retry the current transaction.

           } catch (std::exception &ee) {
                std::cerr << "Unknown exception: " << ee.what() 
                          << std::endl;
                txn.abort();
                retry = false;
          }
        }
    }  

Finally, we increment our global deadlock counter before exiting the function. This is used for reporting purposes when the application itself exits.

    (void)mutex_lock(&thread_num_deadlocks);
    global_num_deadlocks += num_deadlocks;
    (void)mutex_unlock(&thread_num_deadlocks);
    return (0);
}  

This completes our transactional example. If you would like to experiment with this code, you can find the example in the following location in your BDB XML distribution:

BDBXML_INSTALL/dbxml/examples/cxx/txn

In addition, please see Runtime Analysis for an analysis on the performance characteristic illustrated by this program.