13 Using Apache Kafka with EDQ

This document describes how to get started using the Apache Kafka streaming platform with Oracle Enterprise Data Quality (EDQ). This documentation is intended for system administrators responsible for installing and maintaining EDQ applications. In-depth understanding of Kafka concepts and configuration is assumed.

Note:

This feature is applicable only for EDQ 12.2.1.4.1 release.

This chapter includes the following sections:

13.1 Introduction to Kafka and EDQ

Apache Kafka is a highly performant distributed streaming platform.

EDQ can use the Kafka Consumer API to subscribe to one or more topics and process records as they are published, and can use the Kafka Producer API to publish a stream of records to a topic.

Kafka records contain a value and an optional key. EDQ supports only text values for record values and keys.

13.2 Configuring EDQ to Read and Write Kafka Records

Kafka interfaces to and from EDQ are configured using XML interface files that define:

  • The EDQ attributes produced or consumed by the interface

  • Properties that define how to configure the Kafka consumer or producer API

  • How to decode the record value into EDQ attributes (for Message Providers – where EDQ reads records from a topic), or convert attributes to a value for a Kafka record (for Message Consumers – where EDQ writes messages to a topic).

The XML files are located in the EDQ Local Home directory, in the following paths:

  • buckets/realtime/providers (for interfaces 'in' to EDQ)

  • buckets/realtime/consumers (for interfaces 'out' from EDQ)

Once the XML files have been configured, Message Provider interfaces are available in Reader processors in EDQ and to map to Data Interfaces as 'inputs' to a process, and Message Consumer interfaces are available in Writer processors and to map from Data Interfaces as 'outputs' from a process, in the same way as Web Service inputs/outputs and JMS providers/consumers.

13.3 Defining the Interface Files

An interface file in EDQ consists of a realtimedata element which defines the message framework. For Kafka interfaces, use the following:

<realtimedata messenger=”kafka”>
  …
</realtimedata>

The realtimedata element contains three subsections:

  • The <attributes> section, defining the shape of the interface as understood by EDQ

  • The <messengerconfig> section, defining how to configure the Kafka API

  • A message format section defining how a Kafka record is mapped from or to EDQ attributes. For provider interfaces, the element is <incoming>; for consumer interfaces, the element is <outgoing>.

13.3.1 Understanding the <attributes> section

The <attributes> section defines the shape of the interface. It configures the attributes that are available in EDQ when configuring a Reader or Writer. For example, the following excerpt from the beginning of an interface file configures string and number attributes that can be used in EDQ:

<?xml version="1.0" encoding="UTF-8"?>
<realtimedata messenger="kafka">
  <attributes> 
    <attribute type="string" name="messageID"/> 
    <attribute type="string" name="name"/> 
    <attribute type="number" name="AccountNumber"/>
  </attributes> 
…

EDQ supports all the standard attribute types. These are:

  • string

  • number

  • date

  • stringarray

  • numberarray

  • datearray

13.3.2 Understanding the <messengerconfig> section

The <messengerconfig> section of the interface file configures the Kafka API. The text in <messengerconfig> is parsed as a set of Java properties.

Properties prefixed with conf. are passed directly to the Kafka API after removing the conf. prefix from the key.

Other properties which may be placed in the <messengerconfig> section for Kafka are:

  • topic: For provider interfaces, a comma or space separated list of Kafka topics to subscribe to is required. For consumer interfaces, a single topic name is required.

  • poll: The interval between polls for new records in milliseconds. This is only applicable for provider interfaces. The default value is 500.

  • key.encoding and value.encoding: The character sets used for converting record keys and values. The character sets must be recognized by java.nio.charset.Charset.forName. The default values are implementation-specific.

On a WebLogic installation, authentication information required in configuration properties may be stored in the OPSS credentials store, and used in properties with ${username} and ${password} substitution. Use the cred.key and cred.map properties to define the credentials store key and map names. The map name defaults to “edq” if omitted.

The following is an example of a complete configuration with credentials store use:

<messengerconfig>
  cred.key                     = kafka1
  topic                        = mytopic

  conf.bootstrap.servers       = kserver:9094
  confs.acks                   = all
  conf.max.block.ms            = 1000
  conf.security.protocol       = SASL_SSL
  conf.sasl.mechanism          = PLAIN
  conf.ssl.truststore.location = pathtokeystore.jks
  conf.ssl.truststore.password = pw
  conf.sasl.jaas.config        = org.apache.kafka.common.security.plain.PlainLoginModule required 
  username="${username}" password="${password}";
</messengerconfig>

The user name and password for the jaas.config property are read from the credentials store with key "kafka1".

Default properties may be defined in the realtime.properties file in the EDQ local home directory. Keys for Kafka interfaces in this file are prefixed with "kafka.". For example:

kafka.conf.security.protocol = SASL_SSL

13.3.3 Understanding the <incoming> or <outgoing> section

The <incoming> or <outgoing> section defines how record metadata and values are converted to/from EDQ attributes. It consists of the following two subsections:

13.3.3.1 Understanding the <messageheaders> section

The <messageheaders> section is optional. It defines how data outside the record value is converted to/from EDQ attributes.

The format of this section is as follows:

<messageheaders>
   <header name=”headername” attribute=”attributename”/>
  …
</messageheaders>

The Kafka interface defines two header names as follows:

  • key: For providers, the key value from a record is stored in the named EDQ attribute. For consumers, the value of the EDQ attribute is used as the record key on publish.

  • topic: The name of the Kafka topic on which a record was received is stored in the EDQ attribute. It is only applicable for consumers. This is useful if the interface is defined to subscribe to a number of different topics.

13.3.3.2 Understanding the <messagebody> section

The <messagebody> section defines how the text value in a Kafka message is converted from/to EDQ attributes. The element is followed by a subsection defining the conversion mechanism. The following conversion mechanisms are supported:

JSON Conversion

The record value is expected to be in JSON format. The nested elements define the mapping between JSON attributes and EDQ attributes.

The format is as follows:

<json [multirecord=”true or false”] [defaultmappings=”true or false”]>
  <mapping attribute=”attributename” path=”jsonattributename”/>
…
</json>

If the defaultmappings attribute is omitted or set to true, automatic mappings are created from the interface EDQ attributes to JSON attributes.

If the multirecord attribute is set to true, consumer interfaces expect the JSON input to be an array and provider interfaces to generate a JSON array.

Assuming the attributes shown in the <attributes> section description, here are some JSON conversion examples:

Example 1:

An example of a simple JSON conversion in which everything is automatic is as follows:

<json/>

A simple conversion with automatic mappings expects and generates values like:

{ "messageID": "x123",
  "name": "John Smith",
  "AccountNumber": 34567
}

Example 2:

An example of a multirecord JSON conversion with a mapping is as follows:

<json multirecord=”true”>
  <mapping attribute=”AccountNumber” path=”accno”/>
</json>

A multirecord conversion with a single attribute name mapping expects and generates values like:

[{ "messageID": "x123",
  "name": "John Smith",
  "accno": 34567
},
…
]

Script Conversion

To support more complex conversions, for example XML parsing, a JavaScript script can be provided to process the record value.

In provider interfaces, the script must define a function named "extract" which takes the string record value as an argument. The script should return an array of Record objects with attribute names matching EDQ attributes.

The following is an example parsing some XML, using the E4X XML processing API in Rhino JavaScript:

<script>
<![CDATA[
  function extract(str) {
    var r = new Record()
    var x = new XML(XMLTransformer.purifyXML(str));
  
    r.messageID     = x.ID
    r.name          = x.Accname
    r.accountNumber = parseInt(x.Accnumber)
    return [r];
  }
 ]]>
</script>

In consumer interfaces, the script must define a function named "build" which takes an array of Record objects and returns the text value.

The following is an example generating some XML:

<script>
  <![CDATA[
  function build(recs, mtags) {
    var rec = recs[0];

    var xml = 
      <response xmlns="http://www.datanomic.com/ws">
       <sum>{rec.sum}</sum>
      </response>;
           
    return xml.toXMLString();
  }
  ]]>
</script>

In a multi-record response, the default behaviour is to call the script for each record. If <script multirecord=”true”> is used, the build function is called once with all the records in the message.

For more details, refer to Illustrations, which provides an example of a provider interface file using default JSON conversion.

13.4 Illustrations

The following XML is a simple example of a provider interface file, using default JSON conversion:

<?xml version="1.0" encoding="UTF-8"?>
<realtimedata messenger="kafka">

  <attributes> 
    <attribute type="string" name="messageID"/> 
    <attribute type="string" name="name"/> 
    <attribute type="string" name="AccountNumber"/> 
    <attribute type="string" name="AccountName"/>
    <attribute type="string" name="Country"/>
  </attributes> 

  <messengerconfig>
    cred.key                     = kafka1
    topic                        = mytopic

    conf.bootstrap.servers       = kserver:9094
    confs.acks                   = all
    conf.max.block.ms            = 1000
    conf.security.protocol       = SASL_SSL
    conf.sasl.mechanism          = PLAIN
    conf.ssl.truststore.location = mykeystore,jks
    conf.ssl.truststore.password = pw
    conf.sasl.jaas.config        = org.apache.kafka.common.security.plain.PlainLoginModule required 
    username="${username}" password="${password}";
  </messengerconfig>

  <incoming> 
    <messagebody> 
      <json/>
    </messagebody> 
  </incoming> 
</realtimedata>
</realtimedata>