This chapter includes the following sections:
Typical processing in Hadoop includes data validation and transformations that are programmed as MapReduce jobs. Designing and implementing a MapReduce job requires expert programming knowledge. However, when you use Oracle Data Integrator, you do not need to write MapReduce jobs. Oracle Data Integrator uses Apache Hive and the Hive Query Language (HiveQL), a SQL-like language for implementing MapReduce jobs.
When you implement a big data processing scenario, the first step is to load the data into Hadoop. The data source is typically in Files or SQL databases.
After the data is loaded, you can validate and transform it by using HiveQL like you use SQL. You can perform data validation (such as checking for NULLS and primary keys), and transformations (such as filtering, aggregations, set operations, and derived tables). You can also include customized procedural snippets (scripts) for processing the data.
When the data has been aggregated, condensed, or processed into a smaller data set, you can load it into an Oracle database, other relational database, HDFS, HBase, or Hive for further processing and analysis. Oracle Loader for Hadoop is recommended for optimal loading into an Oracle database.
For more information, see Integrating Hadoop Data.
By default, Oracle Data Integrator (ODI) uses HiveQL to implement the mappings. However, Oracle Data Integrator also lets you to implement the mappings using Pig Latin and Spark Python. Once your mapping is designed, you can either implement it using the default HiveQL, or choose to implement it using Pig Latin or Spark Python.
Support for Pig Latin and Spark Python in ODI is achieved through a set of component KMs that are specific to these languages. These component KMs are used only when a Pig data server or a Spark data server is used as the staging location for your mapping.
For example, if you use a Pig data server as the staging location, the Pig related KMs are used to implement the mapping and Pig Latin code is generated. Similarly, to generate Spark Python code, you must use a Spark data server as the staging location for your mapping.
Recommendation is to run Spark applications on yarn. Following this recommendation ODI only supports yarn-client and yarn-cluster mode execution and has introduced a runtime check.
odi.spark.enableUnsupportedSparkModes = true
For more information about generating code in different languages and the Pig and Spark component KMs, see the following:
Apache Oozie is a workflow scheduler system that helps you orchestrate actions in Hadoop. It is a server-based Workflow Engine specialized in running workflow jobs with actions that run Hadoop MapReduce jobs. Implementing and running Oozie workflow requires in-depth knowledge of Oozie.
However, Oracle Data Integrator does not require you to be an Oozie expert. With Oracle Data Integrator you can easily define and execute Oozie workflows.
Oracle Data Integrator allows you to automatically generate an Oozie workflow definition by executing an integration project (package, procedure, mapping, or scenario) on an Oozie engine. The generated Oozie workflow definition is deployed and executed into an Oozie workflow system. You can also choose to only deploy the Oozie workflow to validate its content or execute it at a later time.
Information from the Oozie logs is captured and stored in the ODI repository along with links to the Oozie UIs. This information is available for viewing within ODI Operator and Console.
For more information, see Executing Oozie Workflows.
ODI provides the following two modes for executing the Oozie workflows:
TASK
Task mode generates an Oozie action for every ODI task. This is the default mode.
The task mode cannot handle the following:
KMs with scripting code that spans across multiple tasks.
KMs with transactions.
KMs with file system access that cannot span file access across tasks.
ODI packages with looping constructs.
SESSION
Session mode generates an Oozie action for the entire session.
ODI automatically uses this mode if any of the following conditions is true:
Any task opens a transactional connection.
Any task has scripting.
A package contains loops.
Note that loops in a package are not supported by Oozie engines and may not function properly in terms of execution and/or session log content retrieval, even when running in SESSION mode.
Note:
This mode is recommended for most of the use cases.
By default, the Oozie Runtime Engines use the Task mode, that is, the default value of the OOZIE_WF_GEN_MAX_DETAIL
property for the Oozie Runtime Engines is TASK.
You can configure an Oozie Runtime Engine to use Session mode, irrespective of whether the conditions mentioned above are satisfied or not. To force an Oozie Runtime Engine to generate session level Oozie workflows, set the OOZIE_WF_GEN_MAX_DETAIL
property for the Oozie Runtime Engine to SESSION.
For more information, see Oozie Runtime Engine Properties.
Lambda architecture is a data-processing architecture designed to handle massive quantities of data by taking advantage of both batch and stream processing methods.
In Lambda architecture, the data structure model is used with different technologies. For example, the data source for the batch implementation can be HDFS, whereas the streaming implementation can read data from Kafka. In ODI this is represented by using Generic Technologies like Generic File and Generic SQL.
A streaming application must operate 24/7 and hence should be resilient to failures. Spark Streaming needs to checkpoint information to a fault tolerant storage system so that it can recover from failures.
Checkpointing is enabled for applications recovering from failures of the driver running the application. Checkpointing only ensures that the Spark application will restart from where it left if a checkpoint is found.
For additional information on checkpointing refer to Spark Streaming Programming Guide.
Spark's Windowing feature allows aggregation (and other transformations) to be applied not just to the current RDD, but also include data from a number of previous RDDs (window duration).
The Spark KMs support batch as well as streaming transformations. While the Python code for non-streaming operates on RDD objects, the streaming code works on DStream objects. Aggregation in batch mode is simple: there is a single set of input records (RDD), which are aggregated to form the output data, which is then written into some target. In streaming mode the continuously incoming data is discretized into a flow of RDDs. By default each RDD is aggregated independently.
Older RDDs must be retained
Data falling into the window is recalculated for every new RDD.
Window Duration: Time in seconds RDDs are combined to produce the RDDs of the windowed DStream
Sliding Interval: Interval at which the window operation is performed.
XKM Spark Aggregation
XKM Spark Join
Stateful Aggregation
When data must be aggregated across all data of a stream, stateful aggregation is required. In stateful aggregation Spark builds called state stream containing the aggregated values for all keys. For every incoming RDD this state is updated, for example aggregated sums are updated based on new incoming data.
By default a state stream will output all stored values for every incoming RDD. This is useful in case the stream output is a file and the file is expected to always hold the entire set of aggregate values.
XKM Spark Aggregate
XKM Spark Lookup
Caching
In ODI, we leverage on the Spark caching mechanism by providing two additional Spark base KM options.
Cache data: If this option set to true a storage invocation is added to the generated pyspark code of the component.
Cache storage level: This option is hidden if cache data is set to false.
Repartitioning
The number of partitions initially determined by the data block and if the source is HDFS file, the platform runs the Spark application has more available slots for running tasks than number of partitions is loaded, then the platform resource is not fully used.
The repartition can be done in any step of the whole process, it can be done immediately after data is loaded from source or after processing of filter component. In ODI there are Spark base KM options for you to decide whether and where to do repartition.
Repartition
: If this option set to true, repartition is applied after the transformation of component.Level of Parallelism
: Number of partitions and the default is 0.Sort Partitions: If this option is set to true, partitions are sorted by key and the key is defined by a Lambda function.
Partitions Sort Order: Ascending or descending. Default is ascending.
Partition Key Function: User defined key of partitions and the key definition must be a comma separated column list.
Partition Function: User defined partition Lambda function. Default value is a pyspark defined hash function portable_hash
, which simple compute a hash base on entire row of RDD.
A Kafka cluster consists of one to many Kafka brokers handling and storing messages. Messages are organized into topics and physically broken down into topic partitions. Kafka producers connect to a cluster and feed messages into a topic. Kafka consumers connect to a cluster and receive messages from a topic.
All messages on a specific topic need not have the same message format, it is good practice to use only a single message format per topic. Kafka is integrated into ODI as a new technology.