14 Using Amazon Simple Queue Service (Amazon SQS) with EDQ

This document describes how to get started using the Amazon Simple Queue Service (Amazon SQS) technology with Oracle Enterprise Data Quality (EDQ). This documentation is intended for system administrators responsible for installing and maintaining EDQ applications.

This chapter includes the following sections:

14.1 Introduction to Amazon SQS and EDQ

EDQ realtime provider and consumer buckets can be configured to use Amazon SQS queues for reading and publishing.

14.2 Configuring EDQ to Read and Write Amazon SQS Messages

Amazon SQS interfaces to and from EDQ are configured using XML interface files that define:

  • The path to the queue of messages
  • Properties that define how to work with the specific Amazon SQS technology
  • How to decode the message payload into a format understood by an EDQ process (for Message Providers – where EDQ reads messages from a queue), or convert messages to a format expected by an external process (for Message Consumers – where EDQ writes messages to a queue).

The XML files are located in the EDQ Local Home directory (formerly known as the config directory), in the following paths:

  • buckets/realtime/providers (for interfaces ‘in’ to EDQ)
  • buckets/realtime/consumers (for interfaces ‘out’ from EDQ)

Once the XML files have been configured, Message Provider interfaces are available in Reader processors in EDQ and to map to Data Interfaces as ‘inputs’ to a process, and Message Consumer interfaces are available in Writer processors, and to map from Data Interfaces as ‘outputs’ from a process.

14.3 Defining the Interface Files

An interface file in EDQ consists of a realtimedata element which defines the message framework. For Amazon SQS interfaces, use the following:

<?xml version="1.0" encoding="UTF-8"?>
 
<realtimedata messenger="sqs">
    ...
</realtimedata>

The realtimedata element contains three subsections:

  • The <attributes> section, defining the shape of the interface as understood by EDQ
  • The <messengerconfig> section, defining how to connect to the Amazon SQS queue.
  • A message format section defining how to extract contents of a message (e.g. from XML) into attribute data readable by EDQ (for inbound interfaces), or how to convert attribute data from EDQ to message data (e.g. in XML). For provider interfaces, the element is <incoming>; for consumer interfaces, the element is <outgoing>.

14.3.1 Understanding the <attributes> section

The <attributes> section defines the shape of the interface. It constitutes the attributes that are available in EDQ when configuring a Reader or Writer. For example, the following excerpt from the beginning of an interface file configures string and number attributes that can be used in EDQ:

<?xml version="1.0" encoding="UTF-8"?>
<realtimedata messenger="sqs">
<attributes>  
   <attribute type="string" name="messageID"/> 
   <attribute type="string" name="name"/> 
   <attribute type="number" name="AccountNumber"/> 
</attributes> 

[file continues]...

EDQ supports all the standard attribute types and they are:

  • string
  • number
  • date
  • stringarray
  • numberarray
  • datearray

14.3.2 Understanding the <messengerconfig> section

The following properties can be set in the <messengerconfig> section:

Property Description

queue

SQS queue URL (required).

credentials

Stored credentials name used to connect to stream; if omitted, platform (instance) authentication is used.

interval

Interval in milliseconds between polls for message reception. The default is 0 (long polls are used).

proxy

host:port proxy server for HTTPS calls.

deletemode

Controls message deletion after reception. Valid values are:

  • reception: Each message is deleted immediately on reception. This is the default.

  • completion: Each message is deleted when it has completed traversing the processes in the job.

  • off: Messages are not deleted automatically. They must be deleted manually, perhaps using the reception handle in a web service call.

MessageDeduplicationId

Deduplication ID for FIFO queues. May be overridden by header attribute. Two special values are supported:

  • $hash: The deduplication ID is computed as the SHA-256 hash of the message body.

  • $uuid: A random UUID is used as the deduplication ID.

MessageGroupId

Group ID for FIFO queues. May be overridden by header attribute.

DelaySeconds

Sending delay in seconds. May be overridden by header attribute.

MaxNumberOfMessages

Maximum number of messages to receive in one call. The default is 10.

VisibilityTimeout

The duration (in seconds) that the received messages are hidden from subsequent receive requests. The default is set in the queue definition.

WaitTimeSeconds

Wait time in seconds for long poll receive requests. The default is 20s.

Defaults can be set in realtime.properties with prefix "sqs.".

14.3.3 Understanding the <incoming> or <outgoing> section

The <incoming> or <outgoing> section defines how message metadata and values are converted to/from EDQ attributes. It consists of the following two subsections:

