7.12 RDF Graph Support for Apache Jenaを使用したバルク・ロード
膨大な数のRDF/OWLデータ・ファイルをOracle Databaseに簡単にロードするには、OracleBulkUpdateHandler
JavaクラスのprepareBulk
メソッドとcompleteBulk
メソッドを使用します。
OracleBulkUpdateHandler
クラスのaddInBulk
メソッドは、RDFグラフのトリプルをOracle Databaseにバルクロード方式でロードできます。グラフがJenaインメモリー・グラフの場合、操作は物理メモリーのサイズによって制限されます。prepareBulk
メソッドは、Jenaのインメモリー・グラフをバイパスし、RDFデータ・ファイルへの直接入力ストリームを取得してデータを解析し、基礎となるステージング表にトリプルをロードします。ステージング表および長いリテラルを格納するための添付表が存在しない場合は、自動的に作成されます。
prepareBulk
メソッドは、複数のデータ・ファイルを同じ基礎となるステージング表へロードするために複数回コールできます。ハードウェア・システムがロードバランシングされており、複数のCPUコアによる十分なI/Oキャパシティが確保されている場合には、同時にコールすることもできます。
すべてのデータ・ファイルがprepareBulk
メソッドによって処理されると、completeBulk
を起動して、すべてのデータをRDFネットワークにロードできます。
例7-9 ステージング表へのデータのロード(prepareBulk)
例7-9に、ディレクトリdir_1
のすべてデータ・ファイルを、基礎となるステージング表へロードする方法を示します。長いリテラルがサポートされ、別々の表に格納されます。記憶域容量を節約するため、GZIPを使用してデータ・ファイルを圧縮することができ、そのデータ・ファイルがGZIPを使用して圧縮されているかどうかを、prepareBulk
メソッドで自動的に検出できます。
Oracle oracle = new Oracle(szJdbcURL, szUser, szPasswd); GraphOracleSem graph = new GraphOracleSem(oracle, szModelName); PrintStream psOut = System.out; String dirname = "dir_1"; File fileDir = new File(dirname); String[] szAllFiles = fileDir.list(); // loop through all the files in a directory for (int idx = 0; idx < szAllFiles.length; idx++) { String szIndFileName = dirname + File.separator + szAllFiles[idx]; psOut.println("process to [ID = " + idx + " ] file " + szIndFileName); psOut.flush(); try { InputStream is = new FileInputStream(szIndFileName); graph.getBulkUpdateHandler().prepareBulk( is, // input stream "http://example.com", // base URI "RDF/XML", // data file type: can be RDF/XML, N-TRIPLE, etc. "SEMTS", // tablespace null, // flags null, // listener null // staging table name. ); is.close(); } catch (Throwable t) { psOut.println("Hit exception " + t.getMessage()); } } graph.close(); oracle.dispose();
例7-9のコードは、新しいOracleオブジェクトの作成から、そのOracleオブジェクトを破棄して終了するまでをパラレルで実行できます。データベースのハードウェア・システム上に、クワッド-コアCPUと十分なI/O容量がある場合、すべてのデータ・ファイルを4つの別々のディレクトリ(dir_1
dir_2
、dir_3
およびdir_4
)に分割して保存することができます。4つのJavaプロセス・スレッドを起動し、それらのディレクトリで別々に同時に動作させることができます。(詳細は、「パラレル(マルチスレッド)モードでのprepareBulkの使用」を参照してください。)
例7-10 ステージング表からRDFネットワーク(completeBulk)へのデータのロード
すべてのデータ・ファイルが処理された後、1回だけcompleteBulk
メソッドを起動し、例7-10に示すとおり、ステージング表のデータをRDFネットワークにロードできます。長いリテラルを持つトリプルもロードされます。
graph.getBulkUpdateHandler().completeBulk( null, // flags for invoking SEM_APIS.bulk_load_from_staging_table null // staging table name );
また、prepareBulk
メソッドは、Jena RDFグラフも入力データ・ソースとして取得でき、この場合は、そのJena RDFグラフのトリプルが基礎となるステージング表にロードされます。詳細は、Javadocを参照してください。
例7-11 prepareBulkでのRDFaの使用
Jena RDFグラフとデータ・ファイルからトリプルをロードする以外に、prepareBulk
メソッドは、例7-11に示すとおり、RDFaをサポートします。(RDFaについては、http://www.w3.org/TR/xhtml-rdfa-primer/
を参照してください。)
graph.getBulkUpdateHandler().prepareBulk( rdfaUrl, // url to a web page using RDFa "SEMTS", // tablespace null, // flags null, // listener null // staging table name );
RDFaを解析するには、関連するjava-rdfa
ライブラリをクラスパスに含める必要があります。その他の設定やAPIコールは必要ありません。(java-rdfaの詳細は、http://www.rootdev.net/maven/projects/java-rdfa/
およびProject Informationの下のその他のトピックを参照してください。)
rdfaUrl
がファイアウォールの外側にある場合は、次のHTTPプロキシ関連のJava VMプロパティを設定する必要がある場合があります。
-Dhttp.proxyPort=... -Dhttp.proxyHost=...
例7-12 DatasetGraphへのクワッドのロード
この項の前述の例では、トリプル・データを単一のグラフにロードします。複数の名前付きグラフにわたる場合がある(NQUADS形式のデータなど)クワッド・データのロードには、DatasetGraphOracleSem
クラスの使用が必要です。例7-12
に示すとおり、DatasetGraphOracleSem
クラスはBulkUpdateHandler
APIを使用しませんが、同様のprepareBulk
インタフェースとcompleteBulkインタフェースを提供します。
Oracle oracle = new Oracle(szJdbcURL, szUser, szPasswd); // Can only create DatasetGraphOracleSem from an existing GraphOracleSem GraphOracleSem graph = new GraphOracleSem(oracle, szModelName); DatasetGraphOracleSem dataset = DatasetGraphOracleSem.createFrom(graph); // Don't need graph anymore, close it to free resources graph.close(); try { InputStream is = new FileInputStream(szFileName); // load NQUADS file into a staging table. This file can be gzipp'ed. dataset.prepareBulk( is, // input stream "http://my.base/", // base URI "N-QUADS", // data file type; can be "TRIG" "SEMTS", // tablespace null, // flags null, // listener null, // staging table name false // truncate staging table before load ); // Load quads from staging table into the dataset dataset.completeBulk( null, // flags; can be "PARSE PARALLEL_CREATE_INDEX PARALLEL=4 // mbv_method=shadow" on a quad core machine null // staging table name ); } catch (Throwable t) { System.out.println("Hit exception " + t.getMessage()); } finally { dataset.close(); oracle.dispose(); }
7.12.1 パラレル(マルチスレッド)・モードでのprepareBulkの使用
例7-9には、ファイル・システム・ディレクトリ下の一連のファイルを、Oracle Database表(ステージング表)へ、順番にロードする方法が示されていました。例7-13では、一連のファイルを同時にOracle表(ステージング表)にロードします。並列度は、入力パラメータiMaxThreads
によって制御されます。
4つ以上のCPUコアによる均等なハードウェア設定で、iMaxThreads
に8(または16)を設定すると、多数のデータ・ファイルが処理される場合に、prepareBulk
操作の速度を大幅に向上できます。
例7-13 iMaxThreadsによるprepareBulkの使用
public void testPrepareInParallel(String jdbcUrl, String user, String password, String modelName, String lang, String tbs, String dirname, int iMaxThreads, PrintStream psOut) throws SQLException, IOException, InterruptedException { File dir = new File(dirname); File[] files = dir.listFiles(); // create a set of physical Oracle connections and graph objects Oracle[] oracles = new Oracle[iMaxThreads]; GraphOracleSem[] graphs = new GraphOracleSem[iMaxThreads]; for (int idx = 0; idx < iMaxThreads; idx++) { oracles[idx] = new Oracle(jdbcUrl, user, password); graphs[idx] = new GraphOracleSem(oracles[idx], modelName); } PrepareWorker[] workers = new PrepareWorker[iMaxThreads]; Thread[] threads = new Thread[iMaxThreads]; for (int idx = 0; idx < iMaxThreads; idx++) { workers[idx] = new PrepareWorker( graphs[idx], files, idx, iMaxThreads, lang, tbs, psOut ); threads[idx] = new Thread(workers[idx], workers[idx].getName()); psOut.println("testPrepareInParallel: PrepareWorker " + idx + " running"); threads[idx].start(); } psOut.println("testPrepareInParallel: all threads started"); for (int idx = 0; idx < iMaxThreads; idx++) { threads[idx].join(); } for (int idx = 0; idx < iMaxThreads; idx++) { graphs[idx].close(); oracles[idx].dispose(); } } static class PrepareWorker implements Runnable { GraphOracleSem graph = null; int idx; int threads; File[] files = null; String lang = null; String tbs = null; PrintStream psOut; public void run() { long lStartTime = System.currentTimeMillis(); for (int idxFile = idx; idxFile < files.length; idxFile += threads) { File file = files[idxFile]; try { FileInputStream fis = new FileInputStream(file); graph.getBulkUpdateHandler().prepareBulk( fis, "http://base.com/", lang, tbs, null, // flags new MyListener(psOut), // listener null // table name ); fis.close(); } catch (Exception e) { psOut.println("PrepareWorker: thread ["+getName()+"] error "+ e.getMessage()); } psOut.println("PrepareWorker: thread ["+getName()+"] done to " + idxFile + ", file = " + file.toString() + " in (ms) " + (System.currentTimeMillis() - lStartTime)); } } public PrepareWorker(GraphOracleSem graph, File[] files, int idx, int threads, String lang, String tbs, PrintStream psOut) { this.graph = graph; this.files = files; this.psOut = psOut; this.idx = idx; this.threads = threads; this.files = files; this.lang = lang; this.tbs = tbs ; } public String getName() { return "PrepareWorker" + idx; } } static class MyListener implements StatusListener { PrintStream m_ps = null; public MyListener(PrintStream ps) { m_ps = ps; } long lLastBatch = 0; public void statusChanged(long count) { if (count - lLastBatch >= 10000) { m_ps.println("process to " + Long.toString(count)); lLastBatch = count; } } public int illegalStmtEncountered(Node graphNode, Triple triple, long count) { m_ps.println("hit illegal statement with object " + triple.getObject().toString()); return 0; // skip it } }
7.12.2 データ・ロード中の無効な構文の処理
prepareBulk
を使用すると、無効なトリプルとクワッドをスキップできます。この機能は、ソースのRDFデータに構文エラーが含まれることが考えられる場合に有用です。例7-14で、StatusListener
インタフェースのカスタマイズされた実装(パッケージoracle.spatial.rdf.client.jena
で定義される)が、prepareBulkへのパラメータとして渡されます。この例で、illegalStmtEncountered
メソッドは、prepareBulk
が無効なトリプルをスキップして進むことができるように、その無効なトリプルのオブジェクト・フィールドを出力し、0を戻します。
例7-14 無効な構文によるトリプルのスキップ
.... Oracle oracle = new Oracle(jdbcUrl, user, password); GraphOracleSem graph = new GraphOracleSem(oracle, modelName); PrintStream psOut = System.err; graph.getBulkUpdateHandler().prepareBulk( new FileInputStream(rdfDataFilename), "http://base.com/", // base lang, // data format, can be "N-TRIPLES" "RDF/XML" ... tbs, // tablespace name null, // flags new MyListener(psOut), // call back to show progress and also process illegal triples/quads null, // tableName, if null use default names false // truncate existing staging tables ); graph.close(); oracle.dispose(); .... // A customized StatusListener interface implementation public class MyListener implements StatusListener { PrintStream m_ps = null; public MyListener(PrintStream ps) { m_ps = ps; } public void statusChanged(long count) { // m_ps.println("process to " + Long.toString(count)); } public int illegalStmtEncountered(Node graphNode, Triple triple, long count) { m_ps.println("hit illegal statement with object " + triple.getObject().toString()); return 0; // skip it } }