7 Sparkの操作

この章では、Sparkの操作に関連する様々な概念について説明します。

この章の内容は次のとおりです。

Sparkの使用

Sparkエンジンを使用するには、ステージング実行ユニットが物理マッピングで作成されていて、EU実行場所がSparkスキーマに設定されている必要があります。

Sparkマッピングの作成

Sparkマッピングを作成するには、Spark論理スキーマと物理スキーマが作成されていることを確認してから、次の手順を実行します。
  1. 「マッピング」「新規マッピング」を選択します。
  2. 「モデル」ツリーからfile_srcデータ・ストアとhdfs_tgtデータ・ストアを論理ダイアグラムにドラッグします。
  3. マッピング・コネクタをまとめてリンクして、位置によってマップの列を選択します。
    これにより、列をマップします。
  4. 「ステージングの場所のヒント」をSpark論理スキーマに設定します。
  5. 物理ダイアグラムに移動して、キャンバスの空白部分を選択します。「最適化コンテキスト」がクラスタに対して実行する場合の適切なコンテキストに設定されていることと、「ステージング場所のプリセット」がSparkに設定されていることを確認します。
  6. SPARKLS_STAGING_NODEノードをクリックして、「ロード・ナレッジ・モジュール」をLKM File to Sparkに設定します。
  7. ターゲット・グループのFIL_APノードをクリックして、「ロード・ナレッジ・モジュール」をLKM Spark to Fileに設定します。
  8. HDFノードをクリックして、「統合ナレッジ・モジュール」が「<デフォルト>」に設定されていることを確認します。

SparkマッピングでのAvroファイルおよびデリミタ付きファイルの処理のための前提条件

外部ライブラリは、Avroファイルまたはデリミタ付きファイルでSparkマッピングを使用する前にインストールしておく必要があります。

Avroファイル

SparkマッピングでAvroファイルを使用するには、Avro .eggファイルをODIインストールに追加する必要があります。Avroの.eggファイルは、直接ダウンロードできません。Avroパッケージから生成する必要があります。

Avro .eggファイルをODIインストールに追加するには:

  1. Avroパッケージから.eggファイルを生成する

    1. avro 1.8.2 : Python Package IndexまたはApache Avro™ Releasesからavro-1.8.0.tar.gzをダウンロードします。

    2. 次に示すように、解凍してからAvro Pythonライブラリをインストールします。

      $ tar xvf avro-1.8.1.tar.gz
      $ cd avro-1.8.1
      $ sudo python setup.py install
      Installed /usr/lib/python2.6/site-packages/avro-_AVRO_VERSION_-py2.6.egg
      Processing dependencies for avro===-AVRO-VERSION-
      Finished processing dependencies for avro===-AVRO-VERSION-

    avro-_AVRO_VERSION_-py2.6.eggファイルは、Pythonのインストール・ディレクトリに配置されます。

    詳細は、「Apache Avro™ 1.8.0 Getting Started (Python)」を参照してください。

  2. .eggファイルをODIの特定の場所にコピーする

    ODIエージェントの場合は、.eggファイルを$DOMAIN_HOME_PROPERTY/lib/sparkにコピーします。

    ODI Studioの場合は、.eggファイルを$HOME/.odi/oracledi/userlib/sparkにコピーします。

デリミタ付きファイル

Sparkマッピングでデリミタ付きファイルを使用するには、ODIインストールにAvro .eggファイルを追加する必要があります。

CSV jarファイルをODIインストールに追加するには:

  1. CSV jarファイルをダウンロードする

    それぞれのリンクから次のjarファイルをダウンロードします。

  2. jarファイルをODIの特定の場所にコピーする

    ODIエージェントの場合は、jarファイルを$DOMAIN_HOME_PROPERTY/lib/sparkにコピーします。

    ODI Studioの場合は、jarファイルを$HOME/.odi/oracledi/userlib/sparkにコピーします。

Sparkの設計上の考慮事項

変換エンジンとしてSparkを選択した場合は、マッピングを作成する前に、次に示す設計上の決定を下す必要があります。

バッチまたはストリーミング

