16.4 Oracle Databaseのグラフとグラフ・サーバーの同期の維持

FlashbackSynchronizer APIを使用すると、データベースでのグラフに対する変更をメモリー内の対応するPgxGraphオブジェクトに自動的に適用できるため、両方の同期が保たれます。

このAPIでは、Oracleのフラッシュバック・テクノロジを使用して最後のフェッチ以降のデータベースでの変更をフェッチし、ChangeSet APIを使用してそれらの変更をグラフ・サーバーにプッシュします。変更が適用された後、グラフ・サーバーの通常のスナップショット・セマンティクスが適用されます。デルタ・フェッチが適用されるたびに新しいインメモリー・スナップショットが作成されます。スナップショットの作成に対して同時に実行されている問合せまたはアルゴリズムは、対応するセッションがsession.setSnapshot(graph, PgxSession.LATEST_SNAPSHOT)プロシージャをコールしてPgxGraphオブジェクトを最新の状態にリフレッシュするまで、変更の影響を受けません。

また、前のフェッチ操作からの変更が存在しなくなった場合、シンクロナイザは例外をスローします。これは、前のフェッチ期間がデータベースのUNDO_RETENTIONパラメータ設定より長い場合に発生します。この例外を回避するには、UNDO_RETENTIONパラメータ値より短い間隔で変更をフェッチします。UNDO_RETENTIONパラメータのデフォルト設定は900秒です。詳細は、『Oracle Databaseリファレンス』を参照してください。

同期の前提条件

Oracleデータベースでフラッシュバックが有効になっている必要があり、同期の実行に使用するデータベース・ユーザーには次のものが必要です。

  • 同期を維持する必要があるすべての表に対する読取りアクセス権。
  • フラッシュバックAPIを使用するための権限。たとえば:
    GRANT EXECUTE ON DBMS_FLASHBACK TO <user>

また、ユースケースに必要な時間の間変更を保持するようにデータベースを構成する必要もあります。

同期可能なグラフのタイプ

PGXのすべてのPgxGraphオブジェクトを同期できるわけではありません。次の制限が適用されます。

  • グラフの元の作成者のみがそれを同期できます。つまり、現在のユーザーはグラフのMANAGE権限を持っている必要があります。

  • データベース表からロードされたグラフ(PGQLプロパティ・グラフおよびSQLプロパティ・グラフ)のみを同期できます。他の形式から作成されたグラフ、グラフ・ビルダーAPIを使用して作成されたグラフまたはデータベース・ビューから作成されたPGQLプロパティ・グラフは同期できません。
  • グラフの最新のスナップショットのみを同期できます。

同期可能な変更のタイプ

シンクロナイザでは、インメモリー・グラフ・スナップショットと次のデータベース側の変更との同期の維持がサポートされています。

  • 新しい頂点とエッジの挿入
  • 既存の頂点とエッジの削除
  • 任意の頂点またはエッジのプロパティ値の更新

シンクロナイザでは、入力グラフに対する次のようなスキーマ・レベルの変更はサポートされていません。

  • 入力頂点表またはエッジ表のリストの変更
  • 任意の入力表(頂点表またはエッジ表)の任意の列の変更

さらに、シンクロナイザでは、頂点キーおよびエッジ・キーに対する更新はサポートされていません。

詳細な例は、次のトピックを参照してください。

16.4.1 SQLプロパティ・グラフの同期

グラフ・サーバー(PGX)にロードされたSQLプロパティ・グラフを、データベースのグラフ・データに加えられた変更と同期できます。

