7.12 RDF Graph Support for Apache Jenaを使用したバルク・ロード

膨大な数のRDF/OWLデータ・ファイルをOracle Databaseに簡単にロードするには、OracleBulkUpdateHandler JavaクラスのprepareBulkメソッドとcompleteBulkメソッドを使用します。

OracleBulkUpdateHandlerクラスのaddInBulkメソッドは、RDFグラフのトリプルをOracle Databaseにバルクロード方式でロードできます。グラフがJenaインメモリー・グラフの場合、操作は物理メモリーのサイズによって制限されます。prepareBulkメソッドは、Jenaのインメモリー・グラフをバイパスし、RDFデータ・ファイルへの直接入力ストリームを取得してデータを解析し、基礎となるステージング表にトリプルをロードします。ステージング表および長いリテラルを格納するための添付表が存在しない場合は、自動的に作成されます。

prepareBulkメソッドは、複数のデータ・ファイルを同じ基礎となるステージング表へロードするために複数回コールできます。ハードウェア・システムがロードバランシングされており、複数のCPUコアによる十分なI/Oキャパシティが確保されている場合には、同時にコールすることもできます。

すべてのデータ・ファイルがprepareBulkメソッドによって処理されると、completeBulkを起動して、すべてのデータをRDFネットワークにロードできます。

例7-9 ステージング表へのデータのロード(prepareBulk)

例7-9に、ディレクトリdir_1のすべてデータ・ファイルを、基礎となるステージング表へロードする方法を示します。長いリテラルがサポートされ、別々の表に格納されます。記憶域容量を節約するため、GZIPを使用してデータ・ファイルを圧縮することができ、そのデータ・ファイルがGZIPを使用して圧縮されているかどうかを、prepareBulkメソッドで自動的に検出できます。

Oracle oracle = new Oracle(szJdbcURL, szUser, szPasswd);
GraphOracleSem graph = new GraphOracleSem(oracle, szModelName);
 
PrintStream psOut = System.out;

String dirname = "dir_1";
File fileDir = new File(dirname);
String[] szAllFiles = fileDir.list();

// loop through all the files in a directory
for (int idx = 0; idx < szAllFiles.length; idx++) {
  String szIndFileName = dirname + File.separator + szAllFiles[idx];
  psOut.println("process to [ID = " + idx + " ] file " + szIndFileName);
  psOut.flush();
 
  try {
    InputStream is = new FileInputStream(szIndFileName);
    graph.getBulkUpdateHandler().prepareBulk(
        is,                    // input stream
        "http://example.com",  // base URI
        "RDF/XML",             // data file type: can be RDF/XML, N-TRIPLE, etc.
        "SEMTS",               // tablespace
        null,                  // flags
        null,                  // listener
        null                   // staging table name.
        );
    is.close();
  }
  catch (Throwable t) {
    psOut.println("Hit exception " + t.getMessage());
  }
}
 
graph.close();
oracle.dispose();

例7-9のコードは、新しいOracleオブジェクトの作成から、そのOracleオブジェクトを破棄して終了するまでをパラレルで実行できます。データベースのハードウェア・システム上に、クワッド-コアCPUと十分なI/O容量がある場合、すべてのデータ・ファイルを4つの別々のディレクトリ(dir_1dir_2dir_3およびdir_4)に分割して保存することができます。4つのJavaプロセス・スレッドを起動し、それらのディレクトリで別々に同時に動作させることができます。(詳細は、「パラレル(マルチスレッド)モードでのprepareBulkの使用」を参照してください。)

例7-10 ステージング表からRDFネットワーク(completeBulk)へのデータのロード

すべてのデータ・ファイルが処理された後、1回だけcompleteBulkメソッドを起動し、例7-10に示すとおり、ステージング表のデータをRDFネットワークにロードできます。長いリテラルを持つトリプルもロードされます。

graph.getBulkUpdateHandler().completeBulk(
  null,  // flags for invoking SEM_APIS.bulk_load_from_staging_table
  null   // staging table name
);

また、prepareBulkメソッドは、Jena RDFグラフも入力データ・ソースとして取得でき、この場合は、そのJena RDFグラフのトリプルが基礎となるステージング表にロードされます。詳細は、Javadocを参照してください。

例7-11 prepareBulkでのRDFaの使用

