7.12 Bulk Loading Using RDF Graph Support for Apache Jena

To load thousands to hundreds of thousands of RDF/OWL data files into an Oracle database, you can use the prepareBulk and completeBulk methods in the OracleBulkUpdateHandler Java class to simplify the task.

The addInBulk method in the OracleBulkUpdateHandler class can load triples of an RDF graph into an Oracle database in bulk loading style. If the graph is a Jena in-memory graph, the operation is limited by the size of the physical memory. The prepareBulk method bypasses the Jena in-memory graphs and takes a direct input stream to an RDF data file, parses the data, and load the triples into an underlying staging table. If the staging table and an accompanying table for storing long literals do not already exist, they are created automatically.

The prepareBulk method can be invoked multiple times to load multiple data files into the same underlying staging table. It can also be invoked concurrently, assuming the hardware system is balanced and there are multiple CPU cores and sufficient I/O capacity.

Once all the data files are processed by the prepareBulk method, you can invoke completeBulk to load all the data into the RDF network.

Example 7-9 Loading Data into the Staging Table (prepareBulk)

Example 7-9 shows how to load all data files in directory dir_1 into the underlying staging table. Long literals are supported and will be stored in a separate table. The data files can be compressed using GZIP to save storage space, and the prepareBulk method can detect automatically if a data file is compressed using GZIP or not.

Oracle oracle = new Oracle(szJdbcURL, szUser, szPasswd);
GraphOracleSem graph = new GraphOracleSem(oracle, szModelName);
 
PrintStream psOut = System.out;

String dirname = "dir_1";
File fileDir = new File(dirname);
String[] szAllFiles = fileDir.list();

// loop through all the files in a directory
for (int idx = 0; idx < szAllFiles.length; idx++) {
  String szIndFileName = dirname + File.separator + szAllFiles[idx];
  psOut.println("process to [ID = " + idx + " ] file " + szIndFileName);
  psOut.flush();
 
  try {
    InputStream is = new FileInputStream(szIndFileName);
    graph.getBulkUpdateHandler().prepareBulk(
        is,                    // input stream
        "http://example.com",  // base URI
        "RDF/XML",             // data file type: can be RDF/XML, N-TRIPLE, etc.
        "SEMTS",               // tablespace
        null,                  // flags
        null,                  // listener
        null                   // staging table name.
        );
    is.close();
  }
  catch (Throwable t) {
    psOut.println("Hit exception " + t.getMessage());
  }
}
 
graph.close();
oracle.dispose();

The code in Example 7-9, starting from creating a new Oracle object and ending with disposing of the Oracle object, can be executed in parallel. Assume there is a quad-core CPU and enough I/O capacity on the database hardware system; you can divide up all the data files and save them into four separate directories: dir_1, dir_2, dir_3, and dir_4. Four Java threads of processes can be started to work on those directories separately and concurrently. (For more information, see Using prepareBulk in Parallel (Multithreaded) Mode.)

Example 7-10 Loading Data from the Staging Table into the RDF Network (completeBulk)

After all data files are processed, you can invoke, just once, the completeBulk method to load the data from staging table into the RDF network, as shown in Example 7-10. Triples with long literals will be loaded also.

graph.getBulkUpdateHandler().completeBulk(
  null,  // flags for invoking SEM_APIS.bulk_load_from_staging_table
  null   // staging table name
);

The prepareBulk method can also take a Jena RDF graph as an input data source, in which case triples in that Jena RDF graph are loaded into the underlying staging table. For more information, see the Javadoc.

Example 7-11 Using prepareBulk with RDFa

