5.4.2 グラフ・データのパラレル取得

プロパティ・グラフのパラレル問合せは、頂点(またはエッジ)のパラレル・スキャンを実行するための単純なJava APIを提供します。パラレル取得は、表パーティション全体へのデータの配分を利用する最適化されたソリューションであり、パーティションは別個のデータベース接続を使用して問合せられます。

パラレル取得では、各要素が特定のパーティション(分割)からのすべての頂点(またはエッジ)を保持する配列が生成されます。問合せされたシャードのサブセットは、指定した開始分割IDと、指定した接続配列のサイズにより分離されます。この方法では、サブセットは[開始, 開始 - 1 + 接続の配列のサイズ]の範囲内の分割を考慮します。N分割を含む頂点表のすべての分割に、([0, N - 1]の範囲の)整数IDが割り当てられることに注意してください。

次のコードは、プロパティ・グラフをロードし、接続の配列をオープンし、オープンした接続を使用してパラレル問合せを実行してすべての頂点およびエッジを取得します。getVerticesPartitioned (getEdgesPartitioned)メソッドの呼出し数は、分割の合計数と使用される接続の数によって制御されます。

OraclePropertyGraph opg = OraclePropertyGraph.getInstance(args, szGraphName);

// Clear existing vertices/edges in the property graph 
opg.clearRepository(); 

String szOPVFile = "../../data/connections.opv";
String szOPEFile = "../../data/connections.ope";

// This object will handle parallel data loading
OraclePropertyGraphDataLoader opgdl = OraclePropertyGraphDataLoader.getInstance();
opgdl.loadData(opg, szOPVFile, szOPEFile, dop); 

// Create connections used in parallel query
Oracle[] oracleConns = new Oracle[dop];
Connection[]   conns = new Connection[dop];
for (int i = 0; i < dop; i++) { 
  oracleConns[i] = opg.getOracle().clone();
  conns[i] = oracleConns[i].getConnection();
}

long lCountV = 0;
// Iterate over all the vertices’ partitionIDs to count all the vertices
for (int partitionID = 0; partitionID < opg.getVertexPartitionsNumber(); 
     partitionID += dop) { 
  Iterable<Vertex>[] iterables 
        = opg.getVerticesPartitioned(conns /* Connection array */, 
                                     true /* skip store to cache */, 
                                     partitionID /* starting partition */); 
  lCountV += consumeIterables(iterables); /* consume iterables using 
                                             threads */
}

// Count all vertices
System.out.println("Vertices found using parallel query: " + lCountV);

long lCountE = 0;
// Iterate over all the edges’ partitionIDs to count all the edges
for (int partitionID = 0; partitionID < opg.getEdgeTablePartitionIDs(); 
     partitionID += dop) { 
  Iterable<Edge>[] iterables 
          = opg.getEdgesPartitioned(conns /* Connection array */, 
                                    true /* skip store to cache */, 
                                    partitionID /* starting partitionID */); 
  lCountE += consumeIterables(iterables); /* consume iterables using 
                                             threads */
}

// Count all edges
System.out.println("Edges found using parallel query: " + lCountE);

// Close the connections to the database after completed
for (int idx = 0; idx < conns.length; idx++) { 
   conns[idx].close();
}