16.4 Keeping the Graph in Oracle Database Synchronized with the Graph Server

You can use the FlashbackSynchronizer API to automatically apply changes made to graph in the database to the corresponding PgxGraph object in memory, thus keeping both synchronized.

This API uses Oracle's Flashback Technology to fetch the changes in the database since the last fetch and then push those changes into the graph server using the ChangeSet API. After the changes are applied, the usual snapshot semantics of the graph server apply: each delta fetch application creates a new in-memory snapshot. Any queries or algorithms that are executing concurrently to snapshot creation are unaffected by the changes until the corresponding session refreshes its PgxGraph object to the latest state by calling the session.setSnapshot(graph, PgxSession.LATEST_SNAPSHOT) procedure.

Also, if the changes from the previous fetch operation no longer exist, then the synchronizer will throw an exception. This occurs if the previous fetch duration is longer than the UNDO_RETENTION parameter setting in the database. To avoid this exception, ensure to fetch the changes at intervals less than the UNDO_RETENTION parameter value. The default setting for the UNDO_RETENTION parameter is 900 seconds. See Oracle Database Reference for more information.

Prerequisites for Synchronizing

The Oracle database must have Flashback enabled and the database user that you use to perform synchronization must have:

  • Read access to all tables which need to be kept synchronized.
  • Permission to use flashback APIs. For example:
    GRANT EXECUTE ON DBMS_FLASHBACK TO <user>

The database must also be configured to retain changes for the amount of time needed by your use case.

Types of graphs that can be synchronized

Not all PgxGraph objects in PGX can be synchronized. The following limitations apply:

  • Only the original creator of the graph can synchronize it. That is, the current user must have the MANAGE permission of the graph.

  • Only graphs loaded from database tables (PGQL property graphs and SQL property graphs) can be synchronized. Graphs created from other formats or graphs created via the graph builder API or PGQL property graphs created from database views cannot be synchronized.
  • Only the latest snapshot of a graph can be synchronized.

Types of changes that can be synchronized

The synchronizer supports keeping the in-memory graph snapshot in sync with the following database-side modifications:

  • insertion of new vertices and edges
  • removal of existing vertices and edges
  • update of property values of any vertex or edge

The synchronizer does not support schema-level changes to the input graph, such as:

  • alteration of the list of input vertex or edge tables
  • alteration of any columns of any input tables (vertex or edge tables)

Furthermore, the synchronizer does not support updates to vertex and edge keys.

For a detailed example, see the following topic:

16.4.1 Synchronizing a SQL Property Graph

You can synchronize a SQL property graph that is loaded into the graph server (PGX) with the changes made to the graph data in the database.

