5.4.2.1 JDBCベースのデータのロード

JDBCベースのデータのロードでは、Java Database Connectivity (JDBC) APIを使用して、グラフ・データをOracle Databaseにロードします。このオプションでは、指定した入力ストリームの頂点(またはエッジ)は、スプリッタ・スレッドにより複数のチャンクの間に広がります。各チャンクは、JDBCバッチを使用して、チャンク内のすべての要素を一時作業表に挿入する、異なるローダー・スレッドにより処理されます。使用されるスプリッタおよびローダー・スレッドの数は、ユーザーが指定した並列度(DOP)によって決定されます。

すべてのグラフ・データが一時作業表にロードされた後、一時作業表に格納されたすべてのデータはプロパティ・グラフVT$およびGE$表にロードされます。

次の例は、並列度48で、JDBCベースのパラレル・データ・ロードを使用し、グラフ・データをOracle定義のフラット・ファイル形式の頂点およびエッジ・ファイルからロードします。

    String szOPVFile = "../../data/connections.opv"; 
    String szOPEFile = "../../data/connections.ope"; 
    OraclePropertyGraph opg = OraclePropertyGraph.getInstance( args, szGraphName); 
    opgdl = OraclePropertyGraphDataLoader.getInstance(); 
    opgdl.loadData(opg, szOPVFile, szOPEFile, 48 /* DOP */, 1000 /* batch size */, true /* rebuild index flag */, "pddl=t,pdml=t" /* options */); 
);

データのロード操作のパフォーマンスを最適化するために、JDBCベースのデータのロードを呼び出すときに、フラグやヒントのセットを指定できます。これらのヒントとしては次のものがあります。

  • DOP: データのロード時に使用する並列度。このパラメータは、データをプロパティ・グラフVT$およびGE$表にロードするときに使用するローダー・スレッドの数と同様に、ファイルの分割時に生成するチャンクの数を決定します。

  • バッチ・サイズ:バッチ・モードでOracleのupdate文で使用するバッチ・サイズを指定する整数。JDBCベースのデータのロードに使用されるデフォルトのバッチ・サイズは1000です。

  • 索引の再作成: このフラグがtrueに設定されている場合、データ・ローダーは、データがロードされるプロパティ・グラフで定義されたすべての索引および制約を無効にします。すべてのデータがプロパティ・グラフにロードされた後、すべての索引および制約は再作成されます。

  • ロード・オプション: データのロード操作を最適化するための1つのオプション(またはカンマで区切った複数のオプション)。これらのオプションとしては次のものがあります。

    • NO_DUP=T: 入力データに無効な重複がないことを前提にしています。有効なプロパティ・グラフでは、各頂点(エッジ)は指定したプロパティ・キーに対し最大で1つの値を持つことができます。無効なプロパティ・グラフでは、各頂点(エッジ)は特定のキーに対し2つ以上の値を持つことがあります。例として、頂点vには2つのキー/値ペア、name/"John"およびname/"Johnny"があり、同じキーを共有しています。

    • PDML=T: データ・ローダーで使用されるデータベース・セッションでのDML操作のパラレル実行を有効にします。このヒントは、長時間実行するバッチ・ジョブのパフォーマンスを向上するのに使用されます。

    • PDDL=T: データ・ローダーで使用されるデータベース・セッションでのDDL操作のパラレル実行を有効にします。このヒントは、長時間実行するバッチ・ジョブのパフォーマンスを向上するのに使用されます。

    • KEEP_WORK_TABS=T: データのロードが完了した後の作業表のクリーニングおよび削除をスキップします。これはデバッグでの使用専用です。

    • KEEP_TMP_FILES=T: データのロードが完了した後の一時スプリッタ・ファイルの削除をスキップします。これはデバッグ専用です。

  • スプリッタ・フラグ: 分割フェーズで使用されるファイルまたはストリームのタイプを定義し、グラフのロード・フェーズで使用されるデータ・チャンクを生成する整数値です。一時ファイルは通常のファイル(0)、名前付きパイプ(1)、またはパイプされたストリーム(2)として作成できます。デフォルトでは、JDBCベースのデータのロードでは、次のものが使用されます

    中間データ・チャンクを処理するためのパイプされたストリーム。パイプされたストリームはJDBCベースのローダー専用です。これは完全にインメモリーで効率的であり、オペレーティング・システムにファイルを作成する必要がありません。

    名前付きパイプはローカルのオペレーティング・システムで空のファイルのように見えるのに対し、通常のファイルはローカルのオペレーティング・システムの領域を消費します。すべてのオペレーティング・システムが名前付きパイプをサポートするわけではないことに注意してください。

  • 分割ファイルの接頭辞: 分割フェーズがグラフのロード用のデータ・チャンクを生成しているときに、作成される一時ファイルまたはパイプに使用される接頭辞です。デフォルトで、接頭辞“OPG_Chunk”は通常のファイルに使用され、“OPG_Pipe”は名前付きパイプに使用されます。

  • 表領域: すべての一時作業表が作成される表領域の名前です。