14.3.3.1 Understanding the <messageheaders> section

The <messageheaders> section allows attributes to be mapped to additional sending properties for transmission, and attributes to be set from message metadata on reception. Refer to the SQS Documentation for more details on each value. A non empty message header value will override a <messengerconfig> property with the same name.

The following standard headers are available:

Header name Settable Readable Type Description

DelaySeconds

yes

no

number

Sending delay in seconds

MessageDeduplicationId

yes

yes

string

Deduplication ID for FIFO queues

MessageGroupId

yes

yes

string

Group ID for FIFO queues

ApproximateFirstReceiveTimestamp

no

yes

number

Timestamp for first receive from queue

ApproximateReceiveCount

no

yes

number

Number of times message has been received

AWSTraceHeader

no

yes

string

X-Ray trace header

SenderId

no

yes

string

IAM user or role ID

SentTimestamp

no

yes

number

Sending timestamp

SequenceNumber

no

yes

number

Sequence number from SQS

MessageId

no

yes

string

Internal message ID

ReceiptHandle

no

yes

string

Receipt handle, required for manual deletion of messages

Custom message attributes

In addition to the standard headers defined above, the <messageheaders> section can also define custom attributes. Custom attribute names are prefixed with "messageattribute:" to distinguish them from standard headers.

Example custom attribute

<messageheaders>
  <header name="messageattribute:tel" attribute="telephone" type="string"/>
</messageheaders>
14.3.3.2 Understanding the <messagebody> section

This section uses JavaScript to parse message payloads into attributes that EDQ can use for inbound interfaces, and perform the reverse operation (convert EDQ attribute data into message payload data) for outbound interfaces. A function named ‘extract’ is used to extract data from XML into attribute data for inbound interfaces, and a function named ‘build’ is used to build XML data from attribute data.

For more details, refer to Illustrations, which provides an example of a complete provider bucket which can receive JSON messages from a case management filter reporting trigger.

14.4 Illustrations

The following XML is a simple example of a complete provider bucket which can receive JSON messages from a case management filter reporting trigger. The sender ID and message group ID are also returned, along with two custom message attributes, attr1 and attr2.

<?xml version="1.0" encoding="UTF-8"?>
 
<realtimedata messenger="sqs">
  <attributes>
    <attribute name="filter"      type="string"/>
    <attribute name="type"        type="string"/>
    <attribute name="xaxis"       type="string"/>
    <attribute name="yaxis"       type="string"/> 
    <attribute name="server"      type="string"/>
    <attribute name="userid"      type="number"/>
    <attribute name="user"        type="string"/>
    <attribute name="userdisplay" type="string"/>
    <attribute name="start"       type="date" format="iso"/>
    <attribute name="duration"    type="number"/>
    <attribute name="status"      type="string"/>
    <attribute name="sql"         type="string"/>
    <attribute name="arg.type"    type="stringarray"/>
    <attribute name="arg.value"   type="stringarray"/>
    <attribute name="senderid"    type="string"/>
    <attribute name="attr1"       type="string"/>
    <attribute name="attr2"       type="number"/>
    <attribute name="mgid"        type="string"/>
  </attributes>
   
  <messengerconfig>
    queue        = https://sqs.eu-west-1.amazonaws.com/458503484332/queue1
    credentials = aws1
    deletemode  = completion
  </messengerconfig>
 
  <incoming>
 
    <messageheaders>
      <header name="SenderId"               attribute="senderid"/>
      <header name="MessageGroupId"         attribute="mgid"/>
      <header name="messageattribute:attr1" attribute="attr1" type="string"/>
      <header name="messageattribute:attr3" attribute="attr2" type="number"/>
    </messageheaders>
 
    <messagebody>
      <script>
         <![CDATA[
           var simple = ["filter", "type", "xaxis", "yaxis", "server", "userid", "user", "userdisplay", "duration", "status", "sql"]
 
           function extract(str) {
             var obj = JSON.parse(str)
             var rec = new Record()
 
             for (let x of simple) {
               rec[x] = obj[x]
             }
 
             rec.start = obj.start && new Date(obj.start)
 
             if (obj.args) {
               rec['arg.type']  = obj.args.map(a => a.type)
               rec['arg.value'] = obj.args.map(a => a.value && a.value.toString())
             }
 
             return [rec];
           }
         ]]>
      </script>
    </messagebody>
 
  </incoming>
 
</realtimedata>