Resequence Messages

The Resequence Messages technical accelerator provides a means for you to ensure that messages entering the system are processed in a specific order.

Note:

Oracle Integration comes with the Resequence Messages technical accelerator automatically installed. Therefore, this accelerator will already be listed as INSTALLED on the Accelerators and Recipes page when you sign in.

Description

You often need to ensure that messages are processed in a strict order. Let's look at a pattern that enforces an order using Oracle Integration and the Resequence Messages technical accelerator. The pattern also deals with the case where you must limit the concurrency of calls to an endpoint system.

The Sequencing Problem

The basic problem is that you have a stream of requests that must be executed in order, for example, create account, update account address, and update account contacts. The latter two activities can't occur until the first one has completed.

Before you can sequence messages, you need to know the order in which the messages should be processed. So, you need to use some sort of sequencing ID. This ID could be a timestamp or an actual sequence identifier. If you're using timestamps, then the closer to the message origin that the timestamp is applied the better. For example, if you take the timestamp from when the message arrives in Oracle Integration, then a network delay may have already caused our messages to be out of order.

Typically, you don't want all messages to be in the same ordered sequence. In our account example, only messages for a given account need to be ordered. Messages for different accounts can execute in parallel. So, now you also need some sort of group ID to identify different sequence streams within your message stream.

After you have the messages and know their order, you can process them. Inherent in a resequencing solution is some sort of delay to allow messages to arrive out of order and then be sorted into order. The size of the delay specifies how much time you can accept a message to be delayed before you go ahead without it.

The Resequencing Solution

The Resequence Messages technical accelerator includes a set of integrations, connections, and scripts that use standard Oracle Integration features. The integrations that handle the message resequencing are generic. You can use and reuse the technical accelerator to resequence different types of business integrations without modification to the integrations provided in the technical accelerator package.

The Resequence Messages technical accelerator:

  • Processes the input message based on the desired sequence ID rather than on the order in which the messages arrive.

  • Parks each message in storage for a certain period of time (parking time) so that any out-of-sequence messages have a chance to be processed in the desired order.

  • Lets you configure the maximum number of message groups being processed in parallel in order to throttle the outgoing calls.

  • Takes care of all error handling, including system errors, network errors, and bad requests.

Prerequisite

The solution uses a database to store the input messages. You can create the required database tables by using a SQL script.

To create the required database tables:

  • Search and download the DDL SQL script provided in this blog.
  • Run the script to create the database tables.

Key Parameters

The Resequence Messages technical accelerator uses the following key parameters to reorder messages.

Field Description

gtype - Group Type

The type of stream. Different message types can be sequenced in parallel, for example, account updates and personnel updates are different group types.

gid - Message Group

A field in the request that identifies a specific stream of messages to be sequenced.

id - Message Identifier

A unique identifier for this message.

sequenceId - Message Sequence

A field in the request or a timestamp that is used to determine how to sequence the messages in a stream.

Parking Time

The amount of time that messages may be delayed in order to ensure messages are processed in the desired order.

Message Concurrency

The maximum number of message groups to be processed in parallel.

Connections

The Resequence Messages technical accelerator uses the following connections. After you install the accelerator, you need to configure each connection.

Connection Type Description

RSQ DB

Invoke

ATP database used by the resequencer

RSQMessageConsumer

Trigger and invoke

Used to cause load distribution of calls to message consumer

RSQManager

Trigger

Used to invoke manager interface

RSQProducer

Trigger

Used to invoke producer interface

RSQDispatcher

Trigger

Used to invoke dispatcher interface

TestService

Invoke

Used to invoke a sample test service

Architecture

Let's look at the architecture for the Resequence Messages technical accelerator.

Description of reseq-msgs-architecture.png follows
Description of the illustration reseq-msgs-architecture.png

Business Front-end Integrations

An integration in this tier serves as the front end to the resequencer and the real business integration so that the message can go through the resequencer integrations to be reordered. The front-end integrations are specific to the use case and act as a means of converting typed requests into a common format, including group and sequencing fields.

Also, the front-end integrations receive the typed business payload, extract ID, and group ID from the message before calling the standardized producer message.

Resequencer Integrations

The Resequence Messages technical accelerator includes a set of reseqencer integrations. These integrations handle the message resequencing and are generic. You can use them to resequence different types of business integrations. No additional modifications are necessary.

