17.4 Keeping the Graph in Oracle Database Synchronized with the Graph Server

You can use the FlashbackSynchronizer API to automatically apply changes made to graph in the database to the corresponding PgxGraph object in memory, thus keeping both synchronized.

This API uses Oracle's Flashback Technology to fetch the changes in the database since the last fetch and then push those changes into the graph server using the ChangeSet API. After the changes are applied, the usual snapshot semantics of the graph server apply: each delta fetch application creates a new in-memory snapshot. Any queries or algorithms that are executing concurrently to snapshot creation are unaffected by the changes until the corresponding session refreshes its PgxGraph object to the latest state by calling the session.setSnapshot(graph, PgxSession.LATEST_SNAPSHOT) procedure.

Also, if the changes from the previous fetch operation no longer exist, then the synchronizer will throw an exception. This occurs if the previous fetch duration is longer than the UNDO_RETENTION parameter setting in the database. To avoid this exception, ensure to fetch the changes at intervals less than the UNDO_RETENTION parameter value. The default setting for the UNDO_RETENTION parameter is 900 seconds. See Oracle Database Reference for more information.

Prerequisites for Synchronizing

The Oracle database must have Flashback enabled and the database user that you use to perform synchronization must have:

  • Read access to all tables which need to be kept synchronized.
  • Permission to use flashback APIs. For example:
    GRANT EXECUTE ON DBMS_FLASHBACK TO <user>

The database must also be configured to retain changes for the amount of time needed by your use case.

Types of graphs that can be synchronized

Not all PgxGraph objects in PGX can be synchronized. The following limitations apply:

  • Only the original creator of the graph can synchronize it. That is, the current user must have the MANAGE permission of the graph.

  • Only graphs loaded from database tables (PGQL property graphs and SQL property graphs) can be synchronized. Graphs created from other formats or graphs created via the graph builder API or PGQL property graphs created from database views cannot be synchronized.
  • Only the latest snapshot of a graph can be synchronized.

Types of changes that can be synchronized

The synchronizer supports keeping the in-memory graph snapshot in sync with the following database-side modifications:

  • insertion of new vertices and edges
  • removal of existing vertices and edges
  • update of property values of any vertex or edge

The synchronizer does not support schema-level changes to the input graph, such as:

  • alteration of the list of input vertex or edge tables
  • alteration of any columns of any input tables (vertex or edge tables)

Furthermore, the synchronizer does not support updates to vertex and edge keys.

For a detailed example, see the following topic:

17.4.1 Synchronizing a SQL Property Graph

You can synchronize a SQL property graph that is loaded into the graph server (PGX) with the changes made to the graph data in the database.

