2 Hadoop Data Integration Concepts

The chapter provides an introduction to the basic concepts of Hadoop Data integration using Oracle Data Integrator.

This chapter includes the following sections:

2.1 Hadoop Data Integration with Oracle Data Integrator

Typical processing in Hadoop includes data validation and transformations that are programmed as MapReduce jobs. Designing and implementing a MapReduce job requires expert programming knowledge. However, when you use Oracle Data Integrator, you do not need to write MapReduce jobs. Oracle Data Integrator uses Apache Hive and the Hive Query Language (HiveQL), a SQL-like language for implementing MapReduce jobs.

When you implement a big data processing scenario, the first step is to load the data into Hadoop. The data source is typically in Files or SQL databases.

After the data is loaded, you can validate and transform it by using HiveQL like you use SQL. You can perform data validation (such as checking for NULLS and primary keys), and transformations (such as filtering, aggregations, set operations, and derived tables). You can also include customized procedural snippets (scripts) for processing the data.

When the data has been aggregated, condensed, or processed into a smaller data set, you can load it into an Oracle database, other relational database, HDFS, HBase, or Hive for further processing and analysis. Oracle Loader for Hadoop is recommended for optimal loading into an Oracle database.

For more information, see Integrating Hadoop Data.

2.2 Generate Code in Different Languages with Oracle Data Integrator

By default, Oracle Data Integrator (ODI) uses HiveQL to implement the mappings. However, Oracle Data Integrator also lets you to implement the mappings using Pig Latin and Spark Python. Once your mapping is designed, you can either implement it using the default HiveQL, or choose to implement it using Pig Latin or Spark Python.

Support for Pig Latin and Spark Python in ODI is achieved through a set of component KMs that are specific to these languages. These component KMs are used only when a Pig data server or a Spark data server is used as the staging location for your mapping.

For example, if you use a Pig data server as the staging location, the Pig related KMs are used to implement the mapping and Pig Latin code is generated. Similarly, to generate Spark Python code, you must use a Spark data server as the staging location for your mapping.

Recommendation is to run Spark applications on yarn. Following this recommendation ODI only supports yarn-client and yarn-cluster mode execution and has introduced a runtime check.

In case you are using any other Spark deployment modes, which is not supported in ODI, the following dataserver property must be added to the Spark dataserver:
odi.spark.enableUnsupportedSparkModes = true

For more information about generating code in different languages and the Pig and Spark component KMs, see the following:

2.3 Leveraging Apache Oozie to execute Oracle Data Integrator Projects

Apache Oozie is a workflow scheduler system that helps you orchestrate actions in Hadoop. It is a server-based Workflow Engine specialized in running workflow jobs with actions that run Hadoop MapReduce jobs. Implementing and running Oozie workflow requires in-depth knowledge of Oozie.

However, Oracle Data Integrator does not require you to be an Oozie expert. With Oracle Data Integrator you can easily define and execute Oozie workflows.

Oracle Data Integrator allows you to automatically generate an Oozie workflow definition by executing an integration project (package, procedure, mapping, or scenario) on an Oozie engine. The generated Oozie workflow definition is deployed and executed into an Oozie workflow system. You can also choose to only deploy the Oozie workflow to validate its content or execute it at a later time.

Information from the Oozie logs is captured and stored in the ODI repository along with links to the Oozie UIs. This information is available for viewing within ODI Operator and Console.

For more information, see Executing Oozie Workflows.

2.4 Oozie Workflow Execution Modes

ODI provides the following two modes for executing the Oozie workflows:

  • TASK

    Task mode generates an Oozie action for every ODI task. This is the default mode.

    The task mode cannot handle the following:

    • KMs with scripting code that spans across multiple tasks.

    • KMs with transactions.

    • KMs with file system access that cannot span file access across tasks.

    • ODI packages with looping constructs.

  • SESSION

    Session mode generates an Oozie action for the entire session.

    ODI automatically uses this mode if any of the following conditions is true:

    • Any task opens a transactional connection.

    • Any task has scripting.

    • A package contains loops.

      Note that loops in a package are not supported by Oozie engines and may not function properly in terms of execution and/or session log content retrieval, even when running in SESSION mode.

Note:

This mode is recommended for most of the use cases.

By default, the Oozie Runtime Engines use the Task mode, that is, the default value of the OOZIE_WF_GEN_MAX_DETAIL property for the Oozie Runtime Engines is TASK.

