Program Listing

Function: main()
Function: create_env()
Function: env_init()
Function: doloop()
Function: print_stocks()

Our example program builds from the simple transactional application you created in the Transactional Application chapter and configures write forwarding. The application is network-aware, so you can specify things like host names and ports from the command line. This program has additional error handling for replication errors.

When using replication with write forwarding, there are several benefits for your application code:

Function: main()

Our program begins with some additional include statements:

/*
 * File: rep_mgr_wrfor_gsg.c
 */
include <stdlib.h>
include <string.h>
include <errno.h>
ifdef _WIN32
include <windows.h>
define  sleep(s)        Sleep(1000 * (s))
else /* !_WIN32 */
include <unistd.h>
endif

#include <db.h> 

#ifdef _WIN32
extern int getopt(int, char * const *, const char *);
#endif  

We then define a few values. One is the size of our cache, which we keep deliberately small for this example, and the other is the name of our database. Also, you can define a sleep time, which sets the time that a site waits before it retries synchronizing with the master. We also provide a global variable that is the name of our program; this is used for error reporting later on.

#define CACHESIZE   (10 * 1024 * 1024)
#define DATABASE    "quote.db"
#define SLEEPTIME   3
const char *progname = "ex_rep_gsg_wrfor";  

Then we perform a couple of forward declarations. The first of these, create_env() and env_init() are used to open and initialize our environment.

Next we declare doloop(). This function now takes an int as an argument, which we describe later.

Finally, we have print_stocks function.

int create_env(const char *, DB_ENV **);
int env_init(DB_ENV *, const char *);
int doloop (DB_ENV *, int);
int print_stocks(DB *);  

Next we need our usage() function, which has additional options:

/* Usage function */
static void
usage()
  {
   fprintf(stderr, "usage: %s ", progname);
   fprintf(stderr, "-h home -l|-L host:port [-r host:port]\n");
   fprintf(stderr, "where:\n");
   fprintf(stderr, "\t-h identifies the environment home directory ");
   fprintf(stderr, "(required).\n");
   fprintf(stderr, "\t-l identifies the host and port used by this ");
   fprintf(stderr, "site (required unless L is specified).\n");
   fprintf(stderr, "\t-L identifies the local site as group creator. \n");
   fprintf(stderr, "\t-r identifies another site participating in "); 
   fprintf(stderr, "this replication group\n");
   exit(EXIT_FAILURE);
  }  

where:

  • -h

    Identifies the environment home directory. You must specify this option.

  • -l

    Identifies the host and port used by this site. You must specify this option unless -L is specified.

  • -L

    Identifies the local site as group creator. You must specify this option unless -l is specified.

  • -r

    Optionally identifies another site participating in this replication group.

That completed, we can jump into our application's main() function. We begin by declaring and initializing some variables used to collect TCP/IP host and port information. We also declare a couple of flags that we use to make sure some required information is provided to this application:

