16.7 Using Custom PGX Graph Algorithms

A custom PGX graph algorithm allows you to write a graph algorithm in Java syntax and have it automatically compiled to an efficient parallel implementation.

16.7.1 Writing a Custom PGX Algorithm

A PGX algorithm is a regular .java file with a single class definition that is annotated with @GraphAlgorithm. For example:

import oracle.pgx.algorithm.annotations.GraphAlgorithm;

@GraphAlgorithm
public class MyAlgorithm {
    ...
}

A PGX algorithm class must contain exactly one public method which will be used as entry point. The class may contain any number of private methods.

For example:

import oracle.pgx.algorithm.PgxGraph;
import oracle.pgx.algorithm.VertexProperty;
import oracle.pgx.algorithm.annotations.GraphAlgorithm;
import oracle.pgx.algorithm.annotations.Out;

@GraphAlgorithm
public class MyAlgorithm {
    public int myAlgorithm(PgxGraph g, @Out VertexProperty<Integer> distance) {
        System.out.println("My first PGX Algorithm program!");

        return 42;
    }
}

As with normal Java methods, a PGX algorithm method only supports primitive data types as return values (an integer in this example). More interesting is the @Out annotation, which marks the vertex property distance as output parameter. The caller passes output parameters by reference. This way, the caller has a reference to the modified property after the algorithm terminates.

16.7.1.1 Collections

To create a collection you call the .create() function. For example, a VertexProperty<Integer> is created as follows:

VertexProperty<Integer> distance = VertexProperty.create();

To get the value of a property at a certain vertex v:

distance.get(v);

Similarly, to set the property of a certain vertex v to a value e:

distance.set(v, e);

You can even create properties of collections:

VertexProperty<VertexSequence> path = VertexProperty.create();

However, PGX Algorithm assignments are always by value (as opposed to by reference). To make this explicit, you must call .clone() when assigning a collection:

VertexSequence sequence = path.get(v).clone();

Another consequence of values being passed by value is that you can check for equality using the == operator instead of the Java method .equals(). For example:

PgxVertex v1 = G.getRandomVertex();
PgxVertex v2 = G.getRandomVertex();
System.out.println(v1 == v2);

16.7.1.2 Iteration

