18.4 Oracle Databaseのグラフとグラフ・サーバーの同期の維持

FlashbackSynchronizer APIを使用すると、データベースでのグラフに対する変更をメモリー内の対応するPgxGraphオブジェクトに自動的に適用できるため、両方の同期が保たれます。

このAPIでは、Oracleのフラッシュバック・テクノロジを使用して最後のフェッチ以降のデータベースでの変更をフェッチし、ChangeSet APIを使用してそれらの変更をグラフ・サーバーにプッシュします。変更が適用された後、グラフ・サーバーの通常のスナップショット・セマンティクスが適用されます。デルタ・フェッチが適用されるたびに新しいインメモリー・スナップショットが作成されます。スナップショットの作成に対して同時に実行されている問合せまたはアルゴリズムは、対応するセッションがsession.setSnapshot(graph, PgxSession.LATEST_SNAPSHOT)プロシージャをコールしてPgxGraphオブジェクトを最新の状態にリフレッシュするまで、変更の影響を受けません。

また、前のフェッチ操作からの変更が存在しなくなった場合、シンクロナイザは例外をスローします。これは、前のフェッチ期間がデータベースのUNDO_RETENTIONパラメータ設定より長い場合に発生します。この例外を回避するには、UNDO_RETENTIONパラメータ値より短い間隔で変更をフェッチします。UNDO_RETENTIONパラメータのデフォルト設定は900秒です。詳細は、『Oracle Databaseリファレンス』を参照してください。

同期の前提条件

Oracleデータベースでフラッシュバックが有効になっている必要があり、同期の実行に使用するデータベース・ユーザーには次のものが必要です。

  • 同期を維持する必要があるすべての表に対する読取りアクセス権。
  • フラッシュバックAPIを使用するための権限。たとえば:
    GRANT EXECUTE ON DBMS_FLASHBACK TO <user>

また、ユースケースに必要な時間の間変更を保持するようにデータベースを構成する必要もあります。

同期可能なグラフのタイプ

PGXのすべてのPgxGraphオブジェクトを同期できるわけではありません。次の制限が適用されます。

  • グラフの元の作成者のみがそれを同期できます。つまり、現在のユーザーはグラフのMANAGE権限を持っている必要があります。

  • データベース表からロードされたグラフ(PGQLプロパティ・グラフおよびSQLプロパティ・グラフ)のみを同期できます。他の形式から作成されたグラフ、グラフ・ビルダーAPIを使用して作成されたグラフまたはデータベース・ビューから作成されたPGQLプロパティ・グラフは同期できません。
  • グラフの最新のスナップショットのみを同期できます。

同期可能な変更のタイプ

シンクロナイザでは、インメモリー・グラフ・スナップショットと次のデータベース側の変更との同期の維持がサポートされています。

  • 新しい頂点とエッジの挿入
  • 既存の頂点とエッジの削除
  • 任意の頂点またはエッジのプロパティ値の更新

シンクロナイザでは、入力グラフに対する次のようなスキーマ・レベルの変更はサポートされていません。

  • 入力頂点表またはエッジ表のリストの変更
  • 任意の入力表(頂点表またはエッジ表)の任意の列の変更

さらに、シンクロナイザでは、頂点キーおよびエッジ・キーに対する更新はサポートされていません。

詳細な例は、次のトピックを参照してください。

18.4.1 SQLプロパティ・グラフの同期

グラフ・サーバー(PGX)にロードされたSQLプロパティ・グラフを、データベースのグラフ・データに加えられた変更と同期できます。

