Adding Logic to Handle Map/Reduce Restarts

Occasionally, a script failure may occur due to an application server restart. This could be due to a NetSuite update, NetSuite maintenance, or an unexpected failure of the execution environment.

Restarts can terminate an application forcefully at any moment. Therefore, robust scripts must account for restarts and be able to recover from an unexpected interruption.

In a map/reduce script, each restarted piece of the script will automatically delete any internal map/reduce data that this piece created (for example, the key-value pairs that drive the execution of a entire mapping task). However, you must develop your own code to handle any parts of the script that modify additional data (for example, creation of NetSuite records like sales orders), which is never automatically deleted.

See the following topics to learn more about how restarts and map/reduce script execution:

Note:

A map or reduce function can also be restarted if interrupted by an uncaught error. For the script to restart in this situation, you must use the retryCount option. For additional details, see System Response After an Uncaught Error.

Example 3: Design of a Robust Map/Reduce Script Example

The following script is designed to detect restarts at particular stages in processing, and to hold logic to run in the event of a restart.

Consider this example as a basic template, where the comment // I might do something differently denotes implementation of a special function for each stage, to ensure that the script can repeat itself with the same result. Or, to run a recovery task, such as removing duplicate records.

The script includes a check on the isRestarted property for each entry point function. If the value of isRestarted is true, the example script shows a placeholder for invoking a function. This is a meant as a placeholder where implementation of logic for protection against restarts and data errors could be inserted.

For more information about an interrupted map/reduce stage, see Execution of Restarted Map/Reduce Stages.

          /**
 * @NApiVersion 2.x
 * @NScriptType MapReduceScript
 */
define([], function(){
    return {
        getInputData: function (context)
        {
            if (context.isRestarted)
            {
                // I might do something differently
            }
            .
            .
            .    
            return inputForNextStage;
        },
        map: function (context)
        {
            if (context.isRestarted)
            {
                // I might do something differently
            }
            .
            .
            .
        },
        reduce: function (context)
        {              
            if (context.isRestarted)
            {
                // I might do something differently
            }
            .
            .
            .
        },
        summarize: function (summary)
        {
            if (summary.isRestarted)
            {
                // I might do something differently
            }
            .
            .
            .
        }
    }
}); 

        

Example 4: A Problematic Map/Reduce Script Example

The purpose of this script is to perform a search and process the results. However, it is not adequately prepared for an unexpected restart. The script still needs logic to help prevent an unrecoverable state and prevent creation of erroneous or duplicate data during re-execution.

In Example 4, if the script is forcefully interrupted during the map stage, some sales orders might be updated twice when the map function is re-executed. See Example 5: A Robust Map/Reduce Script Example for an improved example.

Note that the other stages in this script do not require improvement for handling a restart. If the get input stage is re-executed, the map/reduce framework ensures that each result of the search is passed to the map stage only one time. In this script, the getInput stage does not change any additional data, so no special restart logic is needed to ensure correct updates of getInput data. Likewise, the reduce and summarize stages do not change any additional data. They process only internal map/reduce data.

          /**
 * @NApiVersion 2.x
 * @NScriptType MapReduceScript
 */
