4.6.1 Synchronizing a PG View Graph

You can synchronize a graph loaded into the graph server (PGX) from a property graph view (PG View) with the changes made to the graph in the database.

The following example shows the steps for synchronizing a PG View using the FlashbackSynchronizer API:
  1. Load the PG View graph into the graph server (PGX) using the readGraphByName() API as shown:
    opg4j> var graph = session.readGraphByName("BANK_GRAPH_VIEW",GraphSource.PG_VIEW,
                     ReadGraphOption.optimizeFor(GraphOptimizedFor.UPDATES),ReadGraphOption.synchronizable())
    graph ==> PgxGraph[name=BANK_GRAPH_VIEW,N=999,E=4993,created=1660275936010]
    PgxGraph graph = session.readGraphByName("BANK_GRAPH_VIEW",GraphSource.PG_VIEW,
                          ReadGraphOption.optimizeFor(GraphOptimizedFor.UPDATES),ReadGraphOption.synchronizable());
    >>> graph = session.read_graph_by_name('BANK_GRAPH_VIEW','pg_view')
  2. Open a new JDBC connection to the database and change the data in the underlying database tables for the PG View graph. For example, the following code updates the database value for one of the edge properties:
    opg4j> var conn = DriverManager.getConnection(<jdbcUrl>,<username>,<password>)
    conn ==> oracle.jdbc.driver.T4CConnection@60f7261f
    opg4j> var stmt = conn.createStatement()
    stmt ==> oracle.jdbc.driver.OracleStatementWrapper@1a914a00
    opg4j> stmt.executeQuery("UPDATE bank_txns SET amount=4000 WHERE txn_id=3")
    $5 ==> oracle.jdbc.driver.ForwardOnlyResultSet@627d5f99
    opg4j> conn.setAutoCommit(false)
    opg4j> conn.commit()
    Connection conn = DriverManager.getConnection(<jdbcUrl>,<username>,<password>);
    Statement stmt = conn.createStatement();
    stmt.executeQuery("UPDATE bank_txns SET amount=4000 WHERE txn_id=3");
    conn.setAutoCommit(false);
    conn.commit();
    >>> conn = opg4py.pgql.get_connection(<username>,<password>, <jdbcUrl>).get_jdbc_connection()
    >>> conn.prepareStatement("UPDATE bank_txns SET amount=4000 WHERE txn_id=3").execute()
    False
    >>> conn.commit()
    
    Committing the changes to the database causes the graph in the memory to go out of sync with the database source tables.
  3. Synchronize the in-memory graph with the database by creating a new synchronizer object as shown in the following code:
    Synchronizer synchronizer = new Synchronizer.Builder<FlashbackSynchronizer>()
        .setType(FlashbackSynchronizer.class)
        .setGraph(graph)
        .setConnection(conn)
        .build();

    Internally, the graph server keeps track of the Oracle system change number (SCN) the current graph snapshot belongs to. The synchronizer is a client-side component which connects to the database, detects changes by comparing state of the original input tables using the current SCN via the flashback mechanism and then sends any changes to the graph server using the changeset API. In order to do so, the synchronizer needs to know how to connect to the database (conn parameter) as well as which graph to keep in sync (graph parameter).

    Alternatively, you can use this equivalent shortcut as shown:

    opg4j>  var synchronizer = graph.createSynchronizer(FlashbackSynchronizer.class, conn)
    synchronizer ==> oracle.pgx.api.FlashbackSynchronizer@4ac2b4c6
    Synchronizer synchronizer = graph.createSynchronizer(FlashbackSynchronizer.class, conn);
    >>> synchronizer = graph.create_synchronizer(synchronizer_class='oracle.pgx.api.FlashbackSynchronizer', connection=conn)
  4. Fetch and apply the database changes by calling the sync() function and create a new in-memory graph snapshot:
    opg4j> graph=synchronizer.sync()
    g ==> PgxGraph[name=BANK_GRAPH_VIEW,N=999,E=4993,created=1660308128037]
    graph=synchronizer.sync();
    >>> graph = synchronizer.sync()

    Note that the Synchronizer object needs to be created only once per session. Once created, you can perform the synchronizer.sync() operation multiple times to generate the latest graph snapshot that is consistent with the changes in the database.

    Splitting the Fetching and Applying of Changes

    The synchronizer.sync() invocation in the preceding code, fetches the changes and applies them in one call. However, you can encode a more complex update logic by splitting this process into separate fetch() and apply() invocations. For example:

    synchronizer.fetch() // fetches changes from the database
    if (synchronizer.getGraphDelta().getTotalNumberOfChanges() > 100) {  // only create snapshot if there have been more than 100 changes
      synchronizer.apply()
    }
  5. Query the graph to verify the updates to the edge property.
    opg4j> graph.queryPgql("SELECT e.amount FROM MATCH (v1:Accounts)-[e:Transfers]->(v2:Accounts) WHERE e.from_acct_id = 179 AND e.to_acct_id=688").print()
    graph.queryPgql("SELECT e.amount FROM MATCH (v1:Accounts)-[e:Transfers]->(v2:Accounts) WHERE e.from_acct_id = 179 AND e.to_acct_id=688").print();
    >>> graph.query_pgql("SELECT e.amount FROM MATCH (v1:Accounts)-[e:Transfers]->(v2:Accounts) WHERE e.from_acct_id = 179 AND e.to_acct_id=688").print()
    On execution, the preceding example produces the following output:
    +--------+
    | amount |
    +--------+
    | 4000.0 |
    +--------+