サブトピック:

  • 複数のファイルを伴うJDBCベースのデータのロード

  • パーティションを伴うJDBCベースのデータのロード

  • ファインチューニングを使用したJDBCベースのパラレル・データ・ロード

複数のファイルを伴うJDBCベースのデータのロード

JDBCベースのデータのロードは、複数ファイルからの頂点およびエッジ、またはデータベースへの入力ストリームのロードもサポートします。次のコード・フラグメントは、パラレル・データ・ロードAPIを使用し、複数の頂点およびエッジ・ファイルをロードします。この例では、2つの文字列配列szOPVFilesおよびszOPEFilesが入力ファイルの保持に使用されます。

    String[] szOPVFiles = new String[] {"../../data/connections-p1.opv", 
                                        "../../data/connections-p2.opv"}; 
    String[] szOPEFiles = new String[] {"../../data/connections-p1.ope",                          
                                        "../../data/connections-p2.ope"}; 
    OraclePropertyGraph opg = OraclePropertyGraph.getInstance( args, szGraphName); 
    opgdl = OraclePropertyGraphDataLoader.getInstance(); 
    opgdl.loadData(opg, szOPVFiles, szOPEFiles, 48 /* DOP */,
                   1000 /* batch size */, 
                   true /* rebuild index flag */, 
                   "pddl=t,pdml=t" /* options */); 

パーティションを伴うJDBCベースのデータのロード

数千から数十万の要素からのグラフ・データの処理中に、JDBCベースのデータのロードAPIにより、論理パーティション化を使用して、Oracleフラット・ファイル形式のグラフ・データをOracle Databaseにロードできます。

各パーティションは、グラフ・データ・ファイルの頂点(またはエッジ)のサブセットを表します。ファイルのサイズはほぼ、パーティションの数で分割されたファイル内の異なる要素IDの数です。各パーティションは[0, Number of partitions – 1]の範囲の、整数IDで識別されます。

パーティションでパラレル・データ・ロードを使用するには、使用する論理パーティションの合計数と、パーティション・オフセット(開始ID)を、loadData APIで使用される基本パラメータに加えて、指定する必要があります。グラフ・データ・ファイルまたは入力ストリームをデータベースに完全にロードするには、定義したパーティションの数だけ、データ・ロード操作を実行する必要があります。たとえば、2つのパーティションを使用してファイルからグラフ・データをロードするには、オフセット0および1を使用した2つのデータ・ロードAPIコールが必要です。データ・ローダーへの各コールは複数のスレッド、または1つのシステムまたは複数のシステムでの別個のJavaクライアントを使用して処理できます。

この方法は、1つの頂点ファイル(または入力ストリーム)および1つのエッジ・ファイル(または入力ストリーム)で使用されることを意図したものであることに注意してください。さらに、このオプションでは頂点およびエッジでの索引および制約を無効にすることが必要です。これらの索引および制約は、すべてのパーティションがロードされた後で再作成される必要があります。

次の例は、2つのパーティションを使用してグラフ・データをロードします。各パーティションは1つのJavaプロセスDataLoaderWorkerによってロードされます。複数のワーカーを調整するために、DataLoaderCoordinatorという名前のコーディネータ・プロセスが使用されます。この例では、次の操作を行います

  1. すべての索引および制約を無効にし、

  2. 一時作業表loaderProgressを作成します。これはデータのロード・プロセス(つまり、何人のワーカーが作業を終了したか)を記録するものです。作業表が作成されると、すべてのDataLoaderWorkerプロセスがデータのロードを開始します。

  3. 1ずつプロセスを増分します。

  4. すべてのDataLoaderWorkerプロセスが実行されるまで、プロセスのポーリングを継続します(DataLoaderCoordinatorプロセスを使用して)。

  5. すべての索引および制約を再作成します。

ノート: DataLoaderWorkerでは、フラグSKIP_INDEXtrueに、フラグrebuildIndxfalseに設定される必要があります。

// start DataLoaderCoordinator, set dop = 8 and number of partitions = 2
java DataLoaderCoordinator  jdbcUrl  user password pg 8 2
// start the first DataLoaderWorker, set dop = 8, number of partitions = 2, partition offset = 0
java DataLoaderWorker jdbcUrl user password pg  8 2 0
// start the first DataLoaderWorker, set dop = 8, number of partitions = 2, partition offset = 1
java DataLoaderWorker jdbcUrl user password pg  8 2 1

DataLoaderCoordinatorが最初にすべての索引および制約を無効にします。次に、loaderProgressという名前の表を作成し、列progress = 0の1行を挿入します。

