16.7 カスタムPGXグラフ・アルゴリズムの使用

カスタムPGXグラフ・アルゴリズムを使用すると、Java構文でグラフ・アルゴリズムを記述し、それを自動的にコンパイルして効率的なパラレル実装にできます。

16.7.1 カスタムPGXアルゴリズムの作成

PGXアルゴリズムは、@GraphAlgorithmを使用して注釈が付けられる単一のクラス定義を含む通常の.javaファイルです。たとえば:

import oracle.pgx.algorithm.annotations.GraphAlgorithm;

@GraphAlgorithm
public class MyAlgorithm {
    ...
}

PGXアルゴリズム・クラスには、エントリ・ポイントとして使用されるパブリック・メソッドが1つだけ含まれている必要があります。クラスには任意の数のprivateメソッドを含めることができます。

たとえば:

import oracle.pgx.algorithm.PgxGraph;
import oracle.pgx.algorithm.VertexProperty;
import oracle.pgx.algorithm.annotations.GraphAlgorithm;
import oracle.pgx.algorithm.annotations.Out;

@GraphAlgorithm
public class MyAlgorithm {
    public int myAlgorithm(PgxGraph g, @Out VertexProperty<Integer> distance) {
        System.out.println("My first PGX Algorithm program!");

        return 42;
    }
}

通常のJavaメソッドと同様に、PGXアルゴリズム・メソッドは、戻り値としてプリミティブ・データ型のみをサポートします(この例では整数)。さらに興味深いのは、頂点プロパティdistanceを出力パラメータとしてマークする@Out注釈です。コール元は、出力パラメータを参照渡しします。このようにして、呼出し側は、アルゴリズムの終了後に変更されたプロパティへの参照を持ちます。

16.7.1.1 コレクション

コレクションを作成するには、.create()関数をコールします。たとえば、VertexProperty<Integer>は次のように作成されます。

VertexProperty<Integer> distance = VertexProperty.create();

特定の頂点vでプロパティの値を取得するには:

distance.get(v);

同様に、特定の頂点vのプロパティを値eに設定するには、次のようにします。

distance.set(v, e);

コレクションのプロパティを作成することもできます。

VertexProperty<VertexSequence> path = VertexProperty.create();

ただし、PGXアルゴリズムの割当ては常に(参照ではなく)で行われます。これを明示的にするには、コレクションの割当て時に.clone()をコールする必要があります

VertexSequence sequence = path.get(v).clone();

値で渡される値のもう1つの結果として、Javaメソッド.equals()ではなく、==演算子を使用して等価性をチェックできるようになりました。たとえば:

PgxVertex v1 = G.getRandomVertex();
PgxVertex v2 = G.getRandomVertex();
System.out.println(v1 == v2);

16.7.1.2 反復

PGXアルゴリズムの最も一般的な操作は、反復(すべての頂点のループ、頂点の近隣のループなど)およびグラフ・トラバーサル(幅優先/深さ優先など)です。すべてのコレクションが、forEachメソッドとforSequentialメソッドを公開します。これらのメソッドにより、コレクションはそれぞれ並列、および順に反復処理できます。

たとえば:

  • グラフの頂点を並列して反復処理するには:
    G.getVertices().forEach(v -> {
        ...
    });
    
  • グラフの頂点を順に反復処理するには:
    G.getVertices().forSequential(v -> {
        ...
    });
    
  • グラフの頂点をrから幅優先順にトラバースするには:
    import oracle.pgx.algorithm.Traversal;
    
    Traversal.inBFS(G, r).forward(n -> {
        ...
    });
    

    forward (またはbackward)ラムダ内部では、currentLevel()をコールすることによってBFS (またはDFS)トラバーサルの現在のレベルにアクセスできます。

16.7.1.3 削減

これらの並列ブロック内では一般に、ラムダ外部に定義された変数をアトミックに更新するか、この変数まで削減します。これらのアトミック型削減は、Scalar<T>: reduceAdd、reduceMul、reduceAndなどでメソッドとして使用できます。たとえば、グラフ内の頂点の数をカウントするには、次のようにします。

public int countVertices() {
    Scalar<Integer> count = Scalar.create(0);

    G.getVertices().forEach(n -> {
        count.reduceAdd(1);
    });

    return count.get();
}

複数の値を原子的に更新する必要がある場合があります。たとえば、最小のプロパティ値と、この最小値にプロパティ値が到達した頂点を検出できます。並列実行による2つの別々の削減文のために、状態の一貫性が失われる場合があります。

この問題を解決するために、ReductionsクラスにはargMin関数とargMax関数が用意されています。argMinの最初の引数は現在の値で、2番目の引数は潜在的な新しい最小値です。また、ArgMinMaxオブジェクトに対してandUpdateコールを連鎖的に実行することにより、他の変数およびこれらの(アトミックな)更新先の値を示すことができます。たとえば:

VertexProperty<Integer> rank = VertexProperty.create();
int minRank = Integer.MAX_VALUE;
PgxVertex minVertex = PgxVertex.NONE;

