プライマリ・コンテンツに移動
Oracle® Big Data Discovery Cloud Serviceデータ処理ガイド

E65369-05
目次へ
目次
索引へ移動
索引

前
次
機械翻訳について

Spark構成

データ処理は、Spark構成ファイルsparkContext.propertiesを使用します。 このトピックでは、データ処理がこのファイルの設定を取得する方法とファイルのサンプルを示します。 また、このファイルで調整し、データ処理ワークフローを正常に完了するために必要なメモリー量を調整するためのオプションについても説明します。

データ処理ワークフローは、Sparkのワーカーによって実行されます。 Sparkワーカーのデータ処理ジョブが開始されると、sparkContext.propertiesファイルによって上書きまたは追加できる一連のデフォルトの構成設定があります。

Spark構成は細かなもので、クラスタのサイズやデータに適応させる必要があります。 また、タイムアウトおよび障害時の動作を変更する必要がある場合もあります。 Sparkでは、これらの目的のために適切な構成可能なオプション・セットが用意されており、これらのオプションを使用して、インストールのニーズに応じてSparkを構成できます。 そのため、Sparkのワーカーのパフォーマンスを微調整できるようにsparkContext.propertiesが用意されています。

sparkContext.propertiesファイルは、$CLI_HOME/edp_cli/configディレクトリにあります。 出荷時のファイルは空です。 ただし、任意のSpark構成プロパティをファイルに追加できます。 指定したプロパティによって、以前に設定されたすべてのSpark設定がオーバーライドされます。 Sparkプロパティのドキュメントは次の場所にあります。 : https://spark.apache.org/docs/latest/configuration.html

sparkContext.propertiesファイルは空にできることに留意してください。 ファイルが空の場合、Sparkのワーカーがジョブを実行するための十分な構成プロパティのセットを持つため、データ処理ワークフローは引き続き正しく実行されます。

注意:

sparkContext.propertiesファイルは削除しないでください。 空にすることもできますが、その存在のチェックが実行され、ファイルがない場合はデータ処理ワークフローは実行されません。

Sparkのデフォルト構成

開始すると、Sparkワーカーは次の順序で構成設定を3層形式で取得します:
  1. Hadoopのデフォルト設定から。
  2. データ処理構成設定から。Hadoop設定をオーバーライドするか、追加設定を指定できます。 たとえば、sparkExecutorMemoryプロパティ(DP CLI構成)は、Hadoop spark.executor.memoryプロパティをオーバーライドできます。
  3. sparkContext.propertiesファイルのプロパティ設定から、以前の設定をオーバーライドしたり、追加設定を指定できます。

sparkContext.propertiesファイルが空の場合、Sparkワーカーの最終構成は、ステップ1および2から取得されます。

サンプルSpark構成

次に、sparkContext.properties構成ファイルのサンプルを示します:
#########################################################
# Spark additional runtime properties
#########################################################
spark.broadcast.compress=true
spark.rdd.compress=false
spark.io.compression.codec=org.apache.spark.io.LZFCompressionCodec
spark.io.compression.snappy.block.size=32768
spark.closure.serializer=org.apache.spark.serializer.JavaSerializer
spark.serializer.objectStreamReset=10000
spark.kryo.referenceTracking=true
spark.kryoserializer.buffer.mb=2
spark.broadcast.factory=org.apache.spark.broadcast.HttpBroadcastFactory
spark.broadcast.blockSize=4096
spark.files.overwrite=false
spark.files.fetchTimeout=false
spark.storage.memoryFraction=0.6
spark.tachyonStore.baseDir=System.getProperty("java.io.tmpdir")
spark.storage.memoryMapThreshold=8192
spark.cleaner.ttl=(infinite)

変換の異常な高速動作の構成

変換がコミットされると、ApplyTransformToDataSetWorkflowは失敗時に再試行しません。 データ・セットの状態がHDFSサンプル・ファイルの状態と同期されていない可能性があるため、障害の後にこのワークフローを安全に再実行することはできません。 この非試行動作は、すべてのHadoop環境に適用されます。

ユーザーはクラスタ上のyarn.resourcemanager.am.max-attempts設定を変更して、YARNジョブが再試行されないようにできます。 この操作を行わないと、ワークフローは成功したように見えますが、サンプル・データ・ファイルが矛盾しているため、今後の変換では失敗します。 高速動作の失敗が必要でないかぎり、ユーザーがこのプロパティを設定する必要はありません。

Sparkのイベント・ロギングの有効化

このファイルを使用すると、Sparkイベント・ロギングを有効にできます。 実行時に、SparkはDPワークフローを複数のステージに内部的にコンパイルします(通常、ステージはSpark変換のセットによって定義され、Sparkアクションによってバインドされます)。 ステージはDP操作に一致させることができます。 Sparkイベント・ログには、ステージとステージ内のすべてのタスクに関する詳細なタイミング情報が含まれます。

