- Graph Developer's Guide for Property Graph
- Using the Graph Server (PGX)
- Developing Applications with Graph Analytics
- Keeping the Graph in Oracle Database Synchronized with the Graph Server
- Synchronizing a PGQL Property Graph
16.3.1 Synchronizing a PGQL Property Graph
You can synchronize a PGQL property graph loaded into the graph server (PGX) with the changes made to the graph data in the database.
- Load the PGQL property graph into the graph
server (PGX) using the
readGraphByName()
API as shown:opg4j> var graph = session.readGraphByName("BANK_GRAPH",GraphSource.PG_VIEW, ReadGraphOption.optimizeFor(GraphOptimizedFor.UPDATES),ReadGraphOption.synchronizable()) graph ==> PgxGraph[name=BANK_GRAPH,N=999,E=4993,created=1660275936010]
PgxGraph graph = session.readGraphByName("BANK_GRAPH",GraphSource.PG_VIEW, ReadGraphOption.optimizeFor(GraphOptimizedFor.UPDATES),ReadGraphOption.synchronizable());
>>> graph = session.read_graph_by_name('BANK_GRAPH','pg_view')
- Open a new JDBC connection to the database and change the data in the
underlying database tables for the PGQL property graph. For example, the following code updates the database value for
one of the edge properties:
opg4j> var conn = DriverManager.getConnection(<jdbcUrl>,<username>,<password>) conn ==> oracle.jdbc.driver.T4CConnection@60f7261f opg4j> var stmt = conn.createStatement() stmt ==> oracle.jdbc.driver.OracleStatementWrapper@1a914a00 opg4j> stmt.executeQuery("UPDATE bank_txns SET amount=4000 WHERE txn_id=3") $5 ==> oracle.jdbc.driver.ForwardOnlyResultSet@627d5f99 opg4j> conn.setAutoCommit(false) opg4j> conn.commit()
Connection conn = DriverManager.getConnection(<jdbcUrl>,<username>,<password>); Statement stmt = conn.createStatement(); stmt.executeQuery("UPDATE bank_txns SET amount=4000 WHERE txn_id=3"); conn.setAutoCommit(false); conn.commit();
>>> conn = opg4py.pgql.get_connection(<username>,<password>, <jdbcUrl>).get_jdbc_connection() >>> conn.prepareStatement("UPDATE bank_txns SET amount=4000 WHERE txn_id=3").execute() False >>> conn.commit()
Committing the changes to the database causes the graph in the memory to go out of sync with the database source tables. - Synchronize the in-memory graph with the database by creating a new
synchronizer object as shown in the following code:
Synchronizer synchronizer = new Synchronizer.Builder<FlashbackSynchronizer>() .setType(FlashbackSynchronizer.class) .setGraph(graph) .setConnection(conn) .build();
Internally, the graph server keeps track of the Oracle system change number (SCN) the current graph snapshot belongs to. The synchronizer is a client-side component which connects to the database, detects changes by comparing state of the original input tables using the current SCN via the flashback mechanism and then sends any changes to the graph server using the changeset API. In order to do so, the synchronizer needs to know how to connect to the database (
conn
parameter) as well as which graph to keep in sync (graph
parameter).Alternatively, you can use this equivalent shortcut as shown:
opg4j> var synchronizer = graph.createSynchronizer(FlashbackSynchronizer.class, conn) synchronizer ==> oracle.pgx.api.FlashbackSynchronizer@4ac2b4c6
Synchronizer synchronizer = graph.createSynchronizer(FlashbackSynchronizer.class, conn);
>>> synchronizer = graph.create_synchronizer(synchronizer_class='oracle.pgx.api.FlashbackSynchronizer', connection=conn)
- Fetch and apply the database changes by calling the sync() function and create
a new in-memory graph snapshot:
opg4j> graph=synchronizer.sync() g ==> PgxGraph[name=BANK_GRAPH,N=999,E=4993,created=1660308128037]
graph=synchronizer.sync();
>>> graph = synchronizer.sync()
Note that the
Synchronizer
object needs to be created only once per session. Once created, you can perform thesynchronizer.sync()
operation multiple times to generate the latest graph snapshot that is consistent with the changes in the database.Splitting the Fetching and Applying of Changes
The
synchronizer.sync()
invocation in the preceding code, fetches the changes and applies them in one call. However, you can encode a more complex update logic by splitting this process into separatefetch()
andapply()
invocations. For example:synchronizer.fetch(); // fetches changes from the database if (synchronizer.getGraphDelta().getTotalNumberOfChanges() > 100) { // only create snapshot if there have been more than 100 changes synchronizer.apply(); }
- Query the graph to verify the updates to the edge property.
opg4j> graph.queryPgql("SELECT e.amount FROM MATCH (v1:Accounts)-[e:Transfers]->(v2:Accounts) WHERE e.from_acct_id = 179 AND e.to_acct_id=688").print()
graph.queryPgql("SELECT e.amount FROM MATCH (v1:Accounts)-[e:Transfers]->(v2:Accounts) WHERE e.from_acct_id = 179 AND e.to_acct_id=688").print();
>>> graph.query_pgql("SELECT e.amount FROM MATCH (v1:Accounts)-[e:Transfers]->(v2:Accounts) WHERE e.from_acct_id = 179 AND e.to_acct_id=688").print()
On execution, the preceding example produces the following output:+--------+ | amount | +--------+ | 4000.0 | +--------+