C Spark Knowledge Modules

This appendix provides information about the Spark knowledge modules.

This appendix includes the following sections:

LKM File to Spark

This KM will load data from a file into a Spark Python variable and can be defined on the AP between the execution units, source technology File, target technology Spark Python.

Note:

This KM also supports loading HDFS files, although it's preferable to use LKM HDFS to Spark for that purpose.

The following tables describe the options for LKM File to Spark.

Table C-1 LKM File to Spark

Option Description

Storage Function

The storage function to be used to load/store data.

Choose one of the following storage functions to load data:

  • textFile is used to load data from HDFS, a local file system or any Hadoop-supported file system URI.

  • jsonFile is used to load data from a JSON file where each line of the files is a JSON object.

  • newAPIHadoopFile is used to load data from a Hadoop file with an arbitrary new API InputFormat.

  • newAPIHadoopRDD is used to load data from Hadoop-readable dataset with an arbitrary new API InputFormat.

  • hadoopFile is used to load data from a Hadoop file with an arbitrary InputFormat.

  • hadoopRDD is used to load data from a Hadoop-readable dataset.

  • sequenceFile is used to load data from an RDD of key-value pairs to Spark.

streamingContext

Name of Streaming Context.

InputFormatClass

Class for reading the format of input data.

For example,

  • org.apache.hadoop.mapreduce.lib.input.TextInputFormat (default)

  • org.apache.hadoop.mapred.TextInputFormat (hadoopFile and hadoopRDD)

KeyClass

Fully qualified classname for keys.

For example,

  • org.apache.hadoop.io.LongWritable (default)

  • org.apache.hadoop.io.Text

ValueClass

Fully qualified classname for values.

For example,

  • org.apache.hadoop.io.Text (default)

  • org.apache.hadoop.io.LongWritable

KeyConverter

Fully qualified classname of key converter class.

ValueConverter

Fully qualified classname of value converter class.

Job Configuration

Hadoop configuration.

For example, {'hbase.zookeeper.quorum': 'HOST', 'hbase.mapreduce.inputtable': 'TAB'}

inferSchema

Infer DataFrame schema from data.

If set to True (default), the column names and types will be inferred from source data and DataFrame will be created with default options.

If set to False, the DataFrame schema will be specified based on the source data store definition.

dateFormat

Format for Date or Timestamp input fields.

Delete Spark Mapping Files

Delete temporary objects at end of mapping.

Cache

Cache RDD/DataFrame across operations after computation.

Storage Level

The storage level to be used to cache data.

Repartition

Repartition the RDD/DataFrame after transformation of this component.

Level of Parallelism

Number of partitions.

Sort Partitions

Sort partitions by a key function when repartitioning.

Partition Sort Order

Sort partitions order.

Partition Keys

Define keys for partitions.

Partition Function

Customized partitioning function.

This LKM uses StreamingContext.textFileStream() method to transfer file context as data stream. The directory is monitored while the Spark application is running. Any files copied from other locations into this directory is detected.

Table C-2 LKM File to Spark for Streaming

Option Description

Storage Function

If STREAMING_MODE is set to true, the load function is changed to textFileStream automatically.

Default is textFile.

Source Data store

Source data store is a directory and field separator should be defined.

LKM Spark to File

This KM will store data into a file from a Spark Python variable and can be defined on the AP between the execution units, source technology Spark Python, target technology File.

Note:

This KM also supports writing to an HDFS File, although the LKM Spark to HDFS is preferable.

The following tables describes the options for LKM Spark to File.

Table C-3 LKM Spark to File

Option Description

Storage Function

Storage function to be used to load/store data.

Choose one of the following storage functions to store data:

  • saveAsTextFile is used to store the data into HDFS, a local file system or any Hadoop-supported file system URI.

  • saveAsJsonFile is used to store the data in JSON format into HDFS, a local file system or any Hadoop-supported file system URI.

  • saveAsNewAPIHadoopFile is used to store the data to a Hadoop file with an arbitrary new API InputFormat.

  • saveAsHadoopFile is used to store data to a Hadoop file with an arbitrary InputFormat.

  • saveAsSequenceFile is used to store data into key-value pairs.