public class DataLoaderCoordinator {
        public static void main(String[] szArgs) {
          String jdbcUrl = szArgs[0];
          String user = szArgs[1];
          String password = szArgs[2];
          String graphName = szArgs[3];
          int dop = Integer.parseInt(szArgs[4]);
          int numLoaders = Integer.parseInt(szArgs[5]);

          Oracle oracle = null;
          OraclePropertyGraph opg = null;
          try {
            oracle = new Oracle(jdbcUrl, user, password);
            OraclePropertyGraphUtils.dropPropertyGraph(oracle, graphName);
            opg = OraclePropertyGraph.getInstance(oracle, graphName);

            List<String> vIndices = opg.disableVertexTableIndices();
            List<String> vConstraints = opg.disableVertexTableConstraints();
            List<String> eIndices = opg.disableEdgeTableIndices();
            List<String> eConstraints = opg.disableEdgeTableConstraints();

            String szStmt = null;
            try {
              szStmt = "drop table loaderProgress";
              opg.getOracle().executeUpdate(szStmt);
            }
            catch (SQLException ex) {
              if (ex.getErrorCode() == 942) {
                // table does not exist. ignore
              }
              else {
                throw new OraclePropertyGraphException(ex);
              }
            }

            szStmt = "create table loaderProgress (progress integer)";
            opg.getOracle().executeUpdate(szStmt);
            szStmt = "insert into loaderProgress (progress) values (0)";
            opg.getOracle().executeUpdate(szStmt);
            opg.getOracle().getConnection().commit();
            while (true) {
              if (checkLoaderProgress(oracle) == numLoaders) {
                break;
              } else {
                Thread.sleep(1000);
              }
            }

            opg.rebuildVertexTableIndices(vIndices, dop, null);
            opg.rebuildVertexTableConstraints(vConstraints, dop, null);
            opg.rebuildEdgeTableIndices(eIndices, dop, null);
            opg.rebuildEdgeTableConstraints(eConstraints, dop, null);
          }
          catch (IOException ex) {
            throw new OraclePropertyGraphException(ex);
          }
          catch (SQLException ex) {
            throw new OraclePropertyGraphException(ex);
          }
          catch (InterruptedException ex) {
            throw new OraclePropertyGraphException(ex);
          }
          catch (Exception ex) {
            throw new OraclePropertyGraphException(ex);
          }
          finally {
            try {
              if (opg != null) {
                opg.shutdown();
              }
              if (oracle != null) {
                oracle.dispose();
              }
            }
            catch (Throwable t) {
              System.out.println(t);
            }
          }

        }

        private static int checkLoaderProgress(Oracle oracle) {
          int result = 0;
          ResultSet rs = null;

          try {
            String szStmt = "select progress from loaderProgress";
            rs = oracle.executeQuery(szStmt);
            if (rs.next()) {
              result =  rs.getInt(1);
            }

          }
          catch (Exception ex) {
            throw new OraclePropertyGraphException(ex);
          }
          finally {
            try {
              if (rs != null) {
                rs.close();
              }
	    }
            catch (Throwable t) {
              System.out.println(t);
            }
          }
          return result;
        }
}
     
public class DataLoaderWorker {

        public static void main(String[] szArgs) {
          String jdbcUrl = szArgs[0];
          String user = szArgs[1];
          String password = szArgs[2];
          String graphName = szArgs[3];
          int dop = Integer.parseInt(szArgs[4]);
          int numLoaders = Integer.parseInt(szArgs[5]);
          int offset = Integer.parseInt(szArgs[6]);

          Oracle oracle = null;
          OraclePropertyGraph opg = null;

          try {
            oracle = new Oracle(jdbcUrl, user, password);
            opg = OraclePropertyGraph.getInstance(oracle, graphName, 8, dop, null/*tbs*/, ",SKIP_INDEX=T,");
            OraclePropertyGraphDataLoader opgdal = OraclePropertyGraphDataLoader.getInstance();

            while (true) {
              if (checkLoaderProgress(oracle) == 1) {
                break;
              } else {
                Thread.sleep(1000);
              }
            }

            String opvFile = "../../../data/connections.opv";
            String opeFile = "../../../data/connections.ope";
            opgdal.loadData(opg, opvFile, opeFile, dop, numLoaders, offset, 1000, false, null, "pddl=t,pdml=t");

            updateLoaderProgress(oracle);
          }
          catch (SQLException ex) {
            throw new OraclePropertyGraphException(ex);
          }
          catch (InterruptedException ex) {
            throw new OraclePropertyGraphException(ex);
          }
          finally {
            try {
              if (opg != null) {
                opg.shutdown();
              }
              if (oracle != null) {
                oracle.dispose();
              }
            }
            catch (Throwable t) {
              System.out.println(t);
            }
          }
        }

