5.4.2.1 JDBC-Based Data Loading

JDBC-based data loading uses Java Database Connectivity (JDBC) APIs to load the graph data into Oracle Database. In this option, the vertices (or edges) in the given input stream will be spread among multiple chunks by the splitter thread. Each chunk will be processed by a different loader thread that inserts all the elements in the chunk into a temporary work table using JDBC batching. The number of splitter and loader threads used is determined by the degree of parallelism (DOP) specified by the user.

After all the graph data is loaded into the temporary work tables, all the data stored in the temporary work tables is loaded into the property graph VT$ and GE$ tables.

The following example loads the graph data from a vertex and edge files in Oracle-defined flat-file format using a JDBC-based parallel data loading with a degree of parallelism of 48.

    String szOPVFile = "../../data/connections.opv"; 
    String szOPEFile = "../../data/connections.ope"; 
    OraclePropertyGraph opg = OraclePropertyGraph.getInstance( args, szGraphName); 
    opgdl = OraclePropertyGraphDataLoader.getInstance(); 
    opgdl.loadData(opg, szOPVFile, szOPEFile, 48 /* DOP */, 1000 /* batch size */, true /* rebuild index flag */, "pddl=t,pdml=t" /* options */); 
);

To optimize the performance of the data loading operations, a set of flags and hints can be specified when calling the JDBC-based data loading. These hints include:

  • DOP: The degree of parallelism to use when loading the data. This parameter determines the number of chunks to generate when splitting the file as well as the number of loader threads to use when loading the data into the property graph VT$ and GE$ tables.

  • Batch Size: An integer specifying the batch size to use for Oracle update statements in batching mode. The default batch size used in the JDBC-based data loading is 1000.

  • Rebuild index: If this flag is set to true, the data loader will disable all the indexes and constraints defined over the property graph where the data will be loaded. After all the data is loaded into the property graph, all the indexes and constraints will be rebuilt.

  • Load options: An option (or multiple options delimited by commas) to optimize the data loading operations. These options include:

    • NO_DUP=T: Assumes the input data does not have invalid duplicates. In a valid property graph, each vertex (edge) can at most have one value for a given property key. In an invalid property graph, a vertex (edge) may have two or more values for a particular key. As an example, a vertex, v, has two key/value pairs: name/"John" and name/"Johnny" and they share the same key.

    • PDML=T: Enables parallel execution for DML operations for the database session used in the data loader. This hint is used to improve the performance of long-running batching jobs.

    • PDDL=T: Enables parallel execution for DDL operations for the database session used in the data loader. This hint is used to improve the performance of long-running batching jobs.

    • KEEP_WORK_TABS=T: Skips cleaning and deleting the working tables after the data loading is complete. This is for debugging use only.

    • KEEP_TMP_FILES=T: Skips removing the temporary splitter files after the data loading is complete. This is for debug only.

  • Splitter Flag: An integer value defining the type of files or streams used in the splitting phase to generate the data chunks used in the graph loading phase. The temporary files can be created as regular files (0), named pipes (1), or piped streams (2). By default, JDBC-based data loading uses

    Piped streams to handle intermediate data chunksPiped streams are for JDBC-based loader only. They are purely in-memory and efficient, and do not require any files created on the operating system.

    Regular files consume space on the local operating system, while named pipes appear as empty files on the local operating system. Note that not every operating system has support for named pipes.

  • Split File Prefix: The prefix used for the temporary files or pipes created when the splitting phase is generating the data chunks for the graph loading. By default, the prefix “OPG_Chunk” is used for regular files and “OPG_Pipe” is used for named pipes.

  • Tablespace: The name of the tablespace where all the temporary work tables will be created.

Subtopics:

  • JDBC-Based Data Loading with Multiple Files

  • JDBC-Based Data Loading with Partitions

  • JDBC-based Parallel Data Loading Using Fine-Tuning

JDBC-Based Data Loading with Multiple Files