Note:

When spark.useDataFrames is set to True, the data will be saved as RDD of JSON strings for saveAsNewAPIHadoopFile, saveAsHadoopFile, and saveAsSequenceFile.

OutputFormatClass

Fully qualified classname for writing the data.

For example,

  • org.apache.hadoop.mapreduce.lib.input.TextOutputFormat (default)

  • org.apache.hadoop.mapred.TextOutputFormat (saveAsHadoopFile)

KeyClass

Fully qualified class for keys.

For example,

  • org.apache.hadoop.io.NullWritable (default)

  • org.apache.hadoop.io.Text

ValueClass

Fully qualified class for values.

For example,

  • org.apache.hadoop.io.NullWritable

  • org.apache.hadoop.io.Text (default)

KeyConverter

Fully qualified classname of key converter class.

ValueConverter

Fully qualified classname of value converter class.

Job Configuration

Allows adding or overriding Hadoop configuration properties.

For example, {'hbase.zookeeper.quorum': 'HOST', 'hbase.mapreduce.inputtable': 'TAB'}

SQL_EXPRESSIONS

Use SQL Expressions.

Delete Spark Mapping Files

Delete temporary objects at end of mapping.

Cache

Cache RDD/DataFrame across operations after computation.

Storage Level

The storage level to be used to cache data.

Repartition

Repartition the RDD/DataFrame after transformation of this component.

Level of Parallelism

Number of partitions.

Sort Partitions

Sort partitions by a key function when repartitioning.

Partition Sort Order

Sort partitions order.

Partition Keys

Define keys for partitions.

Partition Function

Customized partitioning function.

Table C-4 LKM Spark to File for streaming

Option Description

Storage Function

If STREAMING_MODE is set to true, the load function is changed to textFileStream automatically.

Default is textFile.

LKM Hive to Spark

This KM will load data from a Hive table into a Spark Python variable and can be defined on the AP between the execution units, source technology Hive, target technology Spark Python.

The following table describes the options for LKM Hive to Spark:

Table C-5 LKM Hive to Spark

Option Description

Delete Spark Mapping Files

Delete temporary objects at end of mapping.

Cache

Cache RDD/DataFrame across operations after computation.

Storage Level

The storage level to be used to cache data.

Repartition

Repartition the RDD/DataFrame after transformation of this component.

Level of Parallelism

Number of partitions.

Sort Partitions

Sort partitions by a key function when repartitioning.

Partition Sort Order

Sort partitions order.

Partition Keys

Define keys for partitions.

Partition Function

Customized partitioning function.

LKM Spark to Hive

This KM will store data into a Hive table from a Spark Python variable and can be defined on the AP between the execution units, source technology Spark, target technology Hive.

The following table describes the options for LKM Spark to Hive.

Table C-6 LKM Spark to Hive

Option Description

OVERWRITE_TARGET_TABLE

Overwrite the target table.

INFER_SCHEMA

Infer target DataFrame schema from RDD data.

SAMPLING_RATIO

The sample ratio of rows used for inferring.

SQL_EXPRESSIONS

Use SQL Expressions.

Delete Spark Mapping Files

Delete temporary objects at end of mapping.

Cache

Cache RDD/DataFrame across operations after computation.

Storage Level

The storage level to be used to cache data.

Repartition

Repartition the RDD/DataFrame after transformation of this component.

Level of Parallelism

Number of partitions.

Sort Partitions

Sort partitions by a key function when repartitioning.

Partition Sort Order

Sort partitions order.

Partition Keys

Define keys for partitions.

Partition Function

Customized partitioning function.

LKM HDFS to Spark

This KM will load data from HDFS file to Spark.

Table C-7 LKM HDFS to Spark

Option Description

streamingContext

Name of Streaming Context.

inferSchema

Infer DataFrame schema from data.

Delete Spark Mapping Files

Delete temporary objects at end of mapping.

Cache

Cache RDD/DataFrame across operations after computation.

Storage Level

The storage level to be used to cache data.

Repartition

Repartition the RDD/DataFrame after transformation of this component.

Level of Parallelism

Number of partitions.

Sort Partitions

Sort partitions by a key function when repartitioning.

