Skip Headers
Oracle® Fusion Middleware Knowledge Module Developer's Guide for Oracle Data Integrator
11g Release 1 (11.1.1)

Part Number E12645-04
Go to Documentation Home
Home
Go to Book List
Book List
Go to Table of Contents
Contents
Go to Master Index
Master Index
Go to Feedback page
Contact Us

Go to previous page
Previous
Go to next page
Next
PDF · Mobi · ePub

4 Data Integrity Strategies

This chapter explains the data integrity strategies used for performing flow and static checks. These strategies are implemented in the Check Knowledge Modules.

This chapter contains the following sections:

4.1 Data Integrity Check Process

Data Integrity Check Process checks is activated in the following cases:

In both those cases, a CKM is in charge of checking the data quality of data according to a predefined set of constraints. The CKM can be used either to check existing data when used in a "static control" or to check flow data when used in a "flow control". It is also in charge of removing the erroneous records from the checked table if specified.

In the case of a static control, the CKM used is defined in the model. In the case of a flow control, it is specified for the interface.

4.1.1 Check Knowledge Module Overview

Standard CKMs maintain 2 different types of tables:

  • A single summary table named SNP_CHECK_TAB for each data server, created in the work schema of the default physical schema of the data server. This table contains a summary of the errors for every table and constraint. It can be used, for example, to analyze the overall data quality of a model.

  • An error table named E$_<datastore name> for every datastore that was checked. The error table contains the actual records rejected by data quality processes (static and flow controls) launched for this table.

A standard CKM is composed of the following steps:

  • Drop and create the summary table. The DROP statement is executed only if the designer requires it for resetting the summary table. The CREATE statement is always executed but the error is tolerated if the table already exists.

  • Remove the summary records from the previous run from the summary table

  • Drop and create the error table. The DROP statement is executed only if the designer requires it for recreating the error table. The CREATE statement is always executed but error is tolerated if the table already exists.

  • Remove rejected records from the previous run from the error table

  • Reject records that violate the primary key constraint.

  • Reject records that violate any alternate key constraint

  • Reject records that violate any foreign key constraint

  • Reject records that violate any check condition constraint

  • Reject records that violate any mandatory column constraint

  • Remove rejected records from the checked table if required

  • Insert the summary of detected errors in the summary table.

CKM commands should be tagged to indicate how the code should be generated. The tags can be:

  • "Primary Key": The command defines the code needed to check the primary key constraint

  • "Alternate Key": The command defines the code needed to check an alternate key constraint. During code generation, Oracle Data Integrator will use this command for every alternate key

  • "Join": The command defines the code needed to check a foreign key constraint. During code generation, Oracle Data Integrator will use this command for every foreign key

  • "Condition": The command defines the code needed to check a condition constraint. During code generation, Oracle Data Integrator will use this command for every check condition

  • "Mandatory": The command defines the code needed to check a mandatory column constraint. During code generation, Oracle Data Integrator will use this command for mandatory column

  • "Remove Errors": The command defines the code needed to remove the rejected records from the checked table.

4.1.2 Error Tables Structures

This section describes the typical structure of the Error and Summary Tables.

4.1.2.1 Error Table Structure

The E$ error table has the list of columns described in the following table:

Columns Description

[Columns of the checked table]

The error table contains all the columns of the checked datastore.

ERR_TYPE

Type of error:

  • 'F' when the datastore is checked during flow control

  • 'S' when the datastore is checked using static control

ERR_MESS

Error message related to the violated constraint

CHECK_DATE

Date and time when the datastore was checked

ORIGIN

Origin of the check operation. This column is set either to the datastore name or to an interface name and ID depending on how the check was performed.

CONS_NAME

Name of the violated constraint.

CONS_TYPE

Type of the constraint:

  • 'PK': Primary Key

  • 'AK': Alternate Key

  • 'FK': Foreign Key

  • 'CK': Check condition

  • 'NN': Mandatory column


4.1.2.2 Summary Table Structure

The SNP_CHECK table has the list of columns described in the following table:

Column Description

ODI_CATALOG_NAME

Catalog name of the checked table, where applicable

ODI_SCHEMA_NAME

Schema name of the checked table, where applicable

ODI_RESOURCE_NAME

Resource name of the checked table

ODI_FULL_RES_NAME

Fully qualified name of the checked table. For example <catalog>.<schema>.<table>

