Spark configuration

Data Processing uses a Spark configuration file, sparkContext.properties. This topic describes how Data Processing obtains the settings for this file and includes a sample of the file. It also describes options you can adjust in this file to tweak the amount of memory required to successfully complete a Data Processing workflow.

Data Processing workflows are run by Spark workers. When a Spark worker is started for a Data Processing job, it has a set of default configuration settings that can be overridden or added to by the sparkContext.properties file.

The Spark configuration is very granular and needs to be adapted to the size of the cluster and also the data. In addition, the timeout and failure behavior may have to be altered. Spark offers an excellent set of configurable options for these purposes that you can use to configure Spark for the needs of your installation. For this reason, the sparkContext.properties is provided so that you can fine tune the performance of the Spark workers.

The sparkContext.properties file is located in the $CLI_HOME/edp_cli/config directory. As shipped, the file is empty. However, you can add any Spark configuration property to the file. The properties that you specify will override all previously-set Spark settings. The documentation for the Spark properties is at: https://spark.apache.org/docs/1.1.0/configuration.html

Keep in mind that the sparkContext.properties file can be empty. If the file is empty, a Data Processing workflow will still run correctly because the Spark worker will have a sufficient set of configuration properties to do its job.

Note: Do not delete the sparkContext.properties file. Although it can be empty, a check is made for its existence and the Data Processing workflow will not run if the file is missing.

Spark default configuration

When started, a Spark worker gets its configuration settings in a three-tiered manner, in this order:
  1. From the Cloudera CDH default settings.
  2. From the Data Processing configuration settings, which can either override the Cloudera settings, and/or provide additional settings. For example, the sparkExecutorMemory property (in the DP CLI configuration) can override the CDH spark.executor.memory property.
  3. From the property settings in the sparkContext.properties file, which can either override any previous settings and/or provide additional settings.

If the sparkContext.properties file is empty, then the final configuration for the Spark worker is obtained from Steps 1 and 2.

Sample Spark configuration

The following is a sample sparkContext.properties configuration file:
#########################################################
# Spark additional runtime properties
#########################################################
spark.broadcast.compress=true
spark.rdd.compress=false
spark.io.compression.codec=org.apache.spark.io.LZFCompressionCodec
spark.io.compression.snappy.block.size=32768
spark.closure.serializer=org.apache.spark.serializer.JavaSerializer
spark.serializer.objectStreamReset=10000
spark.kryo.referenceTracking=true
spark.kryoserializer.buffer.mb=2
spark.broadcast.factory=org.apache.spark.broadcast.HttpBroadcastFactory
spark.broadcast.blockSize=4096
spark.files.overwrite=false
spark.files.fetchTimeout=false
spark.storage.memoryFraction=0.6
spark.tachyonStore.baseDir=System.getProperty("java.io.tmpdir")
spark.storage.memoryMapThreshold=8192
spark.cleaner.ttl=(infinite)

Spark worker OutOfMemoryError

If insufficient memory is allocated to a Spark worker, an OutOfMemoryError may occur and the Data Processing workflow may terminate with an error message similar to this example:
java.lang.OutOfMemoryError: Java heap space
   at java.util.Arrays.copyOf(Arrays.java:2271)
   at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
   at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
   at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
   at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
   at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
   at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
   at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
   at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
   at org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:102)
   at org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:30)
   at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:996)
   at org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1005)
   at org.apache.spark.storage.MemoryStore.putValues(MemoryStore.scala:79)
   at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:663)
   at org.apache.spark.storage.BlockManager.put(BlockManager.scala:574)
   at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:108)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
   at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
   at org.apache.spark.scheduler.Task.run(Task.scala:51)
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
   at java.lang.Thread.run(Thread.java:745)
The amount of memory required to successfully complete a Data Processing workflow depends on database considerations such as:
  • The total number of records in each Hive table.
  • The average size of each Hive table record.
It also depends on the DP CLI configuration settings, such as:
  • maxRecordsProcessed (default is 10000)
  • runEnrichment (default is false)
  • sparkExecutorMemory (default is 10GB)

If OutOfMemoryError instances occur, you can adjust the DP CLI default values, as well as specify sparkContext.properties configurations, to suit the provisioning needs of your deployment.

For example, Data Processing allows you to specify a sparkExecutorMemory setting, which is used to define the amount of memory to use per executor process. (This corresponds to the spark.executor.memory parameter in the Spark configuration.) The Spark spark.storage.memoryFraction parameter is another important option to use if the Spark Executors are having memory issues.

You should also check the "Tuning Spark" topic: http://spark.apache.org/docs/latest/tuning.html