PGX 21.1.1

Advanced Task Scheduling Using Execution Environments

Not available on all platforms

The enterprise scheduler features are currently only available for Linux (x86_64), macOS (x86_64), Solaris (x86_64, sparc)

This tutorial shows how you can use the advanced scheduling features of the enterprise scheduler.

The following topics are covered:

  1. How to enable the enterprise scheduler features by configuring the PGX server
  2. How to retrieve and inspect the execution environment
  3. How to modify the execution environment and run tasks with it

Enabling Enterprise Scheduler Features

To enable the enterprise scheduler features, the flag allow_override_scheduling_information of the PGX config needs to be set to true:

  "allow_override_scheduling_information": true

Retrieving and Inspecting the Execution Environment

Execution environments are bound to a session. To retrieve the execution environment for a session call getExecutionEnvironment() on a PgxSession:

pgx> var execEnv = session.getExecutionEnvironment()
execEnv ==> ExecutionEnvironment[session=fd27b689-0018-441a-8247-e710f94fa51e]
import oracle.pgx.api.*;

PgxSession session;

ExecutionEnvironment execEnv = session.getExecutionEnvironment();

An execution environment is split into three sub-environments, one for each task type:

  • The IO environment: for IO tasks
  • The Analysis environment: for CPU bound analytics tasks
  • The Fast Analysis environment: for lightweight, but CPU bound analytics tasks

To query the current state of the execution environment call the getValues() method:

pgx> execEnv.getValues()
$3 ==> [analysis-pool.max_num_threads=4, analysis-pool.weight=4, analysis-pool.priority=MEDIUM, io-pool.num_threads_per_task=4, fast-track-analysis-pool.max_num_threads=4, fast-track-analysis-pool.weight=1, fast-track-analysis-pool.priority=HIGH]
import oracle.pgx.api.*;
import java.util.List;
import java.util.Map.Entry;

List<Entry<String, Object>> currentValues = execEnv.getValues();
for (Entry<String, Object> value : currentValues) {
  System.out.println(value.getKey() + " = " + value.getValue());

The values of an unmodified execution environment match the values configured in the enterprise scheduler configuration.

To retrieve the sub-environments use the getIoEnvironment(), getAnalysisEnvironment() and getFastAnalysisEnvironment() methods: Each sub-environment has their own getValues() method for retrieving the configuration of the sub-environment.

pgx> var ioEnv = execEnv.getIoEnvironment()
ioEnv ==> IoEnvironment[pool=io-pool]
pgx> ioEnv.getValues()
$5 ==> {num_threads_per_task=4}

pgx> var analysisEnv = execEnv.getAnalysisEnvironment()
analysisEnv ==> CpuEnvironment[pool=analysis-pool]
pgx> analysisEnv.getValues()
$7 ==> {max_num_threads=4, weight=4, priority=MEDIUM}

pgx> var fastAnalysisEnv = execEnv.getFastAnalysisEnvironment()
fastAnalysisEnv ==> CpuEnvironment[pool=fast-track-analysis-pool]
pgx> fastAnalysisEnv.getValues()
$9 ==> {max_num_threads=4, weight=1, priority=HIGH}
import oracle.pgx.api.*;
import oracle.pgx.api.executionenvironment.*;
import java.util.Map;

IoEnvironment ioEnv = execEnv.getIoEnvironment();
CpuEnvironment analysisEnv = execEnv.getAnalysisEnvironment();
CpuEnvironment fastAnalysisEnv = execEnv.getFastAnalysisEnvironment();

for (Entry<String, Object> value : ioEnv.getValues().getEntrySet()) {
  System.out.println(value.getKey() + " = " + value.getValue());

for (Entry<String, Object> value : analysisEnv.getValues().getEntrySet()) {
  System.out.println(value.getKey() + " = " + value.getValue());

for (Entry<String, Object> value : fastAnalysisEnv.getValues().getEntrySet()) {
  System.out.println(value.getKey() + " = " + value.getValue());

Modifying the Execution Environment and Submitting Tasks Under the Updated Environment

Io environments can be modified in the number of threads by using the setNumThreadsPerTask() method of the IoEnvironment. The value is updated immediately and all tasks that are submitted after updating it are executed with the updated value.

pgx> ioEnv.setNumThreadsPerTask(8)
pgx> var g = session.readGraphWithProperties(...)
==> PgxGraph[name=graph,N=3,E=6,created=0]
import oracle.pgx.api.*;
import oracle.pgx.api.executionenvironment.*;

PgxGraph g = session.readGraphWithProperties(...)

To reset an environment to their initial values, call the reset() method:

pgx> ioEnv.reset()

Cpu environments can be modified in their weight, priority and maximum number of threads using the setWeight(), setPriority() and setMaxThreads() methods:

pgx> analysisEnv.setWeight(50)
pgx> fastAnalysisEnv.setMaxNumThreads(1)
pgx> var rank = analyst.pagerank(g)
rank ==> VertexProperty[name=pagerank,type=double,graph=my-graph]
import oracle.pgx.api.*;
import oracle.pgx.api.executionenvironment.*;

Analyst analyst = session.createAnalyst();
VertexProperty rank = analyst.pagerank(g);

You can reset all environments at once by calling reset() on the ExecutionEnvironment:

pgx> execEnv.reset()

Lambda Syntax

Typically the environment is used in the following way:

  1. Set up execution environment
  2. Execute task
  3. Reset execution environment

To make these three steps easier, there is a method that combines these three steps: For each set method there is a method using the with prefix which takes the updated value and a lambda which should be executed using the updated value. For example instead of setNumThreadsPerTask() there is a method called withNumThreadsPerTask(), which can be invoked like this:

pgx> var g = ioEnv.withNumThreadsPerTask(8, () -> session.readGraphWithPropertiesAsync(...))
==> PgxGraph[name=graph,N=3,E=6,created=0]
import oracle.pgx.api.*;
import oracle.pgx.api.executionenvironment.*;

PgxGraph g = ioEnv.withNumThreadsPerTask(8, () -> session.readGraphWithPropertiesAsync(...));

This is equivalent to the following sequence of actions:

var oldValue = ioEnv.getNumThreadsPerTask()
var g = session.readGraphWithProperties(...)

See Also

To learn more about the configuration values see the Enterprise scheduler configuration guide.