Not available on all platforms
The enterprise scheduler features are currently only available for Linux (x86_64), macOS (x86_64), Solaris (x86_64, sparc)
This tutorial shows how you can use the advanced scheduling features of the enterprise scheduler.
The following topics are covered:
To enable the enterprise scheduler features, the flag allow_override_scheduling_information
of the
PGX config needs to be set to true
:
{ "allow_override_scheduling_information": true }
Execution environments are bound to a session.
To retrieve the execution environment for a session call getExecutionEnvironment()
on a PgxSession
:
pgx> var execEnv = session.getExecutionEnvironment() execEnv ==> ExecutionEnvironment[session=fd27b689-0018-441a-8247-e710f94fa51e]
import oracle.pgx.api.*; PgxSession session; ExecutionEnvironment execEnv = session.getExecutionEnvironment();
An execution environment is split into three sub-environments, one for each task type:
To query the current state of the execution environment call the getValues()
method:
pgx> execEnv.getValues() $3 ==> [analysis-pool.max_num_threads=4, analysis-pool.weight=4, analysis-pool.priority=MEDIUM, io-pool.num_threads_per_task=4, fast-track-analysis-pool.max_num_threads=4, fast-track-analysis-pool.weight=1, fast-track-analysis-pool.priority=HIGH]
import oracle.pgx.api.*; import java.util.List; import java.util.Map.Entry; List<Entry<String, Object>> currentValues = execEnv.getValues(); for (Entry<String, Object> value : currentValues) { System.out.println(value.getKey() + " = " + value.getValue()); }
The values of an unmodified execution environment match the values configured in the enterprise scheduler configuration.
To retrieve the sub-environments use the getIoEnvironment()
, getAnalysisEnvironment()
and getFastAnalysisEnvironment()
methods:
Each sub-environment has their own getValues()
method for retrieving the configuration of the sub-environment.
pgx> var ioEnv = execEnv.getIoEnvironment() ioEnv ==> IoEnvironment[pool=io-pool] pgx> ioEnv.getValues() $5 ==> {num_threads_per_task=4} pgx> var analysisEnv = execEnv.getAnalysisEnvironment() analysisEnv ==> CpuEnvironment[pool=analysis-pool] pgx> analysisEnv.getValues() $7 ==> {max_num_threads=4, weight=4, priority=MEDIUM} pgx> var fastAnalysisEnv = execEnv.getFastAnalysisEnvironment() fastAnalysisEnv ==> CpuEnvironment[pool=fast-track-analysis-pool] pgx> fastAnalysisEnv.getValues() $9 ==> {max_num_threads=4, weight=1, priority=HIGH}
import oracle.pgx.api.*; import oracle.pgx.api.executionenvironment.*; import java.util.Map; IoEnvironment ioEnv = execEnv.getIoEnvironment(); CpuEnvironment analysisEnv = execEnv.getAnalysisEnvironment(); CpuEnvironment fastAnalysisEnv = execEnv.getFastAnalysisEnvironment(); for (Entry<String, Object> value : ioEnv.getValues().getEntrySet()) { System.out.println(value.getKey() + " = " + value.getValue()); } for (Entry<String, Object> value : analysisEnv.getValues().getEntrySet()) { System.out.println(value.getKey() + " = " + value.getValue()); } for (Entry<String, Object> value : fastAnalysisEnv.getValues().getEntrySet()) { System.out.println(value.getKey() + " = " + value.getValue()); }
Io environments can be modified in the number of threads by using the setNumThreadsPerTask()
method of the IoEnvironment
.
The value is updated immediately and all tasks that are submitted after updating it are executed with the updated value.
pgx> ioEnv.setNumThreadsPerTask(8) pgx> var g = session.readGraphWithProperties(...) ==> PgxGraph[name=graph,N=3,E=6,created=0]
import oracle.pgx.api.*; import oracle.pgx.api.executionenvironment.*; ioEnv.setNumThreadsPerTask(8); PgxGraph g = session.readGraphWithProperties(...)
To reset an environment to their initial values, call the reset()
method:
pgx> ioEnv.reset()
ioEnv.reset();
Cpu environments can be modified in their weight, priority and maximum number of threads using the
setWeight()
, setPriority()
and setMaxThreads()
methods:
pgx> analysisEnv.setWeight(50) pgx> fastAnalysisEnv.setMaxNumThreads(1) pgx> var rank = analyst.pagerank(g) rank ==> VertexProperty[name=pagerank,type=double,graph=my-graph]
import oracle.pgx.api.*; import oracle.pgx.api.executionenvironment.*; analysisEnv.setWeight(50); fastAnalysisEnv.setMaxThreads(1); Analyst analyst = session.createAnalyst(); VertexProperty rank = analyst.pagerank(g);
You can reset all environments at once by calling reset()
on the ExecutionEnvironment
:
pgx> execEnv.reset()
execEnv.reset();
Typically the environment is used in the following way:
To make these three steps easier, there is a method that combines these three steps:
For each set
method there is a method using the with
prefix which takes the updated value and a
lambda which should be executed using the updated value.
For example instead of setNumThreadsPerTask()
there is a method called withNumThreadsPerTask()
, which can be invoked like this:
pgx> var g = ioEnv.withNumThreadsPerTask(8, () -> session.readGraphWithPropertiesAsync(...)) ==> PgxGraph[name=graph,N=3,E=6,created=0]
import oracle.pgx.api.*; import oracle.pgx.api.executionenvironment.*; PgxGraph g = ioEnv.withNumThreadsPerTask(8, () -> session.readGraphWithPropertiesAsync(...));
This is equivalent to the following sequence of actions:
var oldValue = ioEnv.getNumThreadsPerTask() ioEnv.setNumThreadsPerTask(currentValue) var g = session.readGraphWithProperties(...) ioEnv.setNumThreadsPerTask(oldValue)
To learn more about the configuration values see the Enterprise scheduler configuration guide.