In addition to loading triples from Jena RDF graphs and data files, the prepareBulk method supports RDFa, as shown in Example 7-11. (RDFa is explained in http://www.w3.org/TR/xhtml-rdfa-primer/.)

graph.getBulkUpdateHandler().prepareBulk(
  rdfaUrl,   // url to a web page using RDFa
  "SEMTS",   // tablespace
  null,      // flags
  null,      // listener
  null       // staging table name
);

To parse RDFa, the relevant java-rdfa libraries must be included in the classpath. No additional setup or API calls are required. (For information about java-rdfa, see http://www.rootdev.net/maven/projects/java-rdfa/ and the other topics there under Project Information.)

Note that if the rdfaUrl is located outside a firewall, you may need to set the following HTTP Proxy-related Java VM properties:

-Dhttp.proxyPort=...
-Dhttp.proxyHost=...

Example 7-12 Loading Quads into a DatasetGraph

The preceding examples in this section load triple data into a single graph. Loading quad data that may span across multiple named graphs (such as data in NQUADS format) requires the use of the DatasetGraphOracleSem class. The DatasetGraphOracleSem class does not use the BulkUpdateHandler API, but does provide a similar prepareBulk and completeBulk interface, as shown in Example 7-12.

Oracle oracle = new Oracle(szJdbcURL, szUser, szPasswd);
 
// Can only create DatasetGraphOracleSem from an existing GraphOracleSem
GraphOracleSem graph = new GraphOracleSem(oracle, szModelName);
DatasetGraphOracleSem dataset = DatasetGraphOracleSem.createFrom(graph);
 
// Don't need graph anymore, close it to free resources
graph.close();
 
try {
    InputStream is = new FileInputStream(szFileName);
    // load NQUADS file into a staging table. This file can be gzipp'ed.
    dataset.prepareBulk(
        is,                // input stream
        "http://my.base/", // base URI
        "N-QUADS",         // data file type; can be "TRIG"
        "SEMTS",           // tablespace
        null,              // flags
        null,              // listener
        null,              // staging table name
        false              // truncate staging table before load
    );
    // Load quads from staging table into the dataset
    dataset.completeBulk(
        null, // flags; can be "PARSE PARALLEL_CREATE_INDEX PARALLEL=4 
              //                mbv_method=shadow" on a quad core machine
        null  // staging table name
    );
} 
catch (Throwable t) {
    System.out.println("Hit exception " + t.getMessage());
}
finally {
    dataset.close();
    oracle.dispose();
}

7.12.1 Using prepareBulk in Parallel (Multithreaded) Mode

Example 7-9 provided a way to load, sequentially, a set of files under a file system directory to an Oracle Database table (staging table). Example 7-13 loads, concurrently, a set of files to an Oracle table (staging table). The degree of parallelism is controlled by the input parameter iMaxThreads.

On a balanced hardware setup with 4 or more CPU cores, setting iMaxThreads to 8 (or 16) can improve significantly the speed of prepareBulk operation when there are many data files to be processed.

Example 7-13 Using prepareBulk with iMaxThreads

public void testPrepareInParallel(String jdbcUrl, String user,
                       String password, String modelName,
                       String lang,
                       String tbs,
                       String dirname,
                       int iMaxThreads,
                       PrintStream psOut)
   throws SQLException, IOException,  InterruptedException
 {
   File dir = new File(dirname);
   File[] files = dir.listFiles();

   // create a set of physical Oracle connections and graph objects
   Oracle[] oracles = new Oracle[iMaxThreads];
   GraphOracleSem[] graphs = new GraphOracleSem[iMaxThreads];
   for (int idx = 0; idx < iMaxThreads; idx++) {
     oracles[idx] = new Oracle(jdbcUrl, user, password);
     graphs[idx] = new GraphOracleSem(oracles[idx], modelName);
   }

   PrepareWorker[] workers = new PrepareWorker[iMaxThreads];
   Thread[] threads = new Thread[iMaxThreads];
   for (int idx = 0; idx < iMaxThreads; idx++) {
     workers[idx] = new PrepareWorker(
         graphs[idx],
         files,
         idx,
         iMaxThreads,
         lang,
         tbs,
         psOut
         );
     threads[idx] = new Thread(workers[idx], workers[idx].getName());
     psOut.println("testPrepareInParallel: PrepareWorker " + idx + " running");
     threads[idx].start();
   }

   psOut.println("testPrepareInParallel: all threads started");

   for (int idx = 0; idx < iMaxThreads; idx++) {
     threads[idx].join();
   }
   for (int idx = 0; idx < iMaxThreads; idx++) {
     graphs[idx].close();
     oracles[idx].dispose();
   }
 }

 static class PrepareWorker implements Runnable
 {
   GraphOracleSem graph = null;
   int idx;
   int threads;
   File[] files = null;
   String lang = null;
   String tbs  = null;
   PrintStream psOut;

   public void run()
   {
     long lStartTime = System.currentTimeMillis();
     for (int idxFile = idx; idxFile < files.length; idxFile += threads) {
       File file = files[idxFile];
       try {
         FileInputStream fis = new FileInputStream(file);
         graph.getBulkUpdateHandler().prepareBulk(
             fis,
             "http://base.com/",
             lang,
             tbs,
             null,                   // flags
             new MyListener(psOut),  // listener
             null                    // table name
             );
         fis.close();
       }
       catch (Exception e) {
         psOut.println("PrepareWorker: thread ["+getName()+"] error "+ e.getMessage());
       }
       psOut.println("PrepareWorker: thread ["+getName()+"] done to "
           + idxFile + ", file = " + file.toString()
           + " in (ms) " + (System.currentTimeMillis() - lStartTime));
     }
   }

   public PrepareWorker(GraphOracleSem graph,
                        File[] files,
                        int idx,
                        int threads,
                        String lang,
                        String tbs,
                        PrintStream psOut)
   {
     this.graph   = graph;
     this.files   = files;
     this.psOut   = psOut;
     this.idx     = idx;
     this.threads = threads;
     this.files   = files;
     this.lang    = lang;
     this.tbs     = tbs ;
   }

   public String getName()
   {
     return "PrepareWorker" + idx;
   }
 } 
 
 static class MyListener implements StatusListener 
 {
   PrintStream m_ps = null;
   public MyListener(PrintStream ps) { m_ps = ps; }
   long lLastBatch = 0;

   public void statusChanged(long count)
   {
     if (count - lLastBatch >= 10000) {
       m_ps.println("process to " + Long.toString(count));
       lLastBatch = count;
     }
   }

   public int illegalStmtEncountered(Node graphNode, Triple triple, long count)
   {
     m_ps.println("hit illegal statement with object " + triple.getObject().toString());
     return 0; // skip it
   }
 }

7.12.2 Handling Illegal Syntax During Data Loading

You can skip illegal triples and quads when using prepareBulk. This feature is useful if the source RDF data may contain syntax errors. In Example 7-14, a customized implementation of the StatusListener interface (defined in package oracle.spatial.rdf.client.jena) is passed as a parameter to prepareBulk. In this example, the illegalStmtEncountered method prints the object field of the illegal triple, and returns 0 so that prepareBulk can skip that illegal triple and move on.

Example 7-14 Skipping Triples with Illegal Syntax

....
 
Oracle oracle = new Oracle(jdbcUrl, user, password);
GraphOracleSem graph = new GraphOracleSem(oracle, modelName);
PrintStream psOut = System.err;
 
graph.getBulkUpdateHandler().prepareBulk(
  new FileInputStream(rdfDataFilename),
  "http://base.com/",    // base 
  lang,                  // data format, can be "N-TRIPLES" "RDF/XML" ...
  tbs,                   // tablespace name
  null,                  // flags
  new MyListener(psOut), // call back to show progress and also process illegal triples/quads
  null,                  // tableName, if null use default names
  false                  // truncate existing staging tables
  );
 
 graph.close();
 oracle.dispose();
 .... 
 
 // A customized StatusListener interface implementation
  public class MyListener implements StatusListener
 {
   PrintStream m_ps = null;
   public MyListener(PrintStream ps) { m_ps = ps; }
 
   public void statusChanged(long count)
   {
     // m_ps.println("process to " + Long.toString(count));
   }
 
  public int illegalStmtEncountered(Node graphNode, Triple triple, long count)
  {
    m_ps.println("hit illegal statement with object " + triple.getObject().toString());
    return 0; // skip it
  }
 }