5 データ整合性の戦略
この章の内容は次のとおりです。
データ整合性のチェック・プロセス
データ整合性のチェック・プロセスは、次の場合に有効になります。
-
「静的制御」がモデル、サブモデルまたはデータストアに対して起動された場合(Studioから、またはパッケージを使用して)。データストアのデータは、Oracle Data Integratorモデルに定義されている制約と照合してチェックされます。
-
マッピングが実行され、IKMで「フロー制御」が有効化されている場合。統合表(I$)にステージングされたフロー・データは、モデルに定義されているターゲット・データストアの制約と照合してチェックされます。マッピングで選択された制約のみがチェックされます。
このどちらのケースでも、CKMは、事前定義済の一連の制約に従って、データストアのデータ品質をチェックします。CKMは、「静的制御」で既存のデータのチェックに使用するか、「フロー制御」でフロー・データのチェックに使用できます。また、指定すると、チェックした表からエラーのあるレコードを削除することもできます。
静的制御の場合は、使用するCKMはモデルで定義されます。フロー制御の場合は、マッピングで指定されます。
チェック・ナレッジ・モジュールの概要
標準のCKMでは、次の2種類の表が管理されます。
-
各データ・サーバーの単一のサマリー表。名前はSNP_CHECK_TABで、データ・サーバーのデフォルト物理スキーマの作業スキーマで作成されます。この表には、それぞれの表および制約のエラーのサマリーが含まれます。たとえば、モデルの全体的なデータ品質の分析に使用できます。
-
チェックされた各データストアのエラー表。名前はE$_<datastore name>です。エラー表には、この表に対して起動されたデータ品質プロセス(静的制御とフロー制御)によって拒否された実際のレコードが含まれます。
標準CKMは、次のステップで構成されます。
-
サマリー表を削除して作成します。DROP文は、サマリー表をリセットするために設計者がこれを要求する場合のみ実行されます。CREATE文は常に実行されますが、 表がすでに存在する場合は、エラーが許可されます。
-
前の実行のサマリー・レコードをサマリー表から削除します。
-
エラー表を削除して作成します。DROP文は、エラー表を再作成するために設計者がこれを要求する場合のみ実行されます。CREATE文は常に実行されますが、表がすでに存在する場合は、エラーが許可されます。
-
前の実行で拒否されたレコードをエラー表から削除します。
-
主キー制約に違反するレコードを拒否します。
-
代替キー制約に違反するレコードを拒否します。
-
外部キー制約に違反するレコードを拒否します。
-
チェック条件制約に違反するレコードを拒否します。
-
必須属性制約に違反するレコードを拒否します。
-
必要に応じて、チェックした表から拒否されたレコードを削除します。
-
サマリー表に、検出されたエラーのサマリーを挿入します。
CKMコマンドにタグを付けて、コードの生成方法を指定する必要があります。使用できるタグ は次のとおりです。
-
主キー: コマンドによって、主キー制約のチェックに必要なコードが定義されます。
-
代替キー: コマンドによって、代替キー制約のチェックに必要なコードが定義されます。Oracle Data Integratorでは、コードの生成時にそれぞれの代替キーに対してこのコマンドが使用されます。
-
結合: コマンドによって、外部キー制約のチェックに必要なコードが定義されます。Oracle Data Integratorでは、コードの生成時にそれぞれの外部キーに対してこのコマ ンドが使用されます。
-
条件: コマンドによって、条件制約のチェックに必要なコードが定義されます。Oracle Data Integratorでは、コードの生成時にそれぞれのチェック条件に対してこのコマン ドが使用されます。
-
必須: コマンドによって、必須属性制約のチェックに必要なコードが定義されます。Oracle Data Integratorでは、コードの生成時に必須属性に対してこのコマンドが使用されます。
-
エラーの削除: コマンドによって、チェックした表から拒否されたレコードを削除するのに必要なコードが定義されます。
エラー表の構造
この項では、エラー表およびサマリー表の一般的な構造について説明します。
エラー表の構造
E$エラー表には、次の表に記載する列のリストが含まれます。
列 | 説明 |
---|---|
[チェックされた表の列] |
エラー表にはチェックしたデータストアのすべての属性が含まれます。 |
ERR_TYPE |
エラーの種類:
|
ERR_MESS |
違反があった制約に関連するエラー・メッセージ |
CHECK_DATE |
データストアがチェックされた日時 |
ORIGIN |
チェック操作の実行元。この列は、チェックの実行方法によって、データストア名、またはマッピング名とIDのどちらかに設定されます。 |
CONS_NAME |
違反のあった制約の名前 |
CONS_TYPE |
制約の種類:
|
サマリー表の構造
SNP_CHECK表には、次の表に記載する列のリストが含まれます。
列 | 説明 |
---|---|
ODI_CATALOG_NAME |
チェックされた表のカタログ名(適用される場合) |
ODI_SCHEMA_NAME |
チェックされた表のスキーマ名(適用される場合) |
ODI_RESOURCE_NAME |
チェックされた表のリソース名 |
ODI_FULL_RES_NAME |
チェックされた表の完全修飾名。たとえば<catalog>.<schema>.<table> |
ODI_ERR_TYPE |
エラーの種類:
|
ODI_ERR_MESS |
エラー・メッセージ |
ODI_CHECK_DATE |
データストアがチェックされた日時 |
ODI_ORIGIN |
チェック操作の実行元。この列は、チェックの実行方法によって、データストア名、またはマッピング名とIDのどちらかに設定されます。 |
ODI_CONS_NAME |
違反のあった制約の名前 |
ODI_CONS_TYPE |
制約の種類:
|
ODI_ERR_COUNT |
チェック処理中にこの制約によって拒否されたレコードの合計数 |
ODI_SESS_NO |
ODIセッション番号 |
ODI_PK |
この表の一意の識別子 |
ケース・スタディ
この項では、データ整合性のチェック戦略の例を示します。
Oracle CKM
CKM Oracleは、データ整合性チェックの一般的な例です。
次のコマンドは、OracleのCKMから抽出されたもので、ここに例として示します。このナレッジ・モジュールのコードは、Oracle Data Integrator Studioで編集して確認できます。
チェック表の削除
このタスクは、エラーのサマリー表を削除します。このコマンドは、DROP_CHECK_TABLEが「はい」に設定され、「エラーの無視」フラグが有効化されている場合のみ実行されます。サマリー表が見つからない場合でもCKMは停止されません。
ターゲットに対するコマンド(Oracle)
drop table <%=odiRef.getTable("L","CHECK_NAME","W")%> <% if (new Integer(odiRef.getOption( "COMPATIBLE" )).intValue() >= 10 ) { out.print( "purge" ); }; %>
チェック表の作成
このタスクは、エラーのサマリー表を作成します。このコマンドは常に実行され、「エラーの無視」フラグは有効化されます。サマリー表がすでに存在する場合でもCKMは停止されません。
ターゲットに対するコマンド(Oracle)
... create table <%=odiRef.getTable("L","CHECK_NAME","W")%> ( CATALOG_NAME <%=odiRef.getDataType("DEST_VARCHAR", "100", "")%> <%=odiRef.getInfo("DEST_DDL_NULL")%> , SCHEMA_NAME <%=odiRef.getDataType("DEST_VARCHAR", "100", "")%> <%=odiRef.getInfo("DEST_DDL_NULL")%> , RESOURCE_NAME <%=odiRef.getDataType("DEST_VARCHAR", "100", "")%> <%=odiRef.getInfo("DEST_DDL_NULL")%>, FULL_RES_NAME <%=odiRef.getDataType("DEST_VARCHAR", "100", "")%> <%=odiRef.getInfo("DEST_DDL_NULL")%>, ERR_TYPE <%=odiRef.getDataType("DEST_VARCHAR", "1", "")%> <%=odiRef.getInfo("DEST_DDL_NULL")%>, ERR_MESS <%=odiRef.getDataType("DEST_VARCHAR", "250", "")%> <%=odiRef.getInfo("DEST_DDL_NULL")%> , CHECK_DATE <%=odiRef.getDataType("DEST_DATE", "", "")%> <%=odiRef.getInfo("DEST_DDL_NULL")%>, ORIGIN <%=odiRef.getDataType("DEST_VARCHAR", "100", "")%> <%=odiRef.getInfo("DEST_DDL_NULL")%>, CONS_NAME <%=odiRef.getDataType("DEST_VARCHAR", "35", "")%> <%=odiRef.getInfo("DEST_DDL_NULL")%>, CONS_TYPE <%=odiRef.getDataType("DEST_VARCHAR", "2", "")%> <%=odiRef.getInfo("DEST_DDL_NULL")%>, ERR_COUNT <%=odiRef.getDataType("DEST_NUMERIC", "10", "")%> <%=odiRef.getInfo("DEST_DDL_NULL")%> ) ...
エラー表の作成
このタスクはエラー表(E$)を作成します。このコマンドは常に実行され、「エラーの無視」フラグは有効化されます。エラー表がすでに存在する場合でもCKMは停止されません。
チェックしたデータストアからの属性リストをこの表構造に追加するgetCollistメソッドの使用に注目してください。
ターゲットに対するコマンド(Oracle)
... create table <%=odiRef.getTable("L","ERR_NAME", "W")%> ( ODI_ROW_ID UROWID, ODI_ERR_TYPE <%=odiRef.getDataType("DEST_VARCHAR", "1", "")%> <%=odiRef.getInfo("DEST_DDL_NULL")%>, ODI_ERR_MESS <%=odiRef.getDataType("DEST_VARCHAR", "250", "")%> <%=odiRef.getInfo("DEST_DDL_NULL")%>, ODI_CHECK_DATE <%=odiRef.getDataType("DEST_DATE", "", "")%> <%=odiRef.getInfo("DEST_DDL_NULL")%>, <%=odiRef.getColList("", "[COL_NAME]\t[DEST_WRI_DT] " + odiRef.getInfo("DEST_DDL_NULL"), ",\n\t", "", "")%>, ODI_ORIGIN <%=odiRef.getDataType("DEST_VARCHAR", "100", "")%> <%=odiRef.getInfo("DEST_DDL_NULL")%>, ODI_CONS_NAME <%=odiRef.getDataType("DEST_VARCHAR", "35", "")%> <%=odiRef.getInfo("DEST_DDL_NULL")%>, ODI_CONS_TYPE <%=odiRef.getDataType("DEST_VARCHAR", "2", "")%> <%=odiRef.getInfo("DEST_DDL_NULL")%>, ODI_PK <%=odiRef.getDataType("DEST_VARCHAR", "32", "")%> PRIMARY KEY, ODI_SESS_NO <%=odiRef.getDataType("DEST_VARCHAR", "19", "")%> ) ...
PKエラーの挿入
このタスクは、主キーのチェック中に検出されたエラーをエラー表(E$)に挿入します。このコマンドは常に実行され、このレコードをエラーとしてカウントするために、「主キー」チェック・ボックスを有効にし、「ログ・カウンタ」を「エラー」に設定します。
注意:
CKMを使用してマッピングからフロー制御を実行する場合は、許可するエラーの最大数を定義できます。この数は、「ログ・カウンタ」が「エラー」に設定されているCKMで各コマンドによって返されたレコードの合計数と比較されます。
getCollistメソッドを使用してチェックされたレコード全体をエラー表に挿入し、getPKおよびgetInfoメソッドを使用して内容に応じた情報を取得することに注意してください。
ターゲットに対するコマンド(Oracle)
insert into <%=odiRef.getTable("L","ERR_NAME", "W")%> ( ODI_PK, ODI_SESS_NO, ODI_ROW_ID, ODI_ERR_TYPE, ODI_ERR_MESS, ODI_ORIGIN, ODI_CHECK_DATE, ODI_CONS_NAME, ODI_CONS_TYPE, <%=odiRef.getColList("", "[COL_NAME]", ",\n\t", "", "MAP")%> ) select SYS_GUID(), <%=odiRef.getSession("SESS_NO")%>, rowid, '<%=odiRef.getInfo("CT_ERR_TYPE")%>', '<%=odiRef.getPK("MESS")%>', '<%=odiRef.getInfo("CT_ORIGIN")%>', <%=odiRef.getInfo("DEST_DATE_FCT")%>, '<%=odiRef.getPK("KEY_NAME")%>', 'PK', <%=odiRef.getColList("", odiRef.getTargetTable("TABLE_ALIAS")+".[COL_NAME]", ",\n\t", "", "MAP")%> from <%=odiRef.getTable("L", "CT_NAME", "A")%> <%=odiRef.getTargetTable("TABLE_ALIAS")%> where exists ( select <%=odiRef.getColList("", "SUB.[COL_NAME]", ",\n\t\t\t", "", "PK")%> from <%=odiRef.getTable("L","CT_NAME","A")%> SUB where <%=odiRef.getColList("", "SUB.[COL_NAME]="+odiRef.getTargetTable("TABLE_ALIAS")+".[COL_NAME]", "\n\t\t\tand ", "", "PK")%> group by <%=odiRef.getColList("", "SUB.[COL_NAME]", ",\n\t\t\t", "", "PK")%> having count(1) > 1 ) <%=odiRef.getFilter()%>
制御表からのエラーの削除
このタスクは、制御表(静的制御)または統合表(フロー制御)からエラーがあるとして検出された行を削除します。
このタスクは常に実行され、「エラーの削除」オプションが選択されます。
ターゲットに対するコマンド(Oracle)
delete from <%=odiRef.getTable("L", "CT_NAME", "A")%> T where exists ( select 1 from <%=odiRef.getTable("L","ERR_NAME", "W")%> E where ODI_SESS_NO = <%=odiRef.getSession("SESS_NO")%> and T.rowid = E.ODI_ROW_ID )
存在しない参照を動的に作成
次のユースケースは、既存のCKMに加えて実行できるカスタマイズの例について説明します。
ユースケース
データ・ウェアハウスをロードする場合に、受け取ったレコードが他の表のデータを参照しているにもかかわらず、参照されるレコードがまだ存在していないことがあります。
たとえば、1日の販売取引レコードを受け取り、これらのレコードが商品SKUを参照するとします。商品表に商品が存在しない場合は、標準CKMのデフォルトの動作では、販売取引レコードが拒否され、データ・ウェアハウスにロードされずにエラー表に格納されます。ただし、プロジェクトの要件を満たすためには、この販売レコードをデータ・ウェアハウスにロードし、その場で空の商品を作成してデータの一貫性を確保する必要があります。その後、データ分析者は、単純にエラー表を分析して商品表に自動的に追加された商品の欠落情報を作成します。
この例を図示すると次のようになります。
-
ソース・フロー・データは、IKMによってI$_SALES表にステージングされ、販売表にロードされます。IKMによってCKMがコールされ、データ品質がチェックされます。
-
CKMによって、ターゲットの販売表および商品表の間で定義されたFK_SALES_PRODUCT外部キーを含む各制約がチェックされます。PRODUCT_ID="P25"で参照された商品が商品表に存在しないため、SALES_ID='4'のレコードが拒否されてエラー表に格納されます。
-
CKMによって欠落しているP25商品の参照が商品表に自動的に挿入され、PRODUCT_NAMEに<unknown>という値が割り当てられます。これ以外のすべての属性は、nullまたはデフォルト値に設定されます。
-
ソース・フローI$表の一貫性が確保されたため、拒否されたレコードはCKMによってソース・フローI$表から削除されません。
-
IKMによってフロー・データがターゲットに書き込まれます。
上記のシーケンスでは、ステップ3および4は標準CKMとは異なるため、カスタマイズする必要があります。
ディスカッション
このようなCKMを実装する場合、Oracle Data Integratorのデフォルト・メタデータで一部の情報が欠落していることに気付きます。次の情報が必要になります。
-
参照オブジェクト上のREF_TAB_DEF_COLという名前の新しいフレックスフィールドで、これに参照される表の'<unknown>'値(この場合はPRODUCT_NAME)を保持する必要のある属性を含めます。
-
FKエラーが欠落している参照を自動作成する必要があるかどうかを示すエラー表の新しい列(ODI_AUTO_CREATE_REFS)。このフラグはFKエラーの検出時に移入されます。
-
参照オブジェクト上のAUTO_CREATE_REFSという名前の新しいフレックスフィールドで、制約により、欠落している参照の作成が自動的に行われます。フレックス・フィールドの詳細は、『Oracle Data Integrator開発者ガイド』を参照してください。
これで、必要なすべてのメタデータが用意できました。要件に合わせてデフォルトのCKMを拡張できます。CKMのステップは次のようになります(変更箇所は太字で強調表示されています)。
-
サマリー表を削除して作成します。
-
前の実行のサマリー・レコードをサマリー表から削除します。
-
エラー表を削除して作成します。追加のODI_AUTO_CREATE_REFS列をエラー表に追加します。
-
前の実行で拒否されたレコードをエラー表から削除します。
-
主キー制約に違反するレコードを拒否します。
-
各代替キー制約に違反するレコードを拒否します。
-
各外部キー制約に違反するレコードを拒否し、AUTO_CREATE_REFSフレックスフィールドの値をODI_AUTO_CREATE_REFS列に格納します。
-
各外部キー・エラーの検出について、ODI_AUTO_CREATE_REFSが「Yes」に設定されている場合は、参照される表に欠落している参照を挿入します。
-
各チェック条件制約に違反するレコードを拒否します。
-
各必須属性制約に違反するレコードを拒否します。
-
必要に応じて、チェックした表から拒否されたレコードを削除します。制約の動作が「Yes」に設定されているレコードは削除しないでください。
-
サマリー表に、検出されたエラーのサマリーを挿入します。
実装の詳細
次のコマンド変更を実行して、CKMに必要な変更を実装します。変更箇所のコードは太字で強調表示されています。
エラー表の作成
タスクはエラー表に新しいODI_AUTO_CREATE_REFS列を作成するように変更されます。
ターゲットに対するコマンド(Oracle)
...
create table <%=odiRef.getTable("L","ERR_NAME", "W")%>
(
ODI_AUTO_CREATE_REFS <%=odiRef.getDataType("DEST_VARCHAR", "3", "")%>
ODI_ROW_ID UROWID,
ODI_ERR_TYPE <%=odiRef.getDataType("DEST_VARCHAR", "1", "")%>
<%=odiRef.getInfo("DEST_DDL_NULL")%>,
ODI_ERR_MESS <%=odiRef.getDataType("DEST_VARCHAR", "250", "")%>
<%=odiRef.getInfo("DEST_DDL_NULL")%>,
ODI_CHECK_DATE <%=odiRef.getDataType("DEST_DATE", "", "")%>
<%=odiRef.getInfo("DEST_DDL_NULL")%>,
<%=odiRef.getColList("", "[COL_NAME]\t[DEST_WRI_DT] " + odiRef.getInfo("DEST_DDL_NULL"), ",\n\t", "", "")%>,
ODI_ORIGIN <%=odiRef.getDataType("DEST_VARCHAR", "100", "")%>
<%=odiRef.getInfo("DEST_DDL_NULL")%>,
ODI_CONS_NAME <%=odiRef.getDataType("DEST_VARCHAR", "35", "")%>
<%=odiRef.getInfo("DEST_DDL_NULL")%>,
ODI_CONS_TYPE <%=odiRef.getDataType("DEST_VARCHAR", "2", "")%>
<%=odiRef.getInfo("DEST_DDL_NULL")%>,
ODI_PK <%=odiRef.getDataType("DEST_VARCHAR", "32", "")%> PRIMARY KEY,
ODI_SESS_NO <%=odiRef.getDataType("DEST_VARCHAR", "19", "")%>
)
...
FKエラーの挿入
タスクは新しいODI_AUTO_CREATE_REFS列を考慮するように変更されます。それをFKに定義されているフレックスフィールドの内容とともにロードして、この制約により、欠落している参照を自動作成するかどうかを示します。AUTO_CREATE_REFSフレックスフィールドの値を取得するために、getFKメソッドを使用することに注意してください。
ターゲットに対するコマンド(Oracle)
... insert into <%=odiRef.getTable("L","ERR_NAME", "W")%> ( ODI_AUTO_CREATE_REFS, ODI_PK, ODI_SESS_NO, ODI_ROW_ID, ODI_ERR_TYPE, ODI_ERR_MESS, ODI_CHECK_DATE, ODI_ORIGIN, ODI_CONS_NAME, ODI_CONS_TYPE, <%=odiRef.getColList("", "[COL_NAME]", ",\n\t", "", "MAP")%> ) select '<%=odiRef.getFK("AUTO_CREATE_REFS")%>', SYS_GUID(), <%=odiRef.getSession("SESS_NO")%>, rowid, ...
欠落している参照の挿入
FKエラーの挿入タスクの後で新しいタスクが追加されます。「結合」オプションがチェックされます。
次の点に注意してください。
-
getFK("AUTO_CREATE_FS") メソッドを使用して、SQL文の生成を条件付けするAUTO_CREATE_FSフレックスフィールド値を取得します。
-
getFK("REF_TAB_DEF_COL")メソッドを使用して、フレックスフィールドから'<undefined>'に設定されている属性の名前を取得します。
-
getFKColListメソッドを使用して、外部キーに加える属性のリストを取得し、欠落している参照の主キー属性の内容を作成します。
-
AUTO_CREATE_REFSフラグを「Yes」に設定して、現在チェックされている外部キー制約に対応するレコードのみを取得するようにフィルタされます。
ターゲットに対するコマンド(Oracle)
<% if (odiRef.getFK("AUTO_CREATE_REFS").equals("Yes")) { %> insert into <%=odiRef.getTable("L", "FK_PK_TABLE_NAME", "A")%> ( <%=odiRef.getFKColList("", "[PK_COL_NAME]", ",", "")%>, <%=odiRef.getFK("REF_TAB_DEF_COL")%> ) select distinct <%=odiRef.getFKColList("", "[COL_NAME]", ",", "")%>, '<UNKNOWN>' from <%=odiRef.getTable("L","ERR_NAME", "A")%> where CONS_NAME = '<%=odiRef.getFK("FK_NAME")%>' And CONS_TYPE = 'FK' And ORIGIN = '<%=odiRef.getInfo("CT_ORIGIN")%>' And AUTO_CREATE_REFS = 'Yes' <%}%>
制御表からのエラーの削除
このタスクは、参照が作成された外部キー・レコードを削除しないように修正されます。これらのレコードを制御表に残すことができます。
ターゲットに対するコマンド(Oracle)
delete from <%=odiRef.getTable("L", "CT_NAME", "A")%> T
where exists (
select 1
from <%=odiRef.getTable("L","ERR_NAME", "W")%> E
where ODI_SESS_NO = <%=odiRef.getSession("SESS_NO")%>
and T.rowid = E.ODI_ROW_ID
and E.AUTO_CREATE_REFS <> 'Yes'
)