Program Listing

Class: RepWrforConfig
Class: RepQuoteWrforExampleGSG
Method: RepQuoteWrforExampleGSG.main()
Method: RepQuoteWrforExampleGSG.init()
Method: RepQuoteWrforExampleGSG.doloop()
Method: RepQuoteWrforExampleGSG.printStocks()

Our example program builds from the simple transactional application you created in the Transactional Application chapter and configures write forwarding. The application is network-aware, so you can specify things like host names and ports from the command line. This program has additional error handling for replication errors.

When using replication with write forwarding, there are several benefits for your application code:

Class: RepWrforConfig

Before we begin, we present a class that we will use to maintain useful information for us.

The class that we create is called RepWrforConfig. First, we provide some declarations and definitions that are needed later in our example. One is the size of our cache, which we keep deliberately small for this example, and the other is the name of our database. Also, you can define a sleep time, which sets the time that a site waits before it retries synchronizing with the master. We also provide a global variable that is the name of our program; this is used for error reporting later on.

package db.repquote_gsg;

public class RepWrforConfig
{
    // Constant values used in the RepQuote application.
    public static final String progname = "RepQuoteWrforExampleGSG";
    public static final int CACHESIZE = 10 * 1024 * 1024;
    public static final int SLEEPTIME = 5000;

    // Member variables containing configuration information.
    // String specifying the home directory for rep files.
    public String home;
    // Stores an optional set of "other" hosts.
    private Vector otherHosts;
    // Priority within the replication group.
    public ReplicationManagerStartPolicy startPolicy;
    // The host address to listen to.
    private ReplicationManagerSiteConfig thisHost;

    // Member variables used internally.
    private int currOtherHost;
    private boolean gotListenAddress;

    public RepWrforConfig()
    {
        startPolicy = ReplicationManagerStartPolicy.REP_ELECTION;
        home = "TESTDIR";
        gotListenAddress = false;
        currOtherHost = 0;
        thisHost = new ReplicationManagerSiteConfig();
        otherHosts = new Vector();
    }

    public java.io.File getHome()
    {
        return new java.io.File(home);
    }

}  

Class: RepQuoteWrforExampleGSG

Our example will consist of a class, RepQuoteWrforExampleGSG, that performs all our work for us.

First, we provide the package declaration and then a few import statements that the class needs.

package db.repquote_gsg;

import java.io.FileNotFoundException;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.lang.Thread;
import java.lang.InterruptedException;

import com.sleepycat.db.Cursor;
import com.sleepycat.db.Database;
import com.sleepycat.db.DatabaseConfig;
import com.sleepycat.db.DatabaseEntry;
import com.sleepycat.db.DatabaseException;
import com.sleepycat.db.DeadlockException;
import com.sleepycat.db.DatabaseType;
import com.sleepycat.db.Environment;
import com.sleepycat.db.EnvironmentConfig;
import com.sleepycat.db.LockMode;
import com.sleepycat.db.OperationStatus;
import com.sleepycat.db.ReplicationConfig;
import com.sleepycat.db.ReplicationHandleDeadException;
import com.sleepycat.db.ReplicationHostAddress;
import com.sleepycat.db.ReplicationManagerSiteConfig;
import com.sleepycat.db.ReplicationManagerAckPolicy;

import db.repquote_gsg.RepWrforConfig;

public class RepQuoteWrforExampleGSG
{
    private RepWrforConfig repConfig;
    private Environment dbenv; 

Next, we provide our class constructor. This simply initializes our class data members.

    public RepQuoteWrforExampleGSG() 
         throws DatabaseException
    {
         repConfig = null;
         dbenv = null;
    }  

And then we provide our usage() method:

    public static void usage()
    {
        System.err.println("usage: " + RepWrforConfig.progname);
        System.err.println("-h home -l|-L host:port " +
            "[-r host:port]");

        System.err.println("\t -h home directory (required)\n" +
             "\t -l host:port (required unless -L is specified;" +
             " l stands for local)\n" + 
             "\t -L host:port (optional, L means group creator)\n" +
             "\t -r host:port (optional; r stands for remote; any " +
             "number of these\n" +
             "\t    may be specified)\n");

        System.exit(1);
    }  

where:

  • -h

    Identifies the environment home directory. You must specify this option.

  • -l

    Identifies the host and port used by this site. You must specify this option unless -L is specified.

  • -L

    Identifies the local site as group creator. You must specify this option unless -l is specified.

  • -r

    Optionally identifies another site participating in this replication group.

Method: RepQuoteWrforExampleGSG.main()

Having implemented our usage() method, we can jump directly into our main() method. This method begins by instantiating a RepWrforConfig object, and then collecting the command line arguments so that it can populate the object with the appropriate data:

    public static void main(String[] argv)
    throws Exception
{
    RepWrforConfig config = new RepWrforConfig();
    boolean isCreator = false;
    int tmpPort = 0;
    // Extract the command line parameters
    for (int i = 0; i < argv.length; i++)
    {
        if (argv[i].compareTo("-h") == 0) {
            // home is a string arg.
            i++;
            config.home = argv[i];
        } else if (argv[i].compareTo("-l") == 0 ||
          argv[i].compareTo("-L") == 0) {
            if (i == argv.length - 1)
                usage();
            if (argv[i].compareTo("-L") == 0)
                isCreator = true;
            // "local" should be host:port.
            i++;
            // Look for index of the last colon in the argv[i] string.
            int sep = argv[i].lastIndexOf(':');
            if (sep == -1 || sep == 0) {
                System.err.println(
                    "Invalid local host specification host:port needed.");
                usage();
            }
            try {
                tmpPort = Integer.parseInt(argv[i].substring(sep + 1));
            } catch (NumberFormatException nfe) {
                System.err.println("Invalid local host specification, " +
                    "could not parse port number.");
                usage();
            }
            config.setThisHost(argv[i].substring(0, sep), 
                                                     tmpPort, isCreator);
        } else if (argv[i].compareTo("-r") == 0) {
            i++;
            // Look for index of the last colon in the argv[i] string.
            int sep = argv[i].lastIndexOf(':');
            if (sep == -1 || sep == 0) {
                System.err.println(
                    "Invalid remote host specification host:port needed.");
                usage();
            }
            try {
                tmpPort = Integer.parseInt(argv[i].substring(sep + 1));
            } catch (NumberFormatException nfe) {
                System.err.println("Invalid remote host specification, " +
                    "could not parse port number.");
                usage();
            }
            config.addOtherHost(argv[i].substring(0, sep), tmpPort);
        } else {
            System.err.println("Unrecognized option: " + argv[i]);
            usage();
        }

    }  

And then perform a little sanity checking on the command line input:

    // Error check command line.
    if ((!config.gotListenAddress()) || config.home.length() == 0)
        usage();  

Now we perform the class' work. To begin, we initialize the object. The init() method actually opens our environment for us (shown in the next section).

    RepQuoteWrforExampleGSG runner = null;
    try {
        runner = new RepQuoteWrforExampleGSG();
        runner.init(config);  

And then we call our doloop() method. This method is where we perform all our database activity.

        runner.doloop();  

And then, finally terminate the application (which closes our environment handle) and end the method. Note, again, that in a traditional transactional application all databases would be closed here. In our replicated application, the database will usually be closed in the doloop() function, but we also conditionally close the database here to handle some error cases.

        runner.terminate();
    } catch (DatabaseException dbe) {
        System.err.println("Caught an exception during " +
            "initialization or processing: " + dbe.toString());
        if (runner != null)
            runner.terminate();
    }
        System.exit(0);
} // end main  

Method: RepQuoteWrforExampleGSG.init()

The RepQuoteWrforExampleGSG.init() method is used to open our environment handle. You can now configure and start Replication Manager with write forwarding. To configure write forwarding, use setReplicationConfig with the ReplicationConfig.FORWARD_WRITES option:

    public int init(RepWrforConfig config)
        throws DatabaseException
    {
        int ret = 0;
        repConfig = config;
        EnvironmentConfig envConfig = new EnvironmentConfig();
        envConfig.setErrorStream(System.err);
        envConfig.setErrorPrefix(RepWrforConfig.progname);

        envConfig.addReplicationManagerSite(repConfig.getThisHost());
        for (ReplicationHostAddress host = repConfig.getFirstOtherHost();
          host != null; host = repConfig.getNextOtherHost()){

            ReplicationManagerSiteConfig repmgrRemoteSiteConfig =
                new ReplicationManagerSiteConfig(host.host, host.port);
            repmgrRemoteSiteConfig.setBootstrapHelper(true);
            envConfig.addReplicationManagerSite(
                repmgrRemoteSiteConfig);
        }

        envConfig.setCacheSize(RepWrforConfig.CACHESIZE);
        envConfig.setTxnNoSync(true);

        envConfig.setAllowCreate(true);
        envConfig.setRunRecovery(true);
        envConfig.setThreaded(true);
        envConfig.setInitializeReplication(true);
        envConfig.setInitializeLocking(true);
        envConfig.setInitializeLogging(true);
        envConfig.setInitializeCache(true);
        envConfig.setTransactional(true);
        try {
            dbenv = new Environment(repConfig.getHome(), envConfig);
        } catch(FileNotFoundException e) {
            System.err.println("FileNotFound exception: " + e.toString());
            System.err.println(
                "Ensure that the environment directory is pre-created.");
            ret = 1;
        }

        // Configure Replication Manager write forwarding.
        dbenv.setReplicationConfig(ReplicationConfig.FORWARD_WRITES, true);

        // Start Replication Manager.
        dbenv.replicationManagerStart(3, repConfig.startPolicy);
        return ret;
    }  

Method: RepQuoteWrforExampleGSG.doloop()

We now implement our application's primary data processing method. This method provides a command prompt at which the user can enter a stock ticker value and a price for that value. This information is then entered to the database.

To display the database, simply enter return at the prompt.

To begin, we declare a database pointer:

    public int doloop()
        throws DatabaseException
    {
        Database db = null;  

Next, we begin the loop and we immediately open our database if it has not already been opened.

If -L is set, it specifies the need to create the database for the initial group creator startup. The database will be replicated to the other sites when they first start up. The database will already exist on each site for subsequent startups.

Note that there is some logic for a site to retry in case it needs time to synchronize with the master using SLEEPTIME.

        for (;;)
        {
            if (db == null) {
                DatabaseConfig dbconf = new DatabaseConfig();
                dbconf.setType(DatabaseType.BTREE);
                if (repConfig.getThisHost().getGroupCreator()) {
                    dbconf.setAllowCreate(true);
                }
                dbconf.setTransactional(true);

                try {
                    db = dbenv.openDatabase
                        (null, RepWrforConfig.progname, null, dbconf);
                } catch (java.io.FileNotFoundException e) {
                    System.err.println("No stock database available yet.");
                    if (db != null) {
                        db.close(true);
                        db = null;
                    }
                    try {
                        Thread.sleep(RepWrforConfig.SLEEPTIME);
                    } catch (InterruptedException ie) {}
                    continue;
                }
            }  

Now we implement our command prompt. If the user enters the keywords exit or quit, the loop is exited and the application ends. If the user enters nothing and instead simply presses return, the entire contents of the database is displayed. We use our printStocks() method to display the database. (That implementation is shown next in this chapter.)

We also now check for a dead replication handle, which can occur in rare cases when a new master causes a previously committed transaction to be rolled back. In such cases, all database handles must be closed and opened again.

Remember that very little error checking is performed on the data entered at this prompt. If the user fails to enter at least one space in the value string, a simple help message is printed and the prompt is returned to the user.

            BufferedReader stdin =
                new BufferedReader(new InputStreamReader(System.in));

            // Listen for input, and add it to the database.
            System.out.print("QUOTESERVER");
            System.out.print("> ");
            System.out.flush();
            String nextline = null;
            try {
                nextline = stdin.readLine();
            } catch (IOException ioe) {
                System.err.println("Unable to get data from stdin");
                break;
            }
            String[] words = nextline.split("\\s");

            // A blank line causes the DB to be dumped to stdout.
            if (words.length == 0 || 
                (words.length == 1 && words[0].length() == 0)) {
                try {
                    printStocks(db);
                } catch (DeadlockException de) {
                    continue;
                // Dead replication handles are caused by an election
                // resulting in a previously committing read becoming
                // invalid.  Close the db handle and reopen.
                } catch (ReplicationHandleDeadException rhde) {
                    db.close(true); // close no sync.
                    db = null;
                    continue;
                } catch (DatabaseException e) {
                    System.err.println("Got db exception reading
                        replication" + "DB: " + e.toString());
                    break;
                }
                continue;
            }

            if (words.length == 1 &&
                (words[0].compareToIgnoreCase("quit") == 0 ||
                words[0].compareToIgnoreCase("exit") == 0)) {
                break;
            } else if (words.length != 2) {
                System.err.println("Format: TICKER VALUE");
                continue;
            }  

Now we assign data to the DatabaseEntry classes that we will use to write the new information to the database.

            DatabaseEntry key =
                    new DatabaseEntry(words[0].getBytes());
            DatabaseEntry data =
                    new DatabaseEntry(words[1].getBytes());  

Having done that, we can write the new information to the database. Here, the reason we do not need an explicit commit on this put operation is that it uses the implicit NULL txnid, so each one is automatically committed.

            db.put(null, key, data);  

Finally, we close our database before returning from the method.

        }
        if (db != null)
            db.close(true);
        return 0;
    }  

Method: RepQuoteWrforExampleGSG.printStocks()

This function is unmodified from when we originally introduced it. For details on that function, see Method: SimpleTxn.printStocks() .