7.12 Bulk Loading Using RDF Graph Support for Apache Jena
To load thousands to hundreds of thousands of RDF/OWL data files into an Oracle database, you can use the prepareBulk
and completeBulk
methods in the OracleBulkUpdateHandler
Java class to simplify the task.
The addInBulk
method in the OracleBulkUpdateHandler
class can load triples of an RDF graph into an Oracle database in bulk loading style. If
the graph is a Jena in-memory graph, the operation is limited by the size of the
physical memory. The prepareBulk
method bypasses the Jena in-memory
graphs and takes a direct input stream to an RDF data file, parses the data, and load
the triples into an underlying staging table. If the staging table and an accompanying
table for storing long literals do not already exist, they are created
automatically.
The prepareBulk
method can be invoked multiple times to load multiple data files into the same underlying staging table. It can also be invoked concurrently, assuming the hardware system is balanced and there are multiple CPU cores and sufficient I/O capacity.
Once all the data files are processed by the prepareBulk
method, you
can invoke completeBulk
to load all the data into the RDF network.
Example 7-9 Loading Data into the Staging Table (prepareBulk)
Example 7-9 shows how to load all data files in directory dir_1
into the underlying staging table. Long literals are supported and will be stored in a separate table. The data files can be compressed using GZIP to save storage space, and the prepareBulk
method can detect automatically if a data file is compressed using GZIP or not.
Oracle oracle = new Oracle(szJdbcURL, szUser, szPasswd); GraphOracleSem graph = new GraphOracleSem(oracle, szModelName); PrintStream psOut = System.out; String dirname = "dir_1"; File fileDir = new File(dirname); String[] szAllFiles = fileDir.list(); // loop through all the files in a directory for (int idx = 0; idx < szAllFiles.length; idx++) { String szIndFileName = dirname + File.separator + szAllFiles[idx]; psOut.println("process to [ID = " + idx + " ] file " + szIndFileName); psOut.flush(); try { InputStream is = new FileInputStream(szIndFileName); graph.getBulkUpdateHandler().prepareBulk( is, // input stream "http://example.com", // base URI "RDF/XML", // data file type: can be RDF/XML, N-TRIPLE, etc. "SEMTS", // tablespace null, // flags null, // listener null // staging table name. ); is.close(); } catch (Throwable t) { psOut.println("Hit exception " + t.getMessage()); } } graph.close(); oracle.dispose();
The code in Example 7-9, starting from creating a new Oracle object and ending with disposing of the Oracle object, can be executed in parallel. Assume there is a quad-core CPU and enough I/O capacity on the database hardware system; you can divide up all the data files and save them into four separate directories: dir_1
, dir_2
, dir_3
, and dir_4
. Four Java threads of processes can be started to work on those directories separately and concurrently. (For more information, see Using prepareBulk in Parallel (Multithreaded) Mode.)
Example 7-10 Loading Data from the Staging Table into the RDF Network (completeBulk)
After all data files are processed, you can invoke, just once, the
completeBulk
method to load the data from staging table into
the RDF network, as shown in Example 7-10. Triples with long literals will be loaded also.
graph.getBulkUpdateHandler().completeBulk( null, // flags for invoking SEM_APIS.bulk_load_from_staging_table null // staging table name );
The prepareBulk
method can also take a Jena RDF graph as an input
data source, in which case triples in that Jena RDF graph are loaded into the
underlying staging table. For more information, see the Javadoc.
Example 7-11 Using prepareBulk with RDFa
In addition to loading triples from Jena
RDF graphs and data files, the prepareBulk
method supports RDFa, as
shown in Example 7-11. (RDFa is explained in http://www.w3.org/TR/xhtml-rdfa-primer/
.)
graph.getBulkUpdateHandler().prepareBulk( rdfaUrl, // url to a web page using RDFa "SEMTS", // tablespace null, // flags null, // listener null // staging table name );
To parse RDFa, the relevant java-rdfa
libraries must be included in the classpath. No additional setup or API calls are required. (For information about java-rdfa, see http://www.rootdev.net/maven/projects/java-rdfa/
and the other topics there under Project Information.)
Note that if the rdfaUrl
is located outside a firewall, you may need to set the following HTTP Proxy-related Java VM properties:
-Dhttp.proxyPort=... -Dhttp.proxyHost=...
Example 7-12 Loading Quads into a DatasetGraph
The preceding examples in this section load triple data into a single graph. Loading quad data that may span across multiple named graphs (such as data in NQUADS format) requires the use of the DatasetGraphOracleSem
class. The DatasetGraphOracleSem
class does not use the BulkUpdateHandler
API, but does provide a similar prepareBulk
and completeBulk
interface, as shown in Example 7-12.
Oracle oracle = new Oracle(szJdbcURL, szUser, szPasswd); // Can only create DatasetGraphOracleSem from an existing GraphOracleSem GraphOracleSem graph = new GraphOracleSem(oracle, szModelName); DatasetGraphOracleSem dataset = DatasetGraphOracleSem.createFrom(graph); // Don't need graph anymore, close it to free resources graph.close(); try { InputStream is = new FileInputStream(szFileName); // load NQUADS file into a staging table. This file can be gzipp'ed. dataset.prepareBulk( is, // input stream "http://my.base/", // base URI "N-QUADS", // data file type; can be "TRIG" "SEMTS", // tablespace null, // flags null, // listener null, // staging table name false // truncate staging table before load ); // Load quads from staging table into the dataset dataset.completeBulk( null, // flags; can be "PARSE PARALLEL_CREATE_INDEX PARALLEL=4 // mbv_method=shadow" on a quad core machine null // staging table name ); } catch (Throwable t) { System.out.println("Hit exception " + t.getMessage()); } finally { dataset.close(); oracle.dispose(); }
Parent topic: RDF Graph Support for Apache Jena
7.12.1 Using prepareBulk in Parallel (Multithreaded) Mode
Example 7-9 provided a way to load, sequentially, a set of files under a file system directory to an Oracle Database table (staging table). Example 7-13 loads, concurrently, a set of files to an Oracle table (staging table). The degree of parallelism is controlled by the input parameter iMaxThreads
.
On a balanced hardware setup with 4 or more CPU cores, setting iMaxThreads
to 8 (or 16) can improve significantly the speed of prepareBulk
operation when there are many data files to be processed.
Example 7-13 Using prepareBulk with iMaxThreads
public void testPrepareInParallel(String jdbcUrl, String user, String password, String modelName, String lang, String tbs, String dirname, int iMaxThreads, PrintStream psOut) throws SQLException, IOException, InterruptedException { File dir = new File(dirname); File[] files = dir.listFiles(); // create a set of physical Oracle connections and graph objects Oracle[] oracles = new Oracle[iMaxThreads]; GraphOracleSem[] graphs = new GraphOracleSem[iMaxThreads]; for (int idx = 0; idx < iMaxThreads; idx++) { oracles[idx] = new Oracle(jdbcUrl, user, password); graphs[idx] = new GraphOracleSem(oracles[idx], modelName); } PrepareWorker[] workers = new PrepareWorker[iMaxThreads]; Thread[] threads = new Thread[iMaxThreads]; for (int idx = 0; idx < iMaxThreads; idx++) { workers[idx] = new PrepareWorker( graphs[idx], files, idx, iMaxThreads, lang, tbs, psOut ); threads[idx] = new Thread(workers[idx], workers[idx].getName()); psOut.println("testPrepareInParallel: PrepareWorker " + idx + " running"); threads[idx].start(); } psOut.println("testPrepareInParallel: all threads started"); for (int idx = 0; idx < iMaxThreads; idx++) { threads[idx].join(); } for (int idx = 0; idx < iMaxThreads; idx++) { graphs[idx].close(); oracles[idx].dispose(); } } static class PrepareWorker implements Runnable { GraphOracleSem graph = null; int idx; int threads; File[] files = null; String lang = null; String tbs = null; PrintStream psOut; public void run() { long lStartTime = System.currentTimeMillis(); for (int idxFile = idx; idxFile < files.length; idxFile += threads) { File file = files[idxFile]; try { FileInputStream fis = new FileInputStream(file); graph.getBulkUpdateHandler().prepareBulk( fis, "http://base.com/", lang, tbs, null, // flags new MyListener(psOut), // listener null // table name ); fis.close(); } catch (Exception e) { psOut.println("PrepareWorker: thread ["+getName()+"] error "+ e.getMessage()); } psOut.println("PrepareWorker: thread ["+getName()+"] done to " + idxFile + ", file = " + file.toString() + " in (ms) " + (System.currentTimeMillis() - lStartTime)); } } public PrepareWorker(GraphOracleSem graph, File[] files, int idx, int threads, String lang, String tbs, PrintStream psOut) { this.graph = graph; this.files = files; this.psOut = psOut; this.idx = idx; this.threads = threads; this.files = files; this.lang = lang; this.tbs = tbs ; } public String getName() { return "PrepareWorker" + idx; } } static class MyListener implements StatusListener { PrintStream m_ps = null; public MyListener(PrintStream ps) { m_ps = ps; } long lLastBatch = 0; public void statusChanged(long count) { if (count - lLastBatch >= 10000) { m_ps.println("process to " + Long.toString(count)); lLastBatch = count; } } public int illegalStmtEncountered(Node graphNode, Triple triple, long count) { m_ps.println("hit illegal statement with object " + triple.getObject().toString()); return 0; // skip it } }
Parent topic: Bulk Loading Using RDF Graph Support for Apache Jena
7.12.2 Handling Illegal Syntax During Data Loading
You can skip illegal triples and quads when using prepareBulk
. This feature is useful if the source RDF data may contain syntax errors. In Example 7-14, a customized implementation of the StatusListener
interface (defined in package oracle.spatial.rdf.client.jena
) is passed as a parameter to prepareBulk. In this example, the illegalStmtEncountered
method prints the object field of the illegal triple, and returns 0 so that prepareBulk
can skip that illegal triple and move on.
Example 7-14 Skipping Triples with Illegal Syntax
.... Oracle oracle = new Oracle(jdbcUrl, user, password); GraphOracleSem graph = new GraphOracleSem(oracle, modelName); PrintStream psOut = System.err; graph.getBulkUpdateHandler().prepareBulk( new FileInputStream(rdfDataFilename), "http://base.com/", // base lang, // data format, can be "N-TRIPLES" "RDF/XML" ... tbs, // tablespace name null, // flags new MyListener(psOut), // call back to show progress and also process illegal triples/quads null, // tableName, if null use default names false // truncate existing staging tables ); graph.close(); oracle.dispose(); .... // A customized StatusListener interface implementation public class MyListener implements StatusListener { PrintStream m_ps = null; public MyListener(PrintStream ps) { m_ps = ps; } public void statusChanged(long count) { // m_ps.println("process to " + Long.toString(count)); } public int illegalStmtEncountered(Node graphNode, Triple triple, long count) { m_ps.println("hit illegal statement with object " + triple.getObject().toString()); return 0; // skip it } }
Parent topic: Bulk Loading Using RDF Graph Support for Apache Jena