The following example shows the steps for synchronizing a SQL property graph using the FlashbackSynchronizer API:
  1. Load the SQL property graph into the graph server (PGX) using the readGraphByName() API as shown:
    opg4j> var graph = session.readGraphByName("BANK_SQL_PG", GraphSource.PG_SQL,
    ...>                   ReadGraphOption.optimizeFor(GraphOptimizedFor.UPDATES),
    ...>                   ReadGraphOption.synchronizable())
    graph ==> PgxGraph[name=BANK_SQL_PG_2,N=1000,E=5001,created=1697259571499
    PgxGraph graph = session.readGraphByName("BANK_SQL_PG",GraphSource.PG_SQL,
                       ReadGraphOption.optimizeFor(GraphOptimizedFor.UPDATES),
                       ReadGraphOption.synchronizable());
    >>> graph = session.read_graph_by_name('BANK_SQL_PG', 'pg_sql',
    ...              options=['optimized_for_updates', 'synchronizable'])
  2. Open a new JDBC connection to the database and change the data in the underlying database tables for the SQL property graph. For example, the following code updates the database value for one of the edge properties:
    opg4j> var conn = DriverManager.getConnection(<jdbcUrl>,<username>,<password>)
    conn ==> oracle.jdbc.driver.T4CConnection@738e79ec
    opg4j> var stmt = conn.createStatement()
    stmt ==> oracle.jdbc.driver.OracleStatementWrapper@71f056a
    opg4j> stmt.executeQuery("UPDATE bank_transfers SET amount=2000 WHERE txn_id=2")
    $8 ==> oracle.jdbc.driver.ForwardOnlyResultSet@19b0a9f2
    opg4j> conn.commit()
    Connection conn = DriverManager.getConnection(<jdbcUrl>,<username>,<password>);
    Statement stmt = conn.createStatement();
    stmt.executeQuery("UPDATE bank_transfers SET amount=2000 WHERE txn_id=2");
    conn.commit();
    >>> conn = opg4py.pgql.get_connection(<username>,<password>, <jdbcUrl>).get_jdbc_connection()
    >>> conn.prepareStatement("UPDATE bank_transfers SET amount=2000 WHERE txn_id=2").execute()
    False
    >>> conn.commit()
    
    Committing the changes to the database causes the graph in the memory to go out of sync with the database source tables.
  3. Synchronize the in-memory graph with the database by creating a new synchronizer object as shown in the following code:
    opg4j>  var synchronizer = graph.createSynchronizer(FlashbackSynchronizer.class, conn)
    synchronizer ==> oracle.pgx.api.FlashbackSynchronizer@5f65e0c0
    Synchronizer synchronizer = graph.createSynchronizer(FlashbackSynchronizer.class, conn);
    >>> synchronizer = graph.create_synchronizer(synchronizer_class='oracle.pgx.api.FlashbackSynchronizer',
    ...              jdbc_url=<jdbcUrl>, username=<username>, password=<password>)
  4. Fetch and apply the database changes by calling the sync() function and create a new in-memory graph snapshot:
    opg4j> graph=synchronizer.sync()
    graph ==> PgxGraph[name=BANK_SQL_PG,N=1000,E=5001,created=1696332603804]
    graph=synchronizer.sync();
    graph=synchronizer.sync()

    Note that the Synchronizer object needs to be created only once per session. Once created, you can perform the synchronizer.sync() operation multiple times to generate the latest graph snapshot that is consistent with the changes in the database.

  5. Query the graph to verify the updates to the edge property.
    opg4j> graph.queryPgql("SELECT e.amount FROM MATCH (v1:Accounts)-[e:Transfers]->(v2:Accounts) WHERE e.from_acct_id = 237 AND e.to_acct_id=777").print()
    graph.queryPgql("SELECT e.amount FROM MATCH (v1:Accounts)-[e:Transfers]->(v2:Accounts) WHERE e.from_acct_id = 237 AND e.to_acct_id=777").print();
    >>> graph.query_pgql("SELECT e.amount FROM MATCH (v1:Accounts)-[e:Transfers]->(v2:Accounts) WHERE v1.id = 237 AND v2.id=777").print()
    On execution, the preceding example produces the following output:
    +--------+
    | amount |
    +--------+
    | 2000.0 |
    +--------+

17.4.2 Synchronizing a PGQL Property Graph

You can synchronize a PGQL property graph loaded into the graph server (PGX) with the changes made to the graph data in the database.

The following example shows the steps for synchronizing a PGQL property graph using the FlashbackSynchronizer API:
  1. Load the PGQL property graph into the graph server (PGX) using the readGraphByName() API as shown:
    opg4j> var graph = session.readGraphByName("BANK_GRAPH",GraphSource.PG_PGQL,
                     ReadGraphOption.optimizeFor(GraphOptimizedFor.UPDATES),ReadGraphOption.synchronizable())
    graph ==> PgxGraph[name=BANK_GRAPH,N=999,E=4993,created=1660275936010]
    PgxGraph graph = session.readGraphByName("BANK_GRAPH",GraphSource.PG_PGQL,
                          ReadGraphOption.optimizeFor(GraphOptimizedFor.UPDATES),ReadGraphOption.synchronizable());
    >>> graph = session.read_graph_by_name('BANK_GRAPH','pg_pgql',
    ...              options=['optimized_for_updates', 'synchronizable'])
  2. Open a new JDBC connection to the database and change the data in the underlying database tables for the PGQL property graph. For example, the following code updates the database value for one of the edge properties:
    opg4j> var conn = DriverManager.getConnection(<jdbcUrl>,<username>,<password>)
    conn ==> oracle.jdbc.driver.T4CConnection@60f7261f
    opg4j> var stmt = conn.createStatement()
    stmt ==> oracle.jdbc.driver.OracleStatementWrapper@1a914a00
    opg4j> stmt.executeQuery("UPDATE bank_transfers SET amount=4000 WHERE txn_id=3")
    $5 ==> oracle.jdbc.driver.ForwardOnlyResultSet@627d5f99
    opg4j> conn.setAutoCommit(false)
    opg4j> conn.commit()
    Connection conn = DriverManager.getConnection(<jdbcUrl>,<username>,<password>);
    Statement stmt = conn.createStatement();
    stmt.executeQuery("UPDATE bank_transfers SET amount=4000 WHERE txn_id=3");
    conn.setAutoCommit(false);
    conn.commit();
    >>> conn = opg4py.pgql.get_connection(<username>,<password>, <jdbc_url>).get_jdbc_connection()
    >>> conn.prepareStatement("UPDATE bank_transfers SET amount=4000 WHERE txn_id=3").execute()
    False
    >>> conn.commit()
    
    Committing the changes to the database causes the graph in the memory to go out of sync with the database source tables.
  3. Synchronize the in-memory graph with the database by creating a new synchronizer object as shown in the following code:
    Synchronizer synchronizer = new Synchronizer.Builder<FlashbackSynchronizer>()
        .setType(FlashbackSynchronizer.class)
        .setGraph(graph)
        .setConnection(conn)
        .setParallelHintDegree(4)
        .build();

    Internally, the graph server keeps track of the Oracle system change number (SCN) to which the current graph snapshot belongs. The synchronizer is a client-side component which connects to the database, detects changes by comparing state of the original input tables using the current SCN via the flashback mechanism and then sends any changes to the graph server using the changeset API. In order to do so, the synchronizer needs to know how to connect to the database (conn parameter) as well as which graph to keep in sync (graph parameter).

    You can specify the degree of parallelism in the Flashback Synchronizer builder using the setParallelHintDegree API. The specified parallel hint degree will be taken into account by the Flashback Synchronizer when executing the SQL queries.

    Alternatively, you can use this equivalent shortcut as shown:

    opg4j>  var synchronizer = graph.createSynchronizer(FlashbackSynchronizer.class, conn)
    synchronizer ==> oracle.pgx.api.FlashbackSynchronizer@4ac2b4c6
    Synchronizer synchronizer = graph.createSynchronizer(FlashbackSynchronizer.class, conn);
    >>> synchronizer = graph.create_synchronizer(synchronizer_class='oracle.pgx.api.FlashbackSynchronizer', jdbc_url=<jdbc_url>, username=<username>, password=<password>)
  4. Fetch and apply the database changes by calling the sync() function and create a new in-memory graph snapshot:
    opg4j> graph=synchronizer.sync()
    g ==> PgxGraph[name=BANK_GRAPH,N=999,E=4993,created=1660308128037]
    graph=synchronizer.sync();
    >>> graph = synchronizer.sync()

    Note that the Synchronizer object needs to be created only once per session. Once created, you can perform the synchronizer.sync() operation multiple times to generate the latest graph snapshot that is consistent with the changes in the database.

    Splitting the Fetching and Applying of Changes

    The synchronizer.sync() invocation in the preceding code, fetches the changes and applies them in one call. However, you can encode a more complex update logic by splitting this process into separate fetch() and apply() invocations. For example:

    synchronizer.fetch(); // fetches changes from the database
    if (synchronizer.getGraphDelta().getTotalNumberOfChanges() > 100) {  // only create snapshot if there have been more than 100 changes
      synchronizer.apply();
    }
  5. Query the graph to verify the updates to the edge property.
    opg4j> graph.queryPgql("SELECT e.amount FROM MATCH (v1:Accounts)-[e:Transfers]->(v2:Accounts) WHERE e.src_acct_id = 179 AND e.dst_acct_id=688").print()
    graph.queryPgql("SELECT e.amount FROM MATCH (v1:Accounts)-[e:Transfers]->(v2:Accounts) WHERE e.src_acct_id = 179 AND e.dst_acct_id=688").print();
    >>> graph.query_pgql("SELECT e.amount FROM MATCH (v1:Accounts)-[e:Transfers]->(v2:Accounts) WHERE e.src_acct_id = 179 AND e.dst_acct_id=688").print()
    On execution, the preceding example produces the following output:
    +--------+
    | amount |
    +--------+
    | 4000.0 |
    +--------+