Sparkは、バッチとストリーミングの2つの操作モードをサポートしています。ストリーミング・モードでは、Kafkaトピックや特定の場所に追加されたファイル/HDFSファイルからのデータを取込むことができます。最大限にストリーミングを活用するために、「Sparkチェックポイント」「Sparkのウィンドウおよびステートフル集計」を参照してください。

ストリーミング・フラグを設定するには、物理設計を選択し、キャンバスの空白部分をクリックして、プロパティ・パネルで「ストリーミング」チェック・ボックスを選択します。ストリーミング・フラグが設定されていない場合、マッピングはバッチ・モード(デフォルト)で実行するようになります。

RDD (Resilient Distributed Datasets)またはDataFrames

Sparkには、データの変換に使用できる複数のAPIのセットがあります。RDD (Resilient Distributed Datasets)とDataFramesは、ODIがコードを生成できるAPIです。

RDD (Resilient Distributed Datasets)

RDDは、Apache Sparkにおける主要なデータ抽象化です。これは、フォルト・トレラントな(Resilient; 回復性のある)パーティション化されたデータのコレクション(Dataset; データセット)であり、データはクラスタ内の複数のノードに存在(分散型; Distributed)しています。データはメモリー内に存在し、必要な場合はキャッシュされます。RDDは読取り専用ですが、別のRDDを作成する変換の適用が可能です。トリガーされたときにのみデータの使用が可能になる(または変換される)、遅延評価が使用されます。RDDパーティションは、並列度の単位になります。

DataFrames

DataFrameは、読取り専用の分散されたデータのコレクションです。RDDとは異なり名前付き列に編成されます。抽象化レベルが高いため、SparkSQL問合せの使用を可能にしている場合など、巨大なデータセットの処理も簡単になります。DataFramesはSpark SQLエンジンの上部に構築されているため、パフォーマンスと領域の大幅な最適化が可能です。

注意:

ストリーミングを使用している場合、選択可能なオプションはRDDのみになります。

スキーマ・ナレッジ・モジュール・オプションの推測

Sparkは、データを調査することでデータセットのスキーマを推測または推定できます。これは便利なものですが、データ型が期待どおりにマッピングされないこともあります。その場合、該当するSpark KMのinferSchemaオプションをFalseに設定して、この機能をオフにすることができます。データ型の不一致に関連する実行時エラーが発生した場合は、スキーマの推測オプションの値を調整する必要があります。このオプションは、読取りまたは書込みのLKMで設定できます。

注意:

Sparkは、このオプションをDataFramesの作成時にのみ使用します。inferSchemaをFalseに設定すると、ODIはマッピング・データ・ストアのメタデータに基づいてスキーマ定義を生成し、この構造がDataFrame APIの作成に使用されるようになります。

次の図に、スキーマの推測オプションを示します。

図7-1 InferSchema KMオプションを含む物理マッピング

図7-1の説明が続きます
「図7-1 InferSchema KMオプションを含む物理マッピング」の説明

式構文

結合条件やフィルタ条件などに含まれる式または属性式を記述する必要があるときに、式構文に使用できるオプションがあります。ODIでRDDコードが生成されるようにしている場合は、式の記述にPythonを使用する必要があります。ただし、DataFramesが生成されるようにしている場合は、式の記述にSQLまたはPythonのどちらかを選択できます。選択した構文は、SQL_EXPRESSIONSをTrueまたはFalseに設定することで指定できます。

有効なコード生成スタイルは、次のとおりです。

  • RDDとPython式

  • DataFramesとPython式

  • DataFramesとSQL式

Python式はRDDとDataFramesでは定義が異なるため、これら2つのコード生成スタイルに応じてPython構文が異なることがあります。そのため、一部のPython式はRDDとDataFrameのコード生成スタイルの両方には通用しないことがあります。

RDDとPython式

Python式で使用可能な構文と関数の詳細は、「The Python Standard Library」を参照してください。

DataFramesとPython式

列オブジェクトに使用可能なPython関数の一覧は、Pyspark.sql.moduleを参照してください。

DataFramesとSQL式

汎用のSQL関数と演算子は、汎用SQL言語を選択しているときに式エディタで確認できます。

マッピングの説明

マッピングで使用されている複数の式を示す例について説明します。

