12 EDQでのOracle Cloud Infrastructure (OCI) Queueの使用

このドキュメントでは、Oracle Enterprise Data Quality (EDQ)でOracle Cloud Infrastructure (OCI) Queueサービスの使用を開始する方法について説明します。このドキュメントの対象読者は、EDQアプリケーションのインストールおよび管理を担当するシステム管理者です。

この章の内容は次のとおりです。

12.1 OCI QueueおよびEDQの概要

EDQリアルタイム・プロバイダ・バケットおよびコンシューマ・バケットは、読取りおよびパブリッシュにOCI Queueサービスを使用するように構成できます。このキュー・サービスは、OCIストリームと比較して、リアルタイム処理に非常に適しています。

12.2 OCI Queueメッセージの読取りおよび書込みを行うためのEDQの構成

EDQとのOCI Queueインタフェースは、次のものを定義するXMLインタフェース・ファイルを使用して構成します:

  • メッセージ・キューへのパス
  • 特定のOCI Queueテクノロジの使用方法を定義するプロパティ
  • メッセージ・ペイロードをEDQプロセスで認識される形式にデコードする方法(メッセージ・プロバイダの場合、EDQがキューからメッセージを読み取る)、またはメッセージを外部プロセスで予測される形式に変換する方法(メッセージ・コンシューマの場合、EDQがメッセージをキューに書き込む)。

XMLファイルは、次のパスのEDQローカル・ホーム・ディレクトリ(以前の構成ディレクトリ)にあります。

  • buckets/realtime/providers (EDQに入力するインタフェース)
  • buckets/realtime/consumers (EDQから出力するインタフェース)

XMLファイルが構成されると、メッセージ・プロバイダ・インタフェースがEDQのリーダー・プロセッサで使用可能になり、データ・インタフェースをプロセスへの「入力」としてマップできます。また、メッセージ・コンシューマ・インタフェースはライター・プロセッサで使用可能になり、データ・インタフェースからプロセスの「出力」としてマップできます。

12.3 インタフェース・ファイルの定義

EDQのインタフェース・ファイルは、メッセージ・フレームワークを定義するrealtimedata要素で構成されます。OCI Queueインタフェースでは、次を使用します:

<?xml version="1.0" encoding="UTF-8"?>
 
<realtimedata messenger="ociqueues">
    ...
</realtimedata>

realtimedata要素には、3つのサブセクションがあります:

  • <attributes>セクション: EDQで認識できるインタフェースの形状を定義します
  • <messengerconfig>セクション: OCI Queueに接続する方法を定義します。
  • メッセージ・フォーマット・セクション: (たとえば、XMLから)メッセージのコンテンツをEDQが読み込める属性データに抽出する方法(インバウンド・インタフェースの場合)、またはEDQからメッセージ・データに(たとえば、XMLに)属性データを変換する方法を定義します。プロバイダ・インタフェースの場合、要素は<incoming>で、コンシューマ・インタフェースの場合、要素は<outgoing>です。

12.3.1 <attributes>セクションの理解

< attribute >セクションでは、インタフェースの形状を定義します。これは、リーダーまたはライターを構成するときにEDQで使用できる属性から構成されます。たとえば、インタフェース・ファイルの先頭部分の次の抜粋では、EDQで使用できる文字列属性と数値属性が構成されます:

<?xml version="1.0" encoding="UTF-8"?>
<realtimedata messenger="ociqueues">
<attributes>  
   <attribute type="string" name="messageID"/> 
   <attribute type="string" name="name"/> 
   <attribute type="number" name="AccountNumber"/> 
</attributes> 

[file continues]...

EDQでは、次のすべての標準属性タイプがサポートされます。

  • 文字列
  • 数値
  • 日付
  • stringarray
  • numberarray
  • datearray

12.3.2 <messengerconfig>セクションの理解

<messengerconfig>セクションには次のプロパティを設定できます:

プロパティ 説明

queue

キューOCID (必須)。

credentials

ストリームへの接続に使用される格納済資格証明名。省略すると、プラットフォーム(インスタンス)認証が使用されます。

interval

メッセージ受信のポーリング間隔(ミリ秒)。デフォルトは0です(長期ポーリングが使用されます)。

proxy

HTTPSコール用のhost:portプロキシ・サーバー。

deletemode

受信後のメッセージの削除を制御します。有効な値は次のとおりです:

  • reception: 各メッセージは、受信時にすぐに削除されます。これはデフォルトです。

  • completion: 各メッセージは、ジョブ内のプロセスのトラバースが完了すると削除されます。

  • off: メッセージは自動的に削除されません。Webサービス・コールの受信ハンドルなどを使用して、手動で削除する必要があります。