Partition Sort Order

Sort partitions order.

Partition Keys

Define keys for partitions.

Partition Function

Customized partitioning function.

Note:

Streaming is enabled when the streaming check box is selected in the physical schema. Streaming is only supported for the Delimited and JSON formats.

LKM Spark to HDFS

This KM will load data from Spark to HDFS file.

Table C-8 LKM Spark to HDFS

Option Description

SQL_EXPRESSIONS

Use SQL Expressions.

Delete Spark Mapping Files

Delete temporary objects at end of mapping.

Cache

Cache RDD/DataFrame across operations after computation.

Storage Level

The storage level to be used to cache data.

Repartition

Repartition the RDD/DataFrame after transformation of this component.

Level of Parallelism

Number of partitions.

Sort Partitions

Sort partitions by a key function when repartitioning.

Partition Sort Order

Sort partitions order.

Partition Keys

Define keys for partitions.

Partition Function

Customized partitioning function.

Note:

Streaming is enabled when the streaming check box is selected in the physical schema. Streaming is supported for all formats.

LKM Kafka to Spark

This KM will load data with Kafka source and Spark target and can be defined on the AP node that exist in Spark execution unit and have Kafka upstream node.

Table C-9 LKM Kafka to Spark for streaming

Option Description

Storage Function

The storage function to be used to load data.

fromOffsets

Per-topic/partition Kafka offsets defining the (inclusive) starting point of the stream.

KeyDecoder

Converts message key.

ValueDecoder

Converts message value.

groupId

The group id for this consumer.

storageLevel

RDD Storage level.

numPartitions

Number of partitions for each consumer.

offsetRanges

List of offsetRange to specify topic:partition:[start, end) to consume.

leaders

Kafka brokers for each TopicAndPartition in offsetRanges.

messageHandler

A function used to convert KafkaMessageAndMetadata.

avroSchema

avroSchema have the content of .avsc file. This file is associated with .avro Data file.

Delete Spark Mapping Files

Delete temporary objects at end of mapping.

Cache

Cache RDD/DataFrame across operations after computation.

Storage Level

The storage level to be used to cache data.

Repartition

Repartition the RDD/DataFrame after transformation of this component.

Level of Parallelism

Number of partitions.

Sort Partitions

Sort partitions by a key function when repartitioning.

Partition Sort Order

Sort partitions order.

Partition Keys

Define keys for partitions.

Partition Function

Customized partitioning function.

LKM Spark to Kafka

LKM Spark to Kafka works in both streaming and batch mode and can be defined on the AP between the execution units and have Kafka downstream node.

Table C-10 LKM Spark to Kafka

Option Description

avroSchema

Has the content of .avsc file. This file is associated with .avro Data file.

Delete Spark Mapping Files

Delete temporary objects at end of mapping.

Cache

Cache RDD/DataFrame across operations after computation.

Storage Level

The storage level to be used to cache data.

Repartition

Repartition the RDD/DataFrame after transformation of this component.

Level of Parallelism

Number of partitions.

Sort Partitions

Sort partitions by a key function when repartitioning.

Partition Sort Order

Sort partitions order.

Partition Keys

Define keys for partitions.

Partition Function

Customized partitioning function.

LKM SQL to Spark

This KM is designed to load data from Cassandra into Spark, but it can work with other JDBC sources. It can be defined on the AP node that have SQL source and Spark target.

To use this KM, it is mandatory to configure the Hadoop Credential Provider and define the password. For more information, see Password Handling in Hadoop.

Table C-11 LKM SQL to Spark

Option Description

PARTITION_COLUMN

Column used for partitioning.

LOWER_BOUND

Lower bound of the partition column.

UPPER_BOUND

Upper bound of the partition column.

NUMBER_PARTITIONS

Number of partitions.

PREDICATES

List of predicates.

Delete Spark Mapping Files

Delete temporary objects at end of mapping.

Cache

Cache RDD/DataFrame across operations after computation.

Storage Level

The storage level to be used to cache data.

Repartition

Repartition the RDD/DataFrame after transformation of this component.

Level of Parallelism

Number of partitions.

Sort Partitions

