Data Processing uses a Spark configuration file, sparkContext.properties. This topic describes how Data Processing obtains the settings for this file and includes a sample of the file. It also describes options you can adjust in this file to tweak the amount of memory required to successfully complete a Data Processing workflow.
Data Processing workflows are run by Spark workers. When a Spark worker is started for a Data Processing job, it has a set of default configuration settings that can be overridden or added to by the sparkContext.properties file.
The Spark configuration is very granular and needs to be adapted to the size of the cluster and also the data. In addition, the timeout and failure behavior may have to be altered. Spark offers an excellent set of configurable options for these purposes that you can use to configure Spark for the needs of your installation. For this reason, the sparkContext.properties is provided so that you can fine tune the performance of the Spark workers.
The sparkContext.properties file is located in the $CLI_HOME/edp_cli/config directory. As shipped, the file is empty. However, you can add any Spark configuration property to the file. The properties that you specify will override all previously-set Spark settings. The documentation for the Spark properties is at: https://spark.apache.org/docs/1.1.0/configuration.html
Keep in mind that the sparkContext.properties file can be empty. If the file is empty, a Data Processing workflow will still run correctly because the Spark worker will have a sufficient set of configuration properties to do its job.
If the sparkContext.properties file is empty, then the final configuration for the Spark worker is obtained from Steps 1 and 2.
######################################################### # Spark additional runtime properties ######################################################### spark.broadcast.compress=true spark.rdd.compress=false spark.io.compression.codec=org.apache.spark.io.LZFCompressionCodec spark.io.compression.snappy.block.size=32768 spark.closure.serializer=org.apache.spark.serializer.JavaSerializer spark.serializer.objectStreamReset=10000 spark.kryo.referenceTracking=true spark.kryoserializer.buffer.mb=2 spark.broadcast.factory=org.apache.spark.broadcast.HttpBroadcastFactory spark.broadcast.blockSize=4096 spark.files.overwrite=false spark.files.fetchTimeout=false spark.storage.memoryFraction=0.6 spark.tachyonStore.baseDir=System.getProperty("java.io.tmpdir") spark.storage.memoryMapThreshold=8192 spark.cleaner.ttl=(infinite)
java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:2271) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:102) at org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:30) at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:996) at org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1005) at org.apache.spark.storage.MemoryStore.putValues(MemoryStore.scala:79) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:663) at org.apache.spark.storage.BlockManager.put(BlockManager.scala:574) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:108) at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)
If OutOfMemoryError instances occur, you can adjust the DP CLI default values, as well as specify sparkContext.properties configurations, to suit the provisioning needs of your deployment.
For example, Data Processing allows you to specify a sparkExecutorMemory setting, which is used to define the amount of memory to use per executor process. (This corresponds to the spark.executor.memory parameter in the Spark configuration.) The Spark spark.storage.memoryFraction parameter is another important option to use if the Spark Executors are having memory issues.
You should also check the "Tuning Spark" topic: http://spark.apache.org/docs/latest/tuning.html