この例では、不動産取引(Real Estate Transactions)を格納しているソース(REA)が、都市(City)と人口(Population)のデータを格納している2番目のソース(REA2)と組み合されています。その後で、大口取引のみを選択するフィルタが適用されています。これにより、次の図に示すように結合後にフィルタ処理された情報を格納するターゲット・ファイル(REA1)を作成します。

図7-2 複数の式を含むマッピング

図7-2の説明が続きます
「図7-2 複数の式を含むマッピング」の説明

マッピング定義

このマッピングは、次のように定義されています。

  • JOIN (結合): 都市(City)をキーとして使用して、不動産取引表(REA)と都市人口表(REA2)を結合します。REA1の都市名は大文字になっていますが、REA2の都市名は小文字になっています。

  • FILTER (フィルタ): 価格が$500,000以上の行を選択して、タイプがMulti-Familyの取引を無視します。

  • City (都市): 都市名は、各単語の最初の文字以外は小文字にする必要があります。

  • GeoLocation (地理位置情報): "<経度>, <緯度>"の形式にする必要があります。この文字列には、引用符も含める必要があります。

  • Population (人口): 1,000の位に最近接丸めします。

コード生成スタイルごとのマッピング式

次の表に、コード生成スタイルごとのマッピング式の説明を示します。

図7-1 コード生成スタイルごとのマッピング式

コード生成スタイルのマッピング式 RDDとPython式 DataFramesとPython式 DataFramesとSQL式

結合条件

REA.City == (REA2.City).upper()
REA.City == upper(REA2.City)
REA.City = UPPER(REA2.City)

フィルタ構文

REA.Type<>'Multi-Family' and REA.Price >=500000
REA.Type<>'Multi-Family' and  REA.Price >=500000
REA.Type<>'Multi-Family' and  REA.Price >=500000

ターゲット列構文

# City - note: this only capitalizes the first word!
(REA.City).capitalize()
# GeoLocation
REA.geolat + ", " +  REA.geolong
# Population
int(round(REA2.Population,-3))
# City
initcap(lower(REA.City))
# GeoLocation
concat(REA.geolat ,lit(", "),REA.geolong)
# Population
round(REA2.Population,-3)
-- City
INITCAP(LOWER(REA.City))
-- GeoLocation
CONCAT(REA.geolat,', ', REA.geolong)
# Population
ROUND(REA2.Population,-3)

ライブラリのインポート

この例からわかるように、それぞれのスタイルのすべてに同じ組込み関数が通用しないことがあります。ここでは、initcap組込み関数がRDDでは使用できません。capwords()関数は必要な処理を実行しますが、スクリプトにimport文を追加する必要があります。Spark EKMには、customPythonImportsという複数行オプションがあります。このオプションによってスクリプトのImport文を指定できるため、式で別の関数を使用できるようになります。

importのリストを含めるには、customPythonImports EKMオプションを次のように記述します。

from string import *
from time import localtime

ターゲット列式は、次のように記述します。

#City
capwords(REA.City)

Sparkストリーミングのサポート

この項では、データ・セットに対するストリーミング・モードの操作について説明します。また、チェックポイントに関する情報も示します。

注意:

Cloudera CDH 6.0、Hortonworks 3.1以降などのデータ・プラットフォームでは、Kafkaを使用したSpark 2.0ストリーミングはサポートされなくなりました。

この項には次のサブセクションが含まれます:

Sparkチェックポイント

ストリーミング・アプリケーションは、24時間365日動作する必要があるため、障害からの回復が早い必要があります。Sparkストリーミングでは、障害からリカバリできるように、フォルト・トレラント・ストレージ・システムに情報をチェックポイントする必要があります。

チェックポイントは、アプリケーションを実行しているドライバの障害からリカバリするアプリケーションに対して有効です。チェックポイントで保証されるのは、チェックポイントが見つかった場合、Sparkアプリケーションが残っているところから再開することのみです。

チェックポイントの詳細は、『Spark Streaming Programming Guide』を参照してください。

Sparkのウィンドウおよびステートフル集計

Sparkのウィンドウ機能を使用すると、集計(およびその他の変換)を現在のRDDに適用できるだけでなく、複数の以前のRDD (ウィンドウ期間)からのデータも含めることができます。

