Predefined Methods
This section describes the predefined methods available per object. These are available along with the methods described in other chapters for Event Store and Transformation.
Generic Methods
This section describes generic methods that are available in any Dynamic Logic.
getSequenceNumber
Allows retrieval of a unique number from a user defined sequence. Pass the code of the user defined sequence. Note that the sequence must have been configured via the "/usersequences" resource before it can be used.
getSystemProperty
Returns the value of the property whose key is passed as input.
Availability
This method is available in the dynamic logic (groovy script).
Event Consumption Methods
Whenever the system invokes any dynamic logic, it can obtain an eventReader.
reader = eventReader('topic-1')
The above code snippet shows how you can obtain an event reader to read events stored under event type 'topic-1'.
The dynamic logic infrastructure takes care of managing the resources that are used while reading from the events from this event type.As the number of events can be quite large, an event reader exposes the results in a scrollable fashion to prevent memory exhaustion.Every event that is read through this event reader will be automatically flagged as having been read/consumed.Once the dynamic logic is complete, the resource supporting the reader is be automatically closed.Typically you operate on the scrollable event stream using standard dynamic logic idioms, as is shown in this example.
reader = eventReader('topic-1')
reader.readEvents().each { it ->
exchangeStep.addLogEntry(it.payload)
}
Note that this flagging happens in the same transaction as the reading.This means that, in case an unhandled exception occurs, none of the previously read events are flagged as having been read.This might have consequences for the way you can structure the exchange.This especially holds true for situations in which you want to call-out to other systems using REST for instance.These operations should then typically be performing their actions in an idempotent way, so that repeated operation using the same information yields the same result.
Data structure of objects returned by eventReader
The eventReader can be operated on using standard dynamic logic idioms. The scrollable event stream returns objects of type IntegrationEvent that have the following shape.
Field |
Description |
eventId |
the primary key of the event |
type |
the type of the event (aka topic) |
integrationId |
the identification of the integration which was in context of the read (aka consumer) |
payload |
the payload of the event |
timestamp |
the timestamp of the event |
correlation |
the correlation of the event |
The methods supported by the eventReader are:
readEvents()
Reads all the events in streaming fashion for event type.
Parameters
In / Out |
Type |
Description |
Out |
EventScrollableCursorAdapter (subclass of org.eclipse.persistence.queries.ScrollableCursor) |
A stream of events for the passed in event type. |
Example
reader = eventReader('topic-1')
reader.readEvents().each { it ->
exchangeStep.addLogEntry(it.payload)
}
readEvents(startDateTime, endDateTime)
Reads all the events in streaming fashion for the event type, within startDateTime and endDateTime.
Parameters
In / Out |
Type |
Description |
In |
Timestamp (Optional) |
If present, the creation date of the event should not be before this timestamp |
In |
Timestamp (Optional) |
If present, the creation date of the event should not be after this timestamp |
Out |
EventScrollableCursorAdapter (subclass of org.eclipse.persistence.queries.ScrollableCursor) |
A stream of events for the passed in event type and falling in the range of the timestamps. |
Example
import java.time.LocalDate
import java.sql.Timestamp
LocalDate now = LocalDate.now()
LocalDate tomorrow = now.plusDays(1)
timeStampStartOfDay = Timestamp.valueOf(now.atStartOfDay())
tomorrowAtEight15PM = Timestamp.valueOf(tomorrow.atTime(20,15))
reader = eventReader('junit')
reader.readEvents(timeStampStartOfDay, tomorrowAtEight15PM).each { it ->
exchangeStep.addLogEntry(it.payload)
}
readUnhandledEvents()
This method checks for the last exchange that read events of this type and same integration using the consumption tracking entity; ExchangeEvents. All events that are not in the ExchangeEvents for the same type and integration are retrieved by this method.
Availability
This method is available generically in all the dynamic logic.
Parameters
In / Out |
Type |
Description |
Out |
EventScrollableCursorAdapter (subclass of org.eclipse.persistence.queries.ScrollableCursor) |
A stream of unhandled events for the passed in event type |
Example
reader = eventReader('topic-1')
reader.readUnhandledEvents().each { it ->
exchangeStep.addLogEntry(it.payload)
}
Object Methods
In this section, the methods are described that are only available on specific objects. For each method, the following information is given:
-
The purpose of the method.
-
On which object(s) the method can be called.
-
Description of the parameters.
-
An example of the usage.
addLogEntry
The addLogEntry method can be used to add a log statement for an exchange step. The logs entered through this method are marked with log type as "User".
addResponseHeader
The addResponseHeader can be used to add response headers which are then automatically included when calling an external service or when returning the response for an Exchange that is executed in a synchronous/blocking fashion.
responseHeaders
This accessor (which is a representation of the method getResponseHeaders) can be used to obtain the complete set of response headers that have been recorded on the Exchange up-to 'this' point.
removeResponseHeader
This method an be used to selectively remove a response header or response headers from the Exchange that have been recorded up-to 'this' point.
Example
removing all response headers recorded up-to this point that are in a specific set of well known headers.
import java.util.function.*
setOfHeaders = ['header-one', 'header-two']
Predicate isHeaderToRemove = { a ->
setOfHeaders.contains(a.key)
}
exchangeStep.removeResponseHeader(isHeaderToRemove)
markForRecovery
The markForRecovery method can be used to mark the exchange as Failed to enable its current exchange step to be recovered when the exchange is recovered. This method when invoked, sets the exchange’s current exchange step (if any) for recovery along with all the exchange steps that belong to the same integration step as the current exchange step. This only happens if there is a recovery link present in the exchange properties for the current exchange steps' integration step to recover from the failed step. If there is no recovery link or there is no current exchange step, this process will return false, meaning the exchange was not marked for recovery.
This method is useful when the process is not marked for autoRecovery, but is left on the discretion of the messages attached to the process notification.
Availability
This method is available on Exchange object.
Parameters
In / Out |
Type |
Description |
Out |
Boolean |
True when the exchange is successfully marked for recovery, false otherwise. |
Example
There is an activity step invoked and the step is not marked for autoRecovery. The step’s notification contains links for messages as well as for recovery. Following dynamic logic snippet can be used to check the messages and only mark the exchange and its steps as failed if messages contain a message with code 'OHI-FATAL-001'. This dynamic logic signature is 'Step Post Process' and messages are logged as logs on the previous step.
if(exchangeStep.previousExchangeStep!=null) {
def logs = new SearchBuilder(ExchangeLog.class)
.by('exchangeStep').eq(exchangeStep.previousExchangeStep)
.and()
.by(logLine).like('%OHI-FATAL-001%').execute()
if(logs.size > 0) {
exchange.markForRecovery()
// can also add code here to check if the exchange was actually marked for recovery
}
}