G.getVertices().forEach(n ->
    argMin(minRank, rank.get(n)).andUpdate(minVertex, n)
);

16.7.2 カスタムPGXアルゴリズムのコンパイルおよび実行

カスタムPGXアルゴリズムをコンパイルおよび実行できるようにするには、次の処理を実行する必要があります。
  1. conf/pgx.confファイルに次の2つの構成パラメータを設定します。
    • graph_algorithm_languageオプションをJAVAに設定します。
    • java_home_dirオプションを、Javaホームへのパスに設定します(<system-java-home-dir>を使用して、PGXにシステム・プロパティからJavaホームを推測させます)。
    {
      "graph_algorithm_language": "JAVA",
      "java_home_dir": "<system-java-home-dir>"
    }
  2. セッションを作成します。
    cd /opt/oracle/graph
    ./bin/opg4j 
    import oracle.pgx.algorithm.*;
    PgxSession session = Pgx.createSession("my-session");
    session = instance.create_session("my-session")
  3. PGXアルゴリズムをコンパイルします。たとえば:
    opg4j> var myAlgorithm = session.compileProgram("/path/to/MyAlgorithm.java")
    myAlgorithm ==> CompiledProgram[name=MyAlgorithm]
    import oracle.pgx.algorithm.CompiledProgram;
    CompiledProgram myAlgorithm = session.compileProgram("/path/to/MyAlgorithm.java");
    my_algorithm = session.compile_program("/path/to/MyAlgorithm.java")
  4. アルゴリズムを実行します。たとえば:
    opg4j> var graph =  session.readGraphByName("BANK_GRAPH",GraphSource.PG_PGQL)
    g ==> PgxGraph[name=BANK_GRAPH_2,N=999,E=4993,created=1689325558251]
    opg4j> var property = graph.createVertexProperty(PropertyType.INTEGER)
    property ==> VertexProperty[name=vertex_prop_integer_9,type=integer,graph=bank_graph_analytics]
    opg4j> myAlgorithm.run(graph, property)
    $6 ==> {
      "success" : true,
      "canceled" : false,
      "exception" : null,
      "returnValue" : 42,
      "executionTimeMs" : 0
    }
    import oracle.pgx.algorithm.VertexProperty;
    PgxGraph graph =  session.readGraphByName("BANK_GRAPH",GraphSource.PG_PGQL);
    VertexProperty property = graph.createVertexProperty(PropertyType.INTEGER);
    myAlgorithm.run(graph, property);
    graph = session.read_graph_by_name('BANK_GRAPH', 'pg_pgql')
    property = graph.create_vertex_property("integer")
    my_algorithm.run(graph, property)
    {'success': True, 'canceled': False, 'exception': None, 'return_value': 42, 'execution_time(ms)': 1}

16.7.3 カスタムPGXアルゴリズムの例: PageRank

PGXアルゴリズムとしてのpagerankの実装を次に示します。

import oracle.pgx.algorithm.PgxGraph;
import oracle.pgx.algorithm.Scalar;
import oracle.pgx.algorithm.VertexProperty;
import oracle.pgx.algorithm.annotations.GraphAlgorithm;
import oracle.pgx.algorithm.annotations.Out;

@GraphAlgorithm
public class Pagerank {
  public void pagerank(PgxGraph G, double tol, double damp, int max_iter, boolean norm, @Out VertexProperty<Double> rank) {
    Scalar<Double> diff = Scalar.create();
    int cnt = 0;
    double N = G.getNumVertices();

    rank.setAll(1 / N);
    do {
      diff.set(0.0);
      Scalar<Double> dangling_factor = Scalar.create(0d);

      if (norm) {
        dangling_factor.set(damp / N * G.getVertices().filter(v -> v.getOutDegree() == 0).sum(rank::get));
      }

      G.getVertices().forEach(t -> {
        double in_sum = t.getInNeighbors().sum(w -> rank.get(w) / w.getOutDegree());
        double val = (1 - damp) / N + damp * in_sum + dangling_factor.get();
        diff.reduceAdd(Math.abs(val - rank.get(t)));
        rank.setDeferred(t, val);
      });
      cnt++;
    } while (diff.get() > tol && cnt < max_iter);
  }
}

16.7.4 実行中のカスタムPGXグラフ・アルゴリズムの進行状況の追跡

AlgorithmProgress Java APIを使用して、実行中のカスタム・グラフ・アルゴリズムの進行状況を追跡できます。

numberOfStepsCompletedおよびnumberOfStepsEstimatedForCompletion属性で構成されるAlgorithmProgressオブジェクトは、アルゴリズムの進行状況をパーセンテージとして計算する際に使用されます。

カスタム・アルゴリズムの場合、numberOfStepsEstimatedForCompletionの値は自動的に指定されません。したがって、アルゴリズムの実装中にControlFlow.setNumberOfStepsEstimatedForCompletionをコールして値を指定する必要があります。値が指定されていない場合、または指定された値が負の場合、number_of_steps_estimated_for_completionはデフォルトのnull値を使用します。

