PGX 20.2.2
Documentation

Block, Chain and Cancel Requests

The PGX API is designed to be asynchronous. This means that all of its core methods ending with Async do not block the caller thread until the request is completed. Instead, a PgxFuture object is instantly returned. In PGX, three actions can be performed on the returned PgxFuture:

1. Block

The easiest and probably most obvious way to get the result is to call get() on the PgxFuture, which blocks the caller thread until the result is available:

PgxFuture<PgxSession> sessionPromise = instance.createSessionAsync("my-session");
try {
    // block caller thread
    PgxSession session = sessionPromise.get();
    // do something with session
    ...
} catch (InterruptedException e) {
    // caller thread was interrupted while waiting for result
} catch (ExecutionException e) {
    // an exception was thrown during asynchronous computation
    Throwable cause = e.getCause(); // the actual exception is nested
}

Note: PGX provides blocking convenience methods for every Async method, which call the get() for you. Typically, those methods have the same name as the asynchronous method they wrap, but without the Async suffix. For example, above code snippet is equal to

try {
    // block caller thread
    PgxSession session = instance.createSession("my-session");
    // do something with session
    ...
} catch (InterruptedException e) {
    // caller thread was interrupted while waiting for result
} catch (ExecutionException e) {
    // an exception was thrown during asynchronous computation
    Throwable cause = e.getCause(); // the actual exception is nested
}

2. Chain

PGX ships a version of Java 8's CompletableFuture named PgxFuture, a monadic enhancement of the Future interface. The CompletableFuture allows chaining of asynchronous computations without polling or the need of deeply nested callbacks (also known as callback hell). All PgxFuture instances returned by PGX APIs are instances of CompletableFuture and can be chained without the need of Java 8:

import jsr166e.CompletableFuture;
import jsr166e.CompletableFuture.Action;
import jsr166e.CompletableFuture.Fun;

...

final GraphConfig graphConfig = ...
instance.createSessionAsync("my-session")
  .thenCompose(new Fun<PgxSession, CompletableFuture<PgxGraph>>() {
  @Override
  public CompletableFuture<PgxGraph> apply(PgxSession session) {
    return session.readGraphWithPropertiesAsync(graphConfig);
  }
}).thenAccept(new Action<PgxGraph>() {
  @Override
  public void accept(PgxGraph graph) {
    // do something with loaded graph
  }
});

In above example, we first make an asynchronous request createSessionAsync(), asking PGX to create a session. On the returned PgxFuture object, we call thenCompose() to describe what should happen next. The function we pass into thenCompose() gets called once the asynchronous createSessionAsync() completes with the result of the PgxFuture object as argument: the newly created PgxSession object. Inside the function, we make another asynchronous readGraphWithPropertiesAsync() request and return the PgxFuture object. The outer PgxFuture object returned by thenCompose() gets completed when the readGraphWithPropertiesAsync() request completes, so we can chain another function to it. This time, we use thenAccept() instead of thenCompose(). The difference is that the function you pass to thenAccept() does not return anything. Hence, the future return by thenAccept() is of type PgxFuture<Void>.

Here is a full reference to what you can do with the CompletableFuture class.


Blocking Versus Chaining

For most use cases, blocking the caller thread is fine. However, blocking can quickly lead to poor performance or deadlocks once things get more complex. As a rule, use blocking to quickly analyze selected graphs in a sequential manner, for example, in shell scripts or during interactive analysis using PGX Shell. Use chaining for applications built on top of PGX.


3. Cancel

To request cancellation of a pending request, simply invoke the cancel method of the returned PgxFuture instance.

For example:

PgxFuture<Object> promise = ...

// do something else

promise.cancel(); // will cancel computation

Any subsequent promise.get() invocation will result in a CancellationException being thrown.

Note:

Because of Java's cooperative threading model, it might take some time before PGX actually stops the computation.

In addition to the aforementioned actions that can be performed on a PgxFuture object, PGX offers an API PgxSession#runConcurrently to submit a list of suppliers of asynchronous APIs to run concurrently in the PGX server. Here is an example:

import oracle.pgx.api.*;

    Supplier<PgxFuture<?>> asyncRequest1 = () -> session.readGraphWithPropertiesAsync(...);
    Supplier<PgxFuture<?>> asyncRequest2 = () -> session.getAvailableSnapshotsAsync(...);

    List<Supplier<PgxFuture<?>>> supplierList = Arrays.asList(asyncRequest1, asyncRequest2);

    //executing the async requests with the enabled optimization feature
    List<?> results = session.runConcurrently(supplierList);

    //the supplied requests are mapped to their results and orderly collected
    PgxGraph graph = (PgxGraph) results.get(0);
    Deque<GraphMetaData> metaData = (Deque<GraphMetaData>) results.get(1);