次の例に、FlashbackSynchronizer APIを使用してSQLプロパティ・グラフを同期するステップを示します:
  1. 次のように、readGraphByName() APIを使用してSQLプロパティ・グラフをグラフ・サーバー(PGX)にロードします:
    opg4j> var graph = session.readGraphByName("BANK_SQL_PG", GraphSource.PG_SQL,
    ...>                   ReadGraphOption.optimizeFor(GraphOptimizedFor.UPDATES),
    ...>                   ReadGraphOption.synchronizable())
    graph ==> PgxGraph[name=BANK_SQL_PG_2,N=1000,E=5001,created=1697259571499
    PgxGraph graph = session.readGraphByName("BANK_SQL_PG",GraphSource.PG_SQL,
                       ReadGraphOption.optimizeFor(GraphOptimizedFor.UPDATES),
                       ReadGraphOption.synchronizable());
    >>> graph = session.read_graph_by_name('BANK_SQL_PG', 'pg_sql',
    ...              options=['optimized_for_updates', 'synchronizable'])
  2. データベースへの新しいJDBC接続を開き、SQLプロパティ・グラフの基礎となるデータベース表のデータを変更します。たとえば、次のコードでは、エッジ・プロパティのいずれかについてデータベース値を更新します。
    opg4j> var conn = DriverManager.getConnection(<jdbcUrl>,<username>,<password>)
    conn ==> oracle.jdbc.driver.T4CConnection@738e79ec
    opg4j> var stmt = conn.createStatement()
    stmt ==> oracle.jdbc.driver.OracleStatementWrapper@71f056a
    opg4j> stmt.executeQuery("UPDATE bank_txns SET amount=2000 WHERE txn_id=2")
    $8 ==> oracle.jdbc.driver.ForwardOnlyResultSet@19b0a9f2
    opg4j> conn.commit()
    Connection conn = DriverManager.getConnection(<jdbcUrl>,<username>,<password>);
    Statement stmt = conn.createStatement();
    stmt.executeQuery("UPDATE bank_txns SET amount=2000 WHERE txn_id=2");
    conn.commit();
    >>> conn = opg4py.pgql.get_connection(<username>,<password>, <jdbcUrl>).get_jdbc_connection()
    >>> conn.prepareStatement("UPDATE bank_txns SET amount=2000 WHERE txn_id=2").execute()
    False
    >>> conn.commit()
    
    データベースの変更をコミットすると、メモリー内のグラフがデータベースのソース表と同期しなくなります。
  3. 次のコードに示すように、新しいシンクロナイザ・オブジェクトを作成することで、インメモリー・グラフとデータベースを同期します。
    opg4j>  var synchronizer = graph.createSynchronizer(FlashbackSynchronizer.class, conn)
    synchronizer ==> oracle.pgx.api.FlashbackSynchronizer@5f65e0c0
    Synchronizer synchronizer = graph.createSynchronizer(FlashbackSynchronizer.class, conn);
    >>> synchronizer = graph.create_synchronizer(synchronizer_class='oracle.pgx.api.FlashbackSynchronizer',
    ...              jdbc_url=<jdbcUrl>, username=<username>, password=<password>)
  4. sync()関数をコールしてデータベースの変更をフェッチして適用し、新しいインメモリー・グラフ・スナップショットを作成します。
    opg4j> graph=synchronizer.sync()
    graph ==> PgxGraph[name=BANK_SQL_PG,N=1000,E=5001,created=1696332603804]
    graph=synchronizer.sync();
    graph=synchronizer.sync()

    Synchronizerオブジェクトは、セッションごとに1回のみ作成する必要があります。作成後は、synchronizer.sync()操作を複数回実行して、データベースの変更に対応する最新のグラフ・スナップショットを生成できます。

  5. グラフを問い合せて、エッジ・プロパティの更新を確認します。
    opg4j> graph.queryPgql("SELECT e.amount FROM MATCH (v1:Account)-[e:Transfer]->(v2:Account) WHERE e.from_acct_id = 237 AND e.to_acct_id=777").print()
    graph.queryPgql("SELECT e.amount FROM MATCH (v1:Accounts)-[e:Transfers]->(v2:Accounts) WHERE e.from_acct_id = 237 AND e.to_acct_id=777").print();
    >>> graph.query_pgql("SELECT e.amount FROM MATCH (v1:Account)-[e:Transfer]->(v2:Account) WHERE v1.id = 237 AND v2.id=777").print()
    実行すると、前述の例では次の出力が生成されます。
    +--------+
    | amount |
    +--------+
    | 2000.0 |
    +--------+

16.4.2 PGQLプロパティ・グラフの同期

グラフ・サーバー(PGX)にロードされたPGQLプロパティ・グラフを、データベースのグラフ・データに加えられた変更と同期できます。

次の例に、FlashbackSynchronizer APIを使用してPGQLプロパティ・グラフを同期するステップを示します。
  1. 次のように、readGraphByName() APIを使用してPGQLプロパティ・グラフをグラフ・サーバー(PGX)にロードします。
    opg4j> var graph = session.readGraphByName("BANK_GRAPH",GraphSource.PG_PGQL,
                     ReadGraphOption.optimizeFor(GraphOptimizedFor.UPDATES),ReadGraphOption.synchronizable())
    graph ==> PgxGraph[name=BANK_GRAPH,N=999,E=4993,created=1660275936010]
    PgxGraph graph = session.readGraphByName("BANK_GRAPH",GraphSource.PG_PGQL,
                          ReadGraphOption.optimizeFor(GraphOptimizedFor.UPDATES),ReadGraphOption.synchronizable());
    >>> graph = session.read_graph_by_name('BANK_GRAPH','pg_pgql',
    ...              options=['optimized_for_updates', 'synchronizable'])
  2. データベースへの新しいJDBC接続を開き、PGQLプロパティ・グラフの基礎となるデータベース表のデータを変更します。たとえば、次のコードでは、エッジ・プロパティのいずれかについてデータベース値を更新します。
    opg4j> var conn = DriverManager.getConnection(<jdbcUrl>,<username>,<password>)
    conn ==> oracle.jdbc.driver.T4CConnection@60f7261f
    opg4j> var stmt = conn.createStatement()
    stmt ==> oracle.jdbc.driver.OracleStatementWrapper@1a914a00
    opg4j> stmt.executeQuery("UPDATE bank_txns SET amount=4000 WHERE txn_id=3")
    $5 ==> oracle.jdbc.driver.ForwardOnlyResultSet@627d5f99
    opg4j> conn.setAutoCommit(false)
    opg4j> conn.commit()
    Connection conn = DriverManager.getConnection(<jdbcUrl>,<username>,<password>);
    Statement stmt = conn.createStatement();
    stmt.executeQuery("UPDATE bank_txns SET amount=4000 WHERE txn_id=3");
    conn.setAutoCommit(false);
    conn.commit();
    >>> conn = opg4py.pgql.get_connection(<username>,<password>, <jdbc_url>).get_jdbc_connection()
    >>> conn.prepareStatement("UPDATE bank_txns SET amount=4000 WHERE txn_id=3").execute()
    False
    >>> conn.commit()
    
    データベースの変更をコミットすると、メモリー内のグラフがデータベースのソース表と同期しなくなります。
  3. 次のコードに示すように、新しいシンクロナイザ・オブジェクトを作成することで、インメモリー・グラフとデータベースを同期します。
    Synchronizer synchronizer = new Synchronizer.Builder<FlashbackSynchronizer>()
        .setType(FlashbackSynchronizer.class)
        .setGraph(graph)
        .setConnection(conn)
        .setParallelHintDegree(4)
        .build();

    内部的に、グラフ・サーバーは、現在のグラフ・スナップショットが属するOracleシステム変更番号(SCN)をトラッキングします。シンクロナイザはデータベースに接続するクライアント側コンポーネントであり、フラッシュバック・メカニズムを介して現在のSCNを使用して元の入力表の状態を比較することで変更を検出し、その後、変更セットAPIを使用して、グラフ・サーバーにすべての変更を送信します。これを行うには、シンクロナイザによって、データベースへの接続方法(connパラメータ)および同期を維持するグラフ(graphパラメータ)が認識される必要があります。

    setParallelHintDegree APIを使用して、フラッシュバック・シンクロナイザ・ビルダーで並列度を指定できます。指定したパラレル・ヒントの程度は、SQL問合せの実行時にフラッシュバック・シンクロナイザによって考慮されます。

    あるいは、次のように同等のショートカットを使用することもできます。

    opg4j>  var synchronizer = graph.createSynchronizer(FlashbackSynchronizer.class, conn)
    synchronizer ==> oracle.pgx.api.FlashbackSynchronizer@4ac2b4c6
    Synchronizer synchronizer = graph.createSynchronizer(FlashbackSynchronizer.class, conn);
    >>> synchronizer = graph.create_synchronizer(synchronizer_class='oracle.pgx.api.FlashbackSynchronizer', jdbc_url=<jdbc_url>, username=<username>, password=<password>)
  4. sync()関数をコールしてデータベースの変更をフェッチして適用し、新しいインメモリー・グラフ・スナップショットを作成します。
    opg4j> graph=synchronizer.sync()
    g ==> PgxGraph[name=BANK_GRAPH,N=999,E=4993,created=1660308128037]
    graph=synchronizer.sync();
    >>> graph = synchronizer.sync()

    Synchronizerオブジェクトは、セッションごとに1回のみ作成する必要があります。作成後は、synchronizer.sync()操作を複数回実行して、データベースの変更に対応する最新のグラフ・スナップショットを生成できます。

    変更のフェッチおよび適用の分割

    前述のコードでsynchronizer.sync()を呼び出すと、1回のコールで変更のフェッチと適用が行われます。しかし、このプロセスを別個の fetch()呼出しとapply()呼出しに分割することで、より複雑な更新ロジックをエンコードできます。たとえば:

    synchronizer.fetch(); // fetches changes from the database
    if (synchronizer.getGraphDelta().getTotalNumberOfChanges() > 100) {  // only create snapshot if there have been more than 100 changes
      synchronizer.apply();
    }
  5. グラフを問い合せて、エッジ・プロパティの更新を確認します。
    opg4j> graph.queryPgql("SELECT e.amount FROM MATCH (v1:Accounts)-[e:Transfers]->(v2:Accounts) WHERE e.from_acct_id = 179 AND e.to_acct_id=688").print()
    graph.queryPgql("SELECT e.amount FROM MATCH (v1:Accounts)-[e:Transfers]->(v2:Accounts) WHERE e.from_acct_id = 179 AND e.to_acct_id=688").print();
    >>> graph.query_pgql("SELECT e.amount FROM MATCH (v1:Accounts)-[e:Transfers]->(v2:Accounts) WHERE e.from_acct_id = 179 AND e.to_acct_id=688").print()
    実行すると、前述の例では次の出力が生成されます。
    +--------+
    | amount |
    +--------+
    | 4000.0 |
    +--------+

16.4.3 公開済グラフの同期

公開済グラフを同期するには、グラフ・スキーマが含まれるPartitionedGraphConfigオブジェクトとデータベース接続の詳細で、フラッシュバック・シンクロナイザを構成します。

PartitionedGraphConfigオブジェクトは、PartitionedGraphConfigBuilder APIを使用するか、JSONファイルからグラフ構成を読み取ることで作成できます。

グラフ構成オブジェクトを使用して作成されたグラフの同期は一般的にサポートされていますが、次のいくつかの制限が適用されます。

  • すべてのプロバイダがデータベース表であるパーティション化されたグラフ構成のみがサポートされます。
  • 各エッジまたは頂点プロバイダ、あるいはその両方で、ユーザー名フィールドを設定して表の所有者を指定する必要があります。たとえば、ユーザーSCOTTが表を所有している場合は、プロバイダのユーザー名を適宜設定します。
  • スナップショット・ソースはCHANGE_SETに設定する必要があります。
  • 多数のスナップショットの作成時にメモリー不足を回避するために、更新操作用にグラフを最適化することをお薦めします。

次の例は、PartitionedGraphConfigオブジェクトを作成するためのサンプル構成を示しています。

{
  ...
  "optimized_for": "updates",
  "vertex_providers": [
      ...
      "username":"<username>",
      ...
  ],
  "edge_providers": [
      ...
      "username":"<username>",
      ...
  ],
  "loading": {
    "snapshots_source": "change_set"
  }
}
GraphConfig cfg = GraphConfigBuilder.forPartitioned()
                      …
                      .setUsername("<username>")
                      .setSnapshotsSource(SnapshotsSource.CHANGE_SET)
                      .setOptimizedFor(GraphOptimizedFor.UPDATES)
                      ...
                      .build();
前提条件として、前のセッションで公開されているグラフが必要です。たとえば:
opg4j> var graph = session.readGraphWithProperties("<path_to_json_config_file>")
graph ==> PgxGraph[name=bank_graph_analytics_fb,N=999,E=4993,created=1664310157103]
opg4j> graph.publishWithSnapshots()
PgxGraph graph = session.readGraphWithProperties("<path_to_json_config_file>");
graph.publishWithSnapshots();
>>> graph = session.read_graph_with_properties("<path_to_json_config_file>")
>>> graph.publish_with_snapshots()

これで、次のステップを実行し、JSONファイルから作成されたグラフ構成オブジェクトを使用して公開済グラフを同期できるようになります。

  1. 次のように、公開済グラフを取得します。
    opg4j> var graph = session.getGraph("bank_graph")
    graph ==> PgxGraph[name=bank_graph_analytics_fb,N=999,E=4993,created=1664310157103]
    PgxGraph graph = session.getGraph("bank_graph");
    >>> graph = session.get_graph("bank_graph")
  2. 次のように、JSONファイル・パスを使用してグラフ構成オブジェクトを作成します。
    opg4j> var cfg = GraphConfigFactory.forPartitioned().fromFilePath("path_to_json_config_file")
    cfg ==> {"edge_providers":[{"destination_vertex_provider":"Accounts","database_table_name":"BANK_TXNS","name":"Transfers","key_type":"long",
    "props":[{"type":"float","name":"AMOUNT"},{"type":"string","name":"DESCRIPTION"}],"format":"rdbms","source_vertex_provider":"Accounts",
    "source_column":"FROM_ACCT_ID","key_column":"TXN_ID","destination_column":"TO_ACCT_ID","loading":{"create_key_mapping":true}}],
    "loading":{"snapshots_source":"CHANGE_SET"},"name":"bank_graph","vertex_providers":[{"database_table_name":"BANK_ACCOUNTS",
    "key_column":"ID","name":"Accounts","key_type":"integer","props":[{"type":"integer","name":"ID"},{"type":"string","name":"NAME"}],
    "loading":{"create_key_mapping":true},"format":"rdbms"}]}
    PartitionedGraphConfig cfg = GraphConfigFactory.forPartitioned().fromFilePath("path_to_json_config_file");
    >>> from pypgx.api import GraphConfigFactory
    >>> cfg = GraphConfigFactory.for_partitioned().from_file_path("path_to_json_config_file")
    あるいは、「グラフ構成オブジェクトの定義によるグラフのロード」に示すように、GraphConfigBuilder APIを使用してグラフ構成オブジェクトを作成することもできます。
  3. JDBC接続を使用してデータベース表のデータを変更します。
    opg4j> var conn = DriverManager.getConnection(<jdbcUrl>,<username>,<password>)
    conn ==> oracle.jdbc.driver.T4CConnection@60f7261f
    opg4j> var stmt = conn.createStatement()
    stmt ==> oracle.jdbc.driver.OracleStatementWrapper@1a914a00
    opg4j> stmt.executeQuery("UPDATE bank_txns SET amount=9000 WHERE txn_id=3")
    $5 ==> oracle.jdbc.driver.ForwardOnlyResultSet@627d5f99
    opg4j> conn.setAutoCommit(false)
    opg4j> conn.commit()
    Connection conn = DriverManager.getConnection(<jdbcUrl>,<username>,<password>);
    Statement stmt = conn.createStatement();
    stmt.executeQuery("UPDATE bank_txns SET amount=9000 WHERE txn_id=3");
    conn.setAutoCommit(false);
    conn.commit();
    >>> conn = opg4py.pgql.get_connection("graphuser","graphuser", "jdbc:oracle:thin:@localhost:1521/orclpdb").get_jdbc_connection()
    >>> conn.prepareStatement("UPDATE bank_txns SET amount=9000 WHERE txn_id=3").execute()
    False
    >>> conn.commit()
  4. グラフ構成オブジェクトと接続の詳細を使用して、フラッシュバック・シンクロナイザを構成します。
    opg4j> var synchronizer = new Synchronizer.Builder<FlashbackSynchronizer>().
    ...>      setType(FlashbackSynchronizer.class).
    ...>      setGraph(graph).
    ...>      setConnection(conn).
    ...>      setGraphConfiguration(cfg).
    ...>      build()
    synchronizer ==> oracle.pgx.api.FlashbackSynchronizer@1f122cbb
    Synchronizer synchronizer = new Synchronizer.Builder<FlashbackSynchronizer>()
        .setType(FlashbackSynchronizer.class)
        .setGraph(graph)
        .setConnection(conn)
        .setGraphConfiguration(cfg)
        .build();
    >>> synchronizer = graph.create_synchronizer(synchronizer_class='oracle.pgx.api.FlashbackSynchronizer',
                              jdbc_url=<jdbc_url>, username=<username>, password=<password>, graph_config=cfg)
  5. 次のように、公開済グラフを同期します。
    opg4j> graph=synchronizer.sync()
    graph ==> PgxGraph[name=bank_graph,N=999,E=4993,created=1664454171605]
    graph=synchronizer.sync();
    >>> graph = synchronizer.sync()
  6. グラフを問い合せて、エッジ・プロパティの更新を確認します。
    opg4j> graph.queryPgql("SELECT e.amount FROM MATCH (v1:Accounts)-[e:Transfers]->(v2:Accounts) WHERE v1.ID=179 and v2.ID=688").print()
    graph.queryPgql("SELECT e.amount FROM MATCH (v1:Accounts)-[e:Transfers]->(v2:Accounts) WHERE v1.ID=179 and v2.ID=688").print();
    graph.query_pgql("SELECT e.amount FROM MATCH (v1:Accounts)-[e:Transfers]->(v2:Accounts) WHERE v1.ID=179 and v2.ID=688").print();
    実行すると、前述の例では次の出力が生成されます。
    +--------+
    | amount |
    +--------+
    | 9000.0 |
    +--------+