次の例では、カスタム・グラフ・アルゴリズムでnumberOfStepsEstimatedForCompletion値を設定した後、AlgorithmProgress Java APIを使用して実行中のカスタム・グラフ・アルゴリズムの進行状況をパーセンテージとして追跡および見積もるステップを示しています。

  1. カスタム・グラフ・アルゴリズムでnumberOfStepsEstimatedForCompletionの値を設定します。

    numberOfStepsEstimatedForCompletionの値が提供されないアルゴリズムの場合、進行状況をパーセントとして見積もることはできません。ただし、カウンタの値(numberOfStepsCompleted)にはアクセスできます。

    numberOfStepsEstimatedForCompletionの値は、アルゴリズムが実行する実行ステップの合計数と同等であることが理想です。実行ステップは単純なループ反復です。正確な値を指定できない場合は、その値の上限見積値を指定する必要があります。

    次のoutDegreeCentralityアルゴリズムについて考えてみます:

    import oracle.pgx.algorithm.PgxGraph;
    import oracle.pgx.algorithm.VertexProperty;
    import oracle.pgx.algorithm.annotations.GraphAlgorithm;
    import oracle.pgx.algorithm.annotations.Out;
    import oracle.pgx.algorithm.ControlFlow;
    
    @GraphAlgorithm
    public class OutdegreeCentrality {
      public void outdegreeCentrality(PgxGraph g, @Out VertexProperty<Integer> outdegreeCentrality) {
        g.getVertices().forEach(n ->
            outdegreeCentrality.set(n, (int) n.getOutDegree())
        );
      }
    }

    このアルゴリズムはグラフのすべての頂点を反復してプロパティを更新します。したがって、この場合の実行ステップの合計数は、グラフの頂点の数と等しくなります:

    @GraphAlgorithm
    public class OutdegreeCentrality {
      public void outdegreeCentrality(PgxGraph g, @Out VertexProperty<Integer> outdegreeCentrality) {
    
        long totNbOfSteps = g.getNumVertices();
        ControlFlow.setNumberOfStepsEstimatedForCompletion(totNbOfSteps);
    
        g.getVertices().forEach(n ->
            outdegreeCentrality.set(n, (int) n.getOutDegree())
        );
      }
    }
  2. 次に示すように、カスタムの出次数中心性アルゴリズムを実行し、進行状況を追跡します:
    opg4j> var myAlgorithm = session.compileProgram("/path/to/OutdegreeCentrality.java")
    myAlgorithm ==> CompiledProgram[name=outdegreeCentrality]
    opg4j> var graph = session.readGraphByName("BANK_TXN_GRAPH", GraphSource.PG_PGQL)
    graph ==> PgxGraph[name=BANK_TXN_GRAPH,N=1000,E=4993,created=1712307339271]
    opg4j> var future = analyst.outDegreeCentralityAsync(graph)
    future ==> oracle.pgx.api.PgxFuture@55fe9c2f[Not completed]
    opg4j> var futureProgress = future.getProgress()
    futureProgress ==> oracle.pgx.api.DefaultFutureProgress@637506d8
    opg4j> var algorithmProgress = futureProgress.asAlgorithmExecutionProgress()
    import oracle.pgx.algorithm.CompiledProgram;
    
    CompiledProgram myAlgorithm = session.compileProgram("/path/to/OutdegreeCentrality.java");
    PgxGraph graph = session.readGraphByName("BANK_TXN_GRAPH", GraphSource.PG_PGQL);
    PgxFuture<?> future = analyst.pagerankAsync(graph);
    FutureProgress futureProgress = future.getProgress();
    Optional<AlgorithmProgress> algorithmProgress = futureProgress.asAlgorithmExecutionProgress();
  3. 実行中のアルゴリズムの進行状況をパーセンテージで見積もります。
    • if (algorithmProgress.isPresent()) {
        AlgorithmProgress progress = algorithmProgress.get();
        long completedSteps = progress.getNumberOfStepsCompleted();
        Long numberOfStepsEstimatedForCompletion = progress.getNumberOfStepsEstimatedForCompletion();
        long progressPercentage = completedSteps * 100 / numberOfStepsEstimatedForCompletion;
        System.out.println(completedSteps); // 153
        System.out.println(numberOfStepsEstimatedForCompletion); // 2343
        System.out.println(progressPercentage); // 6.53
      }
    • if (algorithmProgress.isPresent()) {
        AlgorithmProgress progress = algorithmProgress.get();
        long completedSteps = progress.getNumberOfStepsCompleted();
        Long numberOfStepsEstimatedForCompletion = progress.getNumberOfStepsEstimatedForCompletion();
        long progressPercentage = completedSteps * 100 / numberOfStepsEstimatedForCompletion;
        System.out.println(completedSteps); // 153
        System.out.println(numberOfStepsEstimatedForCompletion); // 2343
        System.out.println(progressPercentage); // 6.53
      };

    前述のコードは、現時点での進行状況6.53 %を示しています。しばらくしてから実行中のアルゴリズムの進行状況(たとえば、1min)を取得すると、値が大きくなっていることを確認できます。