5.4.2 Parallel Retrieval of Graph Data

The parallel property graph query provides a simple Java API to perform parallel scans on vertices (or edges). Parallel retrieval is an optimized solution taking advantage of the distribution of the data across table partitions, so each partition is queried using a separate database connection.

Parallel retrieval will produce an array where each element holds all the vertices (or edges) from a specific partition (split). The subset of shards queried will be separated by the given start split ID and the size of the connections array provided. This way, the subset will consider splits in the range of [start, start - 1 + size of connections array]. Note that an integer ID (in the range of [0, N - 1]) is assigned to all the splits in the vertex table with N splits.

The following code loads a property graph, opens an array of connections, and executes a parallel query to retrieve all vertices and edges using the opened connections. The number of calls to the getVerticesPartitioned (getEdgesPartitioned) method is controlled by the total number of splits and the number of connections used.

OraclePropertyGraph opg = OraclePropertyGraph.getInstance(args, szGraphName);

// Clear existing vertices/edges in the property graph 
opg.clearRepository(); 

String szOPVFile = "../../data/connections.opv";
String szOPEFile = "../../data/connections.ope";

// This object will handle parallel data loading
OraclePropertyGraphDataLoader opgdl = OraclePropertyGraphDataLoader.getInstance();
opgdl.loadData(opg, szOPVFile, szOPEFile, dop); 

// Create connections used in parallel query
Oracle[] oracleConns = new Oracle[dop];
Connection[]   conns = new Connection[dop];
for (int i = 0; i < dop; i++) { 
  oracleConns[i] = opg.getOracle().clone();
  conns[i] = oracleConns[i].getConnection();
}

long lCountV = 0;
// Iterate over all the vertices’ partitionIDs to count all the vertices
for (int partitionID = 0; partitionID < opg.getVertexPartitionsNumber(); 
     partitionID += dop) { 
  Iterable<Vertex>[] iterables 
        = opg.getVerticesPartitioned(conns /* Connection array */, 
                                     true /* skip store to cache */, 
                                     partitionID /* starting partition */); 
  lCountV += consumeIterables(iterables); /* consume iterables using 
                                             threads */
}

// Count all vertices
System.out.println("Vertices found using parallel query: " + lCountV);

long lCountE = 0;
// Iterate over all the edges’ partitionIDs to count all the edges
for (int partitionID = 0; partitionID < opg.getEdgeTablePartitionIDs(); 
     partitionID += dop) { 
  Iterable<Edge>[] iterables 
          = opg.getEdgesPartitioned(conns /* Connection array */, 
                                    true /* skip store to cache */, 
                                    partitionID /* starting partitionID */); 
  lCountE += consumeIterables(iterables); /* consume iterables using 
                                             threads */
}

// Count all edges
System.out.println("Edges found using parallel query: " + lCountE);

// Close the connections to the database after completed
for (int idx = 0; idx < conns.length; idx++) { 
   conns[idx].close();
}