Adding Logic to Handle Map/Reduce Restarts

Sometimes a script can fail because of an application server restart, like during a NetSuite update, maintenance, or an unexpected environment failure.

Restarts can forcefully stop an application at any time, so your scripts should handle restarts and recover from unexpected interruptions.

In a map/reduce script, each restarted piece of the script will automatically delete any internal map/reduce data that this piece created (for example, the key-value pairs that drive the execution of a entire mapping task). However, you must develop your own code to handle any parts of the script that modify additional data (for example, creation of NetSuite records like sales orders), which is never automatically deleted.

See the following topics to learn more about how restarts and map/reduce script execution:

Note:

A map or reduce function can also be restarted if interrupted by an uncaught error. For the script to restart in this situation, you must use the retryCount option. For additional details, see System Response After an Uncaught Error.

Example 3: Design of a Robust Map/Reduce Script Example

The following script is designed to detect restarts at specific stages in processing and includes logic to run if a restart happens.

Consider this example as a basic template, where the comment // I might do something differently denotes implementation of a special function for each stage, to ensure that the script can repeat itself with the same result. Or, to run a recovery task, such as removing duplicate records.

The script includes a check on the isRestarted property for each entry point function. If the value of isRestarted is true, the example script shows a placeholder for invoking a function. This is meant as a placeholder where you can add logic to protect against restarts and data errors.

For more information about an interrupted map/reduce stage, see Execution of Restarted Map/Reduce Stages.

          /**
 * @NApiVersion 2.x
 * @NScriptType MapReduceScript
 */
define([], function(){
    return {
        getInputData: function (context)
        {
            if (context.isRestarted)
            {
                // I might do something differently
            }
            .
            .
            .    
            return inputForNextStage;
        },
        map: function (context)
        {
            if (context.isRestarted)
            {
                // I might do something differently
            }
            .
            .
            .
        },
        reduce: function (context)
        {              
            if (context.isRestarted)
            {
                // I might do something differently
            }
            .
            .
            .
        },
        summarize: function (summary)
        {
            if (summary.isRestarted)
            {
                // I might do something differently
            }
            .
            .
            .
        }
    }
}); 

        

Example 4: A Problematic Map/Reduce Script Example

This script is meant to perform a search and process the results, but it isn't fully prepared for an unexpected restart. The script still needs logic to help avoid unrecoverable states and the creation of bad or duplicate data if it's re-executed.

In Example 4, if the script is forcefully interrupted during the map stage, some sales orders might be updated twice when the map function is re-executed. See Example 5: A Robust Map/Reduce Script Example for an improved example.

Note that the other stages in this script don't need improvement for handling a restart. If the getInput stage runs again, the map/reduce framework makes sure each result is sent to the map stage only one time. In this script, the getInput stage doesn't change other data, so no special restart logic is needed. The reduce and summarize stages also don't change extra data—they only process internal map/reduce data.

          /**
 * @NApiVersion 2.x
 * @NScriptType MapReduceScript
 */
