11 EDQでのAmazon Simple Queue Service (Amazon SQS)の使用
このドキュメントでは、Oracle Enterprise Data Quality (EDQ)でAmazon Simple Queue Service (Amazon SQS)テクノロジの使用を開始する方法について説明します。このドキュメントの対象読者は、EDQアプリケーションのインストールおよび管理を担当するシステム管理者です。
この章の内容は次のとおりです。
11.1 Amazon SQSおよびEDQの概要
EDQリアルタイム・プロバイダ・バケットおよびコンシューマ・バケットは、読取りおよびパブリッシュにAmazon SQSキューを使用するように構成できます。
11.2 Amazon SQSメッセージの読取りおよび書込みを行うためのEDQの構成
EDQとのAmazon SQSインタフェースは、次のものを定義するXMLインタフェース・ファイルを使用して構成します:
- メッセージ・キューへのパス
- 特定のAmazon SQSテクノロジの使用方法を定義するプロパティ
- メッセージ・ペイロードをEDQプロセスで認識される形式にデコードする方法(メッセージ・プロバイダの場合、EDQがキューからメッセージを読み取る)、またはメッセージを外部プロセスで予測される形式に変換する方法(メッセージ・コンシューマの場合、EDQがメッセージをキューに書き込む)。
XMLファイルは、次のパスのEDQローカル・ホーム・ディレクトリ(以前の構成ディレクトリ)にあります。
- buckets/realtime/providers (EDQに入力するインタフェース)
- buckets/realtime/consumers (EDQから出力するインタフェース)
XMLファイルが構成されると、メッセージ・プロバイダ・インタフェースがEDQのリーダー・プロセッサで使用可能になり、データ・インタフェースをプロセスへの「入力」としてマップできます。また、メッセージ・コンシューマ・インタフェースはライター・プロセッサで使用可能になり、データ・インタフェースからプロセスの「出力」としてマップできます。
11.3 インタフェース・ファイルの定義
EDQのインタフェース・ファイルは、メッセージ・フレームワークを定義するrealtimedata
要素で構成されます。Amazon SQSインタフェースでは、次を使用します:
<?xml version="1.0" encoding="UTF-8"?>
<realtimedata messenger="sqs">
...
</realtimedata>
realtimedata
要素には、3つのサブセクションがあります:
<attributes>
セクション: EDQで認識できるインタフェースの形状を定義します<messengerconfig>
セクション: Amazon SQSキューに接続する方法を定義します。- メッセージ・フォーマット・セクション: (たとえば、XMLから)メッセージのコンテンツをEDQが読み込める属性データに抽出する方法(インバウンド・インタフェースの場合)、またはEDQからメッセージ・データに(たとえば、XMLに)属性データを変換する方法を定義します。プロバイダ・インタフェースの場合、要素は
<incoming>
で、コンシューマ・インタフェースの場合、要素は<outgoing>
です。
11.3.1 <attributes>セクションの理解
< attribute >セクションでは、インタフェースの形状を定義します。これは、リーダーまたはライターを構成するときにEDQで使用できる属性から構成されます。たとえば、インタフェース・ファイルの先頭部分の次の抜粋では、EDQで使用できる文字列属性と数値属性が構成されます:
<?xml version="1.0" encoding="UTF-8"?>
<realtimedata messenger="sqs">
<attributes>
<attribute type="string" name="messageID"/>
<attribute type="string" name="name"/>
<attribute type="number" name="AccountNumber"/>
</attributes>
[file continues]...
EDQでは、次のすべての標準属性タイプがサポートされます。
- 文字列
- 数値
- 日付
- stringarray
- numberarray
- datearray
11.3.2 <messengerconfig>セクションの理解
<messengerconfig>
セクションには次のプロパティを設定できます:
プロパティ | 説明 |
---|---|
queue |
SQSキューのURL (必須)。 |
credentials |
ストリームへの接続に使用される格納済資格証明名。省略すると、プラットフォーム(インスタンス)認証が使用されます。 |
interval |
メッセージ受信のポーリング間隔(ミリ秒)。デフォルトは0です(長期ポーリングが使用されます)。 |
proxy |
HTTPSコール用のhost:portプロキシ・サーバー。 |
deletemode |
受信後のメッセージの削除を制御します。有効な値は次のとおりです:
|
MessageDeduplicationId |
FIFOキューの重複除外ID。ヘッダー属性によってオーバーライドできます。2つの特別な値がサポートされています:
|
MessageGroupId |
FIFOキューのグループID。ヘッダー属性によってオーバーライドできます。 |
DelaySeconds |
送信遅延(秒)。ヘッダー属性によってオーバーライドできます。 |
MaxNumberOfMessages |
1回のコールで受信するメッセージの最大数。デフォルトは10です。 |
VisibilityTimeout |
受信したメッセージが後続の受信リクエストから非表示になる期間(秒)。デフォルトは、キュー定義で設定されます。 |
WaitTimeSeconds |
長期ポーリング受信リクエストの待機時間(秒)。デフォルトは20秒です。 |
デフォルトは、realtime.properties
で接頭辞"sqs."を使用して設定できます。
11.3.3 <incoming>または<outgoing>セクションの理解
<incoming>
or <outgoing>
セクションでは、メッセージ・メタデータおよび値をEDQ属性との間で変換する方法を定義します。これは、次の2つのサブセクションで構成されます:
11.3.3.1 <messageheaders>セクションの理解
<messageheaders>セクションでは、送信用の追加送信プロパティに属性をマップでき、受信時にメッセージ・メタデータから属性を設定できます。各値の詳細は、SQSのドキュメントを参照してください。空でないメッセージ・ヘッダー値は、同じ名前の<messengerconfig>プロパティをオーバーライドします。
次の標準ヘッダーを使用できます:
ヘッダー名 | 設定可能 | 読取り可能 | タイプ | 説明 |
---|---|---|---|---|
DelaySeconds |
はい |
いいえ |
数値 |
送信遅延(秒) |
MessageDeduplicationId |
はい |
はい |
文字列 |
FIFOキューの重複除外ID |
MessageGroupId |
はい |
はい |
文字列 |
FIFOキューのグループID |
ApproximateFirstReceiveTimestamp |
いいえ |
はい |
数値 |
キューからの最初の受信のタイムスタンプ |
ApproximateReceiveCount |
いいえ |
はい |
数値 |
メッセージが受信された回数 |
AWSTraceHeader |
いいえ |
はい |
文字列 |
X-Rayトレース・ヘッダー |
SenderId |
いいえ |
はい |
文字列 |
IAMユーザーまたはロールID |
SentTimestamp |
いいえ |
はい |
数値 |
送信タイムスタンプ |
SequenceNumber |
いいえ |
はい |
数値 |
SQSからの順序番号 |
MessageId |
いいえ |
はい |
文字列 |
内部メッセージID |
ReceiptHandle |
いいえ |
はい |
文字列 |
受信ハンドル。メッセージの手動削除に必要です |
カスタム・メッセージ属性
前述の定義済標準ヘッダーに加えて、<messageheaders>セクションでもカスタム属性を定義できます。カスタム属性名には、標準ヘッダーと区別するために"messageattribute:
"という接頭辞が付きます。
カスタム属性の例
<messageheaders>
<header name="messageattribute:tel" attribute="telephone" type="string"/>
</messageheaders>
11.3.3.2 <messagebody>セクションの理解
このセクションでは、JavaScriptを使用して、EDQがインバウンド・インタフェースに使用できる属性にメッセージ・ペイロードを解析し、アウトバウンド・インタフェースのために逆操作(EDQ属性データをメッセージ・ペイロード・データに変換)を実行します。‘extract’という名前の関数は、XMLからインバウンド・インタフェースの属性データへのデータの抽出に使用され、‘build’という名前の関数は、属性データからXMLデータを構築するために使用されます。
詳細は、図を参照してください。この図では、ケース管理フィルタ・レポート・トリガーからJSONメッセージを受信できる完全なプロバイダ・バケットの例を示します。
11.4 図
次のXMLは、ケース管理フィルタ・レポート・トリガーからJSONメッセージを受信できる完全なプロバイダ・バケットの簡単な例です。送信者IDおよびメッセージ・グループIDも、2つのカスタム・メッセージ属性attr1およびattr2とともに戻されます。
<?xml version="1.0" encoding="UTF-8"?>
<realtimedata messenger="sqs">
<attributes>
<attribute name="filter" type="string"/>
<attribute name="type" type="string"/>
<attribute name="xaxis" type="string"/>
<attribute name="yaxis" type="string"/>
<attribute name="server" type="string"/>
<attribute name="userid" type="number"/>
<attribute name="user" type="string"/>
<attribute name="userdisplay" type="string"/>
<attribute name="start" type="date" format="iso"/>
<attribute name="duration" type="number"/>
<attribute name="status" type="string"/>
<attribute name="sql" type="string"/>
<attribute name="arg.type" type="stringarray"/>
<attribute name="arg.value" type="stringarray"/>
<attribute name="senderid" type="string"/>
<attribute name="attr1" type="string"/>
<attribute name="attr2" type="number"/>
<attribute name="mgid" type="string"/>
</attributes>
<messengerconfig>
queue = https://sqs.eu-west-1.amazonaws.com/458503484332/queue1
credentials = aws1
deletemode = completion
</messengerconfig>
<incoming>
<messageheaders>
<header name="SenderId" attribute="senderid"/>
<header name="MessageGroupId" attribute="mgid"/>
<header name="messageattribute:attr1" attribute="attr1" type="string"/>
<header name="messageattribute:attr3" attribute="attr2" type="number"/>
</messageheaders>
<messagebody>
<script>
<![CDATA[
var simple = ["filter", "type", "xaxis", "yaxis", "server", "userid", "user", "userdisplay", "duration", "status", "sql"]
function extract(str) {
var obj = JSON.parse(str)
var rec = new Record()
for (let x of simple) {
rec[x] = obj[x]
}
rec.start = obj.start && new Date(obj.start)
if (obj.args) {
rec['arg.type'] = obj.args.map(a => a.type)
rec['arg.value'] = obj.args.map(a => a.value && a.value.toString())
}
return [rec];
}
]]>
</script>
</messagebody>
</incoming>
</realtimedata>