ODI_ERR_TYPE

Type of error:

  • 'F' when the datastore is checked during flow control

  • 'S' when the datastore is checked using static control

ODI_ERR_MESS

Error message

ODI_CHECK_DATE

Date and time when the datastore was checked

ODI_ORIGIN

Origin of the check operation. This column is set either to the datastore name or to an interface name and ID depending on how the check was performed.

ODI_CONS_NAME

Name of the violated constraint.

ODI_CONS_TYPE

Type of constraint:

  • 'PK': Primary Key

  • 'AK': Alternate Key

  • 'FK': Foreign Key

  • 'CK': Check condition

  • 'NN': Mandatory column (Not Null)

ODI_ERR_COUNT

Total number of records rejected by this constraint during the check process

ODI_SESS_NO

ODI session number

ODI_PK

Unique identifier for this table, where appicable

   

4.2 Case Studies

This section provides examples of data integrity check strategies.

4.2.1 Oracle CKM

The CKM Oracle is a typical example of a data integrity check.

The commands below are extracted from the CKM for Oracle and provided as examples. You can review the code of this knowledge module by editing it in Oracle Data Integrator Studio.

4.2.1.1 Drop Check Table

This task drops the error summary table. This command runs only if the DROP_CHECK_TABLE is set to Yes, and has the Ignore Errors flag activated. It will not stop the CKM if the summary table is not found.

Command on Target (Oracle)

drop table <%=odiRef.getTable("L","CHECK_NAME","W")%> <% if (new Integer(odiRef.getOption( "COMPATIBLE" )).intValue() >= 10 ) { out.print( "purge" ); }; %>

4.2.1.2 Create Check Table

This task creates the error summary table. This command always runs and has the Ignore Errors flag activated. It will not stop the CKM if the summary table already exist.

Command on Target (Oracle)

...
create table <%=odiRef.getTable("L","CHECK_NAME","W")%>
(
        CATALOG_NAME      <%=odiRef.getDataType("DEST_VARCHAR", "100", "")%>
<%=odiRef.getInfo("DEST_DDL_NULL")%> ,
        SCHEMA_NAME      <%=odiRef.getDataType("DEST_VARCHAR", "100", "")%>
<%=odiRef.getInfo("DEST_DDL_NULL")%> ,
        RESOURCE_NAME    <%=odiRef.getDataType("DEST_VARCHAR", "100", "")%>
<%=odiRef.getInfo("DEST_DDL_NULL")%>,
        FULL_RES_NAME      <%=odiRef.getDataType("DEST_VARCHAR", "100", "")%>
<%=odiRef.getInfo("DEST_DDL_NULL")%>,
        ERR_TYPE          <%=odiRef.getDataType("DEST_VARCHAR", "1", "")%> <%=odiRef.getInfo("DEST_DDL_NULL")%>,
        ERR_MESS          <%=odiRef.getDataType("DEST_VARCHAR", "250", "")%>
<%=odiRef.getInfo("DEST_DDL_NULL")%> ,
        CHECK_DATE       <%=odiRef.getDataType("DEST_DATE", "", "")%> <%=odiRef.getInfo("DEST_DDL_NULL")%>,
        ORIGIN           <%=odiRef.getDataType("DEST_VARCHAR", "100", "")%> <%=odiRef.getInfo("DEST_DDL_NULL")%>,
        CONS_NAME        <%=odiRef.getDataType("DEST_VARCHAR", "35", "")%>
<%=odiRef.getInfo("DEST_DDL_NULL")%>,
        CONS_TYPE                 <%=odiRef.getDataType("DEST_VARCHAR", "2", "")%>
<%=odiRef.getInfo("DEST_DDL_NULL")%>,
        ERR_COUNT                 <%=odiRef.getDataType("DEST_NUMERIC", "10", "")%>
<%=odiRef.getInfo("DEST_DDL_NULL")%>
)
...

4.2.1.3 Create Error Table

This task creates the error (E$) table. This command always runs and has the Ignore Errors flag activated. It will not stop the CKM if the error table already exist.

Note the use of the getCollist method to add the list of columns from the checked to this table structure.

Command on Target (Oracle)