define(['N/search', 'N/record'], 
    function(search, record){
        return {
            getInputData: function (context)
            {
                var filter1 = search.createFilter({
                    name: 'mainline', 
                    operator: search.Operator.IS, 
                    values: true
                });
                var column1 = search.createColumn({name: 'recordtype'});
                var srch = search.create({
                    type: search.Type.SALES_ORDER, 
                    filters: [filter1], 
                    columns: [column1]
                });
                return srch;
            },
            map: function (context)
            {
                var soEntry = JSON.parse(context.value);
                var so = record.load({
                    type: soEntry.values.recordtype,
                    id: context.key
                });
                // UPDATE so FIELDS
                so.save();
             
                context.write({
                    key: soEntry.values.recordtype, 
                    value: context.key 
                }); 

            },
            reduce: function (context)
            {              

                context.write({
                    key: context.key,  
                    value: context.values.length 
                }); 

            },
            summarize: function (summary)
            {
                var totalRecordsUpdated = 0;
                summary.output.iterator().each(function (key, value)
                    {
                        log.audit({
                            title: key + ' records updated', 
                            details: value
                        );
                    totalRecordsUpdated += parseInt(value);
                    return true;
                    });
                log.audit({
                    title: 'Total records updated',
                    details: totalRecordsUpdated
                });
            }
        }
    }); 

        

Example 5: A Robust Map/Reduce Script Example

Comparing Example 5 to Example 4, a filter was added to the search in the getInput stage. The purpose is to filter out already processed sales orders. The filter makes it possible to re-execute the whole map/reduce task repeatedly, because when the whole task is re-executed, the additional filter ensures that only unprocessed sales orders will be returned from the input stage and not all sales orders.

There are also substantial improvements to the map function. In Example 5, if the ((context.isRestarted === false)) condition is met, the script knows it's the first execution of the map function for the current key-value pair. It won’t need to perform any additional checks and can go directly to the sales order record update.

When updating the sales order, the script sets the custbody_processed_flag flag and only checks this flag when needed. If (context.isRestarted === true), it checks the processed flag and updates the order only if it hasn't already been updated.

The script has more checks and lookups than Example 4, but processing is still light. It uses a lookup method that doesn't load the full record, and if the processed flag is true, the record isn't loaded again.

In the map function, note that the context.write(...) statement isn't inside the if-statement because when the map function for a pair is restarted, any writes from the previous run are deleted. There's no need to check which writes have or haven't been done.

The reduce function hasn't changed from Example 4. Since the reduce stage only handles map/reduce internal data, the framework makes sure that even if the reduce function restarts, only the most recent writes for each pair go on to the next stage.

The summarize function didn't need any changes either, but it's a good practice to log any restarts. For example, you may see the "Total records updated" entry twice in the log for a single map/reduce run.

          /**
 * @NApiVersion 2.x
 * @NScriptType MapReduceScript
 */
define(['N/search', 'N/record'], 
    function(search, record){
        return {
            getInputData: function (context)
            {
                var filter1 = search.createFilter({
                    name: 'mainline', 
                    operator: search.Operator.IS, 
                    values: true
                });
                var filter2 = search.createFilter({
                    name: 'custbody_processed_flag', 
                    operator: search.Operator.IS, 
                    values: false
                });
                var column1 = search.createColumn({name: 'recordtype'});
                var srch = search.create({
                    type: search.Type.SALES_ORDER, 
                    filters: [filter1, filter2], 
                    columns: [column1]
                    });
               return srch;
            },
            map: function (context)
            {
                var soEntry = JSON.parse(context.value);
                var alreadyProcessed = false;
                if (context.isRestarted)
                {
                    var lookupResult = search.lookupFields({
                        type: soEntry.values.recordtype, 
                        id: context.key, 
                        columns: ['custbody_processed_flag']
                    });
                    alreadyProcessed = lookupResult.custbody_processed_flag;
                }
                if (!alreadyProcessed)
                {
                    var so = record.load({
                        type: soEntry.values.recordtype, 
                        id: context.key
                    });
                    // UPDATE so FIELDS
                    so.setValue({
                        fieldId: 'custbody_processed_flag', 
                        value: true
                    });
                    so.save();
                }
             
                    context.write({
                        key: soEntry.values.recordtype, 
                        value: context.key  
                    }); 
                
            },
            reduce: function (context)
            {   
                context.write({
                    key: context.key, 
                    value: context.values.length  
                }); 
                        
            },
            summarize: function (summary)
            {
                if (summary.isRestarted)
                {
                    log.audit({details: 'Summary stage is being restarted!'});
                }
                var totalRecordsUpdated = 0;
                summary.output.iterator().each(function (key, value)
                    {
                         log.audit({
                             title: key + ' records updated', 
                             details: value
                         });
                         totalRecordsUpdated += parseInt(value);
                         return true;
                     });
                log.audit({
                    title: 'Total records updated', 
                    details: totalRecordsUpdated
                    });
            }
        }
    }); 

        

Related Topics

General Notices