        private static int checkLoaderProgress(Oracle oracle) {
          int result = 0;
          ResultSet rs = null;

          try {
            String szStmt = "select count(*) from loaderProgress";
            rs = oracle.executeQuery(szStmt);
            if (rs.next()) {
              result = rs.getInt(1);
            }
          }
          catch (SQLException ex) {
            if (ex.getErrorCode() == 942) {
              // table does not exist. ignore
            } else {
              throw new OraclePropertyGraphException(ex);
            }
          }
          finally {
            try {
              if (rs != null) {
                rs.close();
              }
            }
            catch (Throwable t) {
              System.out.println(t);
            }
          }
          return result;
        }

        private static void updateLoaderProgress(Oracle oracle) {
          ResultSet rs = null;

          try {
            String szStmt = "update loaderProgress set progress = progress + 1";
            oracle.executeUpdate(szStmt);
            oracle.getConnection().commit();
          }
          catch (Exception ex) {
            throw new OraclePropertyGraphException(ex);
          }
          finally {
            try {
              if (rs != null) {
                rs.close();
              }
            }
            catch (Throwable t) {
              System.out.println(t);
            }
          }
        }
}

ファインチューニングを使用したJDBCベースのパラレル・データ・ロード

JDBCベースのデータ・ロードは、要素をプロパティ・グラフ・インスタンスへロードするときに使用されるIDオフセットと同様に、ロードされる行からのデータのサブセットのファインチューニングをサポートします。ファイルから読み取る行の最大数と、頂点およびエッジの両方のオフセット行番号(開始位置)を指定して、ファイルからロードするデータのサブセットを指定できます。この方法では、行の最大数が読み取られるまで、データはオフセット行番号からロードされます。最大行数が-1の場合、ロード・プロセスはファイルの終わりまでデータをスキャンします。

複数のグラフ・データ・ファイルにはいくつかのID衝突または重複が含まれることがあるので、JDBCベースのデータ・ロードでは、頂点およびエッジのIDオフセットを定義できます。この方法では、ロードされた各頂点のIDは、元の頂点IDおよび指定した頂点IDのオフセットの合計になります。同様に、ロードされた各エッジのIDは、元のエッジIDおよび指定したエッジIDのオフセットの合計から生成されます。頂点およびエッジ・ファイルは相関関係がある必要があります。ロードされたエッジに対する入出力頂点IDは指定した頂点IDオフセットに対して変更されるからです。この操作は、1つの論理パーティションを使用したデータのロードでのみサポートされます。

次のコード・フラグメントは、指定したグラフ・データ・ファイルから最初の100個の頂点およびエッジ行をロードします。この例では、IDオフセット0が使用され、これはID調整が行われないことを示します。

    String szOPVFile = "../../data/connections.opv"; 
    String szOPEFile = "../../data/connections.ope"; 
    // Run the data loading using fine tuning 
    long lVertexOffsetlines = 0; 
    long lEdgeOffsetlines = 0; 
    long lVertexMaxlines = 100; 
    long lEdgeMaxlines = 100;
    long lVIDOffset = 0;
    long lEIDOffset = 0;
    OraclePropertyGraph opg = OraclePropertyGraph.getInstance( args, szGraphName); 
    OraclePropertyGraphDataLoader opgdl = OraclePropertyGraphDataLoader.getInstance();
    
    opgdl.loadData(opg, szOPVFile, szOPEFile, 
                   lVertexOffsetlines /* offset of lines to start loading from 
              partition, default 0 */, 
                   lEdgeOffsetlines /* offset of lines to start loading from 
                   partition, default 0 */, 
      lVertexMaxlines /* maximum number of lines to start loading from 
                        partition, default -1 (all lines in partition) */, 
      lEdgeMaxlines /* maximum number of lines to start loading from 
                       partition, default -1 (all lines in partition) */, 
      lVIDOffset /* vertex ID offset: the vertex ID will be original 
                    vertex ID + offset, default 0 */, 
      lEIDOffset /* edge ID offset: the edge ID will be original edge ID 
                    + offset, default 0 */, 
      4 /* DOP */, 
      1 /* Total number of partitions, default 1 */, 
      0 /* Partition to load: from 0 to totalPartitions - 1, default 0 */, 
      OraclePropertyGraphDataLoader.PIPEDSTREAM /* splitter flag */, 
      "chunkPrefix" /* prefix: the prefix used to generate split chunks 
                       for regular files or named pipes */, 
      1000 /* batch size: batch size of Oracle update in batching mode. 
              Default value is 1000 */, 
      true /* rebuild index */, 
      null /* table space name*/,
      "pddl=t,pdml=t" /* options: enable parallel DDL and DML */);