...
create table <%=odiRef.getTable("L","ERR_NAME", "W")%>
(
        ODI_ROW_ID                UROWID,
        ODI_ERR_TYPE               <%=odiRef.getDataType("DEST_VARCHAR", "1", "")%>
<%=odiRef.getInfo("DEST_DDL_NULL")%>,
        ODI_ERR_MESS              <%=odiRef.getDataType("DEST_VARCHAR", "250", "")%>
<%=odiRef.getInfo("DEST_DDL_NULL")%>,
        ODI_CHECK_DATE     <%=odiRef.getDataType("DEST_DATE", "", "")%>
<%=odiRef.getInfo("DEST_DDL_NULL")%>, 
        <%=odiRef.getColList("", "[COL_NAME]\t[DEST_WRI_DT] " + odiRef.getInfo("DEST_DDL_NULL"), ",\n\t", "", "")%>,
        ODI_ORIGIN               <%=odiRef.getDataType("DEST_VARCHAR", "100", "")%>
<%=odiRef.getInfo("DEST_DDL_NULL")%>,
        ODI_CONS_NAME      <%=odiRef.getDataType("DEST_VARCHAR", "35", "")%>
<%=odiRef.getInfo("DEST_DDL_NULL")%>,
        ODI_CONS_TYPE              <%=odiRef.getDataType("DEST_VARCHAR", "2", "")%>
<%=odiRef.getInfo("DEST_DDL_NULL")%>,
        ODI_PK                    <%=odiRef.getDataType("DEST_VARCHAR", "32", "")%> PRIMARY KEY,  ODI_SESS_NO               <%=odiRef.getDataType("DEST_VARCHAR", "19", "")%>
)
...

4.2.1.4 Insert PK Errors

This task inserts into the error (E$) table the errors detected while checking a primary key. This command always runs, has the Primary Key checkbox active and has Log Counter set to Error to count these records as errors.

Note:

When using a CKM to perform flow control from an interface, you can define the maximum number of errors allowed. This number is compared to the total number of records returned by every command in the CKM of which the Log Counter is set to Error.

Note the use of the getCollist method to insert into the error table the whole record being checked and the use of the getPK and getInfo method to retrieve contextual information.

Command on Target (Oracle)

insert into <%=odiRef.getTable("L","ERR_NAME", "W")%>
(
        ODI_PK,
        ODI_SESS_NO,
        ODI_ROW_ID,
        ODI_ERR_TYPE,
        ODI_ERR_MESS,
        ODI_ORIGIN,
        ODI_CHECK_DATE,
        ODI_CONS_NAME,
        ODI_CONS_TYPE,
        <%=odiRef.getColList("", "[COL_NAME]", ",\n\t", "", "MAP")%>
)
select  SYS_GUID(),
        <%=odiRef.getSession("SESS_NO")%>, 
        rowid,
        '<%=odiRef.getInfo("CT_ERR_TYPE")%>', 
        '<%=odiRef.getPK("MESS")%>',
        '<%=odiRef.getInfo("CT_ORIGIN")%>',
        <%=odiRef.getInfo("DEST_DATE_FCT")%>,
        '<%=odiRef.getPK("KEY_NAME")%>',
        'PK',   
        <%=odiRef.getColList("", odiRef.getTargetTable("TABLE_ALIAS")+".[COL_NAME]", ",\n\t", "", "MAP")%>
from     <%=odiRef.getTable("L", "CT_NAME", "A")%> <%=odiRef.getTargetTable("TABLE_ALIAS")%>
where   exists  (
                select  <%=odiRef.getColList("", "SUB.[COL_NAME]", ",\n\t\t\t", "", "PK")%>
                from    <%=odiRef.getTable("L","CT_NAME","A")%> SUB
                where   <%=odiRef.getColList("", "SUB.[COL_NAME]="+odiRef.getTargetTable("TABLE_ALIAS")+".[COL_NAME]", "\n\t\t\tand ", "", "PK")%>
                group by        <%=odiRef.getColList("", "SUB.[COL_NAME]", ",\n\t\t\t", "", "PK")%>            having  count(1) > 1
                )
<%=odiRef.getFilter()%>

4.2.1.5 Delete Errors from Controlled Table

This task removed from the controlled table (static control) or integration table (flow control) the rows detected as erroneous.

This task is always executed and has the Remove Errors option selected.

Command on Target (Oracle)

