OIG Stash
This page describes how to use OIG Stash in Dynamic Logic
What is an OIG Stash?
An OIG Stash is a temporary storage area, like a digital checklist, used during long-running processes. It helps the system keep track of its progress. If a job is interrupted for example, due to a system failure, it can use the stash to pick up where it left off, rather than starting over from the beginning. This prevents duplicate work (like sending the same email twice) and makes processes more efficient and reliable.
OIG Stash can be used to support recoverability, partial progress tracking, and progress monitoring for long-running or large-volume processing.
| Do not create stash items and mark those same items as processed in the same exchange sub-step. Always separate insertion and processing into different OIG sub-steps. |
Use Case 1: Recoverability
Retrieve a list of email addresses and send a message to each address using an API. If the job terminates halfway, the process must restart without sending duplicate emails.
Step 1: Store the Data
The first step stores the data in the stash. This step stores the data in the stash. If it fails, all stash inserts are rolled back, as the step is a single transaction.
// Loop through the source data and store it in the stash
def myStash = createStash('email')
sourcedata.each { row ->
myStash.store(key: row.memberId, value: row)
}
Step 2: Process the Data
The second step sends the emails and marks each item as processed when the result is successful, or as errored on failure.
When the exchange is recovered, previously processed items are skipped and errored items are retried.
// Loop through the existing stash
def myStash = openStash('email')
myStash.readUnprocessed().each { stashItem ->
// send email
if (resultOK) {
stashItem.markSuccess()
} else {
stashItem.markError('Email delivery failed')
}
}
Use Case 2: Data Merge
Receive a file of group members (employees) and terminate the policies of members who are not listed in the file.
Step 1: Load Existing Members
Retrieves existing members from Policies, for example by using the Data Transfer interface.
// Loop through the source data and store it in a stash
def memberStash = createStash('Members')
sourcedata.each { row ->
memberStash.store(key: row.memberId, value: row)
}
Step 2: Load New Members
Load the new group member file into a second stash.
// Loop through the group file
def empStash = createStash('Employees')
sourcedata.each { row ->
empStash.store(key: row.memberId, value: row)
}
Step 3: Compare and Process
There is no need to mark items as processed because terminating a member is idempotent and can safely be repeated after recovery.
// Open the stashes created in the previous steps
def memberStash = openStash('Members')
def empStash = openStash('Employees')
memberStash.readUnprocessed().each { stashItem ->
String key = stashItem.data.memberId
if (empStash.existsByKey(key)) {
// Key is found in the employee file: no action
} else {
// Invoke REST service to terminate member
}
}
Use Case 3: Process a set of items
Assume a provider file with multiple records per provider. The records can be stored in the stash as shown below.
| Key | Unique Key | Value |
|---|---|---|
PROV-123 |
1234 |
{"code":"PROV-123","location":"Rochester"} |
PROV-123 |
5678 |
{"code":"PROV-123","location":"Cleveland"} |
PROV-125 |
9871 |
{"code":"PROV-125","location":"Boston"} |
| Stash values can be primitive types, maps, custom objects, CSV records, or XML elements. |
The records are collected in groups of 50, and a single REST call is made for each group.
// Loop through the stash
def myStash = openStash('Providers')
while (true) {
def provider = myStash.getFirstUnprocessed()
if (!provider) {
break // No rows: processing is done
}
def providerKey = provider.key
def stashList = []
myStash.readUnprocessedByKey(providerKey).each { stashItem ->
stashList << stashItem
if (stashList.size() >= 50) {
// Process this list, for example by building one REST call
if (result == Error) {
// Terminate this step with an error
}
// Mark these 50 items as done
stashList.each { item ->
item.markSuccess()
}
stashList = []
}
}
// Process remaining items
if (stashList.size() > 0) {
// Process this list
if (result == Error) {
// Terminate this step with an error
}
// Mark these items as done
stashList.each { item ->
item.markSuccess()
}
}
}
Use Case 4: Progress Monitoring
Users can monitor the progress of each step by retrieving summary data for all stashes.
| Stash | Records | Processed | Processed in last minute |
|---|---|---|---|
myStashName |
10,000 |
2,000 (20%) |
1,000 (10%) |
This progress information can be accessed through the Stash Progress Summary IP. For more details, see Stash Integration Point.
Use Case 5: Error Reporting
The exchange creates a report of stash processing results by listing all errored items and their associated error messages.
def errors = [:]
def myStash = openStash('Providers')
myStash.readByStatus(errored).each { stashItem ->
errors[stashItem.key] = stashItem.extraInfo
}
if (errors) {
// send HTTP request to workflow system containing errors
}
readByStatus() can be used for items with status processed or errored.
If unprocessed items must be reported, use readUnprocessed().
|
Case 6: Transform File
The system processes items and creates a data file at the end. If an error occurs, the process restarts from the beginning without generating the data file. To ensure all items are included, stash items should not be marked as processed while writing the file. Otherwise, recovery could skip items that still need to be included in the regenerated output file.
Step 1 : Store Data in Stash
Stores the external data in the stash and discards duplicates.
def myStash = createStash('Provider')
sourcedata.each { row ->
myStash.store(uniqueKey: row.providerId, value: row, onDuplicate: discard)
}
Step 2: Transform Data and Write to File
import org.apache.commons.csv.*
def csvPrinter = outputWriters.getDataFileOutputWriter().csvPrinter(
CSVFormat.DEFAULT.withHeader('providerId', 'providerName')
)
def myStash = openStash('Provider')
myStash.readUnprocessed().each { item ->
csvPrinter.printRecord(item.data.providerId, item.data.providerName)
}