Sort partitions by a key function when repartitioning.

Partition Sort Order

Sort partitions order.

Partition Keys

Define keys for partitions.

Partition Function

Customized partitioning function.

LKM Spark to SQL

This KM will load data from Spark into JDBC targets and can be defined on the AP node that have Spark source and SQL target.

To use this KM, it is mandatory to the configure the Hadoop Credential Provider and define the password. For more information, see Password Handling in Hadoop.

Table C-12 LKM Spark to SQL

Option Description

CREATE_TARG_TABLE

Create target table.

TRUNCATE_TARG_TABLE

Truncate target table.

DELETE_TARG_TABLE

Delete target table.

INFER_SCHEMA

Infer target DataFrame schema from RDD data.

SAMPLING_RATIO

The sample ratio of rows used for inferring.

SQL_EXPRESSIONS

Use SQL Expressions.

Delete Spark Mapping Files

Delete temporary objects at end of mapping.

Cache

Cache RDD/DataFrame across operations after computation.

Storage Level

The storage level is used to cache data.

Repartition

Repartition the RDD/DataFrame after transformation of this component.

Level of Parallelism

Number of partitions.

Sort Partitions

Sort partitions by a key function when you repartition RDD/DataFrame.

Partition Sort Order

Sort partition order.

Partition Keys

Define keys of partition.

Partition Function

Customized partitioning function.

LKM Spark to Cassandra

To use this KM, it is mandatory to configure the Hadoop Credential Provider and define the password. For more information, see Password Handling in Hadoop.

Table C-13 LKM Spark to Cassandra

Option Description

CREATE_TARG_TABLE

Create target table.

TRUNCATE_TARG_TABLE

Truncate target table.

DELETE_TARG_TABLE

Delete target table.

INFER_SCHEMA

Infer target DataFrame schema from RDD data.

SAMPLING_RATIO

The sample ratio of rows used for inferring.

SQL_EXPRESSIONS

Use SQL Expressions.

Delete Spark Mapping Files

Delete temporary objects at end of mapping.

Cache

Cache RDD/DataFrame across operations after computation.

Storage Level

The storage level to be used to cache data.

Repartition

Repartition the RDD/DataFrame after transformation of this component.

Level of Parallelism

Number of partitions.

Sort Partitions

Sort partitions by a key function when repartitioning.

Partition Sort Order

Sort partitions order.

Partition Keys

Define keys for partitions.

Partition Function

Customized partitioning function.

RKM Cassandra

RKM Cassandra reverses these metadata elements:
  • Cassandra tables as data stores.

    The Mask field in the Reverse Engineer tab filters reverse-engineered objects based on their names. The Mask field cannot be empty and must contain at least the percent sign (%).

  • Cassandra columns as attributes with their data types.

XKM Spark Aggregate

Summarize rows, for example, using SUM and GROUP BY.

The following tables describes the options for XKM Spark Aggregate.

Table C-14 XKM Spark Aggregate

Option Description

CACHE_DATA

Persist the data with the default storage level.

NUMBER_OF_TASKS

Task number.

Table C-15 XKM Spark Aggregate for streaming

Option Description

WINDOW_AGGREGATION

Enable window aggregation.

WINDOW_LENGTH

Number of batch intervals.

SLIDING_INTERVAL

The interval at which the window operation is performed.

STATEFUL_AGGREGATION

Enables stateful aggregation.

STATE_RETENTION_PERIOD

Time in seconds to retain a key or value aggregate in the Spark state object.

FORWARD_ONLY_UPDATED_ROWS

Modified aggregate values forwarded to downstream components.

XKM Spark Distinct

Eliminates duplicates in data and functionality is identical to the existing batch processing.

XKM Spark Expression

Define expressions to be reused across a single mapping.

XKM Spark Filter

Produce a subset of data by a filter condition.

The following tables describes the options for XKM Spark Filter.

Table C-16 XKM Spark Filter

Option Description

CACHE_DATA

Persist the data with the default storage level.

XKM Spark Input Signature and Output Signature

Supports code generation for reusable mapping.

XKM Spark Join

Joins more than one input sources based on the join condition.

The following tables describes the options for XKM Spark Join.