limit

1回のコールで受信するメッセージの最大数。デフォルトは20です。

visibilityInSeconds

受信したメッセージが後続の受信リクエストから非表示になる期間(秒)。デフォルトは、キュー定義で設定されます。

timeoutInSeconds

長期ポーリング受信リクエストの待機時間(秒)。省略すると、OCIのデフォルトである30秒が使用されます。

デフォルトは、realtime.propertiesで接頭辞"ociqueues."を使用して設定できます。

12.3.3 <incoming>または<outgoing>セクションの理解

<incoming> or <outgoing>セクションでは、メッセージ・メタデータおよび値をEDQ属性との間で変換する方法を定義します。これは、次の2つのサブセクションで構成されます:

12.3.3.1 <messageheaders>セクションの理解

<messageheaders>セクションでは、送信用の追加送信プロパティに属性をマップでき、受信時にメッセージ・メタデータから属性を設定できます。各値の詳細は、OCI Queueのドキュメントを参照してください。空でないメッセージ・ヘッダー値は、同じ名前の<messengerconfig>プロパティをオーバーライドします。

次の標準ヘッダーを使用できます:

ヘッダー名 設定可能 読取り可能 タイプ 説明

deliveryCount

いいえ

はい

数値

メッセージがコンシューマに配信された回数。

expireAfter

いいえ

はい

日付

メッセージが自動的に削除されるまでの時間。

id

いいえ

はい

文字列

内部メッセージID。

receipt

いいえ

はい

文字列

受信トークン。メッセージの手動削除に必要です。

visibleAfter

いいえ

はい

日付

メッセージが他のコンシューマに表示されるまでの時間。

12.3.3.2 <messagebody>セクションの理解

このセクションでは、JavaScriptを使用して、EDQがインバウンド・インタフェースに使用できる属性にメッセージ・ペイロードを解析し、アウトバウンド・インタフェースのために逆操作(EDQ属性データをメッセージ・ペイロード・データに変換)を実行します。‘extract’という名前の関数は、XMLからインバウンド・インタフェースの属性データへのデータの抽出に使用され、‘build’という名前の関数は、属性データからXMLデータを構築するために使用されます。

詳細は、を参照してください。この図では、ケース管理フィルタ・レポート・トリガーからJSONメッセージを受信できる完全なプロバイダ・バケットの例を示します。

12.3.4

次のXMLは、ケース管理フィルタ・レポート・トリガーからJSONメッセージを受信できる完全なプロバイダ・バケットの簡単な例です。内部メッセージIDも戻されます。

<?xml version="1.0" encoding="UTF-8"?>
 
<realtimedata messenger="ociqueues">
  <attributes>
    <attribute name="filter"      type="string"/>
    <attribute name="type"        type="string"/>
    <attribute name="xaxis"       type="string"/>
    <attribute name="yaxis"       type="string"/> 
    <attribute name="server"      type="string"/>
    <attribute name="userid"      type="number"/>
    <attribute name="user"        type="string"/>
    <attribute name="userdisplay" type="string"/>
    <attribute name="start"       type="date" format="iso"/>
    <attribute name="duration"    type="number"/>
    <attribute name="status"      type="string"/>
    <attribute name="sql"         type="string"/>
    <attribute name="arg.type"    type="stringarray"/>
    <attribute name="arg.value"   type="stringarray"/>
    <attribute name="mgid"        type="string"/>
  </attributes>
   
  <messengerconfig>
    queue        = ocid1.queue.oc1.phx.amaaaaaa7u6obfiaotsz7yuj2zcbc5uhc45evvv7wfwedgertw3543we
     credentials = OCI 1
  </messengerconfig>
 
  <incoming>
 
    <messageheaders>
      <header name="id" attribute="mgid"/>
    </messageheaders>
 
    <messagebody>
      <script>
         <![CDATA[
           var simple = ["filter", "type", "xaxis", "yaxis", "server", "userid", "user", "userdisplay", "duration", "status", "sql"]
 
           function extract(str) {
             var obj = JSON.parse(str)
             var rec = new Record()
 
             for (let x of simple) {
               rec[x] = obj[x]
             }
 
             rec.start = obj.start && new Date(obj.start)
 
             if (obj.args) {
               rec['arg.type']  = obj.args.map(a => a.type)
               rec['arg.value'] = obj.args.map(a => a.value && a.value.toString())
             }
 
             return [rec];
           }
         ]]>
      </script>
    </messagebody>
 
  </incoming>
 
</realtimedata>