int
main(int argc, char *argv[])
{
    extern char *optarg;
    DB_ENV *dbenv;
    const char *home;
    char ch, *host, *last_colon, *portstr;
    int is_group_creator, local_is_set, ret;
    u_int16_t port;

    dbenv = NULL;

    ret = is_group_creator = local_is_set = 0;
    home = NULL;  

Now we create and configure our environment handle. We do this with our create_env() function, which we will show later in this example.

    if ((ret = create_env(progname, &dbenv)) != 0)
            goto err; 

Then we parse the command line arguments:

 while ((ch = getopt(argc, argv, "h:l:L:r:")) != EOF)
    switch (ch) {
    case 'h':
        home = optarg;
        break;
    case 'L':
        is_group_creator = 1; /* FALLTHROUGH */
    case 'l':
        host = optarg;
        /*
         * The final colon in host:port string is the
         * boundary between the host and the port portions
         * of the string.
         */
        if ((last_colon = strrchr(host, ':')) == NULL ) {
        fprintf(stderr, "Bad local host specification.\n");
        goto err;
        }
        /*
         * Separate the host and port portions of the
         * string for further processing.
         */
        portstr = last_colon + 1;
        *last_colon = '\0';
        port = (unsigned short)atoi(portstr);
        if ((ret =
          dbenv->repmgr_site(dbenv, host, port, &dbsite, 0)) != 0){
        fprintf(stderr, "Could not set local address %s:%d.\n",
          host, port);
        goto err;
        }
        dbsite->set_config(dbsite, DB_LOCAL_SITE, 1);
        if (is_group_creator)
        dbsite->set_config(dbsite, DB_GROUP_CREATOR, 1);

        if ((ret = dbsite->close(dbsite)) != 0) {
        dbenv->err(dbenv, ret, "DB_SITE->close");
        goto err;
             }
        local_is_set = 1;
        break;
    /* Identify another site in the replication group. */
    case 'r':
        host = optarg;
        /*
         * The final colon in host:port string is the
         * boundary between the host and the port portions
         * of the string.
         */
        if ((last_colon = strrchr(host, ':')) == NULL ) {
        fprintf(stderr, "Bad remote host specification.\n");
        goto err;
        }
        /*
         * Separate the host and port portions of the
         * string for further processing.
         */
        portstr = last_colon + 1;
        *last_colon = '\0';
        port = (unsigned short)atoi(portstr);
        if ((ret = 
        dbenv->repmgr_site(dbenv, host, port, &dbsite, 0)) != 0) {
        dbenv->err(dbenv, ret, "DB_ENV->repmgr_site");
        goto err;
        }
        dbsite->set_config(dbsite, DB_BOOTSTRAP_HELPER, 1);
        if ((ret = dbsite->close(dbsite)) != 0) {
        dbenv->err(dbenv, ret, "DB_SITE->close");
        goto err;
        }
        break;
    case '?':
    default:
        usage();
    }  

Now we can open our environment. We do this with the env_init() function which we will describe a little later in this chapter.

    if ((ret = env_init(dbenv, home)) != 0)
            goto err; 

You can now configure and start Replication Manager with write forwarding. To configure write forwarding, use rep_set_config with the DB_REPMGR_CONF_FORWARD_WRITES option.

    dbenv->rep_set_config(dbenv, DB_REPMGR_CONF_FORWARD_WRITES, 1); 
    if ((ret = dbenv->repmgr_start(dbenv, 3, DB_REP_ELECTION)) != 0)
            goto err; 

Now that we have opened the environment and configured and started Replication Manager with write forwarding, we can call our doloop() function.

    if ((ret = doloop(dbenv, is_group_creator)) != 0) {
        dbenv->err(dbenv, ret, "Application failed");
        goto err;
    }  

Finally, we provide our application shutdown code. Note, again, that in a traditional transactional application all databases would be closed here. In our replicated application, the database will usually be closed in the doloop() function, but we also conditionally close the database here to handle some error cases.

err: if (dbenv != NULL)
        (void)dbenv->close(dbenv, 0);

    return (ret);
}  

Function: create_env()

Having written our main() function, we now implement the usual create_env function.

int
create_env(const char *progname, DB_ENV **dbenvp)
{
    DB_ENV *dbenv;
    int ret;

    if ((ret = db_env_create(&dbenv, 0)) != 0) {
        fprintf(stderr, "can't create env handle: %s\n",
            db_strerror(ret));
        return (ret);
    }

    dbenv->set_errfile(dbenv, stderr);
    dbenv->set_errpfx(dbenv, progname);

    *dbenvp = dbenv;
    return (0);
}  

Function: env_init()

Now we implement the env_init function, which also uses the DB_INIT_REP and DB_THREAD flags that are required for Replication Manager.

int
env_init(DB_ENV *dbenv, const char *home)
{
    u_int32_t flags;
    int ret;

    (void)dbenv->set_cachesize(dbenv, 0, CACHESIZE, 0);
    (void)dbenv->set_flags(dbenv, DB_TXN_NOSYNC, 1);

    /* DB_INIT_REP and DB_THREAD are required for Replication Manager. */
    flags = DB_CREATE |
        DB_INIT_LOCK |
        DB_INIT_LOG |
        DB_INIT_MPOOL |
        DB_INIT_REP |
        DB_INIT_TXN |
        DB_RECOVER |
        DB_THREAD;

    if ((ret = dbenv->open(dbenv, home, flags, 0)) != 0)
        dbenv->err(dbenv, ret, "can't open environment");
    return (ret);
}  

Function: doloop()

Having written our main() function and utility functions, we now implement doloop. This function provides a command prompt at which the user can enter a stock ticker value and a price for that value. This information is then entered to the database.

To display the database, simply enter return at the prompt.

To begin, we declare a database pointer, several DBT variables, and the usual assortment of variables used for buffers and return codes. We also initialize all of this. Remember that doloop now takes is_group_creator as an additional argument.