delete from       <%=odiRef.getTable("L", "CT_NAME", "A")%>  T
where    exists         (
                select   1
                from    <%=odiRef.getTable("L","ERR_NAME", "W")%> E
                where ODI_SESS_NO = <%=odiRef.getSession("SESS_NO")%>
                and T.rowid = E.ODI_ROW_ID
                )

4.2.2 Dynamically Create Non-Existing References

The following use case describes an example of customization that can be performed on top of an existing CKM.

4.2.2.1 Use Case

When loading a data warehouse, you may have records referencing data from other tables, but the referenced records do not yet exist.

Suppose, for example, that you receive daily sales transactions records that reference product SKUs. When a product does not exist in the products table, the default behavior of the standard CKM is to reject the sales transaction record into the error table instead of loading it into the data warehouse. However, to meet the requirements of your project you want to load this sales record into the data warehouse and create an empty product on the fly to ensure data consistency. The data analysts would then simply analyze the error tables and complete the missing information for products that were automatically added to the products table.

The following sequence illustrates this example:

  1. The source flow data is staged by the IKM in the "I$_SALES" table to load the SALES table. The IKM calls the CKM to have it check the data quality.

  2. The CKM checks every constraint including the FK_SALES_PRODUCTS foreign key defined between the target SALES table and the PRODUCTS Table. It rejects record with SALES_ID='4' in the error table as referenced product with PRODUCT_ID="P25" doesn't exist in the products table.

  3. The CKM automatically inserts the missing "P25" product reference in the products table and assigns an '<unknown>' value to the PRODUCT_NAME. All other columns are set to null or default values.

  4. The CKM does not remove the rejected record from the source flow I$ table, as it became consistent

  5. The IKM writes the flow data to the target

In the sequence above, steps 3 and 4 differ from the standard CKM and need to be customized.

4.2.2.2 Discussion

To implement such a CKM, you will notice that some information is missing in the Oracle Data Integrator default metadata. We would need the following:

  • A new flexfield called REF_TAB_DEF_COL on the Reference object containing the column of the referenced table that must be populated with the '<unknown>' value (PRODUCT_NAME, in our case)

  • A new column (ODI_AUTO_CREATE_REFS) in the error table to indicate whether an FK error needs to automatically create the missing reference or not. This flag will be populated while detecting the FK errors.

  • A new flexfields ca lled AUTO_CREATE_REFS on the "Reference" object, that will state whether a constraint should automatically cause missing references creation. See the Developer's Guide for Oracle Data Integrator for more information about Flex Fields.

Now that we have all the required metadata, we can start enhancing the default CKM to meet our requirements. The steps of the CKM will therefore be (changes are highlighted in bold font):

  • Drop and create the summary table.

  • Remove the summary records of the previous run from the summary table

  • Drop and create the error table. Add the extra ODI_AUTO_CREATE_REFS column to the error table.

  • Remove rejected records from the previous run from the error table

  • Reject records that violate the primary key constraint.

  • Reject records that violate each alternate key constraint

  • Reject records that violate each foreign key constraint, and store the value of the AUTO_CREATE_REFS flexfield in the ODI_AUTO_CREATE_REFS column.

  • For every foreign key error detected, if the ODI_AUTO_CREATE_REFS is set to "yes", insert missing references in the referenced table.

  • Reject records that violate each check condition constraint

  • Reject records that violate each mandatory column constraint

  • Remove rejected records from the checked table if required. Do not remove records for which the constraint behavior is set to Yes

  • Insert the summary of detected errors in the summary table.

4.2.2.3 Implementation Details

The following command modifications are performed to implement the required changes to the CKM. The changes are highlighted in bold in the code.

4.2.2.3.1 Create Errors Table

The task is modified to create the new ODI_AUTO_CREATE_REFS column into the error table.

Command on Target (Oracle)