Spark KMは、バッチに加えてストリーミング変換もサポートしています。ストリーミング以外のPythonコードはRDDまたはDataFrameオブジェクトに対して機能し、ストリーミングのコードはDStreamオブジェクトに対して機能します。バッチ・モードでの集計は単純で、単一セットの入力レコード(RDD)があり、これが集計されて出力データを形成し、ターゲットに書き込まれます。ストリーミング・モードでは、連続して着信するデータがRDDのフローに離散化されます。デフォルトでは、各RDDは別々に集計されます。

Sparkウィンドウは、累計や移動平均などを計算するのに適しています。しかし、2つの制約があります。

  • 古いRDDを保持する必要があります。

  • ウィンドウに入るデータは、新しいRDDのたびに再計算されます。

このことから、ウィンドウはデータ・ストリーム全体での集計に適しません。これは、ステートフル集計によってのみ実現できます。

ウィンドウ対応のKMには次の省略可能なKMのオプションがあります。

  • ウィンドウ期間: ウィンドウの期間。バッチ間隔の数で定義されます。

  • スライディング間隔: ウィンドウ操作が実行される間隔。バッチ間隔の数で定義されます。

ウィンドウがサポートされているのは、次のとおりです。

  • XKM Spark Aggregation

  • XKM Spark Join

  • XKM Spark Set

  • XKM Spark Distinct

詳細は、『Spark Streaming Programming Guide』を参照してください。

ステートフル集計

ストリームの全データにわたってデータを集計する必要がある場合、ステートフル集計が必要です。ステートフル集計では、Sparkによってすべてのキーの集計値が含まれる状態ストリームが構築されます。着信RDDごとに、この状態が更新されます。たとえば、新しい着信データに基づいて集計された合計が更新されます。

デフォルトでは、ステート・ストリームは、着信RDDごとに格納された値をすべて出力します。これは、ストリーム出力がファイルであり、そのファイルが常に集計値を全部保持していると想定される場合に便利です。

ステートフル処理がサポートされているのは、次のとおりです。

  • XKM Spark Aggregate

  • XKM Spark Lookup

Sparkの再パーティション化およびキャッシング

キャッシング

ODIでは、SparkベースのKMのオプションを2つ追加することで、Sparkのキャッシング・メカニズムを活用します。

  • データのキャッシュ: このオプションをtrueに設定すると、ストレージの起動がコンポーネントの生成済pysparkコードに追加されます。

  • キャッシュ・ストレージ・レベル: データのキャッシュがfalseに設定されている場合、このオプションは表示されません。

再パーティション化

ソースがHDFSファイルの場合、最初のパーティション数はソースHDFSシステムのデータ・ブロックによって決定されます。Sparkアプリケーションを実行するプラットフォームのタスクに使用できるスロットがロードされるパーティションの数よりも多くある場合、プラットフォームのリソースは完全には活用されていません。その場合は、RDD.repartition() APIを使用してパーティションの数を変更できます。

再パーティション化は、プロセス全体の任意の段階で実行できます。データがソースからロードされた直後やフィルタ・コンポーネントの処理後にも実行できます。ODIには、再パーティション化を実行するかどうかや、どこで実行するかを決定するSparkベースのKMオプションがあります。

  • 再パーティション: このオプションをtrueに設定すると、コンポーネントの変換後に再パーティションが適用されます。

  • 並列度のレベル: パーディションの数。デフォルトは0です。デフォルト値が設定されていると、repartition()関数の呼出しのためにspark.default.parallelismが使用されるようになります。

  • パーティションのソート: このオプションをtrueに設定すると、パーティションがキーでソートされます。キーはLambda関数で定義されます。

  • パーティションのソート順: 昇順または降順。デフォルトは昇順です。

  • パーティション・キー: カンマ区切りの列リストとして表されるユーザー定義のパーティション・キー。

  • パーティション関数: ユーザー定義のパーティションLambda関数。デフォルト値は、pyspark定義のハッシュ関数portable_hashです。この関数は、単にRDD行全体のハッシュ・ベースを計算します。

ストリーミングのサポートの構成