The following example shows the steps for synchronizing a SQL property graph using the FlashbackSynchronizer API:
  1. Load the SQL property graph into the graph server (PGX) using the readGraphByName() API as shown:
    opg4j> var graph = session.readGraphByName("BANK_SQL_PG", GraphSource.PG_SQL,
    ...>                   ReadGraphOption.optimizeFor(GraphOptimizedFor.UPDATES),
    ...>                   ReadGraphOption.synchronizable())
    graph ==> PgxGraph[name=BANK_SQL_PG_2,N=1000,E=5001,created=1697259571499
    PgxGraph graph = session.readGraphByName("BANK_SQL_PG",GraphSource.PG_SQL,
                       ReadGraphOption.optimizeFor(GraphOptimizedFor.UPDATES),
                       ReadGraphOption.synchronizable());
    >>> graph = session.read_graph_by_name('BANK_SQL_PG', 'pg_sql',
    ...              options=['optimized_for_updates', 'synchronizable'])
  2. Open a new JDBC connection to the database and change the data in the underlying database tables for the SQL property graph. For example, the following code updates the database value for one of the edge properties:
    opg4j> var conn = DriverManager.getConnection(<jdbcUrl>,<username>,<password>)
    conn ==> oracle.jdbc.driver.T4CConnection@738e79ec
    opg4j> var stmt = conn.createStatement()
    stmt ==> oracle.jdbc.driver.OracleStatementWrapper@71f056a
    opg4j> stmt.executeQuery("UPDATE bank_txns SET amount=2000 WHERE txn_id=2")
    $8 ==> oracle.jdbc.driver.ForwardOnlyResultSet@19b0a9f2
    opg4j> conn.commit()
    Connection conn = DriverManager.getConnection(<jdbcUrl>,<username>,<password>);
    Statement stmt = conn.createStatement();
    stmt.executeQuery("UPDATE bank_txns SET amount=2000 WHERE txn_id=2");
    conn.commit();
    >>> conn = opg4py.pgql.get_connection(<username>,<password>, <jdbcUrl>).get_jdbc_connection()
    >>> conn.prepareStatement("UPDATE bank_txns SET amount=2000 WHERE txn_id=2").execute()
    False
    >>> conn.commit()
    
    Committing the changes to the database causes the graph in the memory to go out of sync with the database source tables.
  3. Synchronize the in-memory graph with the database by creating a new synchronizer object as shown in the following code:
    opg4j>  var synchronizer = graph.createSynchronizer(FlashbackSynchronizer.class, conn)
    synchronizer ==> oracle.pgx.api.FlashbackSynchronizer@5f65e0c0
    Synchronizer synchronizer = graph.createSynchronizer(FlashbackSynchronizer.class, conn);
    >>> synchronizer = graph.create_synchronizer(synchronizer_class='oracle.pgx.api.FlashbackSynchronizer',
    ...              jdbc_url=<jdbcUrl>, username=<username>, password=<password>)
  4. Fetch and apply the database changes by calling the sync() function and create a new in-memory graph snapshot:
    opg4j> graph=synchronizer.sync()
    graph ==> PgxGraph[name=BANK_SQL_PG,N=1000,E=5001,created=1696332603804]
    graph=synchronizer.sync();
    graph=synchronizer.sync()

    Note that the Synchronizer object needs to be created only once per session. Once created, you can perform the synchronizer.sync() operation multiple times to generate the latest graph snapshot that is consistent with the changes in the database.

  5. Query the graph to verify the updates to the edge property.
    opg4j> graph.queryPgql("SELECT e.amount FROM MATCH (v1:Account)-[e:Transfer]->(v2:Account) WHERE e.from_acct_id = 237 AND e.to_acct_id=777").print()
    graph.queryPgql("SELECT e.amount FROM MATCH (v1:Accounts)-[e:Transfers]->(v2:Accounts) WHERE e.from_acct_id = 237 AND e.to_acct_id=777").print();
    >>> graph.query_pgql("SELECT e.amount FROM MATCH (v1:Account)-[e:Transfer]->(v2:Account) WHERE v1.id = 237 AND v2.id=777").print()
    On execution, the preceding example produces the following output:
    +--------+
    | amount |
    +--------+
    | 2000.0 |
    +--------+

16.4.2 Synchronizing a PGQL Property Graph

You can synchronize a PGQL property graph loaded into the graph server (PGX) with the changes made to the graph data in the database.

The following example shows the steps for synchronizing a PGQL property graph using the FlashbackSynchronizer API:
  1. Load the PGQL property graph into the graph server (PGX) using the readGraphByName() API as shown:
    opg4j> var graph = session.readGraphByName("BANK_GRAPH",GraphSource.PG_PGQL,
                     ReadGraphOption.optimizeFor(GraphOptimizedFor.UPDATES),ReadGraphOption.synchronizable())
    graph ==> PgxGraph[name=BANK_GRAPH,N=999,E=4993,created=1660275936010]
    PgxGraph graph = session.readGraphByName("BANK_GRAPH",GraphSource.PG_PGQL,
                          ReadGraphOption.optimizeFor(GraphOptimizedFor.UPDATES),ReadGraphOption.synchronizable());
    >>> graph = session.read_graph_by_name('BANK_GRAPH','pg_pgql',
    ...              options=['optimized_for_updates', 'synchronizable'])
  2. Open a new JDBC connection to the database and change the data in the underlying database tables for the PGQL property graph. For example, the following code updates the database value for one of the edge properties:
    opg4j> var conn = DriverManager.getConnection(<jdbcUrl>,<username>,<password>)
    conn ==> oracle.jdbc.driver.T4CConnection@60f7261f
    opg4j> var stmt = conn.createStatement()
    stmt ==> oracle.jdbc.driver.OracleStatementWrapper@1a914a00
    opg4j> stmt.executeQuery("UPDATE bank_txns SET amount=4000 WHERE txn_id=3")
    $5 ==> oracle.jdbc.driver.ForwardOnlyResultSet@627d5f99
    opg4j> conn.setAutoCommit(false)
    opg4j> conn.commit()
    Connection conn = DriverManager.getConnection(<jdbcUrl>,<username>,<password>);
    Statement stmt = conn.createStatement();
    stmt.executeQuery("UPDATE bank_txns SET amount=4000 WHERE txn_id=3");
    conn.setAutoCommit(false);
    conn.commit();
    >>> conn = opg4py.pgql.get_connection(<username>,<password>, <jdbc_url>).get_jdbc_connection()
    >>> conn.prepareStatement("UPDATE bank_txns SET amount=4000 WHERE txn_id=3").execute()
    False
    >>> conn.commit()
    
    Committing the changes to the database causes the graph in the memory to go out of sync with the database source tables.
  3. Synchronize the in-memory graph with the database by creating a new synchronizer object as shown in the following code:
    Synchronizer synchronizer = new Synchronizer.Builder<FlashbackSynchronizer>()
        .setType(FlashbackSynchronizer.class)
        .setGraph(graph)
        .setConnection(conn)
        .setParallelHintDegree(4)
        .build();

    Internally, the graph server keeps track of the Oracle system change number (SCN) to which the current graph snapshot belongs. The synchronizer is a client-side component which connects to the database, detects changes by comparing state of the original input tables using the current SCN via the flashback mechanism and then sends any changes to the graph server using the changeset API. In order to do so, the synchronizer needs to know how to connect to the database (conn parameter) as well as which graph to keep in sync (graph parameter).

    You can specify the degree of parallelism in the Flashback Synchronizer builder using the setParallelHintDegree API. The specified parallel hint degree will be taken into account by the Flashback Synchronizer when executing the SQL queries.

    Alternatively, you can use this equivalent shortcut as shown:

    opg4j>  var synchronizer = graph.createSynchronizer(FlashbackSynchronizer.class, conn)
    synchronizer ==> oracle.pgx.api.FlashbackSynchronizer@4ac2b4c6
    Synchronizer synchronizer = graph.createSynchronizer(FlashbackSynchronizer.class, conn);
    >>> synchronizer = graph.create_synchronizer(synchronizer_class='oracle.pgx.api.FlashbackSynchronizer', jdbc_url=<jdbc_url>, username=<username>, password=<password>)
  4. Fetch and apply the database changes by calling the sync() function and create a new in-memory graph snapshot:
    opg4j> graph=synchronizer.sync()
    g ==> PgxGraph[name=BANK_GRAPH,N=999,E=4993,created=1660308128037]
    graph=synchronizer.sync();
    >>> graph = synchronizer.sync()

    Note that the Synchronizer object needs to be created only once per session. Once created, you can perform the synchronizer.sync() operation multiple times to generate the latest graph snapshot that is consistent with the changes in the database.

    Splitting the Fetching and Applying of Changes

    The synchronizer.sync() invocation in the preceding code, fetches the changes and applies them in one call. However, you can encode a more complex update logic by splitting this process into separate fetch() and apply() invocations. For example:

    synchronizer.fetch(); // fetches changes from the database
    if (synchronizer.getGraphDelta().getTotalNumberOfChanges() > 100) {  // only create snapshot if there have been more than 100 changes
      synchronizer.apply();
    }
  5. Query the graph to verify the updates to the edge property.
    opg4j> graph.queryPgql("SELECT e.amount FROM MATCH (v1:Accounts)-[e:Transfers]->(v2:Accounts) WHERE e.from_acct_id = 179 AND e.to_acct_id=688").print()
    graph.queryPgql("SELECT e.amount FROM MATCH (v1:Accounts)-[e:Transfers]->(v2:Accounts) WHERE e.from_acct_id = 179 AND e.to_acct_id=688").print();
    >>> graph.query_pgql("SELECT e.amount FROM MATCH (v1:Accounts)-[e:Transfers]->(v2:Accounts) WHERE e.from_acct_id = 179 AND e.to_acct_id=688").print()
    On execution, the preceding example produces the following output:
    +--------+
    | amount |
    +--------+
    | 4000.0 |
    +--------+

16.4.3 Synchronizing a Published Graph

You can synchronize a published graph by configuring the Flashback Synchronizer with a PartitionedGraphConfig object containing the graph schema along with the database connection details.

The PartitionedGraphConfig object can be created either through the PartitionedGraphConfigBuilder API or by reading the graph configuration from a JSON file.

Though synchronization of graphs created via graph configuration objects is supported in general, the following few limitations apply:

  • Only partitioned graph configurations with all providers being database tables are supported.
  • Each edge or vertex provider or both must specify the owner of the table by setting the username field. For example, if user SCOTT owns the table, then set the user name accordingly for the providers.
  • Snapshot source must be set to CHANGE_SET.
  • It is highly recommended to optimize the graph for update operations in order to avoid memory exhaustion when creating many snapshots.

The following example shows the sample configuration for creating the PartitionedGraphConfig object:

{
  ...
  "optimized_for": "updates",
  "vertex_providers": [
      ...
      "username":"<username>",
      ...
  ],
  "edge_providers": [
      ...
      "username":"<username>",
      ...
  ],
  "loading": {
    "snapshots_source": "change_set"
  }
}
GraphConfig cfg = GraphConfigBuilder.forPartitioned()
                      …
                      .setUsername("<username>")
                      .setSnapshotsSource(SnapshotsSource.CHANGE_SET)
                      .setOptimizedFor(GraphOptimizedFor.UPDATES)
                      ...
                      .build();
As a prerequisite requirement, you must have a graph that is published in an earlier session. For example:
opg4j> var graph = session.readGraphWithProperties("<path_to_json_config_file>")
graph ==> PgxGraph[name=bank_graph_analytics_fb,N=999,E=4993,created=1664310157103]
opg4j> graph.publishWithSnapshots()
PgxGraph graph = session.readGraphWithProperties("<path_to_json_config_file>");
graph.publishWithSnapshots();
>>> graph = session.read_graph_with_properties("<path_to_json_config_file>")
>>> graph.publish_with_snapshots()

You can now perform the following steps to synchronize the published graph using a graph configuration object which is built from a JSON file.

  1. Get the published graph as shown:
    opg4j> var graph = session.getGraph("bank_graph")
    graph ==> PgxGraph[name=bank_graph_analytics_fb,N=999,E=4993,created=1664310157103]
    PgxGraph graph = session.getGraph("bank_graph");
    >>> graph = session.get_graph("bank_graph")
  2. Build the graph configuration object using a JSON file path as shown:
    opg4j> var cfg = GraphConfigFactory.forPartitioned().fromFilePath("path_to_json_config_file")
    cfg ==> {"edge_providers":[{"destination_vertex_provider":"Accounts","database_table_name":"BANK_TXNS","name":"Transfers","key_type":"long",
    "props":[{"type":"float","name":"AMOUNT"},{"type":"string","name":"DESCRIPTION"}],"format":"rdbms","source_vertex_provider":"Accounts",
    "source_column":"FROM_ACCT_ID","key_column":"TXN_ID","destination_column":"TO_ACCT_ID","loading":{"create_key_mapping":true}}],
    "loading":{"snapshots_source":"CHANGE_SET"},"name":"bank_graph","vertex_providers":[{"database_table_name":"BANK_ACCOUNTS",
    "key_column":"ID","name":"Accounts","key_type":"integer","props":[{"type":"integer","name":"ID"},{"type":"string","name":"NAME"}],
    "loading":{"create_key_mapping":true},"format":"rdbms"}]}
    PartitionedGraphConfig cfg = GraphConfigFactory.forPartitioned().fromFilePath("path_to_json_config_file");
    >>> from pypgx.api import GraphConfigFactory
    >>> cfg = GraphConfigFactory.for_partitioned().from_file_path("path_to_json_config_file")
    Alternatively, you can also build the graph configuration object using the GraphConfigBuilder API as shown in Loading a Graph by Defining a Graph Configuration Object.
  3. Change the data in the database table using the JDBC connection:
    opg4j> var conn = DriverManager.getConnection(<jdbcUrl>,<username>,<password>)
    conn ==> oracle.jdbc.driver.T4CConnection@60f7261f
    opg4j> var stmt = conn.createStatement()
    stmt ==> oracle.jdbc.driver.OracleStatementWrapper@1a914a00
    opg4j> stmt.executeQuery("UPDATE bank_txns SET amount=9000 WHERE txn_id=3")
    $5 ==> oracle.jdbc.driver.ForwardOnlyResultSet@627d5f99
    opg4j> conn.setAutoCommit(false)
    opg4j> conn.commit()
    Connection conn = DriverManager.getConnection(<jdbcUrl>,<username>,<password>);
    Statement stmt = conn.createStatement();
    stmt.executeQuery("UPDATE bank_txns SET amount=9000 WHERE txn_id=3");
    conn.setAutoCommit(false);
    conn.commit();
    >>> conn = opg4py.pgql.get_connection("graphuser","graphuser", "jdbc:oracle:thin:@localhost:1521/orclpdb").get_jdbc_connection()
    >>> conn.prepareStatement("UPDATE bank_txns SET amount=9000 WHERE txn_id=3").execute()
    False
    >>> conn.commit()
  4. Configure the Flashback synchronizer using the graph configuration object and the connection details:
    opg4j> var synchronizer = new Synchronizer.Builder<FlashbackSynchronizer>().
    ...>      setType(FlashbackSynchronizer.class).
    ...>      setGraph(graph).
    ...>      setConnection(conn).
    ...>      setGraphConfiguration(cfg).
    ...>      build()
    synchronizer ==> oracle.pgx.api.FlashbackSynchronizer@1f122cbb
    Synchronizer synchronizer = new Synchronizer.Builder<FlashbackSynchronizer>()
        .setType(FlashbackSynchronizer.class)
        .setGraph(graph)
        .setConnection(conn)
        .setGraphConfiguration(cfg)
        .build();
    >>> synchronizer = graph.create_synchronizer(synchronizer_class='oracle.pgx.api.FlashbackSynchronizer',
                              jdbc_url=<jdbc_url>, username=<username>, password=<password>, graph_config=cfg)
  5. Synchronize the published graph as shown:
    opg4j> graph=synchronizer.sync()
    graph ==> PgxGraph[name=bank_graph,N=999,E=4993,created=1664454171605]
    graph=synchronizer.sync();
    >>> graph = synchronizer.sync()
  6. Query the graph to verify the updates to the edge property.
    opg4j> graph.queryPgql("SELECT e.amount FROM MATCH (v1:Accounts)-[e:Transfers]->(v2:Accounts) WHERE v1.ID=179 and v2.ID=688").print()
    graph.queryPgql("SELECT e.amount FROM MATCH (v1:Accounts)-[e:Transfers]->(v2:Accounts) WHERE v1.ID=179 and v2.ID=688").print();
    graph.query_pgql("SELECT e.amount FROM MATCH (v1:Accounts)-[e:Transfers]->(v2:Accounts) WHERE v1.ID=179 and v2.ID=688").print();
    On execution, the preceding example produces the following output:
    +--------+
    | amount |
    +--------+
    | 9000.0 |
    +--------+