...
create table <%=odiRef.getTable("L","ERR_NAME", "W")%>
(

ODI_AUTO_CREATE_REFS <%=odiRef.getDataType("DEST_VARCHAR", "3", "")%> 
ODI_ROW_ID                UROWID,
        ODI_ERR_TYPE               <%=odiRef.getDataType("DEST_VARCHAR", "1", "")%>
<%=odiRef.getInfo("DEST_DDL_NULL")%>,
        ODI_ERR_MESS               <%=odiRef.getDataType("DEST_VARCHAR", "250", "")%>
<%=odiRef.getInfo("DEST_DDL_NULL")%>,
        ODI_CHECK_DATE    <%=odiRef.getDataType("DEST_DATE", "", "")%>
<%=odiRef.getInfo("DEST_DDL_NULL")%>, 
        <%=odiRef.getColList("", "[COL_NAME]\t[DEST_WRI_DT] " + odiRef.getInfo("DEST_DDL_NULL"), ",\n\t", "", "")%>,
        ODI_ORIGIN               <%=odiRef.getDataType("DEST_VARCHAR", "100", "")%>
<%=odiRef.getInfo("DEST_DDL_NULL")%>,
        ODI_CONS_NAME      <%=odiRef.getDataType("DEST_VARCHAR", "35", "")%>
<%=odiRef.getInfo("DEST_DDL_NULL")%>,
        ODI_CONS_TYPE             <%=odiRef.getDataType("DEST_VARCHAR", "2", "")%>
<%=odiRef.getInfo("DEST_DDL_NULL")%>,
        ODI_PK                   <%=odiRef.getDataType("DEST_VARCHAR", "32", "")%> PRIMARY KEY,
        ODI_SESS_NO                <%=odiRef.getDataType("DEST_VARCHAR", "19", "")%>
)
...


4.2.2.3.2 Insert FK Errors

The task is modified to take into account the new ODI_AUTO_CREATE_REFS column and load it with the content of the flexfield defined on the FK to indicate whether this constraint should automatically create missing references. Note the use of the getFK method to retrieve the value of the AUTO_CREATE_REFS flexfield.

Command on Target (Oracle)

...
insert into <%=odiRef.getTable("L","ERR_NAME", "W")%>
(
ODI_AUTO_CREATE_REFS,    
ODI_PK,
        ODI_SESS_NO,
        ODI_ROW_ID,
        ODI_ERR_TYPE,
        ODI_ERR_MESS,
        ODI_CHECK_DATE,
        ODI_ORIGIN,
        ODI_CONS_NAME,
        ODI_CONS_TYPE,
        <%=odiRef.getColList("", "[COL_NAME]", ",\n\t", "", "MAP")%>
)
select  
'<%=odiRef.getFK("AUTO_CREATE_REFS")%>',
SYS_GUID(),
        <%=odiRef.getSession("SESS_NO")%>,
        rowid,
...


4.2.2.3.3 Insert Missing References

The new task is added after the insert FK errors task. It has the Join option checked.

Note the following:

  • The getFK("AUTO_CREATE_FS") method is used to retrieve the AUTO_CREATE_FS flexfield value that conditions the generation of the SQL statement.

  • The getFK("REF_TAB_DEF_COL") method is used to retrieve from the flexfield the name of the column to set to '<undefined>'.

  • The getFKColList method is used to retrieve the list of columns participating to the foreign key and create the missing reference primary key columns content.

  • The filter made to retrieve only the records corresponding to the current checked foreign key constraint with the AUTO_CREATE_REFS flag set to Yes.

Command on Target (Oracle)

<% if (odiRef.getFK("AUTO_CREATE_REFS").equals("Yes")) { %>

insert into <%=odiRef.getTable("L", "FK_PK_TABLE_NAME", "A")%>
(
<%=odiRef.getFKColList("", "[PK_COL_NAME]", ",", "")%>, 
<%=odiRef.getFK("REF_TAB_DEF_COL")%>
)

select distinct
<%=odiRef.getFKColList("", "[COL_NAME]", ",", "")%>,
'<UNKNOWN>'
from <%=odiRef.getTable("L","ERR_NAME", "A")%>
where
CONS_NAME = '<%=odiRef.getFK("FK_NAME")%>'
And CONS_TYPE = 'FK'
And ORIGIN = '<%=odiRef.getInfo("CT_ORIGIN")%>'
And AUTO_CREATE_REFS = 'Yes'
<%}%>
4.2.2.3.4 Delete Errors from Controlled Table

This task is modified to avoid deleting the foreign key records for which a reference have been created. These can remain in the controlled table.

Command on Target (Oracle)

delete from       <%=odiRef.getTable("L", "CT_NAME", "A")%>  T
where   exists  (
                select  1
                from     <%=odiRef.getTable("L","ERR_NAME", "W")%> E
                where ODI_SESS_NO = <%=odiRef.getSession("SESS_NO")%>
                and T.rowid = E.ODI_ROW_ID              

and E.AUTO_CREATE_REFS <> 'Yes'           
)