ストリーミングのサポートの構成は、次に示す2つの部分で実行します。

  1. トポロジ
    1. 「トポロジ」タブをクリックします。
    2. 「物理アーキテクチャ」ツリーの「テクノロジ」で、Spark Pythonを右クリックして「新規データ・サーバー」をクリックします。
    3. 「定義」タブで、Sparkデータ・サーバーの詳細を指定します。

      詳細は、「Sparkデータ・サーバーの定義」を参照してください。

    4. 「プロパティ」タブで、Sparkデータ・サーバーのプロパティを指定します。

      詳細は、「Sparkデータ・サーバー・プロパティ」を参照してください。

    5. 「テスト接続」をクリックして、Sparkデータ・サーバーへの接続をテストします。
  2. マッピング設計
    1. マッピングを編集するには、物理設計を選択し、キャンバスの空白部分をクリックして、プロパティ・パネルで「ストリーミング」チェック・ボックスを選択します。

      ODIは、バッチ・モードではなく、ストリーミング・モードでマッピングの実行が可能なコードを生成します。

Sparkストリーミング・データサーバー・プロパティ

Sparkテクノロジ固有のストリーミング・プロパティが提供されています。こうしたプロパティは、Spark実行ユニット・プロパティのデフォルト値です。

表7-2 Sparkストリーミング・データサーバー・プロパティ

キー
spark.checkpointingBaseDir このプロパティでは、チェックポイントのベース・ディレクトリを定義します。このベース・ディレクトリ下のマッピングごとに、サブディレクトリが作成されます。

例: hdfs://cluster-ns1/user/oracle/spark/checkpoints

spark.checkpointingInterval 時間(秒)を表します。
spark.restartFromCheckpoint
  • trueに設定すると、Sparkストリーミング・アプリケーションは既存のチェックポイントから再開されます。

  • falseに設定すると、Sparkストリーミング・アプリケーションは既存のチェックポイントをすべて無視します。

  • チェックポイントが存在しない場合は、通常どおり開始されます。

spark.batchDuration ストリーミング間隔の継続時間を秒単位で表示します。
spark.rememberDuration 秒単位の時間を表示して、この期間のRDDを記憶するようにSparkストリーミング・コンテキストを設定します。
spark.checkpointing Sparkチェックポイントを有効にします。
spark.streaming.timeout ストリーミング・アプリケーションを停止するまでのタイムアウトを秒単位で表示します。

デフォルトは60です。

odi-execution-mode
  • SYNCHRONOUS: Sparkアプリケーションは、OdiOSCommandを使用して発行およびモニターされます。

  • ASYNCHRONOUS: Sparkアプリケーションは、OdiOSCommandを使用して非同期に発行され、Spark REST APIを使用してモニターされます。

spark.ui.enabled Spark Live REST APIを有効にします。

注意:

非同期実行の場合はtrueに設定します。
spark.eventLog.enabled Sparkイベント・ログを有効にします。これにより、Spark履歴サーバーでログにアクセスできるようになります。

注意:

非同期実行の場合はtrueに設定します。
principal Kerberos化されたユーザー名。
keytab kerberosプリンシパル・キーと暗号化キーのペアを格納するkeytabファイルの場所。

例: /tmp/oracle.keytab

odi.spark.enableUnsupportedSparkModes このチェックは、yarn-clientとyarn-clusterのみがサポートされると、採用されます。
その他のSparkストリーミング・データ・プロパティ

非同期Spark実行ユニットに追加されたSparkテクノロジに固有のその他のSparkストリーミング・プロパティを示します。

表7-3 その他のSparkストリーミング・プロパティ

キー
spark-webui-startup-polling-retries Spark WebUIの起動を待機する間の最大再試行回数。
spark-webui-startup-polling-interval 再試行の間隔(秒)を表します。
spark-webui-startup-polling-persist-after-retries  
spark-webui-rest-timeout Spark WebUIでのRESTコールに使用されるタイムアウト(秒)。
spark-webui-polling-interval Spark WebUIでのポーリング間隔(秒)。
spark-webui-polling-persist-after-retries  
spark-history-server-rest-timeout Spark履歴サーバーでのRESTコールに使用されるタイムアウト(秒)。
spark-history-server-polling-retries Spark履歴サーバーによってSparkイベント・ログが使用可能になるのを待機する間の最大再試行回数。
spark-history-server-polling-interval 再試行の間隔(秒)。
spark-history-server-polling-persist-after-retries  
spark-submit-shutdown-polling-retries Sparkによって発行されたOSプロセスの完了を待機する間の最大再試行回数。
spark-submit-shutdown-polling-interval 再試行の間隔(秒)。
spark-submit-shutdown-polling-persist-after-retries  

