10 EDQでのApache Kafkaの使用
このドキュメントでは、Oracle Enterprise Data Quality (EDQ)でApache Kafkaストリーミング・プラットフォームの使用を開始する方法について説明します。このドキュメントの対象読者は、EDQアプリケーションのインストールおよび管理を担当するシステム管理者です。Kafkaの概念および構成について深く理解していることを前提としています。
ノート:
この機能が適用されるのは、EDQ 12.2.1.4.1のリリースのみです。この章の内容は次のとおりです。
10.1 KafkaおよびEDQの概要
Apache Kafkaは、高パフォーマンスの分散ストリーミング・プラットフォームです。
EDQでは、KafkaコンシューマAPIを使用して、1つ以上のトピックをサブスクライブし、それらのパブリッシュ時にレコードを処理できます。また、KafkaプロデューサAPIを使用して、レコードのストリームをトピックにパブリッシュできます。
Kafkaレコードには、値とオプションのキーが含まれます。EDQでは、レコードの値とキーのテキスト値のみがサポートされます。
10.2 Kafkaレコードの読取りおよび書込みを行うためのEDQの構成
EDQとのKafkaインタフェースは、次のものを定義するXMLインタフェース・ファイルを使用して構成します:
-
インタフェースによって生成または消費されるEDQ属性
-
KafkaコンシューマまたはプロデューサAPIの構成方法を定義するプロパティ
-
レコード値をEDQ属性にデコードする方法(メッセージ・プロバイダの場合、EDQがトピックからレコードを読み取る)、または属性をKafkaレコードの値に変換する方法(メッセージ・コンシューマの場合、EDQがトピックにメッセージを書き込む)。
XMLファイルは、次のパスのEDQローカル・ホーム・ディレクトリにあります:
-
buckets/realtime/providers (EDQに入力するインタフェース)
-
buckets/realtime/consumers (EDQから出力するインタフェース)
XMLファイルが構成されると、Webサービスの入力/出力およびJMSプロバイダ/コンシューマと同様に、メッセージ・プロバイダ・インタフェースがEDQのリーダー・プロセッサで使用可能になり、データ・インタフェースをプロセスへの「入力」としてマップできます。また、メッセージ・コンシューマ・インタフェースはライター・プロセッサで使用可能になり、データ・インタフェースからプロセスの「出力」としてマップできます。
10.3 インタフェース・ファイルの定義
EDQのインタフェース・ファイルは、メッセージ・フレームワークを定義するrealtimedata
要素で構成されます。Kafkaインタフェースでは、次を使用します:
<realtimedata messenger=”kafka”>
…
</realtimedata>
realtimedata
要素には、3つのサブセクションがあります:
-
<attributes>
セクション: EDQで認識できるインタフェースの形状を定義します -
<messengerconfig>
セクション: Kafka APIの構成方法を定義します -
メッセージ・フォーマット・セクション: EDQ属性に対するKafkaレコードのマップ方法を定義します。プロバイダ・インタフェースの場合、要素は
<incoming>
で、コンシューマ・インタフェースの場合、要素は<outgoing>
です。
10.3.1 <attributes>セクションの理解
<attributes>
セクションでは、インタフェースの形状を定義します。これは、リーダーまたはライターを構成するときにEDQで使用できる属性を構成します。たとえば、インタフェース・ファイルの先頭部分の次の抜粋では、EDQで使用できる文字列属性と数値属性が構成されます:
<?xml version="1.0" encoding="UTF-8"?>
<realtimedata messenger="kafka">
<attributes>
<attribute type="string" name="messageID"/>
<attribute type="string" name="name"/>
<attribute type="number" name="AccountNumber"/>
</attributes>
…
EDQでは、すべての標準属性タイプがサポートされます。これらは次のとおりです:
-
文字列
-
数値
-
日付
-
stringarray
-
numberarray
-
datearray
10.3.2 <messengerconfig>セクションの理解
インタフェース・ファイルの<messengerconfig>
セクションでは、Kafka APIを構成します。<messengerconfig>
内のテキストは、Javaプロパティのセットとして解析されます。
conf.
という接頭辞があるプロパティは、キーからconf.
接頭辞を削除した後にKafka APIに直接渡されます。
Kafkaの<messengerconfig>
セクションに配置できるその他のプロパティは、次のとおりです:
-
topic
: プロバイダ・インタフェースの場合、サブスクライブするKafkaトピックのカンマまたはスペース区切りのリストが必要です。コンシューマ・インタフェースの場合、単一のトピック名が必要です。 -
poll
: 新規レコードのポーリング間隔(ミリ秒)。これは、プロバイダ・インタフェースにのみ適用されます。デフォルト値は500です。 -
key.encoding
およびvalue.encoding
: レコードのキーと値の変換に使用される文字セット。文字セットは、java.nio.charset.Charset.forName
によって認識される必要があります。デフォルト値は実装固有です。
WebLogicインストールでは、構成プロパティに必要な認証情報をOPSS資格証明ストアに格納して、${username}および${password}置換を持つプロパティで使用できます。cred.key
およびcred.map
プロパティを使用して、資格証明ストアのキーおよびマップ名を定義します。マップ名を省略すると、デフォルトで"edq"に設定されます。
次に、資格証明ストアを使用した完全な構成の例を示します:
<messengerconfig>
cred.key = kafka1
topic = mytopic
conf.bootstrap.servers = kserver:9094
confs.acks = all
conf.max.block.ms = 1000
conf.security.protocol = SASL_SSL
conf.sasl.mechanism = PLAIN
conf.ssl.truststore.location = pathtokeystore.jks
conf.ssl.truststore.password = pw
conf.sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required
username="${username}" password="${password}";
</messengerconfig>
jaas.config
プロパティのユーザー名とパスワードは、キー"kafka1"を使用して資格証明ストアから読み取られます。
デフォルト・プロパティは、EDQローカル・ホーム・ディレクトリのrealtime.properties
ファイルに定義できます。このファイル内のKafkaインタフェースのキーには、"kafka."という接頭辞が付きます。たとえば:
kafka.conf.security.protocol = SASL_SSL
10.3.3 <incoming>または<outgoing>セクションの理解
<incoming>
または<outgoing>
セクションでは、レコード・メタデータおよび値をEDQ属性との間で変換する方法を定義します。これは、次の2つのサブセクションで構成されます:
10.3.3.1 <messageheaders>セクションの理解
<messageheaders>
セクションはオプションです。これは、レコード値の外部にあるデータをEDQ属性との間で変換する方法を定義します。
このセクションの形式は次のとおりです:
<messageheaders>
<header name=”headername” attribute=”attributename”/>
…
</messageheaders>
Kafkaインタフェースは、2つのヘッダー名を次のように定義します:
-
key: プロバイダの場合、レコードのキー値が名前付きEDQ属性に格納されます。コンシューマの場合、EDQ属性の値がパブリッシュ時のレコード・キーとして使用されます。
-
topic: レコードを受信したKafkaトピックの名前がEDQ属性に格納されます。これは、コンシューマにのみ適用されます。これは、インタフェースが多数の異なるトピックをサブスクライブするように定義されている場合に役立ちます。
10.3.3.2 <messagebody>セクションの理解
<messagebody>
セクションでは、Kafkaメッセージのテキスト値をEDQ属性との間で変換する方法を定義します。要素の後には、変換メカニズムを定義するサブセクションが続きます。次の変換メカニズムがサポートされます:
JSON変換
レコード値はJSON形式である必要があります。ネストされた要素は、JSON属性とEDQ属性間のマッピングを定義します。
書式は次のとおりです。
<json [multirecord=”true or false”] [defaultmappings=”true or false”]>
<mapping attribute=”attributename” path=”jsonattributename”/>
…
</json>
defaultmappings
属性を省略するか、trueに設定すると、インタフェースのEDQ属性からJSON属性への自動マッピングが作成されます。
multirecord
属性がtrueに設定されている場合、コンシューマ・インタフェースは、JSON入力が配列であり、プロバイダ・インタフェースがJSON配列を生成すると想定します。
<attributes>
セクションの説明に示されている属性の場合、JSON変換の例は次のとおりです:
例1:
すべてが自動である単純なJSON変換の例は、次のとおりです:
<json/>
自動マッピングによる単純な変換では、次のような値が想定および生成されます:
{ "messageID": "x123",
"name": "John Smith",
"AccountNumber": 34567
}
例2:
マッピングを使用した複数レコードJSON変換の例は、次のとおりです:
<json multirecord=”true”>
<mapping attribute=”AccountNumber” path=”accno”/>
</json>
単一の属性名マッピングによる複数レコード変換では、次のような値が想定および生成されます:
[{ "messageID": "x123",
"name": "John Smith",
"accno": 34567
},
…
]
スクリプト変換
XML解析など、より複雑な変換をサポートするために、レコード値を処理するためのJavaScriptスクリプトを指定できます。
プロバイダ・インタフェースでは、文字列レコード値を引数として使用する"extract"という名前の関数をスクリプトで定義する必要があります。スクリプトは、EDQ属性に一致する属性名を持つRecordオブジェクトの配列を返す必要があります。
次に、Rhino JavaScriptのE4X XML処理APIを使用したXMLの解析の例を示します:
<script>
<![CDATA[
function extract(str) {
var r = new Record()
var x = new XML(XMLTransformer.purifyXML(str));
r.messageID = x.ID
r.name = x.Accname
r.accountNumber = parseInt(x.Accnumber)
return [r];
}
]]>
</script>
コンシューマ・インタフェースでは、Recordオブジェクトの配列を取得してテキスト値を返す"build"という名前の関数をスクリプトで定義する必要があります。
次に、XMLを生成する例を示します:
<script>
<![CDATA[
function build(recs, mtags) {
var rec = recs[0];
var xml =
<response xmlns="http://www.datanomic.com/ws">
<sum>{rec.sum}</sum>
</response>;
return xml.toXMLString();
}
]]>
</script>
複数レコードのレスポンスでは、デフォルトの動作は、各レコードのスクリプトをコールすることです。<script multirecord=”true”>
が使用されている場合、build関数は、メッセージ内のすべてのレコードで1回コールされます。
詳細は、デフォルトのJSON変換を使用するプロバイダ・インタフェース・ファイルの例を示した図を参照してください。
10.4 図
次のXMLは、プロバイダ・インタフェース・ファイルの簡単な例で、デフォルトのJSON変換を使用します:
<?xml version="1.0" encoding="UTF-8"?>
<realtimedata messenger="kafka">
<attributes>
<attribute type="string" name="messageID"/>
<attribute type="string" name="name"/>
<attribute type="string" name="AccountNumber"/>
<attribute type="string" name="AccountName"/>
<attribute type="string" name="Country"/>
</attributes>
<messengerconfig>
cred.key = kafka1
topic = mytopic
conf.bootstrap.servers = kserver:9094
confs.acks = all
conf.max.block.ms = 1000
conf.security.protocol = SASL_SSL
conf.sasl.mechanism = PLAIN
conf.ssl.truststore.location = mykeystore,jks
conf.ssl.truststore.password = pw
conf.sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required
username="${username}" password="${password}";
</messengerconfig>
<incoming>
<messagebody>
<json/>
</messagebody>
</incoming>
</realtimedata>
</realtimedata>