The most common operations in PGX algorithms are iterations (such as looping over all vertices, and looping over a vertex's neighbors) and graph traversal (such as breath-first/depth-first). All collections expose a forEach and forSequential method by which you can iterate over the collection in parallel and in sequence, respectively.

For example:

  • To iterate over a graph's vertices in parallel:
    G.getVertices().forEach(v -> {
        ...
    });
    
  • To iterate over a graph's vertices in sequence:
    G.getVertices().forSequential(v -> {
        ...
    });
    
  • To traverse a graph's vertices from r in breadth-first order:
    import oracle.pgx.algorithm.Traversal;
    
    Traversal.inBFS(G, r).forward(n -> {
        ...
    });
    

    Inside the forward (or backward) lambda you can access the current level of the BFS (or DFS) traversal by calling currentLevel().

16.7.1.3 Reductions

Within these parallel blocks it is common to atomically update, or reduce to, a variable defined outside the lambda. These atomic reductions are available as methods on Scalar<T>: reduceAdd, reduceMul, reduceAnd, and so on. For example, to count the number of vertices in a graph:

public int countVertices() {
    Scalar<Integer> count = Scalar.create(0);

    G.getVertices().forEach(n -> {
        count.reduceAdd(1);
    });

    return count.get();
}

Sometimes you want to update multiple values atomically. For example, you might want to find the smallest property value as well as the vertex whose property value attains this smallest value. Due to the parallel execution, two separate reduction statements might get you in an inconsistent state.

To solve this problem the Reductions class provides argMin and argMax functions. The first argument to argMin is the current value and the second argument is the potential new minimum. Additionally, you can chain andUpdate calls on the ArgMinMax object to indicate other variables and the values that they should be updated to (atomically). For example:

VertexProperty<Integer> rank = VertexProperty.create();
int minRank = Integer.MAX_VALUE;
PgxVertex minVertex = PgxVertex.NONE;

G.getVertices().forEach(n ->
    argMin(minRank, rank.get(n)).andUpdate(minVertex, n)
);

16.7.2 Compiling and Running a Custom PGX Algorithm

To be able to compile and run a custom PGX algorithm, you must perform the following actions:
  1. Set the following two configuration parameters in the conf/pgx.conf file:
    • Set the graph_algorithm_language option to JAVA.
    • Set the java_home_dir option to the path to your Java home (use <system-java-home-dir> to have PGX infer Java home from the system properties).
    {
      "graph_algorithm_language": "JAVA",
      "java_home_dir": "<system-java-home-dir>"
    }
  2. Create a session.
    cd /opt/oracle/graph
    ./bin/opg4j 
    import oracle.pgx.algorithm.*;
    PgxSession session = Pgx.createSession("my-session");
    session = instance.create_session("my-session")
  3. Compile a PGX Algorithm. For example:
    opg4j> var myAlgorithm = session.compileProgram("/path/to/MyAlgorithm.java")
    myAlgorithm ==> CompiledProgram[name=MyAlgorithm]
    import oracle.pgx.algorithm.CompiledProgram;
    CompiledProgram myAlgorithm = session.compileProgram("/path/to/MyAlgorithm.java");
    my_algorithm = session.compile_program("/path/to/MyAlgorithm.java")
  4. Run the algorithm. For example:
    opg4j> var graph =  session.readGraphByName("BANK_GRAPH",GraphSource.PG_PGQL)
    g ==> PgxGraph[name=BANK_GRAPH_2,N=999,E=4993,created=1689325558251]
    opg4j> var property = graph.createVertexProperty(PropertyType.INTEGER)
    property ==> VertexProperty[name=vertex_prop_integer_9,type=integer,graph=bank_graph_analytics]
    opg4j> myAlgorithm.run(graph, property)
    $6 ==> {
      "success" : true,
      "canceled" : false,
      "exception" : null,
      "returnValue" : 42,
      "executionTimeMs" : 0
    }
    import oracle.pgx.algorithm.VertexProperty;
    PgxGraph graph =  session.readGraphByName("BANK_GRAPH",GraphSource.PG_PGQL);
    VertexProperty property = graph.createVertexProperty(PropertyType.INTEGER);
    myAlgorithm.run(graph, property);
    graph = session.read_graph_by_name('BANK_GRAPH', 'pg_pgql')
    property = graph.create_vertex_property("integer")
    my_algorithm.run(graph, property)
    {'success': True, 'canceled': False, 'exception': None, 'return_value': 42, 'execution_time(ms)': 1}

16.7.3 Example Custom PGX Algorithm: PageRank

The following is an implementation of pagerank as a PGX algorithm:

import oracle.pgx.algorithm.PgxGraph;
import oracle.pgx.algorithm.Scalar;
import oracle.pgx.algorithm.VertexProperty;
import oracle.pgx.algorithm.annotations.GraphAlgorithm;
import oracle.pgx.algorithm.annotations.Out;

@GraphAlgorithm
public class Pagerank {
  public void pagerank(PgxGraph G, double tol, double damp, int max_iter, boolean norm, @Out VertexProperty<Double> rank) {
    Scalar<Double> diff = Scalar.create();
    int cnt = 0;
    double N = G.getNumVertices();

    rank.setAll(1 / N);
    do {
      diff.set(0.0);
      Scalar<Double> dangling_factor = Scalar.create(0d);

      if (norm) {
        dangling_factor.set(damp / N * G.getVertices().filter(v -> v.getOutDegree() == 0).sum(rank::get));
      }

      G.getVertices().forEach(t -> {
        double in_sum = t.getInNeighbors().sum(w -> rank.get(w) / w.getOutDegree());
        double val = (1 - damp) / N + damp * in_sum + dangling_factor.get();
        diff.reduceAdd(Math.abs(val - rank.get(t)));
        rank.setDeferred(t, val);
      });
      cnt++;
    } while (diff.get() > tol && cnt < max_iter);
  }
}

16.7.4 Tracking the Progress of a Running Custom PGX Graph Algorithm

You can track the progress of a running custom graph algorithm using the AlgorithmProgress Java API.

The AlgorithmProgress object, which comprises the numberOfStepsCompleted and numberOfStepsEstimatedForCompletion attributes, is used to calculate the progress of the algorithm as a percentage.

In case of custom algorithms, the value of numberOfStepsEstimatedForCompletion is not automatically provided. You are therefore expected to provide the value by calling ControlFlow.setNumberOfStepsEstimatedForCompletion while implementing your algorithms. If no value is provided, or the provided value is negative, then number_of_steps_estimated_for_completion uses the default null value.

The following example describes the steps for setting the numberOfStepsEstimatedForCompletion value in a custom graph algorithm followed by tracking and estimating the progress as a percentage of a running custom graph algorithm using the AlgorithmProgress Java API.

  1. Set the value for numberOfStepsEstimatedForCompletion in your custom graph algorithm.

    Note that you cannot estimate the progress as a percentage for algorithms that do not provide a value for numberOfStepsEstimatedForCompletion. However you can still access the value of the counter (numberOfStepsCompleted).

    The value of numberOfStepsEstimatedForCompletion should ideally be equal to the total number of execution steps that an algorithm will perform. An execution step is simply a loop iteration. If the exact value cannot be specified, you should provide an upper bound estimate of that value.

    Consider the following outDegreeCentrality algorithm:

    import oracle.pgx.algorithm.PgxGraph;
    import oracle.pgx.algorithm.VertexProperty;
    import oracle.pgx.algorithm.annotations.GraphAlgorithm;
    import oracle.pgx.algorithm.annotations.Out;
    import oracle.pgx.algorithm.ControlFlow;
    
    @GraphAlgorithm
    public class OutdegreeCentrality {
      public void outdegreeCentrality(PgxGraph g, @Out VertexProperty<Integer> outdegreeCentrality) {
        g.getVertices().forEach(n ->
            outdegreeCentrality.set(n, (int) n.getOutDegree())
        );
      }
    }

    The algorithm just iterates over all vertices of the graph and updates a property. Therefore, the total number of execution steps in this case is equal to the number of vertices of the graph:

    @GraphAlgorithm
    public class OutdegreeCentrality {
      public void outdegreeCentrality(PgxGraph g, @Out VertexProperty<Integer> outdegreeCentrality) {
    
        long totNbOfSteps = g.getNumVertices();
        ControlFlow.setNumberOfStepsEstimatedForCompletion(totNbOfSteps);
    
        g.getVertices().forEach(n ->
            outdegreeCentrality.set(n, (int) n.getOutDegree())
        );
      }
    }
  2. Run and track the progress of the custom Out-Degree Centrality algorithm as shown:
    opg4j> var myAlgorithm = session.compileProgram("/path/to/OutdegreeCentrality.java")
    myAlgorithm ==> CompiledProgram[name=outdegreeCentrality]
    opg4j> var graph = session.readGraphByName("BANK_TXN_GRAPH", GraphSource.PG_PGQL)
    graph ==> PgxGraph[name=BANK_TXN_GRAPH,N=1000,E=4993,created=1712307339271]
    opg4j> var future = analyst.outDegreeCentralityAsync(graph)
    future ==> oracle.pgx.api.PgxFuture@55fe9c2f[Not completed]
    opg4j> var futureProgress = future.getProgress()
    futureProgress ==> oracle.pgx.api.DefaultFutureProgress@637506d8
    opg4j> var algorithmProgress = futureProgress.asAlgorithmExecutionProgress()
    import oracle.pgx.algorithm.CompiledProgram;
    
    CompiledProgram myAlgorithm = session.compileProgram("/path/to/OutdegreeCentrality.java");
    PgxGraph graph = session.readGraphByName("BANK_TXN_GRAPH", GraphSource.PG_PGQL);
    PgxFuture<?> future = analyst.pagerankAsync(graph);
    FutureProgress futureProgress = future.getProgress();
    Optional<AlgorithmProgress> algorithmProgress = futureProgress.asAlgorithmExecutionProgress();
  3. Estimate the progress of the running algorithm as a percentage.
    • if (algorithmProgress.isPresent()) {
        AlgorithmProgress progress = algorithmProgress.get();
        long completedSteps = progress.getNumberOfStepsCompleted();
        Long numberOfStepsEstimatedForCompletion = progress.getNumberOfStepsEstimatedForCompletion();
        long progressPercentage = completedSteps * 100 / numberOfStepsEstimatedForCompletion;
        System.out.println(completedSteps); // 153
        System.out.println(numberOfStepsEstimatedForCompletion); // 2343
        System.out.println(progressPercentage); // 6.53
      }
    • if (algorithmProgress.isPresent()) {
        AlgorithmProgress progress = algorithmProgress.get();
        long completedSteps = progress.getNumberOfStepsCompleted();
        Long numberOfStepsEstimatedForCompletion = progress.getNumberOfStepsEstimatedForCompletion();
        long progressPercentage = completedSteps * 100 / numberOfStepsEstimatedForCompletion;
        System.out.println(completedSteps); // 153
        System.out.println(numberOfStepsEstimatedForCompletion); // 2343
        System.out.println(progressPercentage); // 6.53
      };

    The preceding code shows the progress as 6.53 % at that current moment. If you try to get the progress of the running algorithm after a while (for example, 1min), then you should get a larger value.