Sparkのイベント・ロギングには、次のSparkプロパティが使用されます:
  • spark.eventLog.enabled (trueに設定)では、Sparkイベントのロギングを有効にします。
  • spark.eventLog.dirでは、Sparkイベントが記録されるベース・ディレクトリを指定します。
  • spark.yarn.historyServer.addressでは、Spark履歴サーバー(つまり、host.com:18080)のアドレスを指定します。 アドレスにはスキーム(http://)を含めないでください。
次に例を示します。
spark.eventLog.enabled=true
spark.eventLog.dir=hdfs://busj40CDH3-ns/user/spark/applicationHistory
spark.yarn.historyServer.address=busj40bda13.example.com:18088

Sparkのイベント・ロギングを有効にするには、問題のトラブルシューティングを行う際にOracle Support担当者が行う必要があります。 ワークフローのパフォーマンスが低下する可能性があるため、通常の状況でSparkイベント・ロギングを有効にすることはお薦めしません。

SparkワーカーOutOfMemoryError

Sparkのワーカーに十分なメモリーが割り当てられていない場合は、OutOfMemoryErrorが発生し、データ処理ワークフローが次の例のようなエラー・メッセージで終了する可能性があります:
java.lang.OutOfMemoryError: Java heap space
   at java.util.Arrays.copyOf(Arrays.java:2271)
   at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
   at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
   at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
   at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
   at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
   at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
   at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
   at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
   at org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:102)
   at org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:30)
   at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:996)
   at org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1005)
   at org.apache.spark.storage.MemoryStore.putValues(MemoryStore.scala:79)
   at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:663)
   at org.apache.spark.storage.BlockManager.put(BlockManager.scala:574)
   at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:108)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
   at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
   at org.apache.spark.scheduler.Task.run(Task.scala:51)
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
   at java.lang.Thread.run(Thread.java:745)
データ処理ワークフローを正常に完了するために必要なメモリーの量は、次のようなデータベースの考慮事項によって異なります:
  • 各Hive表のレコード合計数。
  • 各Hive表レコードの平均サイズ。
また、次のようなDP CLI構成設定にも依存します:
  • maxRecordsForNewDataSet
  • runEnrichment
  • sparkExecutorMemory

OutOfMemoryErrorインスタンスが発生した場合、デプロイメントのプロビジョニングのニーズに応じて、DP CLIのデフォルト値を調整してsparkContext.properties構成を指定できます。

たとえば、データ処理を使用すると、sparkExecutorMemory設定を指定できます。この設定は、executorプロセスごとに使用するメモリー量を定義するために使用されます。 (この設定は、Spark構成のspark.executor.memoryパラメータに対応しています。) Spark spark.storage.memoryFractionパラメータは、Spark Executorsにメモリーの問題がある場合に使用する別の重要なオプションです。

Sparkのチューニングのトピックも確認する必要があります。 : http://spark.apache.org/docs/latest/tuning.html

ベナンsparkDriverシャットダウン・エラー

Sparkジョブが正常に終了すると、次のようなsparkDriverの停止ERRORメッセージがログに表示されます:
...
11:11:42.828 Thread-2 INFO : Shutting down all executors
11:11:42.829 sparkDriver-akka.actor.default-dispatcher-19 INFO : Asking each executor to shut down
11:11:42.892 sparkDriver-akka.actor.default-dispatcher-17 ERROR: AssociationError [akka.tcp://sparkDriver@10.152.110.203:62743] <- [akka.tcp://sparkExecutor@atm.example.com:30203]: Error [Shut down address: akka.tcp://sparkExecutor@bus00atm.us.oracle.com:30203] [
akka.remote.ShutDownAssociation: Shut down address: akka.tcp://sparkExecutor@atm.example.com:30203
Caused by: akka.remote.transport.Transport$InvalidAssociationException: The remote system terminated the association because it is shutting down.
]
akka.event.Logging$Error$NoCause$
11:11:42.893 sparkDriver-akka.actor.default-dispatcher-19 INFO : Driver terminated or disconnected! Shutting down. atm.example.com:30203
11:11:42.897 sparkDriver-akka.actor.default-dispatcher-19 INFO : MapOutputTrackerMasterEndpoint stopped!
...

実際のSpark作業は正常に行われます。 ただし、sparkDriver停止ではエラー・メッセージが生成されます。 ログ・メッセージは、Sparkによって表示されます(データ処理コードではありません)。 メッセージは曲げられ、機能に実際の影響はありません。

ジョブ・キューイングとクラスタのロックを区別するための注意

小規模でビジーなクラスタを持つサイトでは、次の例のようなメッセージを表示するSparkジョブが実行されていない場合に問題が発生することがあります:
[DataProcessing] [WARN] [] [org.apache.spark.Logging$class] [tid:Timer-0] [userID:yarn]
Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered 
and have sufficient memory

これは、クラスタ・ロックではなく通常のYARNジョブ・キューイングが原因の可能性があります。 (クラスタのロックは、多数のアプリケーションを一度に発行し、すべてのクラスタ・リソースをApplicationManagersで占有することにより、クラスタがデッドロックになったときです。) 通常のYARNジョブ・キューイングの外観は、クラスタ・ロックと非常によく似ています(特に、実行に時間がかかりすぎるYARNジョブがある場合)。 ジョブのステータスを確認するには、HadoopディストリビューションのHadoopクラスタ・マネージャを使用します。

次の情報は、ジョブ・キューイングと疑わしいクラスタ・ロックの区別に役立ちます: ジョブが正常なキュー状態にあるときに、RUNNING状態にあるジョブが複数ある場合を除き、「これらのすべてのジョブ」のログには、初期ジョブはリソースを受け入れていませんと表示されます。 通常はステージXでのタスクX.Xの開始を参照している1つのジョブが進行中であるかぎり、これらのジョブは実際に通常のキューイング状態になります。 また、Spark RUNNINGジョブをResourceManager UIでチェックするときは、実行中のアプリケーションが残っていないように、最初のページで表示するか、UIの検索ボックスを使用してください。

Hadoopクラスタに2.6.0.より前のバージョンのHadoopがある場合は、明示的な設定を使用してApplicationMaster共有を制限することをお薦めします:

<queueMaxAMShareDefault>0.5</queueMaxAMShareDefault>

このプロパティは、アプリケーション・マスターの実行に使用できるキュー・フェア・シェアの割合を制限します。