ストリーミング・モードでのマッピングの実行

このトピックでは、ストリーミング・モードでマッピングを実行できるようにするステップを示します。ストリーミングには、フォルト・トレラント・ストレージ・システムが障害からリカバリするためのチェックポイント情報が必要です。

  1. ストリーミングのサポートを有効にするには、「ストリーミングのサポートの構成」を参照してください。
  2. マッピングの物理設計で、ステージング実行ユニットを選択して、EKMのチェックポイント・オプションを有効にします。チェックポイントを有効化するには、spark.checkpointingの値をTrueに設定して、spark.checkpointingBaseDirプロパティにチェックポイント・ディレクトリを設定します。
    すべてのマッピングには固有のチェックポイント・ディレクトリがあります。
  3. マッピングを実行し、物理設計のためのコンテキストを設定します。

    注意:

    デフォルトでは、ユーザー・インタフェース・デザイナ「「マッピングの実行」ダイアログで最後に実行した物理設計を選択」があらかじめ選択されています。

ODIでのRDDとDataFramesの切替え

DataFrameコードとRDDコードの生成を切り替えるには、Spark実行ユニットでEKMオプションspark.useDataFramesをTrueまたはFalseに設定します。

DataFrameコードの生成をサポートしていないコンポーネント

一部のコンポーネントは、DataFrameコードの生成をサポートしていません。DataFramesをサポートしていないマッピング・コンポーネントが1つでもあると、検証エラーが表示され(Spark実行ユニットのプロパティspark.useDataFramesをfalseに設定するように求められ)、RDDに戻す必要があります。

次のコンポーネントは、DataFrameコードの生成をサポートしていません。

  • ピボット

  • アンピボット

  • 入力シグネチャ

  • 出力シグネチャ

表関数の形式でのカスタム・コードの追加

表関数コンポーネントを使用すると、外部スクリプトまたはインライン・コードへの参照の形式で独自のコード・セグメントをマッピングに追加できます。

TABLEFUNCTIONコンポーネントがソース・ログ・ファイルの解析と変換に使用されている例について説明します。このマッピングによって、ソース・ファイルからの未加工のデータ、変更されたデータおよびタイムスタンプなどの新しいデータを含むターゲット・ファイルが生成されます。

表関数を含んでいるマッピングを作成して、そのマッピングに入力属性と出力属性を追加するには、次の手順を実行します。

  1. ソース・データ・ストアとターゲット・データ・ストアを'TABLEFUNCTION'という名前の表関数コンポーネントとともに追加してマッピングを作成します。

    図7-3 ソース、ターゲットおよび表関数を含むマッピング

    図7-3の説明が続きます
    「図7-3 ソース、ターゲットおよび表関数を含むマッピング」の説明
  2. ソース・データ・ストアの出力コネクタをTABLEFUNCTIONコンポーネントの入力コネクタに接続します。

    この時点で、入力属性はTABLEFUNCTIONに直接追加されるようになります。

    注意:

    • 次の図に示すように、ソース・データ・ストアからのすべての属性を含んでいる入力グループ'INPUT1'が自動的に作成されます。

    • 追加のソース・データ・ストアごとに、新しい入力グループが追加されます。

    図7-4 TABLEFUNCTIONに追加された入力グループ

    図7-4の説明が続きます
    「図7-4 TABLEFUNCTIONに追加された入力グループ」の説明
  3. ターゲット・データ・ストアの入力コネクタをTABLEFUNCTIONコンポーネントの出力コネクタに接続します。

    この時点で、出力属性はTABLEFUNCTIONに直接追加されるようになります。

    注意:

    • 次の図に示すように、ターゲット・データ・ストアからのすべての属性を含んでいる出力グループ'OUTPUT1'が自動的に作成されます。

    • 'OUTPUT1'の出力属性は、名前変更または削除が可能です。

    • 各出力属性の式は、TABLEFUNCTIONコンポーネントに埋め込まれたスクリプトによるプログラムで設定されるため個別に設定する必要はありません。

    図7-5 ソース、ターゲットおよび表関数が接続されたマッピング

    図7-5の説明が続きます
    「図7-5 ソース、ターゲットおよび表関数が接続されたマッピング」の説明

