Spark構成

データ処理には、Spark構成ファイルが用意されています。

データ処理ワークフローはSparkワーカーによって実行されます。データ処理ジョブに対してSparkワーカーが起動するときに、Sparkワーカーには、ここで説明するsparkContext.propertiesファイルによってオーバーライドまたは追加できるデフォルトの構成が設定されています。

Spark構成は非常に詳細であるため、クラスタのサイズだけでなくデータのサイズにも適合させる必要があります。また、タイムアウトや失敗の動作も変更する必要がある場合があります。Sparkには、インストールのニーズに応じてSparkを構成するために使用することを目的とした、一連の優れた構成可能オプションが用意されています。このような理由から、sparkContext.propertiesが提供されており、これを使用してSparkワーカーのパフォーマンスを微調整できます。

sparkContext.propertiesファイルは、$CLI_HOME/edp_cli/configディレクトリにあります。このファイルは出荷時は空です。ただし、このファイルには任意のSpark構成プロパティを追加できます。指定するプロパティにより、事前に設定されているSpark設定はすべてオーバーライドされます。Sparkプロパティに関するドキュメントは、https://spark.apache.org/docs/1.1.0/configuration.htmlにあります。

sparkContext.propertiesファイルは空である可能性があることに注意してください。ファイルが空である場合も、Sparkワーカーにはジョブを実行するのに十分な構成プロパティ・セットがあるため、データ処理ワークフローは正しく動作します。

注意: sparkContext.propertiesファイルは削除しないでください。ファイルが空であっても、存在しているかどうかがチェックされ、ファイルがない場合、データ処理ワークフローは実行されません。

Sparkのデフォルト構成

Sparkワーカーは起動すると、構成設定が次の順序の3層方式で取得されます。
  1. Cloudera CDHのデフォルト設定から。
  2. データ処理構成設定から。これにより、Cloudera設定をオーバーライドすることも、設定を追加することもできます。たとえば、sparkExecutorMemoryプロパティ(DP CLI構成内)により、CDHのspark.executor.memoryプロパティをオーバーライドできます。
  3. sparkContext.propertiesファイル内のプロパティ設定から。これにより、前の設定をオーバーライドすることも、設定を追加することもできます。

sparkContext.propertiesファイルが空である場合、Sparkワーカーの最終構成には手順1および2の構成が使用されます。

Spark構成例

sparkContext.properties構成ファイルの例を次に示します。
#########################################################
# Spark additional runtime properties
#########################################################
spark.broadcast.compress=true
spark.rdd.compress=false
spark.io.compression.codec=org.apache.spark.io.LZFCompressionCodec
spark.io.compression.snappy.block.size=32768
spark.closure.serializer=org.apache.spark.serializer.JavaSerializer
spark.serializer.objectStreamReset=10000
spark.kryo.referenceTracking=true
spark.kryoserializer.buffer.mb=2
spark.broadcast.factory=org.apache.spark.broadcast.HttpBroadcastFactory
spark.broadcast.blockSize=4096
spark.files.overwrite=false
spark.files.fetchTimeout=false
spark.storage.memoryFraction=0.6
spark.tachyonStore.baseDir=System.getProperty("java.io.tmpdir")
spark.storage.memoryMapThreshold=8192
spark.cleaner.ttl=(infinite)

SparkワーカーのOutOfMemoryError

Sparkワーカーに割り当てられているメモリーが十分でない場合、OutOfMemoryErrorが発生し、データ処理ワークフローが異常終了し、次の例のようなエラー・メッセージが表示される可能性があります。
java.lang.OutOfMemoryError: Java heap space
   at java.util.Arrays.copyOf(Arrays.java:2271)
   at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
   at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
   at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
   at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
   at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
   at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
   at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
   at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
   at org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:102)
   at org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:30)
   at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:996)
   at org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1005)
   at org.apache.spark.storage.MemoryStore.putValues(MemoryStore.scala:79)
   at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:663)
   at org.apache.spark.storage.BlockManager.put(BlockManager.scala:574)
   at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:108)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
   at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
   at org.apache.spark.scheduler.Task.run(Task.scala:51)
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
   at java.lang.Thread.run(Thread.java:745)
データ処理ワークフローを正常に完了するために必要なメモリーの量は、データベースに次のような考慮事項によって異なります。
  • 各Hive表内のレコードの合計数。
  • 各Hive表レコードの平均サイズ。
また、次のようなDP CLI構成設定によっても異なります。
  • maxRecordsProcessed (デフォルト値は10000)
  • runEnrichment (デフォルト値はfalse)
  • sparkExecutorMemory (デフォルト値は10GB)

OutOfMemoryErrorインスタンスが発生した場合、DP CLIのデフォルト値を調整するとともに、sparkContext.properties構成を指定し、デプロイメントのプロビジョニングに関するニーズに合せることができます。

たとえば、データ処理では、エグゼキュータ・プロセスごとに使用するメモリーの量を定義するために使用するsparkExecutorMemory設定を指定できます。(これは、Spark構成のspark.executor.memoryパラメータに対応しています。)Sparkのspark.storage.memoryFractionパラメータは、Sparkエグゼキュータにメモリー上の問題があるときに使用するもう1つの重要なオプションです。

また、『Tuning Spark』トピック(http://spark.apache.org/docs/latest/tuning.html)もチェックする必要があります。