JDBC-based data loading also supports loading vertices and edges from multiple files or input streams into the database. The following code fragment loads multiple vertex and edge files using the parallel data loading APIs. In the example, two string arrays szOPVFiles and szOPEFiles are used to hold the input files.

    String[] szOPVFiles = new String[] {"../../data/connections-p1.opv", 
                                        "../../data/connections-p2.opv"}; 
    String[] szOPEFiles = new String[] {"../../data/connections-p1.ope",                          
                                        "../../data/connections-p2.ope"}; 
    OraclePropertyGraph opg = OraclePropertyGraph.getInstance( args, szGraphName); 
    opgdl = OraclePropertyGraphDataLoader.getInstance(); 
    opgdl.loadData(opg, szOPVFiles, szOPEFiles, 48 /* DOP */,
                   1000 /* batch size */, 
                   true /* rebuild index flag */, 
                   "pddl=t,pdml=t" /* options */); 

JDBC-Based Data Loading with Partitions

When dealing with graph data from thousands to hundreds of thousands elements, the JDBC-based data loading API allows loading the graph data in Oracle Flat file format into Oracle Database using logical partitioning.

Each partition represents a subset of vertices (or edges) in the graph data file of size is approximately the number of distinct element IDs in the file divided by the number of partitions. Each partition is identified by an integer ID in the range of [0, Number of partitions – 1].

To use parallel data loading with partitions, you must specify the total number of logical partitions to use and the partition offset (start ID) in addition to the base parameters used in the loadData API. To fully load a graph data file or input stream into the database, you must execute the data loading operation as many times as the defined number of partitions. For example, to load the graph data from a file using two partitions, there should be two data loading API calls using an offset of 0 and 1. Each call to the data loader can be processed using multiple threads or a separate Java client on a single system or multiple systems.

Note that this approach is intended to be used with a single vertex file (or input stream) and a single edge file (or input stream). Additionally, this option requires disabling the indices and constraints on vertices and edges. These indices and constraints must be rebuilt after all partitions have been loaded.

