データ処理は、Spark構成ファイルsparkContext.properties
を使用します。 このトピックでは、データ処理がこのファイルの設定を取得する方法とファイルのサンプルを示します。 また、このファイルで調整し、データ処理ワークフローを正常に完了するために必要なメモリー量を調整するためのオプションについても説明します。
データ処理ワークフローは、Sparkのワーカーによって実行されます。 Sparkワーカーのデータ処理ジョブが開始されると、sparkContext.properties
ファイルによって上書きまたは追加できる一連のデフォルトの構成設定があります。
Spark構成は細かなもので、クラスタのサイズやデータに適応させる必要があります。 また、タイムアウトおよび障害時の動作を変更する必要がある場合もあります。 Sparkでは、これらの目的のために適切な構成可能なオプション・セットが用意されており、これらのオプションを使用して、インストールのニーズに応じてSparkを構成できます。 そのため、Sparkのワーカーのパフォーマンスを微調整できるようにsparkContext.properties
が用意されています。
sparkContext.properties
ファイルは、$CLI_HOME/edp_cli/config
ディレクトリにあります。 出荷時のファイルは空です。 ただし、任意のSpark構成プロパティをファイルに追加できます。 指定したプロパティによって、以前に設定されたすべてのSpark設定がオーバーライドされます。 Sparkプロパティのドキュメントは次の場所にあります。 : https://spark.apache.org/docs/latest/configuration.html
sparkContext.properties
ファイルは空にできることに留意してください。 ファイルが空の場合、Sparkのワーカーがジョブを実行するための十分な構成プロパティのセットを持つため、データ処理ワークフローは引き続き正しく実行されます。
注意:
sparkContext.properties
ファイルは削除しないでください。 空にすることもできますが、その存在のチェックが実行され、ファイルがない場合はデータ処理ワークフローは実行されません。
Sparkのデフォルト構成
sparkExecutorMemory
プロパティ(DP CLI構成)は、Hadoop spark.executor.memory
プロパティをオーバーライドできます。
sparkContext.properties
ファイルのプロパティ設定から、以前の設定をオーバーライドしたり、追加設定を指定できます。
sparkContext.properties
ファイルが空の場合、Sparkワーカーの最終構成は、ステップ1および2から取得されます。
サンプルSpark構成
sparkContext.properties
構成ファイルのサンプルを示します:
######################################################### # Spark additional runtime properties ######################################################### spark.broadcast.compress=true spark.rdd.compress=false spark.io.compression.codec=org.apache.spark.io.LZFCompressionCodec spark.io.compression.snappy.block.size=32768 spark.closure.serializer=org.apache.spark.serializer.JavaSerializer spark.serializer.objectStreamReset=10000 spark.kryo.referenceTracking=true spark.kryoserializer.buffer.mb=2 spark.broadcast.factory=org.apache.spark.broadcast.HttpBroadcastFactory spark.broadcast.blockSize=4096 spark.files.overwrite=false spark.files.fetchTimeout=false spark.storage.memoryFraction=0.6 spark.tachyonStore.baseDir=System.getProperty("java.io.tmpdir") spark.storage.memoryMapThreshold=8192 spark.cleaner.ttl=(infinite)
変換の異常な高速動作の構成
変換がコミットされると、ApplyTransformToDataSetWorkflowは失敗時に再試行しません。 データ・セットの状態がHDFSサンプル・ファイルの状態と同期されていない可能性があるため、障害の後にこのワークフローを安全に再実行することはできません。 この非試行動作は、すべてのHadoop環境に適用されます。
ユーザーはクラスタ上のyarn.resourcemanager.am.max-attempts
設定を変更して、YARNジョブが再試行されないようにできます。 この操作を行わないと、ワークフローは成功したように見えますが、サンプル・データ・ファイルが矛盾しているため、今後の変換では失敗します。 高速動作の失敗が必要でないかぎり、ユーザーがこのプロパティを設定する必要はありません。
Sparkのイベント・ロギングの有効化
このファイルを使用すると、Sparkイベント・ロギングを有効にできます。 実行時に、SparkはDPワークフローを複数のステージに内部的にコンパイルします(通常、ステージはSpark変換のセットによって定義され、Sparkアクションによってバインドされます)。 ステージはDP操作に一致させることができます。 Sparkイベント・ログには、ステージとステージ内のすべてのタスクに関する詳細なタイミング情報が含まれます。
spark.eventLog.enabled
(trueに設定)では、Sparkイベントのロギングを有効にします。
spark.eventLog.dir
では、Sparkイベントが記録されるベース・ディレクトリを指定します。
spark.yarn.historyServer.address
では、Spark履歴サーバー(つまり、host.com:18080)のアドレスを指定します。 アドレスにはスキーム(http://)を含めないでください。
spark.eventLog.enabled=true spark.eventLog.dir=hdfs://busj40CDH3-ns/user/spark/applicationHistory spark.yarn.historyServer.address=busj40bda13.example.com:18088
Sparkのイベント・ロギングを有効にするには、問題のトラブルシューティングを行う際にOracle Support担当者が行う必要があります。 ワークフローのパフォーマンスが低下する可能性があるため、通常の状況でSparkイベント・ロギングを有効にすることはお薦めしません。
SparkワーカーOutOfMemoryError
java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:2271) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:102) at org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:30) at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:996) at org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1005) at org.apache.spark.storage.MemoryStore.putValues(MemoryStore.scala:79) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:663) at org.apache.spark.storage.BlockManager.put(BlockManager.scala:574) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:108) at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)
maxRecordsForNewDataSet
runEnrichment
sparkExecutorMemory
OutOfMemoryErrorインスタンスが発生した場合、デプロイメントのプロビジョニングのニーズに応じて、DP CLIのデフォルト値を調整してsparkContext.properties
構成を指定できます。
たとえば、データ処理を使用すると、sparkExecutorMemory
設定を指定できます。この設定は、executorプロセスごとに使用するメモリー量を定義するために使用されます。 (この設定は、Spark構成のspark.executor.memory
パラメータに対応しています。) Spark spark.storage.memoryFraction
パラメータは、Spark Executorsにメモリーの問題がある場合に使用する別の重要なオプションです。
Sparkのチューニングのトピックも確認する必要があります。 : http://spark.apache.org/docs/latest/tuning.html
ベナンsparkDriverシャットダウン・エラー
... 11:11:42.828 Thread-2 INFO : Shutting down all executors 11:11:42.829 sparkDriver-akka.actor.default-dispatcher-19 INFO : Asking each executor to shut down 11:11:42.892 sparkDriver-akka.actor.default-dispatcher-17 ERROR: AssociationError [akka.tcp://sparkDriver@10.152.110.203:62743] <- [akka.tcp://sparkExecutor@atm.example.com:30203]: Error [Shut down address: akka.tcp://sparkExecutor@bus00atm.us.oracle.com:30203] [ akka.remote.ShutDownAssociation: Shut down address: akka.tcp://sparkExecutor@atm.example.com:30203 Caused by: akka.remote.transport.Transport$InvalidAssociationException: The remote system terminated the association because it is shutting down. ] akka.event.Logging$Error$NoCause$ 11:11:42.893 sparkDriver-akka.actor.default-dispatcher-19 INFO : Driver terminated or disconnected! Shutting down. atm.example.com:30203 11:11:42.897 sparkDriver-akka.actor.default-dispatcher-19 INFO : MapOutputTrackerMasterEndpoint stopped! ...
実際のSpark作業は正常に行われます。 ただし、sparkDriver停止ではエラー・メッセージが生成されます。 ログ・メッセージは、Sparkによって表示されます(データ処理コードではありません)。 メッセージは曲げられ、機能に実際の影響はありません。
ジョブ・キューイングとクラスタのロックを区別するための注意
[DataProcessing] [WARN] [] [org.apache.spark.Logging$class] [tid:Timer-0] [userID:yarn] Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
これは、クラスタ・ロックではなく通常のYARNジョブ・キューイングが原因の可能性があります。 (クラスタのロックは、多数のアプリケーションを一度に発行し、すべてのクラスタ・リソースをApplicationManagersで占有することにより、クラスタがデッドロックになったときです。) 通常のYARNジョブ・キューイングの外観は、クラスタ・ロックと非常によく似ています(特に、実行に時間がかかりすぎるYARNジョブがある場合)。 ジョブのステータスを確認するには、HadoopディストリビューションのHadoopクラスタ・マネージャを使用します。
次の情報は、ジョブ・キューイングと疑わしいクラスタ・ロックの区別に役立ちます: ジョブが正常なキュー状態にあるときに、RUNNING状態にあるジョブが複数ある場合を除き、「これらのすべてのジョブ」のログには、初期ジョブはリソースを受け入れていませんと表示されます。 通常はステージXでのタスクX.Xの開始を参照している1つのジョブが進行中であるかぎり、これらのジョブは実際に通常のキューイング状態になります。 また、Spark RUNNINGジョブをResourceManager UIでチェックするときは、実行中のアプリケーションが残っていないように、最初のページで表示するか、UIの検索ボックスを使用してください。
Hadoopクラスタに2.6.0.より前のバージョンのHadoopがある場合は、明示的な設定を使用してApplicationMaster共有を制限することをお薦めします:
<queueMaxAMShareDefault>0.5</queueMaxAMShareDefault>
このプロパティは、アプリケーション・マスターの実行に使用できるキュー・フェア・シェアの割合を制限します。