define(['N/search', 'N/record'], 
    function(search, record){
        return {
            getInputData: function (context)
            {
                var filter1 = search.createFilter({
                    name: 'mainline', 
                    operator: search.Operator.IS, 
                    values: true
                });
                var column1 = search.createColumn({name: 'recordtype'});
                var srch = search.create({
                    type: search.Type.SALES_ORDER, 
                    filters: [filter1], 
                    columns: [column1]
                });
                return srch;
            },
            map: function (context)
            {
                var soEntry = JSON.parse(context.value);
                var so = record.load({
                    type: soEntry.values.recordtype,
                    id: context.key
                });
                // UPDATE so FIELDS
                so.save();
             
                context.write({
                    key: soEntry.values.recordtype, 
                    value: context.key 
                }); 

            },
            reduce: function (context)
            {              

                context.write({
                    key: context.key,  
                    value: context.values.length 
                }); 

            },
            summarize: function (summary)
            {
                var totalRecordsUpdated = 0;
                summary.output.iterator().each(function (key, value)
                    {
                        log.audit({
                            title: key + ' records updated', 
                            details: value
                        );
                    totalRecordsUpdated += parseInt(value);
                    return true;
                    });
                log.audit({
                    title: 'Total records updated',
                    details: totalRecordsUpdated
                });
            }
        }
    }); 

        

Example 5: A Robust Map/Reduce Script Example

Comparing Example 5 to Example 4, a filter was added to the search in the getInput stage. The purpose is to filter out already processed sales orders. The filter makes it possible to re-execute the whole map/reduce task repeatedly, because when the whole task is re-executed, the additional filter ensures that only unprocessed sales orders will be returned from the input stage and not all sales orders.

There are also substantial improvements to the map function. In Example 5, if the ((context.isRestarted === false)) condition is met, the script knows it is the first execution of the map function for the current key-value pair. It won’t need to perform any additional checks and can go directly to the sales order record update.

During the sales order update, an operation sets the custbody_processed_flag flag. The script performs a check on this flag only as necessary. If (context.isRestarted === true), then the script looks up the appropriate processed flag value, and executes the sales order update only if it wasn't already updated.

Although the script includes more checks and lookups than example 4, the processing demand is light. To perform the check, the script uses a lookup method that doesn't load the full record. If the processed flag value is true, then the record is not loaded again.

In the map function, note that the context.write(...) statement is not in the if-statement body. It is because when a map function for a particular key-value pair is restarted, all these writes done in the previous execution of the map function are deleted. So there is no need to check which writes have or haven't been done.

The reduce function is not changed from Example 4. This reduce stage handles only the map/reduce internal data, and so the map/reduce framework ensures that even when the reduce function is restarted for a particular key-value pair, only the writes from its last execution for the key-value pair are passed to the next stage.

The summarize function also didn't require improvement. However, it is a good practice to log any restarts. For example, to account for when the "Total records updated" entry appears twice in the execution log for a single map/reduce task execution.

          /**
 * @NApiVersion 2.x
 * @NScriptType MapReduceScript
 */
define(['N/search', 'N/record'], 
    function(search, record){
        return {
            getInputData: function (context)
            {
                var filter1 = search.createFilter({
                    name: 'mainline', 
                    operator: search.Operator.IS, 
                    values: true
                });
                var filter2 = search.createFilter({
                    name: 'custbody_processed_flag', 
                    operator: search.Operator.IS, 
                    values: false
                });
                var column1 = search.createColumn({name: 'recordtype'});
                var srch = search.create({
                    type: search.Type.SALES_ORDER, 
                    filters: [filter1, filter2], 
                    columns: [column1]
                    });
               return srch;
            },
            map: function (context)
            {
                var soEntry = JSON.parse(context.value);
                var alreadyProcessed = false;
                if (context.isRestarted)
                {
                    var lookupResult = search.lookupFields({
                        type: soEntry.values.recordtype, 
                        id: context.key, 
                        columns: ['custbody_processed_flag']
                    });
                    alreadyProcessed = lookupResult.custbody_processed_flag;
                }
                if (!alreadyProcessed)
                {
                    var so = record.load({
                        type: soEntry.values.recordtype, 
                        id: context.key
                    });
                    // UPDATE so FIELDS
                    so.setValue({
                        fieldId: 'custbody_processed_flag', 
                        value: true
                    });
                    so.save();
                }
             
                    context.write({
                        key: soEntry.values.recordtype, 
                        value: context.key  
                    }); 
                
            },
            reduce: function (context)
            {   
                context.write({
                    key: context.key, 
                    value: context.values.length  
                }); 
                        
            },
            summarize: function (summary)
            {
                if (summary.isRestarted)
                {
                    log.audit({details: 'Summary stage is being restarted!'});
                }
                var totalRecordsUpdated = 0;
                summary.output.iterator().each(function (key, value)
                    {
                         log.audit({
                             title: key + ' records updated', 
                             details: value
                         });
                         totalRecordsUpdated += parseInt(value);
                         return true;
                     });
                log.audit({
                    title: 'Total records updated', 
                    details: totalRecordsUpdated
                    });
            }
        }
    }); 

        

Related Topics

Map/Reduce Script Error Handling
System Response After a Map/Reduce Interruption
Configuration Options for Handling Map/Reduce Interruptions
Logging Errors
Execution of Restarted Map/Reduce Stages

General Notices