C Sparkナレッジ・モジュール
この付録では、Sparkナレッジ・モジュールについて説明します。
この付録の内容は次のとおりです。
LKM File to Spark
このKMはデータをファイルからSpark Python変数にロードし、実行ユニット、ソース・テクノロジであるファイル、ターゲット・テクノロジであるSpark Pythonの間のAPで定義できます。
注意:
このKMはHDFSファイルのロードもサポートしています。ただし、その目的にはLKM HDFS to Sparkの使用をお薦めします。次の表に、LKM File to Sparkのオプションの説明を示します。
表C-1 LKM File to Spark
オプション | 説明 |
---|---|
ストレージ関数 |
データをロード/保存するのに使用されるストレージ関数。 データをロードするための次のいずれかのストレージ関数を選択します。
|
streamingContext |
ストリーミング・コンテキストの名前。 |
InputFormatClass |
入力データの書式を読み取るためのクラス。 次に例を示します。
|
KeyClass |
キーの完全修飾クラス名。 次に例を示します。
|
ValueClass |
値の完全修飾クラス名。 次に例を示します。
|
KeyConverter |
キー・コンバータ・クラスの完全修飾クラス名。 |
ValueConverter |
値コンバータ・クラスの完全修飾クラス名。 |
ジョブ構成 |
Hadoop構成。 例: {'hbase.zookeeper.quorum': 'HOST', 'hbase.mapreduce.inputtable': 'TAB'} |
inferSchema |
データからDataFrameスキーマを推測します。 True (デフォルト)に設定すると、ソース・データから列名と型が推測され、デフォルトのオプションでDataFrameが作成されます。 Falseに設定すると、DataFrameスキーマがソース・データ・ストアの定義に基づいて指定されます。 |
dateFormat |
日付またはタイムスタンプ入力フィールドの書式。 |
Sparkマッピング・ファイルの削除 |
マッピングの最後に一時オブジェクトを削除します。 |
キャッシュ |
計算後に、操作全体でRDD/DataFrameのキャッシュを実行します。 |
ストレージ・レベル |
データのキャッシュに使用するストレージ・レベル。 |
再パーティション |
このコンポーネントの変換後に、RDD/DataFrameの再パーティションを実行します。 |
並列度のレベル |
パーティションの数。 |
パーティションのソート |
再パーティション時に、キー関数でパーティションをソートします。 |
パーティションのソート順序 |
パーティションをソートする順序。 |
パーティション・キー |
パーティションのキーを定義します。 |
パーティション関数 |
カスタマイズされたパーティション関数。 |
このLKMは、StreamingContext.textFileStream()メソッドを使用して、ファイル・コンテキストをデータ・ストリームとして転送します。Sparkアプリケーションの実行中、ディレクトリはモニターされます。他の場所からこのディレクトリにコピーされたファイルはすべて削除されます。
表C-2 ストリーミング用のLKM File to Spark
オプション | 説明 |
---|---|
ストレージ関数 |
STREAMING_MODEをtrueに設定すると、ロード関数はtextFileStreamに自動的に変更されます。 デフォルトはtextFileです。 |
ソース・データ・ストア |
ソース・データ・ストアはディレクトリであり、フィールド・セパレータを定義する必要があります。 |
LKM Spark to File
このKMはデータをSpark Python変数からファイルに保存し、実行ユニット、ソース・テクノロジであるSpark Python、ターゲット・テクノロジであるファイルの間のAPで定義できます。
注意:
このKMはHDFSファイルへの書込みもサポートしていますが、LKM Spark to HDFSの使用をお薦めします。次の表は、LKM Spark to Fileのオプションについて説明します。
表C-3 LKM Spark to File
オプション | 説明 |
---|---|
ストレージ関数 |
データのロード/保存に使用するストレージ関数。 データを保存するための次のいずれかのストレージ関数を選択します。
注意: spark.useDataFramesがTrueに設定されていると、データはJSON文字列のRDDとして保存されます(saveAsNewAPIHadoopFile 、saveAsHadoopFile およびsaveAsSequenceFile の場合)。
|
OutputFormatClass |
データ書込みの完全修飾クラス名。 次に例を示します。
|
KeyClass |
キーの完全修飾クラス。 次に例を示します。
|
ValueClass |
値の完全修飾クラス。 次に例を示します。
|
KeyConverter |
キー・コンバータ・クラスの完全修飾クラス名。 |
ValueConverter |
値コンバータ・クラスの完全修飾クラス名。 |
ジョブ構成 |
Hadoop構成プロパティの追加またはオーバーライドが可能です。 例: {'hbase.zookeeper.quorum': 'HOST', 'hbase.mapreduce.inputtable': 'TAB'} |
SQL_EXPRESSIONS |
SQL式を使用します。 |
Sparkマッピング・ファイルの削除 |
マッピングの最後に一時オブジェクトを削除します。 |
キャッシュ |
計算後に、操作全体でRDD/DataFrameのキャッシュを実行します。 |
ストレージ・レベル |
データのキャッシュに使用するストレージ・レベル。 |
再パーティション |
このコンポーネントの変換後に、RDD/DataFrameの再パーティションを実行します。 |
並列度のレベル |
パーティションの数。 |
パーティションのソート |
再パーティション時に、キー関数でパーティションをソートします。 |
パーティションのソート順序 |
パーティションをソートする順序。 |
パーティション・キー |
パーティションのキーを定義します。 |
パーティション関数 |
カスタマイズされたパーティション関数。 |
表C-4 ストリーミング用のLKM Spark to File
オプション | 説明 |
---|---|
ストレージ関数 |
STREAMING_MODEをtrueに設定すると、ロード関数はtextFileStreamに自動的に変更されます。 デフォルトはtextFileです。 |
LKM Hive to Spark
このKMはデータをHive表からSpark Python変数にロードし、実行ユニット、ソース・テクノロジであるHive、ターゲット・テクノロジであるSpark Pythonの間のAPで定義できます。
次の表に、LKM Hive to Sparkのオプションの説明を示します。
表C-5 LKM Hive to Spark
オプション | 説明 |
---|---|
Sparkマッピング・ファイルの削除 |
マッピングの最後に一時オブジェクトを削除します。 |
キャッシュ |
計算後に、操作全体でRDD/DataFrameのキャッシュを実行します。 |
ストレージ・レベル |
データのキャッシュに使用するストレージ・レベル。 |
再パーティション |
このコンポーネントの変換後に、RDD/DataFrameの再パーティションを実行します。 |
並列度のレベル |
パーティションの数。 |
パーティションのソート |
再パーティション時に、キー関数でパーティションをソートします。 |
パーティションのソート順序 |
パーティションをソートする順序。 |
パーティション・キー |
パーティションのキーを定義します。 |
パーティション関数 |
カスタマイズされたパーティション関数。 |
LKM Spark to Hive
このKMはデータをSpark Python変数からHive表に保存し、実行ユニット、ソース・テクノロジであるSpark Python、ターゲット・テクノロジであるHiveの間のAPで定義できます。
次の表に、LKM Spark to Hiveのオプションの説明を示します。
表C-6 LKM Spark to Hive
オプション | 説明 |
---|---|
OVERWRITE_TARGET_TABLE |
ターゲット表を上書きします。 |
INFER_SCHEMA |
RDDデータからターゲットDataFrameのスキーマを推測します。 |
SAMPLING_RATIO |
推測に使用される行のサンプリング割合。 |
SQL_EXPRESSIONS |
SQL式を使用します。 |
Sparkマッピング・ファイルの削除 |
マッピングの最後に一時オブジェクトを削除します。 |
キャッシュ |
計算後に、操作全体でRDD/DataFrameのキャッシュを実行します。 |
ストレージ・レベル |
データのキャッシュに使用するストレージ・レベル。 |
再パーティション |
このコンポーネントの変換後に、RDD/DataFrameの再パーティションを実行します。 |
並列度のレベル |
パーティションの数。 |
パーティションのソート |
再パーティション時に、キー関数でパーティションをソートします。 |
パーティションのソート順序 |
パーティションをソートする順序。 |
パーティション・キー |
パーティションのキーを定義します。 |
パーティション関数 |
カスタマイズされたパーティション関数。 |
LKM HDFS to Spark
このKMは、データをHDFSファイルからSparkにロードします。
表C-7 LKM HDFS to Spark
オプション | 説明 |
---|---|
streamingContext |
ストリーミング・コンテキストの名前。 |
inferSchema |
データからDataFrameスキーマを推測します。 |
Sparkマッピング・ファイルの削除 |
マッピングの最後に一時オブジェクトを削除します。 |
キャッシュ |
計算後に、操作全体でRDD/DataFrameのキャッシュを実行します。 |
ストレージ・レベル |
データのキャッシュに使用するストレージ・レベル。 |
再パーティション |
このコンポーネントの変換後に、RDD/DataFrameの再パーティションを実行します。 |
並列度のレベル |
パーティションの数。 |
パーティションのソート |
再パーティション時に、キー関数でパーティションをソートします。 |
パーティションのソート順序 |
パーティションをソートする順序。 |
パーティション・キー |
パーティションのキーを定義します。 |
パーティション関数 |
カスタマイズされたパーティション関数。 |
注意:
物理スキーマでストリーミングのチェック・ボックスが選択されていると、ストリーミングが有効化されます。ストリーミングは、区切り形式とJSON形式の場合にのみサポートされます。LKM Spark to HDFS
このKMは、データをSparkからHDFSファイルにロードします。
表C-8 LKM Spark to HDFS
オプション | 説明 |
---|---|
SQL_EXPRESSIONS |
SQL式を使用します。 |
Sparkマッピング・ファイルの削除 |
マッピングの最後に一時オブジェクトを削除します。 |
キャッシュ |
計算後に、操作全体でRDD/DataFrameのキャッシュを実行します。 |
ストレージ・レベル |
データのキャッシュに使用するストレージ・レベル。 |
再パーティション |
このコンポーネントの変換後に、RDD/DataFrameの再パーティションを実行します。 |
並列度のレベル |
パーティションの数。 |
パーティションのソート |
再パーティション時に、キー関数でパーティションをソートします。 |
パーティションのソート順序 |
パーティションをソートする順序。 |
パーティション・キー |
パーティションのキーを定義します。 |
パーティション関数 |
カスタマイズされたパーティション関数。 |
注意:
物理スキーマでストリーミングのチェック・ボックスが選択されていると、ストリーミングが有効化されます。ストリーミングは、すべての形式でサポートされます。LKM Kafka to Spark
このKMは、KafkaソースおよびSparkターゲットを使用してデータをロードし、Spark実行ユニットに存在してKafkaアップストリーム・ノードを保持するAPノードで定義できます。
表C-9 ストリーミング用のLKM Kafka to Spark
オプション | 説明 |
---|---|
ストレージ関数 |
データをロードするのに使用されるストレージ関数。 |
fromOffsets |
ストリーミングの始点を定義するトピック/パーティションごとのKafkaオフセット(始点を含む)。 |
KeyDecoder |
メッセージ・キーを変換します。 |
ValueDecoder |
メッセージ値を変換します。 |
groupId |
このコンシューマのグループID。 |
storageLevel |
RDDストレージ・レベル。 |
numPartitions |
コンシューマごとのパーティション数。 |
offsetRanges |
利用するtopic:partition:[start, end]を指定するoffsetRangeのリスト。 |
leaders |
offsetRangesのTopicAndPartitionごとのKafkaブローカ。 |
messageHandler |
KafkaMessageAndMetadataの変換に使用する関数。 |
avroSchema |
avroSchemaは、.avscファイルのコンテンツを保持します。このファイルは、.avroデータ・ファイルに関連付けられます。 |
Sparkマッピング・ファイルの削除 |
マッピングの最後に一時オブジェクトを削除します。 |
キャッシュ |
計算後に、操作全体でRDD/DataFrameのキャッシュを実行します。 |
ストレージ・レベル |
データのキャッシュに使用するストレージ・レベル。 |
再パーティション |
このコンポーネントの変換後に、RDD/DataFrameの再パーティションを実行します。 |
並列度のレベル |
パーティションの数。 |
パーティションのソート |
再パーティション時に、キー関数でパーティションをソートします。 |
パーティションのソート順序 |
パーティションをソートする順序。 |
パーティション・キー |
パーティションのキーを定義します。 |
パーティション関数 |
カスタマイズされたパーティション関数。 |
LKM Spark to Kafka
LKM Spark to Kafkaは、ストリーミングとバッチの両方のモードで機能し、実行ユニット間に存在してKafkaダウンストリーム・ノードを保持するAPで定義できます。
表C-10 LKM Spark to Kafka
オプション | 説明 |
---|---|
avroSchema |
.avscファイルのコンテンツを保持します。このファイルは、.avroデータ・ファイルに関連付けられます。 |
Sparkマッピング・ファイルの削除 |
マッピングの最後に一時オブジェクトを削除します。 |
キャッシュ |
計算後に、操作全体でRDD/DataFrameのキャッシュを実行します。 |
ストレージ・レベル |
データのキャッシュに使用するストレージ・レベル。 |
再パーティション |
このコンポーネントの変換後に、RDD/DataFrameの再パーティションを実行します。 |
並列度のレベル |
パーティションの数。 |
パーティションのソート |
再パーティション時に、キー関数でパーティションをソートします。 |
パーティションのソート順序 |
パーティションをソートする順序。 |
パーティション・キー |
パーティションのキーを定義します。 |
パーティション関数 |
カスタマイズされたパーティション関数。 |
LKM SQL to Spark
このKMは、データをCassandraからSparkにロードするよう設計されていますが、他のJDBCソースとでも機能します。SQLソースおよびSparkターゲットを保持するAPノードで定義できます。
このKMを使用するには、Hadoop資格証明プロバイダの構成とパスワードの定義が必要になります。詳細は、「Hadoopのパスワード処理」を参照してください。
表C-11 LKM SQL to Spark
オプション | 説明 |
---|---|
PARTITION_COLUMN |
パーティション化に使用される列 |
LOWER_BOUND |
パーティション列の下限。 |
UPPER_BOUND |
パーティション列の上限。 |
NUMBER_PARTITIONS |
パーティションの数。 |
PREDICATES |
述語のリスト。 |
Sparkマッピング・ファイルの削除 |
マッピングの最後に一時オブジェクトを削除します。 |
キャッシュ |
計算後に、操作全体でRDD/DataFrameのキャッシュを実行します。 |
ストレージ・レベル |
データのキャッシュに使用するストレージ・レベル。 |
再パーティション |
このコンポーネントの変換後に、RDD/DataFrameの再パーティションを実行します。 |
並列度のレベル |
パーティションの数。 |
パーティションのソート |
再パーティション時に、キー関数でパーティションをソートします。 |
パーティションのソート順序 |
パーティションをソートする順序。 |
パーティション・キー |
パーティションのキーを定義します。 |
パーティション関数 |
カスタマイズされたパーティション関数。 |
LKM Spark to SQL
このKMは、SparkからJDBCターゲットにデータをロードします。SparkソースとSQLターゲットを保持するAPノードで定義できます。
このKMを使用するには、Hadoop資格証明プロバイダの構成とパスワードの定義が必要になります。詳細は、「Hadoopのパスワード処理」を参照してください。
表C-12 LKM Spark to SQL
オプション | 説明 |
---|---|
CREATE_TARG_TABLE |
ターゲット表を作成します。 |
TRUNCATE_TARG_TABLE |
ターゲット表を切り捨てます。 |
DELETE_TARG_TABLE |
ターゲット表を削除します。 |
INFER_SCHEMA |
RDDデータからターゲットDataFrameのスキーマを推測します。 |
SAMPLING_RATIO |
推測に使用される行のサンプリング割合。 |
SQL_EXPRESSIONS |
SQL式を使用します。 |
Sparkマッピング・ファイルの削除 |
マッピングの最後に一時オブジェクトを削除します。 |
キャッシュ |
計算後に、操作全体でRDD/DataFrameのキャッシュを実行します。 |
ストレージ・レベル |
ストレージ・レベルは、データのキャッシュに使用されます。 |
再パーティション |
このコンポーネントの変換後に、RDD/DataFrameの再パーティションを実行します。 |
並列度のレベル |
パーティションの数。 |
パーティションのソート |
RDD/DataFrameの再パーティション時に、キー関数でパーティションをソートします。 |
パーティションのソート順序 |
パーティションをソートする順序。 |
パーティション・キー |
パーティションのキーを定義します。 |
パーティション関数 |
カスタマイズされたパーティション関数。 |
LKM Spark to Cassandra
このKMを使用するには、Hadoop資格証明プロバイダの構成とパスワードの定義が必要になります。詳細は、「Hadoopのパスワード処理」を参照してください。
表C-13 LKM Spark to Cassandra
オプション | 説明 |
---|---|
CREATE_TARG_TABLE |
ターゲット表を作成します。 |
TRUNCATE_TARG_TABLE |
ターゲット表を切り捨てます。 |
DELETE_TARG_TABLE |
ターゲット表を削除します。 |
INFER_SCHEMA |
RDDデータからターゲットDataFrameのスキーマを推測します。 |
SAMPLING_RATIO |
推測に使用される行のサンプリング割合。 |
SQL_EXPRESSIONS |
SQL式を使用します。 |
Sparkマッピング・ファイルの削除 |
マッピングの最後に一時オブジェクトを削除します。 |
キャッシュ |
計算後に、操作全体でRDD/DataFrameのキャッシュを実行します。 |
ストレージ・レベル |
データのキャッシュに使用するストレージ・レベル。 |
再パーティション |
このコンポーネントの変換後に、RDD/DataFrameの再パーティションを実行します。 |
並列度のレベル |
パーティションの数。 |
パーティションのソート |
再パーティション時に、キー関数でパーティションをソートします。 |
パーティションのソート順序 |
パーティションをソートする順序。 |
パーティション・キー |
パーティションのキーを定義します。 |
パーティション関数 |
カスタマイズされたパーティション関数。 |
RKM Cassandra
-
データストアとしてのCassandra表。
「リバース・エンジニアリング」タブの「マスク」フィールドで、リバースエンジニアリングされるオブジェクトが名前に基づいてフィルタ処理されます。「マスク」フィールドは空にできず、少なくともパーセント記号(%)を含める必要があります。
-
属性としてのCassandra列およびそのデータ型。
XKM Spark Aggregate
SUMやGROUP BYなどを使用して行を集約します。
次の表は、XKM Spark Aggregateのオプションについて説明します。
表C-14 XKM Spark Aggregate
オプション | 説明 |
---|---|
CACHE_DATA |
デフォルト・ストレージ・レベルを使用してデータを永続化します。 |
NUMBER_OF_TASKS |
タスク番号。 |
表C-15 ストリーミング用のXKM Spark Aggregate
オプション | 説明 |
---|---|
WINDOW_AGGREGATION |
ウィンドウ集計を有効にします。 |
WINDOW_LENGTH |
バッチ間隔の数。 |
SLIDING_INTERVAL |
ウィンドウ操作が実行される間隔。 |
STATEFUL_AGGREGATION |
ステートフル集計を有効にします。 |
STATE_RETENTION_PERIOD |
Spark状態オブジェクトでキーまたは値の集計を保持する時間(秒)。 |
FORWARD_ONLY_UPDATED_ROWS |
ダウンストリーム・コンポーネントに転送される変更された集計値。 |
XKM Spark Filter
フィルタ条件に基づいてデータのサブセットを生成します。
次の表は、XKM Spark Filterのオプションについて説明します。
表C-16 XKM Spark Filter
オプション | 説明 |
---|---|
CACHE_DATA |
デフォルト・ストレージ・レベルを使用してデータを永続化します。 |
XKM Spark Join
結合条件に基づいて複数の入力ソースを結合します。
次の表は、XKM Spark Joinのオプションについて説明します。
表C-17 XKM Spark Join
オプション | 説明 |
---|---|
CACHE_DATA |
デフォルト・ストレージ・レベルを使用してデータを永続化します。 |
NUMBER_OF_TASKS |
タスク番号。 |
XKM Spark Lookup
派生データ・ソースのデータをルックアップします。
次の表は、XKM Spark Lookupのオプションについて説明します。
表C-18 XKM Spark Lookup
オプション | 説明 |
---|---|
CACHE_DATA |
デフォルト・ストレージ・レベルを使用してデータを永続化します。 |
NUMBER_OF_TASKS |
タスク番号。 |
MAP_SIDE |
KMでマップ側ルックアップを実行するのかリデュース側ルックアップを実行するのかを定義し、ルックアップのパフォーマンスにきわめて大きな影響を及ぼします。 |
KEY_BASED_LOOKUP |
ルックアップ・キーに対応するデータのみが取得されます。 |
表C-19 ストリーミング用のXKM Spark Lookup
オプション | 説明 |
---|---|
MAP_SIDE |
MAP_SIDE=true: メモリーに入る小さいルックアップ・データ・セットに適しています。この設定により、ルックアップ・データがすべてのSparkタスクにブロードキャストされてパフォーマンスが向上します。 |
KEY_BASED_LOOKUP |
どの着信ルックアップ・キーについても、Sparkキャッシュがチェックされます。
|
CACHE_RELOAD |
このオプションは、ルックアップ・ソース・データがいつロードおよびリフレッシュされるかを定義します。対応する値は次のとおりです。
|
CACHE_RELOAD_INTERVAL |
データがSparkキャッシュで保持される時間を定義します。この時間を経過すると、失効したデータまたはレコードはキャッシュから削除されます。 |
XKM Spark Pivot
別々の行のデータを取得して集計し、列に変換します。
次の表は、XKM Spark Pivotのオプションについて説明します。
表C-20 XKM Spark Pivot
オプション | 説明 |
---|---|
CACHE_DATA |
デフォルト・ストレージ・レベルを使用してデータを永続化します。 |
注意:
XKM Spark Pivotでは、ストリーミングがサポートされていません。XKM Spark Sort
式を使用してデータをソートします。
次の表は、XKM Spark Sortのオプションについて説明します。
表C-21 XKM Spark Sort
オプション | 説明 |
---|---|
CACHE_DATA |
デフォルト・ストレージ・レベルを使用してデータを永続化します。 |
NUMBER_OF_TASKS |
タスク番号。 |
XKM Spark Split
データを複数の条件により複数のパスに分割します。
次の表は、XKM Spark Splitのオプションについて説明します。
表C-22 XKM Spark Split
オプション | 説明 |
---|---|
CACHE_DATA |
デフォルト・ストレージ・レベルを使用してデータを永続化します。 |
XKM Spark Table Function
このKMでは、Spark Pythonスクリプト全体の一部として任意のSpark/Python変換を実行することで、カスタム変換を適用できます。
次の表に、XKM Spark Table Functionのオプションの説明を示します。
表C-23 XKM Spark Table Function
オプション | 説明 |
---|---|
SPARK_SCRIPT |
カスタマイズされたコード・コンテンツをユーザーが指定します。 |
SPARK_SCRIPT_FILE |
ユーザーがsparkスクリプト・ファイルのパスを指定します。 |
CACHE_DATA |
デフォルト・ストレージ・レベルを使用してデータを永続化します。 |
注意:
オプションSPARK_SCRIPTまたはSPARK_SCRIPT_FILEのどちらか1つのみを設定する必要があります。-
SPARK_SCRIPT_FILEを設定した場合は、指定したファイルが動的に実行されます。
-
SPARK_SCRIPTを設定した場合、コンテンツがメインのSparkスクリプトに挿入されます。
-
SPARK_SCRIPTまたはSPARK_SCRIPT_FILEのどちらも設定していないと、少なくとも1つのオプションの指定が必要なことを示す検証エラーが生成されます。
-
SPARK_SCRIPTとSPARK_SCRIPT_FILEの両方を設定すると、1つのオプションのみを指定する必要があることを示す検証エラーが生成されます。
IKM Spark Table Function
ターゲットとしてのSpark表関数。
次の表は、IKM Spark Table Functionのオプションについて説明します。
表C-24 IKM Spark Table Function
オプション | 説明 |
---|---|
SPARK_SCRIPT_FILE |
ユーザーがsparkスクリプト・ファイルのパスを指定します。 |
CACHE_DATA |
デフォルト・ストレージ・レベルを使用してデータを永続化します。 |