次の例に、FlashbackSynchronizer APIを使用してSQLプロパティ・グラフを同期するステップを示します:
  1. 次のように、readGraphByName() APIを使用してSQLプロパティ・グラフをグラフ・サーバー(PGX)にロードします:
    opg4j> var graph = session.readGraphByName("BANK_SQL_PG", GraphSource.PG_SQL,
    ...>                   ReadGraphOption.optimizeFor(GraphOptimizedFor.UPDATES),
    ...>                   ReadGraphOption.synchronizable())
    graph ==> PgxGraph[name=BANK_SQL_PG_2,N=1000,E=5001,created=1697259571499
    PgxGraph graph = session.readGraphByName("BANK_SQL_PG",GraphSource.PG_SQL,
                       ReadGraphOption.optimizeFor(GraphOptimizedFor.UPDATES),
                       ReadGraphOption.synchronizable());
    >>> graph = session.read_graph_by_name('BANK_SQL_PG', 'pg_sql',
    ...              options=['optimized_for_updates', 'synchronizable'])
  2. データベースへの新しいJDBC接続を開き、SQLプロパティ・グラフの基礎となるデータベース表のデータを変更します。たとえば、次のコードでは、エッジ・プロパティのいずれかについてデータベース値を更新します。
    opg4j> var conn = DriverManager.getConnection(<jdbcUrl>,<username>,<password>)
    conn ==> oracle.jdbc.driver.T4CConnection@738e79ec
    opg4j> var stmt = conn.createStatement()
    stmt ==> oracle.jdbc.driver.OracleStatementWrapper@71f056a
    opg4j> stmt.executeQuery("UPDATE bank_transfers SET amount=2000 WHERE txn_id=2")
    $8 ==> oracle.jdbc.driver.ForwardOnlyResultSet@19b0a9f2
    opg4j> conn.commit()
    Connection conn = DriverManager.getConnection(<jdbcUrl>,<username>,<password>);
    Statement stmt = conn.createStatement();
    stmt.executeQuery("UPDATE bank_transfers SET amount=2000 WHERE txn_id=2");
    conn.commit();
    >>> conn = opg4py.pgql.get_connection(<username>,<password>, <jdbcUrl>).get_jdbc_connection()
    >>> conn.prepareStatement("UPDATE bank_transfers SET amount=2000 WHERE txn_id=2").execute()
    False
    >>> conn.commit()
    
    データベースの変更をコミットすると、メモリー内のグラフがデータベースのソース表と同期しなくなります。
  3. 次のコードに示すように、新しいシンクロナイザ・オブジェクトを作成することで、インメモリー・グラフとデータベースを同期します。
    opg4j>  var synchronizer = graph.createSynchronizer(FlashbackSynchronizer.class, conn)
    synchronizer ==> oracle.pgx.api.FlashbackSynchronizer@5f65e0c0
    Synchronizer synchronizer = graph.createSynchronizer(FlashbackSynchronizer.class, conn);
    >>> synchronizer = graph.create_synchronizer(synchronizer_class='oracle.pgx.api.FlashbackSynchronizer',
    ...              jdbc_url=<jdbcUrl>, username=<username>, password=<password>)
  4. sync()関数をコールしてデータベースの変更をフェッチして適用し、新しいインメモリー・グラフ・スナップショットを作成します。
    opg4j> graph=synchronizer.sync()
    graph ==> PgxGraph[name=BANK_SQL_PG,N=1000,E=5001,created=1696332603804]
    graph=synchronizer.sync();
    graph=synchronizer.sync()

    Synchronizerオブジェクトは、セッションごとに1回のみ作成する必要があります。作成後は、synchronizer.sync()操作を複数回実行して、データベースの変更に対応する最新のグラフ・スナップショットを生成できます。

  5. グラフを問い合せて、エッジ・プロパティの更新を確認します。
    opg4j> graph.queryPgql("SELECT e.amount FROM MATCH (v1:Accounts)-[e:Transfers]->(v2:Accounts) WHERE e.from_acct_id = 237 AND e.to_acct_id=777").print()
    graph.queryPgql("SELECT e.amount FROM MATCH (v1:Accounts)-[e:Transfers]->(v2:Accounts) WHERE e.from_acct_id = 237 AND e.to_acct_id=777").print();
    >>> graph.query_pgql("SELECT e.amount FROM MATCH (v1:Accounts)-[e:Transfers]->(v2:Accounts) WHERE v1.id = 237 AND v2.id=777").print()
    実行すると、前述の例では次の出力が生成されます。
    +--------+
    | amount |
    +--------+
    | 2000.0 |
    +--------+

18.4.2 PGQLプロパティ・グラフの同期

グラフ・サーバー(PGX)にロードされたPGQLプロパティ・グラフを、データベースのグラフ・データに加えられた変更と同期できます。

次の例に、FlashbackSynchronizer APIを使用してPGQLプロパティ・グラフを同期するステップを示します。
  1. 次のように、readGraphByName() APIを使用してPGQLプロパティ・グラフをグラフ・サーバー(PGX)にロードします。
    opg4j> var graph = session.readGraphByName("BANK_GRAPH",GraphSource.PG_PGQL,
                     ReadGraphOption.optimizeFor(GraphOptimizedFor.UPDATES),ReadGraphOption.synchronizable())
    graph ==> PgxGraph[name=BANK_GRAPH,N=999,E=4993,created=1660275936010]
    PgxGraph graph = session.readGraphByName("BANK_GRAPH",GraphSource.PG_PGQL,
                          ReadGraphOption.optimizeFor(GraphOptimizedFor.UPDATES),ReadGraphOption.synchronizable());
    >>> graph = session.read_graph_by_name('BANK_GRAPH','pg_pgql',
    ...              options=['optimized_for_updates', 'synchronizable'])
  2. データベースへの新しいJDBC接続を開き、PGQLプロパティ・グラフの基礎となるデータベース表のデータを変更します。たとえば、次のコードでは、エッジ・プロパティのいずれかについてデータベース値を更新します。
    opg4j> var conn = DriverManager.getConnection(<jdbcUrl>,<username>,<password>)
    conn ==> oracle.jdbc.driver.T4CConnection@60f7261f
    opg4j> var stmt = conn.createStatement()
    stmt ==> oracle.jdbc.driver.OracleStatementWrapper@1a914a00
    opg4j> stmt.executeQuery("UPDATE bank_transfers SET amount=4000 WHERE txn_id=3")
    $5 ==> oracle.jdbc.driver.ForwardOnlyResultSet@627d5f99
    opg4j> conn.setAutoCommit(false)
    opg4j> conn.commit()
    Connection conn = DriverManager.getConnection(<jdbcUrl>,<username>,<password>);
    Statement stmt = conn.createStatement();
    stmt.executeQuery("UPDATE bank_transfers SET amount=4000 WHERE txn_id=3");
    conn.setAutoCommit(false);
    conn.commit();
    >>> conn = opg4py.pgql.get_connection(<username>,<password>, <jdbc_url>).get_jdbc_connection()
    >>> conn.prepareStatement("UPDATE bank_transfers SET amount=4000 WHERE txn_id=3").execute()
    False
    >>> conn.commit()
    
    データベースの変更をコミットすると、メモリー内のグラフがデータベースのソース表と同期しなくなります。
  3. 次のコードに示すように、新しいシンクロナイザ・オブジェクトを作成することで、インメモリー・グラフとデータベースを同期します。
    Synchronizer synchronizer = new Synchronizer.Builder<FlashbackSynchronizer>()
        .setType(FlashbackSynchronizer.class)
        .setGraph(graph)
        .setConnection(conn)
        .setParallelHintDegree(4)
        .build();

    内部的に、グラフ・サーバーは、現在のグラフ・スナップショットが属するOracleシステム変更番号(SCN)をトラッキングします。シンクロナイザはデータベースに接続するクライアント側コンポーネントであり、フラッシュバック・メカニズムを介して現在のSCNを使用して元の入力表の状態を比較することで変更を検出し、その後、変更セットAPIを使用して、グラフ・サーバーにすべての変更を送信します。これを行うには、シンクロナイザによって、データベースへの接続方法(connパラメータ)および同期を維持するグラフ(graphパラメータ)が認識される必要があります。

    setParallelHintDegree APIを使用して、フラッシュバック・シンクロナイザ・ビルダーで並列度を指定できます。指定したパラレル・ヒントの程度は、SQL問合せの実行時にフラッシュバック・シンクロナイザによって考慮されます。

    あるいは、次のように同等のショートカットを使用することもできます。

    opg4j>  var synchronizer = graph.createSynchronizer(FlashbackSynchronizer.class, conn)
    synchronizer ==> oracle.pgx.api.FlashbackSynchronizer@4ac2b4c6
    Synchronizer synchronizer = graph.createSynchronizer(FlashbackSynchronizer.class, conn);
    >>> synchronizer = graph.create_synchronizer(synchronizer_class='oracle.pgx.api.FlashbackSynchronizer', jdbc_url=<jdbc_url>, username=<username>, password=<password>)
  4. sync()関数をコールしてデータベースの変更をフェッチして適用し、新しいインメモリー・グラフ・スナップショットを作成します。
    opg4j> graph=synchronizer.sync()
    g ==> PgxGraph[name=BANK_GRAPH,N=999,E=4993,created=1660308128037]
    graph=synchronizer.sync();
    >>> graph = synchronizer.sync()

    Synchronizerオブジェクトは、セッションごとに1回のみ作成する必要があります。作成後は、synchronizer.sync()操作を複数回実行して、データベースの変更に対応する最新のグラフ・スナップショットを生成できます。

    変更のフェッチおよび適用の分割

    前述のコードでsynchronizer.sync()を呼び出すと、1回のコールで変更のフェッチと適用が行われます。しかし、このプロセスを別個の fetch()呼出しとapply()呼出しに分割することで、より複雑な更新ロジックをエンコードできます。たとえば:

    synchronizer.fetch(); // fetches changes from the database
    if (synchronizer.getGraphDelta().getTotalNumberOfChanges() > 100) {  // only create snapshot if there have been more than 100 changes
      synchronizer.apply();
    }
  5. グラフを問い合せて、エッジ・プロパティの更新を確認します。
    opg4j> graph.queryPgql("SELECT e.amount FROM MATCH (v1:Accounts)-[e:Transfers]->(v2:Accounts) WHERE e.src_acct_id = 179 AND e.dst_acct_id=688").print()
    graph.queryPgql("SELECT e.amount FROM MATCH (v1:Accounts)-[e:Transfers]->(v2:Accounts) WHERE e.src_acct_id = 179 AND e.dst_acct_id=688").print();
    >>> graph.query_pgql("SELECT e.amount FROM MATCH (v1:Accounts)-[e:Transfers]->(v2:Accounts) WHERE e.src_acct_id = 179 AND e.dst_acct_id=688").print()
    実行すると、前述の例では次の出力が生成されます。
    +--------+
    | amount |
    +--------+
    | 4000.0 |
    +--------+