The following example loads the graph data using two partitions. Each partition is loaded by one Java process DataLoaderWorker. To coordinate multiple workers, a coordinator process named DataLoaderCoordinator is used. This example does the following

  1. Disables all indexes and constraints,

  2. Creates a temporary working table, loaderProgress, that records the data loading progress (that is, how many workers have finished their work. All DataLoaderWorker processes start loading data after the working table is created.

  3. Increments the progress by 1.

  4. Keeps polling (using the DataLoaderCoordinator process) the progress until all DataLoaderWorker processes are done.

  5. Rebuilds all indexes and constraints.

Note: In DataLoaderWorker, the flag SKIP_INDEX should be set to true and the flag rebuildIndx should be set to false.

// start DataLoaderCoordinator, set dop = 8 and number of partitions = 2
java DataLoaderCoordinator  jdbcUrl  user password pg 8 2
// start the first DataLoaderWorker, set dop = 8, number of partitions = 2, partition offset = 0
java DataLoaderWorker jdbcUrl user password pg  8 2 0
// start the first DataLoaderWorker, set dop = 8, number of partitions = 2, partition offset = 1
java DataLoaderWorker jdbcUrl user password pg  8 2 1

The DataLoaderCoordinator first disables all indexes and constraints. It then creates a table named loaderProgress and inserts one row with column progress = 0.

public class DataLoaderCoordinator {
        public static void main(String[] szArgs) {
          String jdbcUrl = szArgs[0];
          String user = szArgs[1];
          String password = szArgs[2];
          String graphName = szArgs[3];
          int dop = Integer.parseInt(szArgs[4]);
          int numLoaders = Integer.parseInt(szArgs[5]);

          Oracle oracle = null;
          OraclePropertyGraph opg = null;
          try {
            oracle = new Oracle(jdbcUrl, user, password);
            OraclePropertyGraphUtils.dropPropertyGraph(oracle, graphName);
            opg = OraclePropertyGraph.getInstance(oracle, graphName);

            List<String> vIndices = opg.disableVertexTableIndices();
            List<String> vConstraints = opg.disableVertexTableConstraints();
            List<String> eIndices = opg.disableEdgeTableIndices();
            List<String> eConstraints = opg.disableEdgeTableConstraints();

            String szStmt = null;
            try {
              szStmt = "drop table loaderProgress";
              opg.getOracle().executeUpdate(szStmt);
            }
            catch (SQLException ex) {
              if (ex.getErrorCode() == 942) {
                // table does not exist. ignore
              }
              else {
                throw new OraclePropertyGraphException(ex);
              }
            }

            szStmt = "create table loaderProgress (progress integer)";
            opg.getOracle().executeUpdate(szStmt);
            szStmt = "insert into loaderProgress (progress) values (0)";
            opg.getOracle().executeUpdate(szStmt);
            opg.getOracle().getConnection().commit();
            while (true) {
              if (checkLoaderProgress(oracle) == numLoaders) {
                break;
              } else {
                Thread.sleep(1000);
              }
            }

            opg.rebuildVertexTableIndices(vIndices, dop, null);
            opg.rebuildVertexTableConstraints(vConstraints, dop, null);
            opg.rebuildEdgeTableIndices(eIndices, dop, null);
            opg.rebuildEdgeTableConstraints(eConstraints, dop, null);
          }
          catch (IOException ex) {
            throw new OraclePropertyGraphException(ex);
          }
          catch (SQLException ex) {
            throw new OraclePropertyGraphException(ex);
          }
          catch (InterruptedException ex) {
            throw new OraclePropertyGraphException(ex);
          }
          catch (Exception ex) {
            throw new OraclePropertyGraphException(ex);
          }
          finally {
            try {
              if (opg != null) {
                opg.shutdown();
              }
              if (oracle != null) {
                oracle.dispose();
              }
            }
            catch (Throwable t) {
              System.out.println(t);
            }
          }

        }

        private static int checkLoaderProgress(Oracle oracle) {
          int result = 0;
          ResultSet rs = null;

          try {
            String szStmt = "select progress from loaderProgress";
            rs = oracle.executeQuery(szStmt);
            if (rs.next()) {
              result =  rs.getInt(1);
            }

          }
          catch (Exception ex) {
            throw new OraclePropertyGraphException(ex);
          }
          finally {
            try {
              if (rs != null) {
                rs.close();
              }
	    }
            catch (Throwable t) {
              System.out.println(t);
            }
          }
          return result;
        }
}
     
public class DataLoaderWorker {

        public static void main(String[] szArgs) {
          String jdbcUrl = szArgs[0];
          String user = szArgs[1];
          String password = szArgs[2];
          String graphName = szArgs[3];
          int dop = Integer.parseInt(szArgs[4]);
          int numLoaders = Integer.parseInt(szArgs[5]);
          int offset = Integer.parseInt(szArgs[6]);

          Oracle oracle = null;
          OraclePropertyGraph opg = null;

          try {
            oracle = new Oracle(jdbcUrl, user, password);
            opg = OraclePropertyGraph.getInstance(oracle, graphName, 8, dop, null/*tbs*/, ",SKIP_INDEX=T,");
            OraclePropertyGraphDataLoader opgdal = OraclePropertyGraphDataLoader.getInstance();

            while (true) {
              if (checkLoaderProgress(oracle) == 1) {
                break;
              } else {
                Thread.sleep(1000);
              }
            }

            String opvFile = "../../../data/connections.opv";
            String opeFile = "../../../data/connections.ope";
            opgdal.loadData(opg, opvFile, opeFile, dop, numLoaders, offset, 1000, false, null, "pddl=t,pdml=t");

            updateLoaderProgress(oracle);
          }
          catch (SQLException ex) {
            throw new OraclePropertyGraphException(ex);
          }
          catch (InterruptedException ex) {
            throw new OraclePropertyGraphException(ex);
          }
          finally {
            try {
              if (opg != null) {
                opg.shutdown();
              }
              if (oracle != null) {
                oracle.dispose();
              }
            }
            catch (Throwable t) {
              System.out.println(t);
            }
          }
        }

        private static int checkLoaderProgress(Oracle oracle) {
          int result = 0;
          ResultSet rs = null;

          try {
            String szStmt = "select count(*) from loaderProgress";
            rs = oracle.executeQuery(szStmt);
            if (rs.next()) {
              result = rs.getInt(1);
            }
          }
          catch (SQLException ex) {
            if (ex.getErrorCode() == 942) {
              // table does not exist. ignore
            } else {
              throw new OraclePropertyGraphException(ex);
            }
          }
          finally {
            try {
              if (rs != null) {
                rs.close();
              }
            }
            catch (Throwable t) {
              System.out.println(t);
            }
          }
          return result;
        }

        private static void updateLoaderProgress(Oracle oracle) {
          ResultSet rs = null;

          try {
            String szStmt = "update loaderProgress set progress = progress + 1";
            oracle.executeUpdate(szStmt);
            oracle.getConnection().commit();
          }
          catch (Exception ex) {
            throw new OraclePropertyGraphException(ex);
          }
          finally {
            try {
              if (rs != null) {
                rs.close();
              }
            }
            catch (Throwable t) {
              System.out.println(t);
            }
          }
        }
}

JDBC-based Parallel Data Loading Using Fine-Tuning

JDBC-based data loading supports fine-tuning the subset of data from a line to be loaded, as well as the ID offset to use when loading the elements into the property graph instance. You can specify the subset of data to load from a file by specifying the maximum number of lines to read from the file and the offset line number (start position) for both vertices and edges. This way, data will be loaded from the offset line number until the maximum number of lines has been read. IIf the maximum line number is -1, the loading process will scan the data until reaching the end of file.

Because multiple graph data files may have some ID collisions or overlap, the JDBC-based data loading allows you to define a vertex and edge ID offset. This way, the ID of each loaded vertex will be the sum of the original vertex ID and the given vertex ID offset. Similarly, the ID of each loaded edge will be generated from the sum of the original edge ID and the given edge ID offset. Note that the vertices and edge files must be correlated, because the in/out vertex ID for the loaded edges will be modified with respect to the specified vertex ID offset. This operation is supported only in data loading using a single logical partition.

The following code fragment loads the first 100 vertices and edges lines from the given graph data file. In this example, an ID offset 0 is used, which indicates no ID adjustment is performed.

    String szOPVFile = "../../data/connections.opv"; 
    String szOPEFile = "../../data/connections.ope"; 
    // Run the data loading using fine tuning 
    long lVertexOffsetlines = 0; 
    long lEdgeOffsetlines = 0; 
    long lVertexMaxlines = 100; 
    long lEdgeMaxlines = 100;
    long lVIDOffset = 0;
    long lEIDOffset = 0;
    OraclePropertyGraph opg = OraclePropertyGraph.getInstance( args, szGraphName); 
    OraclePropertyGraphDataLoader opgdl = OraclePropertyGraphDataLoader.getInstance();
    
    opgdl.loadData(opg, szOPVFile, szOPEFile, 
                   lVertexOffsetlines /* offset of lines to start loading from 
              partition, default 0 */, 
                   lEdgeOffsetlines /* offset of lines to start loading from 
                   partition, default 0 */, 
      lVertexMaxlines /* maximum number of lines to start loading from 
                        partition, default -1 (all lines in partition) */, 
      lEdgeMaxlines /* maximum number of lines to start loading from 
                       partition, default -1 (all lines in partition) */, 
      lVIDOffset /* vertex ID offset: the vertex ID will be original 
                    vertex ID + offset, default 0 */, 
      lEIDOffset /* edge ID offset: the edge ID will be original edge ID 
                    + offset, default 0 */, 
      4 /* DOP */, 
      1 /* Total number of partitions, default 1 */, 
      0 /* Partition to load: from 0 to totalPartitions - 1, default 0 */, 
      OraclePropertyGraphDataLoader.PIPEDSTREAM /* splitter flag */, 
      "chunkPrefix" /* prefix: the prefix used to generate split chunks 
                       for regular files or named pipes */, 
      1000 /* batch size: batch size of Oracle update in batching mode. 
              Default value is 1000 */, 
      true /* rebuild index */, 
      null /* table space name*/,
      "pddl=t,pdml=t" /* options: enable parallel DDL and DML */);