15 Using Oracle Cloud Infrastructure (OCI) Queue with EDQ

This document describes how to get started using the Oracle Cloud Infrastructure (OCI) Queue service with Oracle Enterprise Data Quality (EDQ). This documentation is intended for system administrators responsible for installing and maintaining EDQ applications.

This chapter includes the following sections:

15.1 Introduction to OCI Queue and EDQ

EDQ realtime provider and consumer buckets can be configured to use OCI Queue service for reading and publishing. This queue service is a much better fit for realtime processing as compared to OCI streams.

15.2 Configuring EDQ to Read and Write OCI Queue Messages

OCI Queue interfaces to and from EDQ are configured using XML interface files that define:

  • The path to the queue of messages
  • Properties that define how to work with the specific OCI Queue technology
  • How to decode the message payload into a format understood by an EDQ process (for Message Providers – where EDQ reads messages from a queue), or convert messages to a format expected by an external process (for Message Consumers – where EDQ writes messages to a queue).

The XML files are located in the EDQ Local Home directory (formerly known as the config directory), in the following paths:

  • buckets/realtime/providers (for interfaces ‘in’ to EDQ)
  • buckets/realtime/consumers (for interfaces ‘out’ from EDQ)

Once the XML files have been configured, Message Provider interfaces are available in Reader processors in EDQ and to map to Data Interfaces as ‘inputs’ to a process, and Message Consumer interfaces are available in Writer processors, and to map from Data Interfaces as ‘outputs’ from a process.

15.3 Defining the Interface Files

An interface file in EDQ consists of a realtimedata element which defines the message framework. For OCI Queue interfaces, use the following:

<?xml version="1.0" encoding="UTF-8"?>
 
<realtimedata messenger="ociqueues">
    ...
</realtimedata>

The realtimedata element contains three subsections:

  • The <attributes> section, defining the shape of the interface as understood by EDQ
  • The <messengerconfig> section, defining how to connect to OCI Queue.
  • A message format section defining how to extract contents of a message (e.g. from XML) into attribute data readable by EDQ (for inbound interfaces), or how to convert attribute data from EDQ to message data (e.g. in XML). For provider interfaces, the element is <incoming>; for consumer interfaces, the element is <outgoing>.

15.3.1 Understanding the <attributes> section

The <attributes> section defines the shape of the interface. It constitutes the attributes that are available in EDQ when configuring a Reader or Writer. For example, the following excerpt from the beginning of an interface file configures string and number attributes that can be used in EDQ:

<?xml version="1.0" encoding="UTF-8"?>
<realtimedata messenger="ociqueues">
<attributes>  
   <attribute type="string" name="messageID"/> 
   <attribute type="string" name="name"/> 
   <attribute type="number" name="AccountNumber"/> 
</attributes> 

[file continues]...

EDQ supports all the standard attribute types and they are:

  • string
  • number
  • date
  • stringarray
  • numberarray
  • datearray

15.3.2 Understanding the <messengerconfig> section

The following properties can be set in the <messengerconfig> section:

Property Description

queue

The queue OCID (required).

credentials

Stored credentials name used to connect to stream; if omitted, platform (instance) authentication is used.

interval

Interval in milliseconds between polls for message reception. The default is 0 (long polls are used).

proxy

host:port proxy server for HTTPS calls.

deletemode

Controls message deletion after reception. Valid values are:

  • reception: Each message is deleted immediately on reception. This is the default.

  • completion: Each message is deleted when it has completed traversing the processes in the job.

  • off: Messages are not deleted automatically. They must be deleted manually, perhaps using the reception handle in a web service call.

limit

Maximum number of messages to receive in one call. The default is 20.

visibilityInSeconds

The duration (in seconds) that the received messages are hidden from subsequent receive requests. The default is set in the queue definition.

timeoutInSeconds

Wait time in seconds for long poll receive requests. If omitted the OCI default of 30s is used.

Defaults can be set in realtime.properties with the prefix "ociqueues.".

15.3.3 Understanding the <incoming> or <outgoing> section

The <incoming> or <outgoing> section defines how message metadata and values are converted to/from EDQ attributes. It consists of the following two subsections:

15.3.3.1 Understanding the <messageheaders> section

The <messageheaders> section allows attributes to be mapped to additional sending properties for transmission, and attributes to be set from message metadata on reception. Refer to the OCI Queue Documentation for more details on each value. A non empty message header value will override a <messengerconfig> property with the same name.

The following standard headers are available:

Header name Settable Readable Type Description

deliveryCount

no

yes

number

The number of times the message has been delivered to a consumer.

expireAfter

no

yes

date

The time after which the message will be automatically deleted.

id

no

yes

string

The internal message ID.

receipt

no

yes

string

The Receipt token that is required for manual deletion of messages.

visibleAfter

no

yes

date

The time after which the message will be visible to other consumers.

15.3.3.2 Understanding the <messagebody> section

This section uses JavaScript to parse message payloads into attributes that EDQ can use for inbound interfaces, and perform the reverse operation (convert EDQ attribute data into message payload data) for outbound interfaces. A function named ‘extract’ is used to extract data from XML into attribute data for inbound interfaces, and a function named ‘build’ is used to build XML data from attribute data.

For more details, refer to Illustrations, which provides an example of a complete provider bucket which can receive JSON messages from a case management filter reporting trigger.

15.3.4 Illustrations

The following XML is a simple example of a complete provider bucket which can receive JSON messages from a case management filter reporting trigger. The internal message ID is also returned.

<?xml version="1.0" encoding="UTF-8"?>
 
<realtimedata messenger="ociqueues">
  <attributes>
    <attribute name="filter"      type="string"/>
    <attribute name="type"        type="string"/>
    <attribute name="xaxis"       type="string"/>
    <attribute name="yaxis"       type="string"/> 
    <attribute name="server"      type="string"/>
    <attribute name="userid"      type="number"/>
    <attribute name="user"        type="string"/>
    <attribute name="userdisplay" type="string"/>
    <attribute name="start"       type="date" format="iso"/>
    <attribute name="duration"    type="number"/>
    <attribute name="status"      type="string"/>
    <attribute name="sql"         type="string"/>
    <attribute name="arg.type"    type="stringarray"/>
    <attribute name="arg.value"   type="stringarray"/>
    <attribute name="mgid"        type="string"/>
  </attributes>
   
  <messengerconfig>
    queue        = ocid1.queue.oc1.phx.amaaaaaa7u6obfiaotsz7yuj2zcbc5uhc45evvv7wfwedgertw3543we
     credentials = OCI 1
  </messengerconfig>
 
  <incoming>
 
    <messageheaders>
      <header name="id" attribute="mgid"/>
    </messageheaders>
 
    <messagebody>
      <script>
         <![CDATA[
           var simple = ["filter", "type", "xaxis", "yaxis", "server", "userid", "user", "userdisplay", "duration", "status", "sql"]
 
           function extract(str) {
             var obj = JSON.parse(str)
             var rec = new Record()
 
             for (let x of simple) {
               rec[x] = obj[x]
             }
 
             rec.start = obj.start && new Date(obj.start)
 
             if (obj.args) {
               rec['arg.type']  = obj.args.map(a => a.type)
               rec['arg.value'] = obj.args.map(a => a.value && a.value.toString())
             }
 
             return [rec];
           }
         ]]>
      </script>
    </messagebody>
 
  </incoming>
 
</realtimedata>