You can configure an Oozie Runtime Engine to use Session mode, irrespective of whether the conditions mentioned above are satisfied or not. To force an Oozie Runtime Engine to generate session level Oozie workflows, set the OOZIE_WF_GEN_MAX_DETAIL property for the Oozie Runtime Engine to SESSION.

For more information, see Oozie Runtime Engine Properties.

2.5 Lambda Architecture

Lambda architecture is a data-processing architecture designed to handle massive quantities of data by taking advantage of both batch and stream processing methods.

In Lambda architecture, the data structure model is used with different technologies. For example, the data source for the batch implementation can be HDFS, whereas the streaming implementation can read data from Kafka. In ODI this is represented by using Generic Technologies like Generic File and Generic SQL.

2.6 Spark Checkpointing

A streaming application must operate 24/7 and hence should be resilient to failures. Spark Streaming needs to checkpoint information to a fault tolerant storage system so that it can recover from failures.

Checkpointing is enabled for applications recovering from failures of the driver running the application. Checkpointing only ensures that the Spark application will restart from where it left if a checkpoint is found.

For additional information on checkpointing refer to Spark Streaming Programming Guide.

2.7 Spark Windowing and Stateful Aggregation

Spark's Windowing feature allows aggregation (and other transformations) to be applied not just to the current RDD, but also include data from a number of previous RDDs (window duration).

The Spark KMs support batch as well as streaming transformations. While the Python code for non-streaming operates on RDD objects, the streaming code works on DStream objects. Aggregation in batch mode is simple: there is a single set of input records (RDD), which are aggregated to form the output data, which is then written into some target. In streaming mode the continuously incoming data is discretized into a flow of RDDs. By default each RDD is aggregated independently.

Spark windowing works well for calculating things like running sum or running averages. But it comes with two restrictions:
  • Older RDDs must be retained

  • Data falling into the window is recalculated for every new RDD.

This is the reason why windowing is not suitable for aggregation across an entire data stream. This can only be achieved by stateful aggregation.
Windowing enabled KMs have the following optional KM Options:
  • Window Duration: Time in seconds RDDs are combined to produce the RDDs of the windowed DStream

  • Sliding Interval: Interval at which the window operation is performed.

Windowing is supported by:
  • XKM Spark Aggregation

  • XKM Spark Join

For additional information, refer to Spark Streaming Programming Guide.

Stateful Aggregation

When data must be aggregated across all data of a stream, stateful aggregation is required. In stateful aggregation Spark builds called state stream containing the aggregated values for all keys. For every incoming RDD this state is updated, for example aggregated sums are updated based on new incoming data.

By default a state stream will output all stored values for every incoming RDD. This is useful in case the stream output is a file and the file is expected to always hold the entire set of aggregate values.

Stateful processing is supported by:
  • XKM Spark Aggregate

  • XKM Spark Lookup

2.8 Spark Repartitioning and Caching

Caching

In ODI, we leverage on the Spark caching mechanism by providing two additional Spark base KM options.

  • Cache data: If this option set to true a storage invocation is added to the generated pyspark code of the component.

  • Cache storage level: This option is hidden if cache data is set to false.

Repartitioning

The number of partitions initially determined by the data block and if the source is HDFS file, the platform runs the Spark application has more available slots for running tasks than number of partitions is loaded, then the platform resource is not fully used.

The repartition can be done in any step of the whole process, it can be done immediately after data is loaded from source or after processing of filter component. In ODI there are Spark base KM options for you to decide whether and where to do repartition.

  • Repartition

    : If this option set to true, repartition is applied after the transformation of component.
  • Level of Parallelism

    : Number of partitions and the default is 0.
  • Sort Partitions: If this option is set to true, partitions are sorted by key and the key is defined by a Lambda function.

  • Partitions Sort Order: Ascending or descending. Default is ascending.

  • Partition Key Function: User defined key of partitions and the key definition must be a comma separated column list.

  • Partition Function: User defined partition Lambda function. Default value is a pyspark defined hash function portable_hash, which simple compute a hash base on entire row of RDD.

2.9 Kafka Integration with Oracle Data Integrator

A Kafka cluster consists of one to many Kafka brokers handling and storing messages. Messages are organized into topics and physically broken down into topic partitions. Kafka producers connect to a cluster and feed messages into a topic. Kafka consumers connect to a cluster and receive messages from a topic.

All messages on a specific topic need not have the same message format, it is good practice to use only a single message format per topic. Kafka is integrated into ODI as a new technology.