Incremental Data Transfer

The following dynamic logic implements incremental data transfer by dynamically adjusting the whereClause to filter records based on the LAST_UPDATED_DATE. It checks previous exchanges related to the same integration and, if the previous exchange is completed, sets a low watermark using its creationDate to only fetch records updated since then. The process paginates through the exchanges, stopping once a completed exchange is found or no more exchanges exist.

In this example the dynamic logic is of signature 'Step Invocation Check', configured as part of the data transfer step. The dynamic logic stores an exchange property called 'whereClause' which is referred to by the configuration of the data transfer step 'where' attribute.

In reality, a failed previous exchange might not mean that the data transfer step and associated data delivery failed. Maybe the exchange failed after delivery. If the order delivery of incremental data needs to be guaranteed, this dynamic logic can be expanded upon. For instance, checking if the previous exchange delivery step completed

Example Dynamic Logic

import java.text.SimpleDateFormat
def dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
//default settings
whereClause = "GROC.STATUS='APPROVED'"
//get previous exchanges for same integration
page = 1
paginate = true
while(paginate){
	List previousExchanges = getPreviousExchange(page)
	if(previousExchanges){
		for(previousExchange in previousExchanges){
			exchangeStatus = previousExchange.exchangeStatus
			exchangeStep.addLogEntry("previousExchangeID: ${previousExchange.toString()}; status: ${exchangeStatus}")
			if(exchangeStatus == 'C'){
                whereClause = whereClause + " AND GROC.LAST_UPDATED_DATE >= TO_TIMESTAMP('${dateFormat.format(previousExchange.creationDate)}', 'YYYY-MM-DD HH24:MI:SS.FF') AND GROC.LAST_UPDATED_DATE < TO_TIMESTAMP('${dateFormat.format(exchangeStep.exchange.creationDate)}', 'YYYY-MM-DD HH24:MI:SS.FF')"
				paginate = false
				break;
            } else {
				if(!['C', 'F', 'T'
                ].contains(exchangeStatus)) {
                    //previous exchange is still in process, throw an exception
					throw new Exception("Previous exchange with id ${previousExchange.id} that is performing the same data transfer is still in process")
                }
            }
        }
		page++
    } else {
		paginate = false //stop paginating
    }
}
properties.putAll([
    "whereClause": whereClause
])
return true
def getPreviousExchange(page){
	return new SearchBuilder(Exchange.class)
			.by("id").neq(exchangeStep.exchange.id).and()
			.join("integration").by("code").eq(exchangeStep.exchange.integration.code)
			.orderBy("creationDate").descending()
			.setPageSize(5).setPage(page)
			.execute()
}