7 Sparkの操作
この章では、Sparkの操作に関連する様々な概念について説明します。
この章の内容は次のとおりです。
Sparkの使用
Sparkエンジンを使用するには、ステージング実行ユニットが物理マッピングで作成されていて、EU実行場所がSparkスキーマに設定されている必要があります。
SparkマッピングでのAvroファイルおよびデリミタ付きファイルの処理のための前提条件
Avroファイル
SparkマッピングでAvroファイルを使用するには、Avro .egg
ファイルをODIインストールに追加する必要があります。Avroの.egg
ファイルは、直接ダウンロードできません。Avroパッケージから生成する必要があります。
Avro .egg
ファイルをODIインストールに追加するには:
-
Avroパッケージから.eggファイルを生成する
-
avro 1.8.2 : Python Package IndexまたはApache Avro™ Releasesから
avro-1.8.0.tar.gz
をダウンロードします。 -
次に示すように、解凍してからAvro Pythonライブラリをインストールします。
$ tar xvf avro-1.8.1.tar.gz $ cd avro-1.8.1 $ sudo python setup.py install Installed /usr/lib/python2.6/site-packages/avro-_AVRO_VERSION_-py2.6.egg Processing dependencies for avro===-AVRO-VERSION- Finished processing dependencies for avro===-AVRO-VERSION-
avro-_AVRO_VERSION_-py2.6.egg
ファイルは、Pythonのインストール・ディレクトリに配置されます。詳細は、「Apache Avro™ 1.8.0 Getting Started (Python)」を参照してください。
-
-
.egg
ファイルをODIの特定の場所にコピーするODIエージェントの場合は、
.egg
ファイルを$DOMAIN_HOME_PROPERTY/lib/spark
にコピーします。ODI Studioの場合は、
.egg
ファイルを$HOME/.odi/oracledi/userlib/spark
にコピーします。
デリミタ付きファイル
Sparkマッピングでデリミタ付きファイルを使用するには、ODIインストールにAvro .eggファイルを追加する必要があります。
CSV jarファイルをODIインストールに追加するには:
-
CSV jarファイルをダウンロードする
それぞれのリンクから次のjarファイルをダウンロードします。
-
spark-csvの
spark-csv_2.10-1.5.0.jar
-
Commons CSV – Download Apache Commons CSVの
commons-csv-1.2.jar
-
-
jarファイルをODIの特定の場所にコピーする
ODIエージェントの場合は、jarファイルを
$DOMAIN_HOME_PROPERTY/lib/spark
にコピーします。ODI Studioの場合は、jarファイルを
$HOME/.odi/oracledi/userlib/spark
にコピーします。
Sparkの設計上の考慮事項
変換エンジンとしてSparkを選択した場合は、マッピングを作成する前に、次に示す設計上の決定を下す必要があります。
バッチまたはストリーミング
Sparkは、バッチとストリーミングの2つの操作モードをサポートしています。ストリーミング・モードでは、Kafkaトピックや特定の場所に追加されたファイル/HDFSファイルからのデータを取込むことができます。最大限にストリーミングを活用するために、「Sparkチェックポイント」と「Sparkのウィンドウおよびステートフル集計」を参照してください。
ストリーミング・フラグを設定するには、物理設計を選択し、キャンバスの空白部分をクリックして、プロパティ・パネルで「ストリーミング」チェック・ボックスを選択します。ストリーミング・フラグが設定されていない場合、マッピングはバッチ・モード(デフォルト)で実行するようになります。
RDD (Resilient Distributed Datasets)またはDataFrames
Sparkには、データの変換に使用できる複数のAPIのセットがあります。RDD (Resilient Distributed Datasets)とDataFramesは、ODIがコードを生成できるAPIです。
RDD (Resilient Distributed Datasets)
RDDは、Apache Sparkにおける主要なデータ抽象化です。これは、フォルト・トレラントな(Resilient; 回復性のある)パーティション化されたデータのコレクション(Dataset; データセット)であり、データはクラスタ内の複数のノードに存在(分散型; Distributed)しています。データはメモリー内に存在し、必要な場合はキャッシュされます。RDDは読取り専用ですが、別のRDDを作成する変換の適用が可能です。トリガーされたときにのみデータの使用が可能になる(または変換される)、遅延評価が使用されます。RDDパーティションは、並列度の単位になります。
DataFrames
DataFrameは、読取り専用の分散されたデータのコレクションです。RDDとは異なり名前付き列に編成されます。抽象化レベルが高いため、SparkSQL問合せの使用を可能にしている場合など、巨大なデータセットの処理も簡単になります。DataFramesはSpark SQLエンジンの上部に構築されているため、パフォーマンスと領域の大幅な最適化が可能です。
注意:
ストリーミングを使用している場合、選択可能なオプションはRDDのみになります。スキーマ・ナレッジ・モジュール・オプションの推測
Sparkは、データを調査することでデータセットのスキーマを推測または推定できます。これは便利なものですが、データ型が期待どおりにマッピングされないこともあります。その場合、該当するSpark KMのinferSchemaオプションをFalseに設定して、この機能をオフにすることができます。データ型の不一致に関連する実行時エラーが発生した場合は、スキーマの推測オプションの値を調整する必要があります。このオプションは、読取りまたは書込みのLKMで設定できます。
注意:
Sparkは、このオプションをDataFramesの作成時にのみ使用します。inferSchemaをFalseに設定すると、ODIはマッピング・データ・ストアのメタデータに基づいてスキーマ定義を生成し、この構造がDataFrame APIの作成に使用されるようになります。次の図に、スキーマの推測オプションを示します。
式構文
結合条件やフィルタ条件などに含まれる式または属性式を記述する必要があるときに、式構文に使用できるオプションがあります。ODIでRDDコードが生成されるようにしている場合は、式の記述にPythonを使用する必要があります。ただし、DataFramesが生成されるようにしている場合は、式の記述にSQLまたはPythonのどちらかを選択できます。選択した構文は、SQL_EXPRESSIONS
をTrueまたはFalseに設定することで指定できます。
有効なコード生成スタイルは、次のとおりです。
-
RDDとPython式
-
DataFramesとPython式
-
DataFramesとSQL式
Python式はRDDとDataFramesでは定義が異なるため、これら2つのコード生成スタイルに応じてPython構文が異なることがあります。そのため、一部のPython式はRDDとDataFrameのコード生成スタイルの両方には通用しないことがあります。
RDDとPython式
Python式で使用可能な構文と関数の詳細は、「The Python Standard Library」を参照してください。
DataFramesとPython式
列オブジェクトに使用可能なPython関数の一覧は、Pyspark.sql.moduleを参照してください。
DataFramesとSQL式
汎用のSQL関数と演算子は、汎用SQL言語を選択しているときに式エディタで確認できます。
マッピングの説明
マッピングで使用されている複数の式を示す例について説明します。
この例では、不動産取引(Real Estate Transactions)を格納しているソース(REA
)が、都市(City)と人口(Population)のデータを格納している2番目のソース(REA2
)と組み合されています。その後で、大口取引のみを選択するフィルタが適用されています。これにより、次の図に示すように結合後にフィルタ処理された情報を格納するターゲット・ファイル(REA1
)を作成します。
マッピング定義
このマッピングは、次のように定義されています。
-
JOIN (結合): 都市(City)をキーとして使用して、不動産取引表(
REA
)と都市人口表(REA2
)を結合します。REA1
の都市名は大文字になっていますが、REA2
の都市名は小文字になっています。 -
FILTER (フィルタ): 価格が$500,000以上の行を選択して、タイプがMulti-Familyの取引を無視します。
-
City (都市): 都市名は、各単語の最初の文字以外は小文字にする必要があります。
-
GeoLocation (地理位置情報): "<経度>, <緯度>"の形式にする必要があります。この文字列には、引用符も含める必要があります。
-
Population (人口): 1,000の位に最近接丸めします。
コード生成スタイルごとのマッピング式
次の表に、コード生成スタイルごとのマッピング式の説明を示します。
図7-1 コード生成スタイルごとのマッピング式
コード生成スタイルのマッピング式 | RDDとPython式 | DataFramesとPython式 | DataFramesとSQL式 |
---|---|---|---|
結合条件 |
REA.City == (REA2.City).upper() |
REA.City == upper(REA2.City) |
REA.City = UPPER(REA2.City) |
フィルタ構文 |
REA.Type<>'Multi-Family' and REA.Price >=500000 |
REA.Type<>'Multi-Family' and REA.Price >=500000 |
REA.Type<>'Multi-Family' and REA.Price >=500000 |
ターゲット列構文 |
# City - note: this only capitalizes the first word! (REA.City).capitalize() # GeoLocation REA.geolat + ", " + REA.geolong # Population int(round(REA2.Population,-3)) |
# City initcap(lower(REA.City)) # GeoLocation concat(REA.geolat ,lit(", "),REA.geolong) # Population round(REA2.Population,-3) |
-- City INITCAP(LOWER(REA.City)) -- GeoLocation CONCAT(REA.geolat,', ', REA.geolong) # Population ROUND(REA2.Population,-3) |
ライブラリのインポート
この例からわかるように、それぞれのスタイルのすべてに同じ組込み関数が通用しないことがあります。ここでは、initcap
組込み関数がRDDでは使用できません。capwords()
関数は必要な処理を実行しますが、スクリプトにimport文を追加する必要があります。Spark EKMには、customPythonImportsという複数行オプションがあります。このオプションによってスクリプトのImport文を指定できるため、式で別の関数を使用できるようになります。
importのリストを含めるには、customPythonImports EKMオプションを次のように記述します。
from string import * from time import localtime
ターゲット列式は、次のように記述します。
#City capwords(REA.City)
Sparkストリーミングのサポート
この項では、データ・セットに対するストリーミング・モードの操作について説明します。また、チェックポイントに関する情報も示します。
注意:
Cloudera CDH 6.0、Hortonworks 3.1以降などのデータ・プラットフォームでは、Kafkaを使用したSpark 2.0ストリーミングはサポートされなくなりました。この項には次のサブセクションが含まれます:
Sparkチェックポイント
ストリーミング・アプリケーションは、24時間365日動作する必要があるため、障害からの回復が早い必要があります。Sparkストリーミングでは、障害からリカバリできるように、フォルト・トレラント・ストレージ・システムに情報をチェックポイントする必要があります。
チェックポイントは、アプリケーションを実行しているドライバの障害からリカバリするアプリケーションに対して有効です。チェックポイントで保証されるのは、チェックポイントが見つかった場合、Sparkアプリケーションが残っているところから再開することのみです。
チェックポイントの詳細は、『Spark Streaming Programming Guide』を参照してください。
Sparkのウィンドウおよびステートフル集計
Sparkのウィンドウ機能を使用すると、集計(およびその他の変換)を現在のRDDに適用できるだけでなく、複数の以前のRDD (ウィンドウ期間)からのデータも含めることができます。
Spark KMは、バッチに加えてストリーミング変換もサポートしています。ストリーミング以外のPythonコードはRDDまたはDataFrameオブジェクトに対して機能し、ストリーミングのコードはDStreamオブジェクトに対して機能します。バッチ・モードでの集計は単純で、単一セットの入力レコード(RDD)があり、これが集計されて出力データを形成し、ターゲットに書き込まれます。ストリーミング・モードでは、連続して着信するデータがRDDのフローに離散化されます。デフォルトでは、各RDDは別々に集計されます。
Sparkウィンドウは、累計や移動平均などを計算するのに適しています。しかし、2つの制約があります。
-
古いRDDを保持する必要があります。
-
ウィンドウに入るデータは、新しいRDDのたびに再計算されます。
このことから、ウィンドウはデータ・ストリーム全体での集計に適しません。これは、ステートフル集計によってのみ実現できます。
ウィンドウ対応のKMには次の省略可能なKMのオプションがあります。
-
ウィンドウ期間: ウィンドウの期間。バッチ間隔の数で定義されます。
-
スライディング間隔: ウィンドウ操作が実行される間隔。バッチ間隔の数で定義されます。
ウィンドウがサポートされているのは、次のとおりです。
-
XKM Spark Aggregation
-
XKM Spark Join
-
XKM Spark Set
-
XKM Spark Distinct
詳細は、『Spark Streaming Programming Guide』を参照してください。
ステートフル集計
ストリームの全データにわたってデータを集計する必要がある場合、ステートフル集計が必要です。ステートフル集計では、Sparkによってすべてのキーの集計値が含まれる状態ストリームが構築されます。着信RDDごとに、この状態が更新されます。たとえば、新しい着信データに基づいて集計された合計が更新されます。
デフォルトでは、ステート・ストリームは、着信RDDごとに格納された値をすべて出力します。これは、ストリーム出力がファイルであり、そのファイルが常に集計値を全部保持していると想定される場合に便利です。
ステートフル処理がサポートされているのは、次のとおりです。
-
XKM Spark Aggregate
-
XKM Spark Lookup
Sparkの再パーティション化およびキャッシング
キャッシング
ODIでは、SparkベースのKMのオプションを2つ追加することで、Sparkのキャッシング・メカニズムを活用します。
-
データのキャッシュ: このオプションをtrueに設定すると、ストレージの起動がコンポーネントの生成済pysparkコードに追加されます。
-
キャッシュ・ストレージ・レベル: データのキャッシュがfalseに設定されている場合、このオプションは表示されません。
再パーティション化
ソースがHDFSファイルの場合、最初のパーティション数はソースHDFSシステムのデータ・ブロックによって決定されます。Sparkアプリケーションを実行するプラットフォームのタスクに使用できるスロットがロードされるパーティションの数よりも多くある場合、プラットフォームのリソースは完全には活用されていません。その場合は、RDD.repartition() APIを使用してパーティションの数を変更できます。
再パーティション化は、プロセス全体の任意の段階で実行できます。データがソースからロードされた直後やフィルタ・コンポーネントの処理後にも実行できます。ODIには、再パーティション化を実行するかどうかや、どこで実行するかを決定するSparkベースのKMオプションがあります。
-
再パーティション: このオプションをtrueに設定すると、コンポーネントの変換後に再パーティションが適用されます。
-
並列度のレベル: パーディションの数。デフォルトは0です。デフォルト値が設定されていると、repartition()関数の呼出しのためにspark.default.parallelismが使用されるようになります。
-
パーティションのソート: このオプションをtrueに設定すると、パーティションがキーでソートされます。キーはLambda関数で定義されます。
-
パーティションのソート順: 昇順または降順。デフォルトは昇順です。
-
パーティション・キー: カンマ区切りの列リストとして表されるユーザー定義のパーティション・キー。
-
パーティション関数: ユーザー定義のパーティションLambda関数。デフォルト値は、pyspark定義のハッシュ関数
portable_hash
です。この関数は、単にRDD行全体のハッシュ・ベースを計算します。
ストリーミングのサポートの構成
ストリーミングのサポートの構成は、次に示す2つの部分で実行します。
- トポロジ
- マッピング設計
Sparkストリーミング・データサーバー・プロパティ
Sparkテクノロジ固有のストリーミング・プロパティが提供されています。こうしたプロパティは、Spark実行ユニット・プロパティのデフォルト値です。
表7-2 Sparkストリーミング・データサーバー・プロパティ
キー | 値 |
---|---|
spark.checkpointingBaseDir | このプロパティでは、チェックポイントのベース・ディレクトリを定義します。このベース・ディレクトリ下のマッピングごとに、サブディレクトリが作成されます。
例: hdfs://cluster-ns1/user/oracle/spark/checkpoints |
spark.checkpointingInterval | 時間(秒)を表します。 |
spark.restartFromCheckpoint |
|
spark.batchDuration | ストリーミング間隔の継続時間を秒単位で表示します。 |
spark.rememberDuration | 秒単位の時間を表示して、この期間のRDDを記憶するようにSparkストリーミング・コンテキストを設定します。 |
spark.checkpointing | Sparkチェックポイントを有効にします。 |
spark.streaming.timeout | ストリーミング・アプリケーションを停止するまでのタイムアウトを秒単位で表示します。
デフォルトは60です。 |
odi-execution-mode |
|
spark.ui.enabled | Spark Live REST APIを有効にします。
注意: 非同期実行の場合はtrueに設定します。 |
spark.eventLog.enabled | Sparkイベント・ログを有効にします。これにより、Spark履歴サーバーでログにアクセスできるようになります。
注意: 非同期実行の場合はtrueに設定します。 |
principal | Kerberos化されたユーザー名。 |
keytab | kerberosプリンシパル・キーと暗号化キーのペアを格納するkeytabファイルの場所。
例: /tmp/oracle.keytab |
odi.spark.enableUnsupportedSparkModes | このチェックは、yarn-clientとyarn-clusterのみがサポートされると、採用されます。 |
その他のSparkストリーミング・データ・プロパティ
非同期Spark実行ユニットに追加されたSparkテクノロジに固有のその他のSparkストリーミング・プロパティを示します。
表7-3 その他のSparkストリーミング・プロパティ
キー | 値 |
---|---|
spark-webui-startup-polling-retries | Spark WebUIの起動を待機する間の最大再試行回数。 |
spark-webui-startup-polling-interval | 再試行の間隔(秒)を表します。 |
spark-webui-startup-polling-persist-after-retries | |
spark-webui-rest-timeout | Spark WebUIでのRESTコールに使用されるタイムアウト(秒)。 |
spark-webui-polling-interval | Spark WebUIでのポーリング間隔(秒)。 |
spark-webui-polling-persist-after-retries | |
spark-history-server-rest-timeout | Spark履歴サーバーでのRESTコールに使用されるタイムアウト(秒)。 |
spark-history-server-polling-retries | Spark履歴サーバーによってSparkイベント・ログが使用可能になるのを待機する間の最大再試行回数。 |
spark-history-server-polling-interval | 再試行の間隔(秒)。 |
spark-history-server-polling-persist-after-retries | |
spark-submit-shutdown-polling-retries | Sparkによって発行されたOSプロセスの完了を待機する間の最大再試行回数。 |
spark-submit-shutdown-polling-interval | 再試行の間隔(秒)。 |
spark-submit-shutdown-polling-persist-after-retries |
ODIでのRDDとDataFramesの切替え
DataFrameコードとRDDコードの生成を切り替えるには、Spark実行ユニットでEKMオプションspark.useDataFrames
をTrueまたはFalseに設定します。
DataFrameコードの生成をサポートしていないコンポーネント
一部のコンポーネントは、DataFrameコードの生成をサポートしていません。DataFramesをサポートしていないマッピング・コンポーネントが1つでもあると、検証エラーが表示され(Spark実行ユニットのプロパティspark.useDataFramesをfalseに設定するように求められ)、RDDに戻す必要があります。
次のコンポーネントは、DataFrameコードの生成をサポートしていません。
-
ピボット
-
アンピボット
-
入力シグネチャ
-
出力シグネチャ
表関数の形式でのカスタム・コードの追加
表関数コンポーネントを使用すると、外部スクリプトまたはインライン・コードへの参照の形式で独自のコード・セグメントをマッピングに追加できます。
TABLEFUNCTIONコンポーネントがソース・ログ・ファイルの解析と変換に使用されている例について説明します。このマッピングによって、ソース・ファイルからの未加工のデータ、変更されたデータおよびタイムスタンプなどの新しいデータを含むターゲット・ファイルが生成されます。
表関数を含んでいるマッピングを作成して、そのマッピングに入力属性と出力属性を追加するには、次の手順を実行します。
-
ソース・データ・ストアとターゲット・データ・ストアを'TABLEFUNCTION'という名前の表関数コンポーネントとともに追加してマッピングを作成します。
-
ソース・データ・ストアの出力コネクタをTABLEFUNCTIONコンポーネントの入力コネクタに接続します。
この時点で、入力属性はTABLEFUNCTIONに直接追加されるようになります。
注意:
-
次の図に示すように、ソース・データ・ストアからのすべての属性を含んでいる入力グループ'INPUT1'が自動的に作成されます。
-
追加のソース・データ・ストアごとに、新しい入力グループが追加されます。
-
-
ターゲット・データ・ストアの入力コネクタをTABLEFUNCTIONコンポーネントの出力コネクタに接続します。
この時点で、出力属性はTABLEFUNCTIONに直接追加されるようになります。
注意:
-
次の図に示すように、ターゲット・データ・ストアからのすべての属性を含んでいる出力グループ'OUTPUT1'が自動的に作成されます。
-
'OUTPUT1'の出力属性は、名前変更または削除が可能です。
-
各出力属性の式は、TABLEFUNCTIONコンポーネントに埋め込まれたスクリプトによるプログラムで設定されるため個別に設定する必要はありません。
-
次の手順を実行して、マッピングを構成します。
-
「論理」タブに移動して、「ステージングの場所のヒント」に従ってSpark-Local_Defaultを選択します。
-
「物理」タブに移動します。「抽出オプション」で、SPARK_SCRIPT_FILE KMオプションの値として
/tmp/xkmtf.py
と入力することで、TABLEFUNCTIONコンポーネントに使用するスクリプトを指定します。xmktf.py
スクリプトには、次の内容が含まれています。import sys import datetime #get the upstream object using the input connector point name upstream=sys.argv[0]['INPUT1'] #A value must be calculated for every TF output attribute TABLEFUNCTION = upstream.map(lambda input:Row(**{"visitorId":input.visitorId, "channel":input.channel, "clientCountry":input.clientCountry, "newSessionId":'Prefix'+input.sessionId, "timeStamp":now.strftime("%Y-%m-%d %H:%M")}))
ここでは、TABLEFUNCTIONコンポーネントの入力グループ'INPUT1'が、sys.argv経由でSpark-Pythonスクリプトの
xkmtf.py
に渡されます。また、SPARK_SCRIPT KMオプションの値として次の内容を入力すると、TABLEFUNCTIONコンポーネントに使用するスクリプトを直接指定することもできます。import datetime now = datetime.datetime.now() #A value must be calculated for every TF output attribute TABLEFUNCTION = ACT.map(lambda input:Row(**{"visitorId":input.visitorId, "channel":input.channel, "clientCountry":input.clientCountry, "newSessionId":'Prefix'+input.sessionId, "timeStamp":now.strftime("%Y-%m-%d %H:%M")}))
次に示す2つのタイプの表関数用Sparkスクリプトがあります。
-
外部表関数スクリプト
-
インライン表関数スクリプト
外部表関数スクリプト
これは、ODIマッピング・コード内から動的に実行できます。必要に応じて、sys.argvを使用して外部スクリプトで処理するためのRDD/DataFrameを送信します。
たとえば、次のプロパティによって挿入された表関数コンポーネントについて考えてみます。
-
名前 – TABLEFUNCTION
-
入力コネクタ - INPUT1
-
入力フィールド - IN_ATTR_1およびIN_ATTR_2
-
出力属性 - OUT_ATTR_1、OUT_ATTR_2およびOUT_ATTR_3
次の外部スクリプトに示すように、アップストリームのRDD/DataStreamオブジェクトは入力コネクタ・ポイント名を使用して取得されます。その後で結果のRDD/DStreamが計算されます。すべての表関数出力属性名に対して値が計算されます。
import sys import datetime upstream=sys.argv[0]['INPUT1'] now = datetime.datetime.now() TABLEFUNCTION = upstream.map(lambda input:Row(**{"OUT_ATTR_1":input.sessionId, "OUT_ATTR_2":input.customerId, "OUT_ATTR_3":now.strftime("%Y-%m-%d %H:%M")}))
この外部スクリプトを動的に実行するために、ODIは次のマッピング・コードを生成します。外部スクリプトの実行結果は、TABLEFUNCTIONとして保存されます。
sys.argv=[dict(INPUT1=ACT)] execfile('/tmp/xkmtf_300.py') TABLEFUNCTION = TABLEFUNCTION.toDF(...)
インライン表関数スクリプト
インライン・モードでは、実際の表関数スクリプトがXKMオプションとして保存されます。sys.argvを使用して、スクリプトを処理するためのソース・オブジェクトを送信する必要はありません。
次のスクリプトに示すように、外部スクリプトの実行結果が直接参照されます。
ACT=ACT.filter("ACT_customerId = '5001'") TABLEFUNCTION = ACT.toDF(...)