この章の内容は次のとおりです。
Hadoopでの通常の処理には、MapReduceジョブとしてプログラミングされたデータの検証や変換などがあります。MapReduceジョブの設計と実装には、専門的なプログラミングの知識が必要です。しかしOracle Data Integratorを使用すれば、MapReduceジョブを記述する必要はありません。Oracle Data Integratorは、Apache Hiveと、MapReduceジョブを実装するためのSQLライクな言語であるHive Query Language (HiveQL)を使用します。
ビッグ・データ処理のシナリオを実装する場合の最初の手順は、データをHadoopにロードすることです。データ・ソースは、通常はファイルまたはSQLデータベースです。
データのロード後は、SQLと同様の方法でHiveQLを使用して、データを検証および変換できます。データ検証(NULLや主キーのチェックなど)および変換(フィルタ、集計、設定操作、表の導出など)を実行できます。また、カスタマイズした手続き型のスニペット(スクリプト)をデータの処理に含めることもできます。
データが集計、簡略化または処理されて小さなデータ・セットになったら、追加の処理および分析を行うためにそれをOracleデータベース、他のリレーショナル・データベース、HDFS、HBase、またはHiveにロードできます。Oracleデータベースへのロードを最適に行うには、Oracle Loader for Hadoopが推奨されます。
詳細は、「Hadoopデータの統合」を参照してください。
デフォルトでは、Oracle Data Integrator (ODI)はHiveQLを使用してマッピングを実装します。しかし、Oracle Data IntegratorではPig LatinおよびSpark Pythonを使用してマッピングを実装することもできます。マッピングを設計したら、それをデフォルトのHiveQLを使用して実装することも、Pig LatinまたはSpark Pythonを使用して実装するよう選択することもできます。
ODIにおけるPig LatinおよびSpark Pythonのサポートは、これらの言語に固有のコンポーネントKMのセットによって実現されています。これらのコンポーネントKMは、Pigデータ・サーバーまたはSparkデータ・サーバーがマッピングのステージング場所として使用される場合にのみ使用されます。
たとえば、ステージング場所としてPigデータ・サーバーを使用する場合、Pig関連のKMがマッピングを実装するのに使用され、Pig Latinコードが生成されます。同様に、Spark Pythonコードを生成するには、Sparkデータ・サーバーをマッピングのステージング場所として使用する必要があります。
Sparkアプリケーションをyarnで実行することをお薦めします。この推奨に従って、ODIでは、yarn-clientモードとyarn-clusterモードの実行のみがサポートされると、実行時チェックが採用されます。
odi.spark.enableUnsupportedSparkModes = true
異なる言語のコードの生成ならびにPigおよびSparkコンポーネントKMの詳細は、次を参照してください。
Apache Oozieは、Hadoopでのアクションをオーケストレートするのに役立つワークフロー・スケジューラ・システムです。Hadoop MapReduceジョブを実行するアクションによるワークフロー・ジョブの実行に特化したサーバーベースのワークフロー・エンジンです。Oozieワークフローの実装と実行には、Oozieについての詳細な知識が必要になります。
しかし、Oracle Data IntegratorがあればOozieのエキスパートになる必要はありません。Oracle Data Integratorを使用すると、Oozieワークフローを簡単に定義および実行できます。
Oracle Data Integratorでは、統合プロジェクト(パッケージ、プロシージャ、マッピング、またはシナリオ)をOozieエンジンで実行することにより、Oozieワークフロー定義を自動的に生成できます。生成されたOozieワークフロー定義はOozieワークフロー・システムに配置されて実行されます。コンテンツの検証のためにOozieワークフローの配置のみを行い、実行は後から行うこともできます。
Oozieログからの情報は取得されてODIリポジトリにOozie UIへのリンクとともに保存されます。この情報はODIオペレータおよびコンソールに表示されます。
詳細は、「Oozieワークフローの実行」を参照してください。
ODIでは、Oozieワークフローの実行のために次の2つのモードが用意されています。
TASK
タスク・モードではすべてのODIタスクに対してOozieアクションが生成されます。これがデフォルト・モードです。
タスク・モードでは次は処理できません。
複数のタスクにまたがるスクリプト・コードがあるKM
トランザクションがあるKM
タスクをまたぐファイル・アクセスができないファイル・システム・アクセスが発生するKM
ループ構成のあるODIパッケージ
SESSION
セッション・モードではセッション全体に対してOozieアクションが生成されます。
次のいずれかの条件に当てはまる場合は、自動的にこのモードが使用されます。
いずれかのタスクによりトランザクション接続がオープンされている。
いずれかのタスクにスクリプトがある。
パッケージにループが含まれている。
パッケージ内のループはOozieエンジンではサポートされておらず、SESSIONモードで実行されているときであっても、実行やセッション・ログ・コンテンツの取得という点からして正しく動作しない可能性があります。
注意:
ほとんどのユースケースにおいて、このモードの使用をお薦めします。
デフォルトの場合、Oozieランタイム・エンジンではタスク・モードが使用されます。つまり、Oozieランタイム・エンジンのOOZIE_WF_GEN_MAX_DETAIL
プロパティのデフォルト値はTASKです。
前述の条件が満たされているかどうかに関わりなく、Oozieランタイム・エンジンをセッション・モードを使用するように構成できます。Oozieランタイム・エンジンでセッション・レベルのOozieワークフローが生成されるよう設定するには、Oozieランタイム・エンジンのOOZIE_WF_GEN_MAX_DETAIL
プロパティをSESSIONに設定します。
詳細は、「Oozieランタイム・エンジンのプロパティ」を参照してください。
Lambdaアーキテクチャは、バッチ処理方式とストリーム処理方式の両方を利用して大量のデータを処理するように設計されたデータ処理アーキテクチャです。
Lambdaアーキテクチャでは、データ構造モデルが様々なテクノロジとともに使用されています。たとえば、バッチ実装用のデータソースをHDFSにできるのに対して、ストリーミング実装ではKafkaからデータを読み取ることができます。ODIでのこの典型は、汎用ファイルや汎用SQLなどの汎用テクノロジの使用です。
ストリーミング・アプリケーションは、24時間365日動作する必要があるため、障害からの回復が早い必要があります。Sparkストリーミングでは、障害からリカバリできるように、フォルト・トレラント・ストレージ・システムに情報をチェックポイントする必要があります。
チェックポイントは、アプリケーションを実行しているドライバの障害からリカバリするアプリケーションに対して有効です。チェックポイントで保証されるのは、チェックポイントが見つかった場合、Sparkアプリケーションが残っているところから再開することのみです。
チェックポイントの詳細は、『Spark Streaming Programming Guide』を参照してください。
Sparkのウィンドウ機能を使用すると、集計(およびその他の変換)を現在のRDDに適用できるだけでなく、多数の以前のRDD (ウィンドウ期間)からのデータも含めることができます。
Spark KMでは、バッチおよびストリーミング変換をサポートしています。ストリーミング以外のPythonコードはRDDオブジェクトに対して機能する一方、ストリーミング・コードはDStreamオブジェクトに対して機能します。バッチ・モードでの集計は単純で、単一セットの入力レコード(RDD)があり、これが集計されて出力データを形成し、ターゲットに書き込まれます。ストリーミング・モードでは、連続して着信するデータがRDDのフローに離散化されます。デフォルトでは、各RDDは別々に集計されます。
古いRDDを保持する必要があります。
ウィンドウに入るデータは、新しいRDDのたびに再計算されます。
ウィンドウ期間: ウィンドウDStreamのRDDを生成するためにRDDが結合される時間(秒)
スライディング間隔: ウィンドウ操作が実行される間隔。
XKM Spark Aggregate
XKM Spark Join
ステートフル集計
ストリームの全データにわたってデータを集計する必要がある場合、ステートフル集計が必要です。ステートフル集計では、Sparkによってすべてのキーの集計値が含まれる状態ストリームが構築されます。着信RDDごとに、この状態が更新されます。たとえば、新しい着信データに基づいて集計された合計が更新されます。
デフォルトでは、ステート・ストリームは、着信RDDごとに格納された値をすべて出力します。これは、ストリーム出力がファイルであり、そのファイルが常に集計値を全部保持していると想定される場合に便利です。
XKM Spark Aggregate
XKM Spark Lookup
キャッシング
ODIでは、SparkベースのKMのオプションをさらに2つ用意して、Sparkキャッシング・メカニズムを強化しています。
データのキャッシュ: このオプションをtrueに設定すると、ストレージの起動がコンポーネントの生成済pysparkコードに追加されます。
キャッシュ・ストレージ・レベル: データのキャッシュがfalseに設定されている場合、このオプションは表示されません。
再パーティション化
パーティションの数は最初にデータ・ブロックによって決定されますが、ソースがHDFSファイルであり、タスクの実行に使用可能なスロットがパーティションの数を超えるSparkアプリケーションを実行するプラットフォームがロードされる場合、プラットフォームのリソースはすべて使用されるわけではありません。
再パーティションは、プロセス全体のどのステップでも実行でき、データをソースからロードした直後またはフィルタ・コンポーネントの処理後に実行できます。ODIには、再パーティションを実行するかどうかとどこで実行するかを決定するために、SparkベースのKMのオプションがあります。
再パーティション
: このオプションをtrueに設定すると、コンポーネントの変換後に再パーティションが適用されます。並列度のレベル
: パーティションの数で、デフォルトは0です。パーティションのソート: このオプションをtrueに設定すると、パーティションがキーでソートされます。キーはLambda関数で定義されます。
パーティションのソート順: 昇順または降順。デフォルトは昇順です。
パーティション・キー関数: ユーザー定義のパーティション・キーで、キー定義はカンマ区切りの列リストにする必要があります。
パーティション関数: ユーザー定義のパーティションLambda関数。デフォルト値は、pyspark定義のハッシュ関数であるportable_hash
で、RDDの行全体についてハッシュ・ベースを計算します。
Kafkaクラスタは、メッセージを処理および格納する1つ以上のKafkaブローカで構成されます。メッセージはトピックに編成され、物理的にトピックのパーティションに分類されます。Kafkaプロデューサは、クラスタに接続してメッセージをトピックに入れます。Kafkaコンシューマは、クラスタに接続してメッセージをトピックから受け取ります。
特定のトピックに関するすべてのメッセージが同じメッセージ形式である必要はありませんが、トピックごとに単一のメッセージ形式のみを使用することをお薦めします。Kafkaは新しいテクノロジとしてODIに統合されています。