Data Processing uses a Spark configuration file, sparkContext.properties. This topic describes how Data Processing obtains the settings for this file and includes a sample of the file. It also describes options you can adjust in this file to tweak the amount of memory required to successfully complete a Data Processing workflow.
Data Processing workflows are run by Spark workers. When a Spark worker is started for a Data Processing job, it has a set of default configuration settings that can be overridden or added to by the sparkContext.properties file.
The Spark configuration is very granular and needs to be adapted to the size of the cluster and also the data. In addition, the timeout and failure behavior may have to be altered. Spark offers an excellent set of configurable options for these purposes that you can use to configure Spark for the needs of your installation. For this reason, the sparkContext.properties is provided so that you can fine tune the performance of the Spark workers.
The sparkContext.properties file is located in the $CLI_HOME/edp_cli/config directory. As shipped, the file is empty. However, you can add any Spark configuration property to the file. The properties that you specify will override all previously-set Spark settings. The documentation for the Spark properties is at: https://spark.apache.org/docs/latest/configuration.html
Keep in mind that the sparkContext.properties file can be empty. If the file is empty, a Data Processing workflow will still run correctly because the Spark worker will have a sufficient set of configuration properties to do its job.
If the sparkContext.properties file is empty, then the final configuration for the Spark worker is obtained from Steps 1 and 2.
######################################################### # Spark additional runtime properties ######################################################### spark.broadcast.compress=true spark.rdd.compress=false spark.io.compression.codec=org.apache.spark.io.LZFCompressionCodec spark.io.compression.snappy.block.size=32768 spark.closure.serializer=org.apache.spark.serializer.JavaSerializer spark.serializer.objectStreamReset=10000 spark.kryo.referenceTracking=true spark.kryoserializer.buffer.mb=2 spark.broadcast.factory=org.apache.spark.broadcast.HttpBroadcastFactory spark.broadcast.blockSize=4096 spark.files.overwrite=false spark.files.fetchTimeout=false spark.storage.memoryFraction=0.6 spark.tachyonStore.baseDir=System.getProperty("java.io.tmpdir") spark.storage.memoryMapThreshold=8192 spark.cleaner.ttl=(infinite)
When a transform is committed, the ApplyTransformToDataSetWorkflow will not retry on failure. This workflow cannot safely be re-run after failure because the state of the data set may be out of sync with the state of the HDFS sample files. This non-retry behavior applies to all Hadoop environments.
Users can modify the yarn.resourcemanager.am.max-attempts setting on their cluster to prevent retries of any YARN job. If users do not do this, it may look like the workflow succeeded, but will fail on future transforms because of the inconsistent sample data files. Users do not have to set this property unless they want the fail fast behavior.
You can enable Spark event logging with this file. At runtime, Spark internally compiles the DP workflow into multiple stages (a stage is usually defined by a set of Spark Transformation and bounded by Spark Action). The stages can be matched to the DP operations. The Spark event log includes the detailed timing information on a stage and all the tasks within the stage.
spark.eventLog.enabled=true spark.eventLog.dir=hdfs://busj40CDH3-ns/user/spark/applicationHistory spark.yarn.historyServer.address=busj40bda13.example.com:18088
Note that enabling Spark event logging should be done by Oracle Support personnel when trouble-shooting problems. Enabling Spark event logging under normal circumstances is not recommended as it can have an adverse performance impact on workflows.
java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:2271) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:102) at org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:30) at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:996) at org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1005) at org.apache.spark.storage.MemoryStore.putValues(MemoryStore.scala:79) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:663) at org.apache.spark.storage.BlockManager.put(BlockManager.scala:574) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:108) at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)
If OutOfMemoryError instances occur, you can adjust the DP CLI default values, as well as specify sparkContext.properties configurations, to suit the provisioning needs of your deployment.
For example, Data Processing allows you to specify a sparkExecutorMemory setting, which is used to define the amount of memory to use per executor process. (This setting corresponds to the spark.executor.memory parameter in the Spark configuration.) The Spark spark.storage.memoryFraction parameter is another important option to use if the Spark Executors are having memory issues.
You should also check the "Tuning Spark" topic: http://spark.apache.org/docs/latest/tuning.html
... 11:11:42.828 Thread-2 INFO : Shutting down all executors 11:11:42.829 sparkDriver-akka.actor.default-dispatcher-19 INFO : Asking each executor to shut down 11:11:42.892 sparkDriver-akka.actor.default-dispatcher-17 ERROR: AssociationError [akka.tcp://sparkDriver@10.152.110.203:62743] <- [akka.tcp://sparkExecutor@atm.example.com:30203]: Error [Shut down address: akka.tcp://sparkExecutor@bus00atm.us.oracle.com:30203] [ akka.remote.ShutDownAssociation: Shut down address: akka.tcp://sparkExecutor@atm.example.com:30203 Caused by: akka.remote.transport.Transport$InvalidAssociationException: The remote system terminated the association because it is shutting down. ] akka.event.Logging$Error$NoCause$ 11:11:42.893 sparkDriver-akka.actor.default-dispatcher-19 INFO : Driver terminated or disconnected! Shutting down. atm.example.com:30203 11:11:42.897 sparkDriver-akka.actor.default-dispatcher-19 INFO : MapOutputTrackerMasterEndpoint stopped! ...
The actual Spark work is done successfully. However, the sparkDriver shutdown generates the error message. The log message is displayed by Spark (not the Data Processing code). The message is benign and there is no actual impact to functionality.
[DataProcessing] [WARN] [] [org.apache.spark.Logging$class] [tid:Timer-0] [userID:yarn] Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
The cause may be due to normal YARN job queuing rather than cluster locking. (Cluster locking is when a cluster is deadlocked by submitting many applications at once, and having all cluster resources taken up by the ApplicationManagers.) The appearance of the normal YARN job queuing is very similar to cluster locking, especially when there is a large YARN job taking excess time to run. To check on the status of jobs, use the Hadoop cluster manager for your Hadoop distribution.
The following information may help differentiate between job queuing and suspected cluster locking: Jobs are in normal queuing state unless there are multiple jobs in a RUNNING state, and you observe "Initial job has not accepted any resources" in the logs of all these jobs. As long as there is one job making progress where you usually see "Starting task X.X in stage X.X", those jobs are actually in normal queuing state. Also, when checking Spark RUNNING jobs through ResourceManager UI, you should browse beyond the first page or use the Search box in the UI, so that no RUNNING applications are left out.
If your Hadoop cluster has a Hadoop version earlier than 2.6.0., it is recommended that the explicit setting is used to limit the ApplicationMaster share:
<queueMaxAMShareDefault>0.5</queueMaxAMShareDefault>
This property limits the fraction of the queue's fair share that can be used to run Application Masters.