15.3.2 Synchronizing a Published Graph

You can synchronize a published graph by configuring the Flashback Synchronizer with a PartitionedGraphConfig object containing the graph schema along with the database connection details.

The PartitionedGraphConfig object can be created either through the PartitionedGraphConfigBuilder API or by reading the graph configuration from a JSON file.

Though synchronization of graphs created via graph configuration objects is supported in general, the following few limitations apply:

  • Only partitioned graph configurations with all providers being database tables are supported.
  • Each edge or vertex provider or both must specify the owner of the table by setting the username field. For example, if user SCOTT owns the table, then set the user name accordingly for the providers.
  • Snapshot source must be set to CHANGE_SET.
  • It is highly recommended to optimize the graph for update operations in order to avoid memory exhaustion when creating many snapshots.

The following example shows the sample configuration for creating the PartitionedGraphConfig object:

{
  ...
  "optimized_for": "updates",
  "vertex_providers": [
      ...
      "username":"<username>",
      ...
  ],
  "edge_providers": [
      ...
      "username":"<username>",
      ...
  ],
  "loading": {
    "snapshots_source": "change_set"
  }
}
GraphConfig cfg = GraphConfigBuilder.forPartitioned()
                      …
                      .setUsername("<username>")
                      .setSnapshotsSource(SnapshotsSource.CHANGE_SET)
                      .setOptimizedFor(GraphOptimizedFor.UPDATES)
                      ...
                      .build();
As a prerequisite requirement, you must have a graph that is published in an earlier session. For example:
opg4j> var graph = session.readGraphWithProperties("<path_to_json_config_file>")
graph ==> PgxGraph[name=bank_graph_analytics_fb,N=999,E=4993,created=1664310157103]
opg4j> graph.publishWithSnapshots()
PgxGraph graph = session.readGraphWithProperties("<path_to_json_config_file>");
graph.publishWithSnapshots();
>>> graph = session.read_graph_with_properties("<path_to_json_config_file>")
>>> graph.publish_with_snapshots()

You can now perform the following steps to synchronize the published graph using a graph configuration object which is built from a JSON file.

  1. Get the published graph as shown:
    opg4j> var graph = session.getGraph("bank_graph")
    graph ==> PgxGraph[name=bank_graph_analytics_fb,N=999,E=4993,created=1664310157103]
    PgxGraph graph = session.getGraph("bank_graph");
    >>> graph = session.get_graph("bank_graph")
  2. Build the graph configuration object using a JSON file path as shown:
    opg4j> var cfg = GraphConfigFactory.forPartitioned().fromFilePath("path_to_json_config_file")
    cfg ==> {"edge_providers":[{"destination_vertex_provider":"Accounts","database_table_name":"BANK_TXNS","name":"Transfers","key_type":"long",
    "props":[{"type":"float","name":"AMOUNT"},{"type":"string","name":"DESCRIPTION"}],"format":"rdbms","source_vertex_provider":"Accounts",
    "source_column":"FROM_ACCT_ID","key_column":"TXN_ID","destination_column":"TO_ACCT_ID","loading":{"create_key_mapping":true}}],
    "loading":{"snapshots_source":"CHANGE_SET"},"name":"bank_graph","vertex_providers":[{"database_table_name":"BANK_ACCOUNTS",
    "key_column":"ID","name":"Accounts","key_type":"integer","props":[{"type":"integer","name":"ID"},{"type":"string","name":"NAME"}],
    "loading":{"create_key_mapping":true},"format":"rdbms"}]}
    PartitionedGraphConfig cfg = GraphConfigFactory.forPartitioned().fromFilePath("path_to_json_config_file");
    >>> from pypgx.api import GraphConfigFactory
    >>> cfg = GraphConfigFactory.for_partitioned().from_file_path("path_to_json_config_file")
    Alternatively, you can also build the graph configuration object using the GraphConfigBuilder API as shown in Loading a Graph by Defining a Graph Configuration Object.
  3. Change the data in the database table using the JDBC connection:
    opg4j> var conn = DriverManager.getConnection(<jdbcUrl>,<username>,<password>)
    conn ==> oracle.jdbc.driver.T4CConnection@60f7261f
    opg4j> var stmt = conn.createStatement()
    stmt ==> oracle.jdbc.driver.OracleStatementWrapper@1a914a00
    opg4j> stmt.executeQuery("UPDATE bank_txns SET amount=9000 WHERE txn_id=3")
    $5 ==> oracle.jdbc.driver.ForwardOnlyResultSet@627d5f99
    opg4j> conn.setAutoCommit(false)
    opg4j> conn.commit()
    Connection conn = DriverManager.getConnection(<jdbcUrl>,<username>,<password>);
    Statement stmt = conn.createStatement();
    stmt.executeQuery("UPDATE bank_txns SET amount=9000 WHERE txn_id=3");
    conn.setAutoCommit(false);
    conn.commit();
    >>> conn = opg4py.pgql.get_connection("graphuser","graphuser", "jdbc:oracle:thin:@localhost:1521/orclpdb").get_jdbc_connection()
    >>> conn.prepareStatement("UPDATE bank_txns SET amount=9000 WHERE txn_id=3").execute()
    False
    >>> conn.commit()
  4. Configure the Flashback synchronizer using the graph configuration object and the connection details:
    opg4j> var synchronizer = new Synchronizer.Builder<FlashbackSynchronizer>().
    ...>      setType(FlashbackSynchronizer.class).
    ...>      setGraph(graph).
    ...>      setConnection(conn).
    ...>      setGraphConfiguration(cfg).
    ...>      build()
    synchronizer ==> oracle.pgx.api.FlashbackSynchronizer@1f122cbb
    Synchronizer synchronizer = new Synchronizer.Builder<FlashbackSynchronizer>()
        .setType(FlashbackSynchronizer.class)
        .setGraph(graph)
        .setConnection(conn)
        .setGraphConfiguration(cfg)
        .build();
    >>> synchronizer = graph.create_synchronizer(synchronizer_class='oracle.pgx.api.FlashbackSynchronizer',
                              connection=conn, graph_config=cfg)
  5. Synchronize the published graph as shown:
    opg4j> graph=synchronizer.sync()
    graph ==> PgxGraph[name=bank_graph,N=999,E=4993,created=1664454171605]
    graph=synchronizer.sync();
    >>> graph = synchronizer.sync()
  6. Query the graph to verify the updates to the edge property.
    opg4j> graph.queryPgql("SELECT e.amount FROM MATCH (v1:Accounts)-[e:Transfers]->(v2:Accounts) WHERE v1.ID=179 and v2.ID=688").print()
    graph.queryPgql("SELECT e.amount FROM MATCH (v1:Accounts)-[e:Transfers]->(v2:Accounts) WHERE v1.ID=179 and v2.ID=688").print();
    graph.query_pgql("SELECT e.amount FROM MATCH (v1:Accounts)-[e:Transfers]->(v2:Accounts) WHERE v1.ID=179 and v2.ID=688").print();
    On execution, the preceding example produces the following output:
    +--------+
    | amount |
    +--------+
    | 9000.0 |
    +--------+