Spark configuration

Data Processing uses a Spark configuration file, sparkContext.properties. This topic describes how Data Processing obtains the settings for this file and includes a sample of the file. It also describes options you can adjust in this file to tweak the amount of memory required to successfully complete a Data Processing workflow.

Data Processing workflows are run by Spark workers. When a Spark worker is started for a Data Processing job, it has a set of default configuration settings that can be overridden or added to by the sparkContext.properties file.

The Spark configuration is very granular and needs to be adapted to the size of the cluster and also the data. In addition, the timeout and failure behavior may have to be altered. Spark offers an excellent set of configurable options for these purposes that you can use to configure Spark for the needs of your installation. For this reason, the sparkContext.properties is provided so that you can fine tune the performance of the Spark workers.

The sparkContext.properties file is located in the $CLI_HOME/edp_cli/config directory. As shipped, the file is empty. However, you can add any Spark configuration property to the file. The properties that you specify will override all previously-set Spark settings. The documentation for the Spark properties is at: https://spark.apache.org/docs/latest/configuration.html

Keep in mind that the sparkContext.properties file can be empty. If the file is empty, a Data Processing workflow will still run correctly because the Spark worker will have a sufficient set of configuration properties to do its job.

Note:

Do not delete the sparkContext.properties file. Although it can be empty, a check is made for its existence and the Data Processing workflow will not run if the file is missing.

Spark default configuration

When started, a Spark worker gets its configuration settings in a three-tiered manner, in this order:
  1. From the Hadoop default settings.
  2. From the Data Processing configuration settings, which can either override the Hadoop settings, and/or provide additional settings. For example, the sparkExecutorMemory property (in the DP CLI configuration) can override the Hadoop spark.executor.memory property.
  3. From the property settings in the sparkContext.properties file, which can either override any previous settings and/or provide additional settings.

If the sparkContext.properties file is empty, then the final configuration for the Spark worker is obtained from Steps 1 and 2.

Sample Spark configuration

The following is a sample sparkContext.properties configuration file:
#########################################################
# Spark additional runtime properties
#########################################################
spark.broadcast.compress=true
spark.rdd.compress=false
spark.io.compression.codec=org.apache.spark.io.LZFCompressionCodec
spark.io.compression.snappy.block.size=32768
spark.closure.serializer=org.apache.spark.serializer.JavaSerializer
spark.serializer.objectStreamReset=10000
spark.kryo.referenceTracking=true
spark.kryoserializer.buffer.mb=2
spark.broadcast.factory=org.apache.spark.broadcast.HttpBroadcastFactory
spark.broadcast.blockSize=4096
spark.files.overwrite=false
spark.files.fetchTimeout=false
spark.storage.memoryFraction=0.6
spark.tachyonStore.baseDir=System.getProperty("java.io.tmpdir")
spark.storage.memoryMapThreshold=8192
spark.cleaner.ttl=(infinite)

Configuring fail fast behavior for transforms

When a transform is committed, the ApplyTransformToDataSetWorkflow will not retry on failure. This workflow cannot safely be re-run after failure because the state of the data set may be out of sync with the state of the HDFS sample files. This non-retry behavior applies to all Hadoop environments.

Users can modify the yarn.resourcemanager.am.max-attempts setting on their cluster to prevent retries of any YARN job. If users do not do this, it may look like the workflow succeeded, but will fail on future transforms because of the inconsistent sample data files. Users do not have to set this property unless they want the fail fast behavior.

Enabling Spark event logging

You can enable Spark event logging with this file. At runtime, Spark internally compiles the DP workflow into multiple stages (a stage is usually defined by a set of Spark Transformation and bounded by Spark Action). The stages can be matched to the DP operations. The Spark event log includes the detailed timing information on a stage and all the tasks within the stage.

The following Spark properties are used for Spark event logging:
  • spark.eventLog.enabled (which set to true) enables the logging of Spark events.
  • spark.eventLog.dir specifies the base directory in which Spark events are logged.
  • spark.yarn.historyServer.address specifies the address of the Spark history server (i.e., host.com:18080). The address should not contain a scheme (http://).
For example:
spark.eventLog.enabled=true
spark.eventLog.dir=hdfs://busj40CDH3-ns/user/spark/applicationHistory
spark.yarn.historyServer.address=busj40bda13.example.com:18088

Note that enabling Spark event logging should be done by Oracle Support personnel when trouble-shooting problems. Enabling Spark event logging under normal circumstances is not recommended as it can have an adverse performance impact on workflows.

Spark worker OutOfMemoryError

If insufficient memory is allocated to a Spark worker, an OutOfMemoryError may occur and the Data Processing workflow may terminate with an error message similar to this example:
java.lang.OutOfMemoryError: Java heap space
   at java.util.Arrays.copyOf(Arrays.java:2271)
   at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
   at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
   at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
   at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
   at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
   at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
   at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
   at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
   at org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:102)
   at org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:30)
   at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:996)
   at org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1005)
   at org.apache.spark.storage.MemoryStore.putValues(MemoryStore.scala:79)
   at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:663)
   at org.apache.spark.storage.BlockManager.put(BlockManager.scala:574)
   at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:108)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
   at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
   at org.apache.spark.scheduler.Task.run(Task.scala:51)
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
   at java.lang.Thread.run(Thread.java:745)
