This chapter explains the integration strategies used in mappings. These strategies are implemented in the Integration Knowledge Modules.
This chapter includes the following sections:
An integration process is always needed in a mapping. This process integrates data from the source or loading tables into the target datastore, using a temporary integration table.
An integration process uses an integration strategy which defines the steps required in the integration process. Example of integration strategies are:
Append: Optionally delete all records in the target datastore and insert all the flow into the target.
Control Append: Optionally delete all records in the target datastore and insert all the flow into the target. This strategy includes an optional flow control.
Incremental Update: Optionally delete all records in the target datastore. Identify new and existing records by comparing the flow with the target, then insert new records and update existing records in the target. This strategy includes an optional flow control.
Slowly Changing Dimension: Implement a Type 2 Slowly Changing Dimension, identifying fields that require a simple update in the target record when change, fields that require to historize the previous record state.
This phase may involve one single server, when the staging area and the target are located in the same data server, on two servers when the staging area and target are on different servers.
The integration process depends strongly on the strategy being used.
The following elements are used in the integration process:
An integration table (also known as the flow table) is sometimes needed to stage data after all staging area transformations are made. This loading table is named after the target table, prefixed with I$. This integration table is the image of the target table with extra fields required for the strategy to be implemented. The data in this table is flagged, transformed or checked before being integrated into the target table.
The source and/or loading tables (created by the LKM). The integration process loads data from these tables into the integration table or directly into the target tables.
Check Knowledge Module. The IKM may initiate a flow check phase to check the data in the integration table against some of the constraints of the target table. Invalid data is removed from the integration table (removed from the flow).
Mapping metadata, such as Insert, Update, UD1, etc., or model metadata such as the Slowly Changing Dimension behavior are used at integration phase to parameterize attribute-level behavior in the integration strategies.
A typical integration process works in the following way:
Create a temporary integration table if needed. For example, an update flag taking value I or U to identify which of the rows are to be inserted or updated.
Load data from the source and loading tables into this integration table, executing those of the transformations (joins, filters, mapping) specified on the staging area.
Perform some transformation on the integration table to implement the integration strategy. For example, compare the content of the integration table with the target table to set the update flag.
Modify the content Load data from the integration table into the target table.
The following sections explain some of the integration strategies used in Oracle Data Integrator. They are grouped into two families:
These strategies are used when the staging area schema is located in the same data server as the target table schema. In this configuration, complex integration strategies can take place
This strategy simply inserts the incoming data flow into the target datastore, possibly deleting the content of the target beforehand.
This integration strategy includes the following steps:
Delete (or truncate) all records from the target table. This step usually depends on a KM option.
Transform and insert data from sources located on the same server and from loading tables in the staging area. When dealing with remote source data, LKMs will have already prepared loading tables. Sources on the same server can be read directly. The integration operation will be a direct INSERT/SELECT statement leveraging containing all the transformations performed on the staging area in the SELECT clause and on all the transformation on the target in the INSERT clause.
Commit the Transaction. The operations performed on the target should be done within a transaction and committed after they are all complete. Note that committing is typically triggered by a KM option called COMMIT.
The same integration strategy can be obtained by using the Control Append strategy and not choosing to activate flow control.
In the Append strategy, flow data is simply inserted in the target table without any flow control. This approach can be improved by adding extra steps that will store the flow data in an integration table ("I$"), then call the CKM to isolate erroneous records in the error table ("E$").
This integration strategy includes the following steps:
Drop (if it exists) and create the integration table in the staging area. This is created with the same attributes as the target table so that it can be passed to the CKM for flow control.
Insert data in the loading table from the sources and loading tables using a single INSERT/SELECT statement similar to the one loading the target in the append strategy.
Call the CKM for flow control. The CKM will evaluate every constraint defined for the target table on the integration table data. It will create an error table and insert the erroneous records into this table. It will also remove erroneous records from the integration table.
After the CKM completes, the integration table will only contain valid records. Inserting them in the target table can then be done safely.
Remove all records from the target table. This step can be made dependent on an option value set by the designer of the mapping.
Append the records from the integration table to the target table in a single INSERT/SELECT statement.
Commit the transaction.
Drop the temporary integration table.
In some cases, it is useful to recycle errors from previous runs so that they are added to the flow and applied again to the target. This method can be useful for example when receiving daily sales transactions that reference product IDs that may not exist. Suppose that a sales record is rejected in the error table because the referenced product ID does not exist in the product table. This happens during the first run of the mapping. In the meantime the missing product ID is created by the data administrator. Therefore the rejected record becomes valid and should be re-applied to the target during the next execution of the mapping.
This mechanism implements IKMs by an extra task that inserts all the rejected records of the previous executions of this mapping from the error table into integration table. This operation is made prior to calling the CKM to check the data quality, and is conditioned by a KM option usually called RECYCLE_ERRORS.
The Incremental Update strategy is used to integrate data in the target table by comparing the records of the flow with existing records in the target according to a set of attributes called the "update key". Records that have the same update key are updated when their associated data is not the same. Those that don't yet exist in the target are inserted. This strategy is often used for dimension tables when there is no need to keep track of the records that have changed.
The challenge with such IKMs is to use set-oriented SQL based programming to perform all operations rather than using a row-by-row approach that often leads to performance issues. The most common method to build such strategies often relies on the integration table ("I$") which stores the transformed execution units. This method is described below:
Drop (if it exists) and create the integration table in the staging area. This is created with the same attributes as the target table so that it can be passed to the CKM for flow control. It also contains an IND_UPDATE
attribute that is used to flag the records that should be inserted ("I") and those that should be updated ("U").
Transform and insert data in the loading table from the sources and loading tables using a single INSERT/SELECT statement. The IND_UPDATE
attribute is set by default to "I".
Recycle the rejected records from the previous run to the integration table if the RECYCLE_ERROR KM option is selected.
Call the CKM for flow control. The CKM will evaluate every constraint defined for the target table on the integration table data. It will create an error table and insert the erroneous records into this table. It will also remove erroneous records from the integration table.
Update the integration table to set the IND_UPDATE
flag to "U" for all the records that have the same update key values as the target ones. Therefore, records that already exist in the target will have a "U" flag. This step is usually an UPDATE/SELECT statement.
Update the integration table again to set the IND_UPDATE
attribute to "N" for all records that are already flagged as "U" and for which the attribute values are exactly the same as the target ones. As these flow records match exactly the target records, they don't need to be used to update the target data.
After this step, the integration table is ready for applying the changes to the target as it contains records that are flagged:
"I": these records should be inserted into the target.
"U": these records should be used to update the target.
"N": these records already exist in the target and should be ignored.
Update the target with records from the integration table that are flagged "U". Note that the update statement is typically executed prior to the INSERT statement to minimize the volume of data manipulated.
Insert records in the integration table that are flagged "I" into the target.
Commit the transaction.
Drop the temporary integration table.
This approach can be optimized depending on the underlying database. The following examples illustrate such optimizations:
With Teradata, it may be more efficient to use a left outer join between the flow data and the target table to populate the integration table with the IND_UPDATE
attribute already set properly.
With Oracle, it may be more efficient in some cases to use a MERGE INTO statement on the target table instead of an UPDATE then INSERT.
The update key should always be unique. In most cases, the primary key will be used as an update key. The primary key cannot be used, however, when it is automatically calculated using an increment such as an identity attribute, a rank function, or a sequence. In this case an update key based on attributes present in the source must be used.
When comparing data values to determine what should not be updated, the join between the integration table and the target table is expressed on each attribute as follows:
<target_table>.AttributeN = <loading_table>.AttributeN or (<target_table> is null and <loading_table>.AttributeN is null)
This is done to allow comparison between null values, so that a null value matches another null value. A more elegant way of writing it would be to use the coalesce function. Therefore the WHERE predicate could be written this way:
<%=odiRef.getColList("","coalesce(" + odiRef.getTable("L", "INT_NAME", "A") + ".[COL_NAME], 0) = coalesce(T.[COL_NAME], 0)", " \nand\t", "", "((UPD and !TRG) and !UK) ")%>
Attribute-Level Insert/Update Behavior
Attributes updated by the UPDATE statement are not the same as the ones used in the INSERT statement. The UPDATE statement uses selector "UPD and not UK" to filter only those attributes mappings that are marked as "Update" in the mapping and that do not belong to the update key. The INSERT statement uses selector "INS" to retrieve the attribute mappings that are marked as "insert" in the mapping.
It is important that the UPDATE and the INSERT statements on the target belong to the same transaction. Should any of them fail, no data will be inserted or updated in the target.
Type 2 Slowly Changing Dimension (SCD) is a strategy used for loading data warehouses. It is often used for loading dimension tables, in order to keep track of changes on specific attributes. A typical slowly changing dimension table would contain the flowing attributes:
A surrogate key. This is usually a numeric attribute containing an automatically-generated number (using an identity attribute, a rank function or a sequence).
A natural key. This is the list of attributes that represent the primary key of the operational system.
Attributes that one must overwrite on change.
Attributes that require to add row on change.
A starting timestamp attribute indicating when the record was created in the data warehouse
An ending timestamp attribute indicating when the record became obsolete (closing date)
A current record flag indicating whether the record is the actual one (1) or an old one (0)
The following example illustrate the Slowly Changing Dimension behavior.
In the operational system, a product is defined by its ID that acts as a primary key. Every product has a name, a size, a supplier and a family. In the Data Warehouse a new version of this product is stored whenever the supplier or the family is updated in the operational system.
Figure 6-1 Type 2 Slow Changing Dimensions Example
In this example, the product dimension is first initialized in the Data Warehouse on March 12, 2006. All the records are inserted and are assigned a calculated surrogate key as well as a fake ending date set to January 1, 2400. As these records represent the current state of the operational system, their current record flag is set to 1. After the first load, the following changes happen in the operational system:
The supplier is updated for product P1
The family is updated for product P2
The name is updated for product P3
Product P5 is added
These updates have the following impact on the data warehouse dimension:
The update of the supplier of P1 is translated into the creation of a new current record (Surrogate Key 5) and the closing of the previous record (Surrogate Key 1)
The update of the family of P2 is translated into the creation of a new current record (Surrogate Key 6) and the closing of the previous record (Surrogate Key 2)
The update of the name of P3 simply updates the target record with Surrogate Key 3
The new product P5 is translated into the creation of a new current record (Surrogate Key 7).
To create a Knowledge Module that implements this behavior, it is necessary to know which attributes act as a surrogate key, a natural key, a start date etc. Oracle Data Integrator stores this information in Slowly Changing Dimension Behavior field in the Description tab for every attribute in the model.
When populating such a datastore in a mapping, the IKM has access to this metadata using the SCD_xx selectors on the getColList() substitution method.
The way Oracle Data Integrator implements Type 2 Slowly Changing Dimensions is described below:
Drop (if it exists) and create the integration table in the staging area.
Insert the flow data in the integration table using only mappings that apply to the natural key, overwrite on change and add row on change attributes. Set the starting timestamp to the current date and the ending timestamp to a constant.
Recycle previous rejected records
Call the CKM to perform a data quality check on the flow
Flag the records in the integration table to 'U' when the natural key and the add row on change columns have not changed compared to the current records of the target.
Update the target with the columns flagged overwrite on change by using the integration table content filtered on the 'U' flag.
Close old records - those for which the natural key exists in the integration table, and set their current record flag to 0 and their ending timestamp to the current date
Insert the new changing records with their current record flag set to 1
Drop the integration table.
Again, this approach can be adapted. There may be some cases where the SQL produced requires further tuning and optimization.
These strategies are used when the staging area cannot be located on the same data server as the target datastore. This configuration is mainly used for data servers with no transformation capabilities (Files, for example). In this configuration, only simple integration strategies are possible
There are some cases when the source is a single file that can be loaded directly into the target table using the most efficient method. By default, Oracle Data Integrator suggests to locate the staging area on the target server, use a LKM to stage the source file in a loading table and then use an IKM to integrate the loaded data to the target table.
If the source data is not transformed, the loading phase is not necessary.
In this situation you would use an IKM that directly loads the file data to the target: This requires setting the staging area on the source file logical schema. By doing this, Oracle Data Integrator will automatically suggest to use a "Multi-Connection" IKM that moves data between a remote staging area and the target.
Such an IKM would use a loader, and include the following steps:
Generate the appropriate load utility script
Run the loader utility
An example of such KM is the IKM File to Teradata (TTU).
When using a staging area different from the target and when setting this staging area to an RDBMS, it is possible to use an IKM that moves the transformed data from the staging area to the remote target. This type of IKM is similar to a LKM and follows the same rules.
The steps when using the agent are usually:
Delete (or truncate) all records from the target table. This step usually depends on a KM option.
Insert the data from the staging area to the target. This step has a SELECT statement in the "Command on Source" tab that will be executed on the staging area. The INSERT statement is written using bind variables in the "Command on Target" tab and will be executed for every batch on the target table.
The IKM SQL to SQL Append is a typical example of such KM.
Variation of this strategy use loaders or database specific methods for loading data from the staging area to the target instead of the agent.
When the target datastore is a file or JMS queue or topic the staging area is set on a different location than the target. Therefore, if you want to target a file or queue datastore you will have to use a "Multi-Connection" IKM that will integrate the transformed data from your staging area to this target. The method to perform this data movement depends on the target technology. For example, it is possible to use the agent or specific features of the target (such as a Java API)
Typical steps of such an IKM will include:
Reset the target file or queue made dependent on an option
Unload the data from the staging area to the file or queue
This section provides example of integration strategies and customizations.
The simplest strategy for integrating data in an existing target table, provided that all source data is already in the staging area is to replace and insert the records in the target. Therefore, the simplest IKM would be composed of 2 steps:
Remove all records from the target table. This step can be made dependent on an option set by the designer of the mapping.
Transform and insert source records from all datasets. When dealing with remote source data, LKMs will have already prepared loading tables with pre-transformed result sets. If the mapping uses source data sets on the same server as the target (and the staging area as well), they will be joined to the other loading tables. Therefore the integration operation will be a straight INSERT/SELECT statement leveraging all the transformation power of the target Teradata box.
The following example gives you the details of these steps:
This task deletes the data from the target table. This command runs in a transaction and is not committed. It is executed if the DELETE_ALL Knowledge Module option is selected.
delete from <%=odiRef.getTable("L","INT_NAME","A")%>
This task insert rows from the staging table into the target table. This command runs in the same transaction as all operations made on the target and is not committed. A final Commit transaction command triggers the commit on the target.
Note that this commands selects the data from the different datasets defined for the mapping. Using a for loop, it goes through all the datasets, generates for each dataset a SELECT query. These queries are merged using set-based operations (UNION, INTERSECT, etc.) and the resulting data flow is inserted into the target table.
insert into <%=odiRef.getTable("L","TARG_NAME","A")%> ( <%=odiRef.getColList("", "[COL_NAME]", ",\n\t", "", "((INS and !TRG) and REW)")%> <%=odiRef.getColList(",", "[COL_NAME]", ",\n\t", "", "((INS and TRG) and REW)")%> ) select <%=odiRef.getColList("", "[COL_NAME]", ",\n\t", "", "((INS and !TRG) and REW)")%> <%=odiRef.getColList(",", "[EXPRESSION]", ",\n\t", "", "((INS and TRG) and REW)")%> FROM ( <%for (int i=0; i < odiRef.getDataSetCount(); i++){%><%=odiRef.getDataSet(i, "Operator")%>select <%=odiRef.getPop("DISTINCT_ROWS")%> <%=odiRef.getColList(i,"", "[EXPRESSION] [COL_NAME]", ",\n\t", "", "((INS and !TRG) and REW)")%> from <%=odiRef.getFrom(i)%>where <% if (odiRef.getDataSet(i, "HAS_JRN").equals("1")) { %> JRN_FLAG <> 'D '<%} else {%> (1=1) <% } %><%=odiRef.getJoin(i)%><%=odiRef.getFilter(i)%><%=odiRef.getJrnFilter(i)%><%=odiRef.getGrpBy(i)%><%=odiRef.getHaving(i)%><%}%>)
A project requirements is to backup every data warehouse table prior to loading the current data. This can help restoring the data warehouse to its previous state in case of a major problem. The backup tables are called like the data table with a "_BCK" suffix.
A first solution to this requirement would be to develop mappings that would duplicate data from every target datastore to its corresponding backup one. These mappings would be triggered prior to the ones that would populate the data warehouse. Unfortunately, this solution would lead to significant development and maintenance effort as it requires the creation of an additional mapping for every target datastore. The number of mappings to develop and maintain would be at least doubled!
A simple solution would be to implement this behavior in the IKM used to populate the target datastores. This would be done using a single CREATE AS SELECT statement that creates and populates to the backup table right before modifying the target. Therefore, the backup operation becomes automatic and the developers would no longer need to worry about it.
This example shows how this behavior could be implemented in the IKM Oracle Incremental Update.
Before the Update Existing Rows and Insert New Rows tasks that modify the target, the following tasks are added.
This task drops the backup table.
Drop table <%=odiRef.getTable("L","TARG_NAME","A")%>_BCK
Some data warehousing projects could require keeping track of every insert or update operation done to target tables for regulatory compliance. This could help business analysts understand what happened to their data during a certain period of time.
Even if you can achieve this behavior by using the slowly changing dimension Knowledge Modules, it can also be done by simply creating a copy of the flow data before applying it to the target table.
Suppose that every target table has a corresponding tracking table with a "_RGG" suffix with all the data columns plus some additional regulatory compliance columns such as:
The Job Id
The Job Name
Date and time of the operation
The type of operation ("Insert" or "Update")
You would populate this table directly from the integration table after applying the inserts and updates to the target, and before the end of the IKM.
For example, in the case of the Oracle Incremental Update IKM, you would add the following tasks just after the Update Existing Rows and Insert New Rows tasks that modify the target.
This task loads data in the tracking table.
insert into <%=odiRef.getTable("L","TARG_NAME","A")%>_RGC(JOBID,JOBNAME,OPERATIONDATE,OPERATIONTYPE,<%=odiRef.getColList("", "[COL_NAME]", ",\n\t", "")%>)select <%=odiRef.getSession("SESS_NO")%> /* JOBID */,<%=odiRef.getSession("SESS_NAME")%> /* JOBNAME */,Current_timestamp /* OPERATIONDATE */,Case when IND_UPDATE = 'I' then 'Insert' else 'Update' end /* OPERATIONTYPE */,<%=odiRef.getColList("", "[COL_NAME]", ",\n\t", "")%>from <%=odiRef.getTable("L","INT_NAME","A")%>where IND_UPDATE <> 'N'
This customization could be extended of course by creating automatically the tracking table using the IKM if it does not exist yet.