次の手順を実行して、マッピングを構成します。

  1. 「論理」タブに移動して、「ステージングの場所のヒント」に従ってSpark-Local_Defaultを選択します。

  2. 「物理」タブに移動します。「抽出オプション」で、SPARK_SCRIPT_FILE KMオプションの値として/tmp/xkmtf.pyと入力することで、TABLEFUNCTIONコンポーネントに使用するスクリプトを指定します。xmktf.pyスクリプトには、次の内容が含まれています。
    import sys
    import datetime
    
    #get the upstream object using the input connector point name 
    upstream=sys.argv[0]['INPUT1']
    
    #A value must be calculated for every TF output attribute
    TABLEFUNCTION = upstream.map(lambda input:Row(**{"visitorId":input.visitorId, "channel":input.channel, "clientCountry":input.clientCountry, "newSessionId":'Prefix'+input.sessionId, "timeStamp":now.strftime("%Y-%m-%d %H:%M")}))

    ここでは、TABLEFUNCTIONコンポーネントの入力グループ'INPUT1'が、sys.argv経由でSpark-Pythonスクリプトのxkmtf.pyに渡されます。

    また、SPARK_SCRIPT KMオプションの値として次の内容を入力すると、TABLEFUNCTIONコンポーネントに使用するスクリプトを直接指定することもできます。
    import datetime
    
    now = datetime.datetime.now()
    
    #A value must be calculated for every TF output attribute 
    TABLEFUNCTION = ACT.map(lambda input:Row(**{"visitorId":input.visitorId, "channel":input.channel, "clientCountry":input.clientCountry, "newSessionId":'Prefix'+input.sessionId, "timeStamp":now.strftime("%Y-%m-%d %H:%M")}))

次に示す2つのタイプの表関数用Sparkスクリプトがあります。

  • 外部表関数スクリプト

  • インライン表関数スクリプト

外部表関数スクリプト

これは、ODIマッピング・コード内から動的に実行できます。必要に応じて、sys.argvを使用して外部スクリプトで処理するためのRDD/DataFrameを送信します。

たとえば、次のプロパティによって挿入された表関数コンポーネントについて考えてみます。

  • 名前 – TABLEFUNCTION

  • 入力コネクタ - INPUT1

  • 入力フィールド - IN_ATTR_1およびIN_ATTR_2

  • 出力属性 - OUT_ATTR_1、OUT_ATTR_2およびOUT_ATTR_3

次の外部スクリプトに示すように、アップストリームのRDD/DataStreamオブジェクトは入力コネクタ・ポイント名を使用して取得されます。その後で結果のRDD/DStreamが計算されます。すべての表関数出力属性名に対して値が計算されます。

import sys
import datetime
upstream=sys.argv[0]['INPUT1']
now = datetime.datetime.now()
TABLEFUNCTION = upstream.map(lambda input:Row(**{"OUT_ATTR_1":input.sessionId, "OUT_ATTR_2":input.customerId, "OUT_ATTR_3":now.strftime("%Y-%m-%d %H:%M")}))

この外部スクリプトを動的に実行するために、ODIは次のマッピング・コードを生成します。外部スクリプトの実行結果は、TABLEFUNCTIONとして保存されます。

sys.argv=[dict(INPUT1=ACT)]
execfile('/tmp/xkmtf_300.py')
TABLEFUNCTION = TABLEFUNCTION.toDF(...)

インライン表関数スクリプト

インライン・モードでは、実際の表関数スクリプトがXKMオプションとして保存されます。sys.argvを使用して、スクリプトを処理するためのソース・オブジェクトを送信する必要はありません。

次のスクリプトに示すように、外部スクリプトの実行結果が直接参照されます。

ACT=ACT.filter("ACT_customerId = '5001'")
TABLEFUNCTION = ACT.toDF(...)