Table C-17 XKM Spark Join

Option Description

CACHE_DATA

Persist the data with the default storage level.

NUMBER_OF_TASKS

Task number.

XKM Spark Lookup

Lookup data for a driving data source.

The following tables describes the options for XKM Spark Lookup.

Table C-18 XKM Spark Lookup

Option Description

CACHE_DATA

Persist the data with the default storage level.

NUMBER_OF_TASKS

Task number.

MAP_SIDE

Defines whether the KM will do a map-side lookup or a reduce-side lookup and significantly impacts lookup performance.

KEY_BASED_LOOKUP

Only data corresponding to the lookup keys are retrieved.

Table C-19 XKM Spark Lookup for streaming

Option Description

MAP_SIDE

MAP_SIDE=true : Suitable for small lookup data sets fitting into memory. This setting provides better performance by broadcasting the lookup data to all Spark tasks.

KEY_BASED_LOOKUP

For any incoming lookup key a Spark cache is checked.

  • If the lookup record is present and not expired, the lookup data is served from the cache.

  • If the lookup record is missing or expired, the data is re-loaded from the SQL source.

CACHE_RELOAD

This option defines when the lookup source data is loaded and refreshed and here are the corresponding values:
  • NO_RELOAD: The lookup source data is loaded once on Spark application startup.

  • RELOAD_EVERY_BATCH: The lookup source data is reloaded for every new Spark batch.

  • RELOAD_BASE_ON_TIME: The lookup source data is loaded on Spark application startup and refreshed after the time interval provided by KM option CacheReloadInterval.

CACHE_RELOAD_INTERVAL

Defines the time data to be retained in the Spark cache. After this time the expired data or records are removed from cache.

XKM Spark Pivot

Take data in separate rows, aggregates it and converts it into columns.

The following tables describes the options for XKM Spark Pivot.

Table C-20 XKM Spark Pivot

Option Description

CACHE_DATA

Persist the data with the default storage level.

Note:

XKM Spark Pivot does not support streaming.

XKM Spark Set

Perform UNION, MINUS or other set operations.

XKM Spark Sort

Sort data using an expression.

The following tables describes the options for XKM Spark Sort.

Table C-21 XKM Spark Sort

Option Description

CACHE_DATA

Persist the data with the default storage level.

NUMBER_OF_TASKS

Task number.

XKM Spark Split

Split data into multiple paths with multiple conditions.

The following tables describes the options for XKM Spark Split.

Table C-22 XKM Spark Split

Option Description

CACHE_DATA

Persist the data with the default storage level.

XKM Spark Table Function

This KM allows applying custom transformation by executing arbitrary Spark/Python transformations as part of the overall Spark Python script.

The following table describes the options for XKM Spark Table Function.

Table C-23 XKM Spark Table Function

Option Description

SPARK_SCRIPT

User specifies the customized code content.

SPARK_SCRIPT_FILE

User specifies the path of spark script file.

CACHE_DATA

Persist the data with the default storage level.

Note:

Only one of the options, either SPARK_SCRIPT or SPARK_SCRIPT_FILE must be set.
  • If SPARK_SCRIPT_FILE is set, then the specified file will be dynamically executed.

  • If SPARK_SCRIPT is set, its content will be inserted into the main Spark script.

  • If neither SPARK_SCRIPT nor SPARK_SCRIPT_FILE is set, a validation error is generated stating that at least one of the options must be specified.

  • If both SPARK_SCRIPT and SPARK_SCRIPT_FILE are set, a validation error is generated stating that only one of the options must be specified.

IKM Spark Table Function

Spark table function as target.

The following tables describes the options for IKM Spark Table Function.

Table C-24 IKM Spark Table Function

Option Description

SPARK_SCRIPT_FILE

User specifies the path of spark script file.

CACHE_DATA

Persist the data with the default storage level.

XKM Spark Unpivot

Transform a single row of attributes into multiple rows in an efficient manner.

The following tables describes the options for XKM Spark Pivot.

Table C-25 XKM Spark Unpivot

Option Description

CACHE_DATA

Persist the data with the default storage level.

Note:

XKM Spark Unpivot does not support streaming.