Jena RDFグラフとデータ・ファイルからトリプルをロードする以外に、prepareBulkメソッドは、例7-11に示すとおり、RDFaをサポートします。(RDFaについては、http://www.w3.org/TR/xhtml-rdfa-primer/を参照してください。)

graph.getBulkUpdateHandler().prepareBulk(
  rdfaUrl,   // url to a web page using RDFa
  "SEMTS",   // tablespace
  null,      // flags
  null,      // listener
  null       // staging table name
);

RDFaを解析するには、関連するjava-rdfaライブラリをクラスパスに含める必要があります。その他の設定やAPIコールは必要ありません。(java-rdfaの詳細は、http://www.rootdev.net/maven/projects/java-rdfa/およびProject Informationの下のその他のトピックを参照してください。)

rdfaUrlがファイアウォールの外側にある場合は、次のHTTPプロキシ関連のJava VMプロパティを設定する必要がある場合があります。

-Dhttp.proxyPort=...
-Dhttp.proxyHost=...

例7-12 DatasetGraphへのクワッドのロード

この項の前述の例では、トリプル・データを単一のグラフにロードします。複数の名前付きグラフにわたる場合がある(NQUADS形式のデータなど)クワッド・データのロードには、DatasetGraphOracleSemクラスの使用が必要です。例7-12に示すとおり、DatasetGraphOracleSemクラスはBulkUpdateHandler APIを使用しませんが、同様のprepareBulkインタフェースとcompleteBulkインタフェースを提供します。

Oracle oracle = new Oracle(szJdbcURL, szUser, szPasswd);
 
// Can only create DatasetGraphOracleSem from an existing GraphOracleSem
GraphOracleSem graph = new GraphOracleSem(oracle, szModelName);
DatasetGraphOracleSem dataset = DatasetGraphOracleSem.createFrom(graph);
 
// Don't need graph anymore, close it to free resources
graph.close();
 
try {
    InputStream is = new FileInputStream(szFileName);
    // load NQUADS file into a staging table. This file can be gzipp'ed.
    dataset.prepareBulk(
        is,                // input stream
        "http://my.base/", // base URI
        "N-QUADS",         // data file type; can be "TRIG"
        "SEMTS",           // tablespace
        null,              // flags
        null,              // listener
        null,              // staging table name
        false              // truncate staging table before load
    );
    // Load quads from staging table into the dataset
    dataset.completeBulk(
        null, // flags; can be "PARSE PARALLEL_CREATE_INDEX PARALLEL=4 
              //                mbv_method=shadow" on a quad core machine
        null  // staging table name
    );
} 
catch (Throwable t) {
    System.out.println("Hit exception " + t.getMessage());
}
finally {
    dataset.close();
    oracle.dispose();
}

7.12.1 パラレル(マルチスレッド)・モードでのprepareBulkの使用

例7-9には、ファイル・システム・ディレクトリ下の一連のファイルを、Oracle Database表(ステージング表)へ、順番にロードする方法が示されていました。例7-13では、一連のファイルを同時にOracle表(ステージング表)にロードします。並列度は、入力パラメータiMaxThreadsによって制御されます。

4つ以上のCPUコアによる均等なハードウェア設定で、iMaxThreadsに8(または16)を設定すると、多数のデータ・ファイルが処理される場合に、prepareBulk操作の速度を大幅に向上できます。

例7-13 iMaxThreadsによるprepareBulkの使用

public void testPrepareInParallel(String jdbcUrl, String user,
                       String password, String modelName,
                       String lang,
                       String tbs,
                       String dirname,
                       int iMaxThreads,
                       PrintStream psOut)
   throws SQLException, IOException,  InterruptedException
 {
   File dir = new File(dirname);
   File[] files = dir.listFiles();

   // create a set of physical Oracle connections and graph objects
   Oracle[] oracles = new Oracle[iMaxThreads];
   GraphOracleSem[] graphs = new GraphOracleSem[iMaxThreads];
   for (int idx = 0; idx < iMaxThreads; idx++) {
     oracles[idx] = new Oracle(jdbcUrl, user, password);
     graphs[idx] = new GraphOracleSem(oracles[idx], modelName);
   }

   PrepareWorker[] workers = new PrepareWorker[iMaxThreads];
   Thread[] threads = new Thread[iMaxThreads];
   for (int idx = 0; idx < iMaxThreads; idx++) {
     workers[idx] = new PrepareWorker(
         graphs[idx],
         files,
         idx,
         iMaxThreads,
         lang,
         tbs,
         psOut
         );
     threads[idx] = new Thread(workers[idx], workers[idx].getName());
     psOut.println("testPrepareInParallel: PrepareWorker " + idx + " running");
     threads[idx].start();
   }

   psOut.println("testPrepareInParallel: all threads started");

   for (int idx = 0; idx < iMaxThreads; idx++) {
     threads[idx].join();
   }
   for (int idx = 0; idx < iMaxThreads; idx++) {
     graphs[idx].close();
     oracles[idx].dispose();
   }
 }

 static class PrepareWorker implements Runnable
 {
   GraphOracleSem graph = null;
   int idx;
   int threads;
   File[] files = null;
   String lang = null;
   String tbs  = null;
   PrintStream psOut;

   public void run()
   {
     long lStartTime = System.currentTimeMillis();
     for (int idxFile = idx; idxFile < files.length; idxFile += threads) {
       File file = files[idxFile];
       try {
         FileInputStream fis = new FileInputStream(file);
         graph.getBulkUpdateHandler().prepareBulk(
             fis,
             "http://base.com/",
             lang,
             tbs,
             null,                   // flags
             new MyListener(psOut),  // listener
             null                    // table name
             );
         fis.close();
       }
       catch (Exception e) {
         psOut.println("PrepareWorker: thread ["+getName()+"] error "+ e.getMessage());
       }
       psOut.println("PrepareWorker: thread ["+getName()+"] done to "
           + idxFile + ", file = " + file.toString()
           + " in (ms) " + (System.currentTimeMillis() - lStartTime));
     }
   }

   public PrepareWorker(GraphOracleSem graph,
                        File[] files,
                        int idx,
                        int threads,
                        String lang,
                        String tbs,
                        PrintStream psOut)
   {
     this.graph   = graph;
     this.files   = files;
     this.psOut   = psOut;
     this.idx     = idx;
     this.threads = threads;
     this.files   = files;
     this.lang    = lang;
     this.tbs     = tbs ;
   }

   public String getName()
   {
     return "PrepareWorker" + idx;
   }
 } 
 
 static class MyListener implements StatusListener 
 {
   PrintStream m_ps = null;
   public MyListener(PrintStream ps) { m_ps = ps; }
   long lLastBatch = 0;

   public void statusChanged(long count)
   {
     if (count - lLastBatch >= 10000) {
       m_ps.println("process to " + Long.toString(count));
       lLastBatch = count;
     }
   }

   public int illegalStmtEncountered(Node graphNode, Triple triple, long count)
   {
     m_ps.println("hit illegal statement with object " + triple.getObject().toString());
     return 0; // skip it
   }
 }

7.12.2 データ・ロード中の無効な構文の処理

prepareBulkを使用すると、無効なトリプルとクワッドをスキップできます。この機能は、ソースのRDFデータに構文エラーが含まれることが考えられる場合に有用です。例7-14で、StatusListenerインタフェースのカスタマイズされた実装(パッケージoracle.spatial.rdf.client.jenaで定義される)が、prepareBulkへのパラメータとして渡されます。この例で、illegalStmtEncounteredメソッドは、prepareBulkが無効なトリプルをスキップして進むことができるように、その無効なトリプルのオブジェクト・フィールドを出力し、0を戻します。

例7-14 無効な構文によるトリプルのスキップ

....
 
Oracle oracle = new Oracle(jdbcUrl, user, password);
GraphOracleSem graph = new GraphOracleSem(oracle, modelName);
PrintStream psOut = System.err;
 
graph.getBulkUpdateHandler().prepareBulk(
  new FileInputStream(rdfDataFilename),
  "http://base.com/",    // base 
  lang,                  // data format, can be "N-TRIPLES" "RDF/XML" ...
  tbs,                   // tablespace name
  null,                  // flags
  new MyListener(psOut), // call back to show progress and also process illegal triples/quads
  null,                  // tableName, if null use default names
  false                  // truncate existing staging tables
  );
 
 graph.close();
 oracle.dispose();
 .... 
 
 // A customized StatusListener interface implementation
  public class MyListener implements StatusListener
 {
   PrintStream m_ps = null;
   public MyListener(PrintStream ps) { m_ps = ps; }
 
   public void statusChanged(long count)
   {
     // m_ps.println("process to " + Long.toString(count));
   }
 
  public int illegalStmtEncountered(Node graphNode, Triple triple, long count)
  {
    m_ps.println("hit illegal statement with object " + triple.getObject().toString());
    return 0; // skip it
  }
 }