The amount of memory required to successfully complete a Data Processing workflow depends on database considerations such as:
  • The total number of records in each Hive table.
  • The average size of each Hive table record.
It also depends on the DP CLI configuration settings, such as:
  • maxRecordsForNewDataSet
  • runEnrichment
  • sparkExecutorMemory

If OutOfMemoryError instances occur, you can adjust the DP CLI default values, as well as specify sparkContext.properties configurations, to suit the provisioning needs of your deployment.

For example, Data Processing allows you to specify a sparkExecutorMemory setting, which is used to define the amount of memory to use per executor process. (This setting corresponds to the spark.executor.memory parameter in the Spark configuration.) The Spark spark.storage.memoryFraction parameter is another important option to use if the Spark Executors are having memory issues.

You should also check the "Tuning Spark" topic: http://spark.apache.org/docs/latest/tuning.html

Benign sparkDriver shutdown error

After a Spark job finishes successfully, you may see a sparkDriver shutdown ERROR message in the log, as in this abbreviated example:
...
11:11:42.828 Thread-2 INFO : Shutting down all executors
11:11:42.829 sparkDriver-akka.actor.default-dispatcher-19 INFO : Asking each executor to shut down
11:11:42.892 sparkDriver-akka.actor.default-dispatcher-17 ERROR: AssociationError [akka.tcp://sparkDriver@10.152.110.203:62743] <- [akka.tcp://sparkExecutor@atm.example.com:30203]: Error [Shut down address: akka.tcp://sparkExecutor@bus00atm.us.oracle.com:30203] [
akka.remote.ShutDownAssociation: Shut down address: akka.tcp://sparkExecutor@atm.example.com:30203
Caused by: akka.remote.transport.Transport$InvalidAssociationException: The remote system terminated the association because it is shutting down.
]
akka.event.Logging$Error$NoCause$
11:11:42.893 sparkDriver-akka.actor.default-dispatcher-19 INFO : Driver terminated or disconnected! Shutting down. atm.example.com:30203
11:11:42.897 sparkDriver-akka.actor.default-dispatcher-19 INFO : MapOutputTrackerMasterEndpoint stopped!
...

The actual Spark work is done successfully. However, the sparkDriver shutdown generates the error message. The log message is displayed by Spark (not the Data Processing code). The message is benign and there is no actual impact to functionality.

Note on differentiating job queuing and cluster locking

Sites that have a small and busy cluster may encounter problems with Spark jobs not running with a message similar to the following example:
[DataProcessing] [WARN] [] [org.apache.spark.Logging$class] [tid:Timer-0] [userID:yarn]
Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered 
and have sufficient memory

The cause may be due to normal YARN job queuing rather than cluster locking. (Cluster locking is when a cluster is deadlocked by submitting many applications at once, and having all cluster resources taken up by the ApplicationManagers.) The appearance of the normal YARN job queuing is very similar to cluster locking, especially when there is a large YARN job taking excess time to run. To check on the status of jobs, use the Hadoop cluster manager for your Hadoop distribution.

The following information may help differentiate between job queuing and suspected cluster locking: Jobs are in normal queuing state unless there are multiple jobs in a RUNNING state, and you observe "Initial job has not accepted any resources" in the logs of all these jobs. As long as there is one job making progress where you usually see "Starting task X.X in stage X.X", those jobs are actually in normal queuing state. Also, when checking Spark RUNNING jobs through ResourceManager UI, you should browse beyond the first page or use the Search box in the UI, so that no RUNNING applications are left out.

If your Hadoop cluster has a Hadoop version earlier than 2.6.0., it is recommended that the explicit setting is used to limit the ApplicationMaster share:

<queueMaxAMShareDefault>0.5</queueMaxAMShareDefault>

This property limits the fraction of the queue's fair share that can be used to run Application Masters.