#define BUFSIZE 1024
int
doloop(DB_ENV *dbenv, int is_group_creator)
{
    DB *dbp;
    DBT key, data;
    char buf[BUFSIZE], *rbuf;
    int ret;
    u_int32_t db_flags;

    dbp = NULL;
    memset(&key, 0, sizeof(key));
    memset(&data, 0, sizeof(data));
    ret = 0;  

Next, we begin the loop and we immediately open our database if it has not already been opened.

If -L is set, is_group_creator specifies the need to create the database for the initial group creator startup. The database will be replicated to the other sites when they first start up. The database will already exist on each site for subsequent startups.

Note that there is some logic for a site to retry in case it needs time to synchronize with the master using SLEEPTIME.

    for (;;) {
    
        if (dbp == NULL) {
            if ((ret = db_create(&dbp, dbenv, 0)) != 0)
                return (ret);

            db_flags = DB_AUTO_COMMIT;
            /*
             * Only need to create the database for the 
             * initial group creator startup.  The database 
             * will be replicated to the other sites when they
             * first start up.  The database will already exist on
             * each site for subsequent startups.
             */
            if (is_group_creator)
                db_flags |= DB_CREATE;
      
            if ((ret = dbp->open(dbp, NULL, DATABASE, 
                NULL, DB_BTREE, db_flags, 0)) != 0) {
            /* Retry in case site needs time to synchronize with master. */
            if (ret == ENOENT) {
                printf(
                  "No stock database yet available.\n");
                if ((ret = dbp->close(dbp, 0)) != 0) {
                    dbenv->err(dbenv, ret, "DB->close");
                    goto err;
                }
                dbp = NULL;
                sleep(SLEEPTIME);
                continue;
            }
            dbenv->err(dbenv, ret, "DB->open");
            goto err;
        }  

Now we implement our command prompt. If the user enters the keywords exit or quit, the loop is exited and the application ends. If the user enters nothing and instead simply presses return, the entire contents of the database is displayed. We use our print_stocks() function to display the database. (That implementation is shown next in this chapter.)

We also now check for a dead replication handle, which can occur in rare cases when a new master causes a previously committed transaction to be rolled back. In such cases, all database handles must be closed and opened again.

Remember that very little error checking is performed on the data entered at this prompt. If the user fails to enter at least one space in the value string, a simple help message is printed and the prompt is returned to the user.

        printf("QUOTESERVER> ");
        fflush(stdout);

        if (fgets(buf, sizeof(buf), stdin) == NULL)
            break;
        if (strtok(&buf[0], " \t\n") == NULL) {
            switch ((ret = print_stocks(dbp))) {
            case 0:
                continue;
            case DB_REP_HANDLE_DEAD:
                /* Must close and reopen the handle, then can retry. */
                (void)dbp->close(dbp, 0);
                dbp = NULL;
                dbenv->errx(dbenv, "Could not traverse data, 
                                                   retry operation");
                continue;       
            default:
                dbp->err(dbp, ret, "Error traversing data");
                goto err;
            }
        }
        rbuf = strtok(NULL, " \t\n");
        if (rbuf == NULL || rbuf[0] == '\0') {
            if (strncmp(buf, "exit", 4) == 0 ||
                strncmp(buf, "quit", 4) == 0)
                break;
            dbenv->errx(dbenv, "Format: TICKER VALUE");
            continue;
        }  

Now we assign data to the DBTs that we will use to write the new information to the database.

        key.data = buf;
        key.size = (u_int32_t)strlen(buf);

        data.data = rbuf;
        data.size = (u_int32_t)strlen(rbuf);  

Having done that, we can write the new information to the database. Here, the reason we do not need an explicit commit on this put operation is that it uses the implicit NULL txnid, so each one is automatically committed. Also, the application retries if a deadlock, timeout or permission error occurs. A forwarded put operation can return a timeout error if the operation takes too long and a permission error if there is currently no master.

    if ((ret = dbp->put(dbp, NULL, &key, &data, 0)) != 0)
    {
        dbp->err(dbp, ret, "DB->put");
        switch (ret) {
        case DB_REP_HANDLE_DEAD:
            /* Must close and reopen the handle, then can retry. */
            (void)dbp->close(dbp, 0);
            dbp = NULL;
            /* FALLTHROUGH */
        case DB_LOCK_DEADLOCK:
        case DB_TIMEOUT:
        case EACCES:
            dbenv->errx(dbenv, "Could not update data, retry operation");
            continue;
        default:
            dbp->err(dbp, ret, "Error updating data");
            goto err;
        }
    }  

Finally, we close our database before returning from the function.

err:    if (dbp != NULL)
        (void)dbp->close(dbp, 0);

    return (ret);
}  

Function: print_stocks()

This function is unmodified from when we originally introduced it. For details on that function, see Function: print_stocks() .