Resequencer Integrations Description

Producer

Serves as the entry point of the resequencer. The producer integration receives the resequencing message, creates a new row in the group table if it's not already there, and sets the status of the group to N. It then creates a message in the message table.

Sample message payload:

{ "gid": "gid", "gtype": "order", "id": "mid",
      "sequenceId": 123, "payload": "string repesentation of the payload"
    }

Group Consumer

Detects the active groups and invokes the message consumer integration. The integration is scheduled to run every minute. When scheduling, use this expression:

FREQ=MINUTELY;INTERVAL=1;

The group consumer integration finds active groups, limiting parallelism to throttle outgoing calls to prevent overloading the target system. For each active group, the integration invokes a message consumer.

Message Consumer

Processes active messages of the given group. It receives the group ID and type from the group consumer integration. It loads active messages of the group ordered by sequenceID. The messages have to be at least as old as the parking time. This ensures that there's a window for the message to arrive out of order but be processed in order.

The integration loops through active messages, marks the message status as P, and invokes the dispatcher. Note that exceptions can occur here. After the dispatcher returns for a given message it is delete and the group status may be updated to mark the group status to be C if there are no active messages, or N if there are new active messages.

Sample message payload:

{ "id": "Engineering", "type": "employee"
    }

Manager

The manager integration supervises the resequencer. It supports three operations.

Operation Path and Method Description

Get configs

Path: /configs

Method: GET

Returns the config of all the types. Example of invocation:

$ curl https://my.integration.cloud/ic/api/integration/v1/flows/rest/RSQMANAGER/1.0/configs -v -u username:password

Update config

Path: /configs/{type}

Method: PUT

Update the config for the given type. Example of invocation:

$ curl -X PUT https://my.integration.cloud/ic/api/integration/v1/flows/rest/RSQMANAGER/1.0/configs/employee -v -u username:password -H "Content-Type: application/json" -d@config.json

config.json example:

{ "maxConcurrent": 5, "timeWindow": 11 }

Recover Group

Path: /types/{type}/groups/{group}/recover

Method: PUT

Deletes stuck messages in the message table and reactivates the group by setting its status to 'N'. Example of invocation:

$ curl -X PUT https://my.integration.cloud/ic/api/integration/v1/flows/rest/RSQMANAGER/1.0/types/employee/groups/eng/recover -v -u username:password -H "Content-length: 0"

Dispatcher

The dispatcher is a request/response integration that reconstructs the original payload and sends it to the real backend integration. Unlike the resequencer integrations, the dispatcher isn't generic because it needs to invoke specific business integrations.

The dispatcher receives the message and converts the payload to the original typed business payload. It uses the group ID to find the business end point and invoke it synchronously. Exceptions can happen here.

Here is the dispatcher interface:

{ "id": "Engineering", 
"gid" : "Zebra", "gtype" : "order", "sequenceId" : 123,
"payload" : "original payload"
}

Business Integrations

A business integration is the real integration that processes the business messages. It has its own typed interface. For each business front-end integration, there should be a corresponding business integration.

Error Handling

Exceptions can occur when the dispatcher integration invokes the business integration.

Exceptions bubble up to the message consumer integration and cause the message consumer instance to fail. When this happens, the group status stays at P in the database.

Description of reseq-msgs-error1.png follows
Description of the illustration reseq-msgs-error1.png

On the Monitoring Integration page, you can see the failed dispatcher instance and the message consumer instance.

Description of reseq-msgs-monitor-error2.png follows
Description of the illustration reseq-msgs-monitor-error2.png

Recover System Error

If the problem is caused by a system error, such as a networking issue, then you can recover by resubmitting the failed message consumer instance after the system error is resolved.

Description of reseq-msgs-resubmit-error3.png follows
Description of the illustration reseq-msgs-resubmit-error3.png

Recover Bad Request

If the error is caused by a bad request, then resubmitting the request will not help. In this case, you need to skip the bad request and move on. To do this, invoke the resequencer manager integration to remove the stuck message and reactivate the group:

$ curl -X PUT https://my.integration.cloud/ic/api/integration/v1/flows/rest/RSQMANAGER/1.0/types/employee/groups/eng/recover -v -u username:password -H "Content-length: 0"