Sun Java System Messaging Server 6 2005Q4 MTA Developer's Reference

Chapter 4 Dequeuing Messages

Once enqueued to the MTA, messages are processed using the SDK dequeue routines. These routines provide channel programs and MTA utilities with programmatic access to queued messages. With these routines, a channel program can process its queue of messages, accessing the message’s envelope information and message content.

This chapter discusses the following dequeuing topics:

How Dequeuing Works

Channel programs wishing to dequeue messages from the MTA must associate themselves with a specific MTA channel or channels. Without this information, the MTA SDK does not know which channel queue to draw messages from. This information can be provided implicitly with the PMDF_CHANNEL environment variable, or explicitly by specifying the name of the MTA channel to process when calling mtaDequeueStart().

The dequeue process is initiated by calling the routine mtaDequeueStart(). A key piece of required information passed to mtaDequeueStart() is the address of a caller-supplied routine designed to process a single message. This routine will be repeatedly called by mtaDequeueStart() until there are no more queued messages in need of processing. One call is made per message to be processed.

Unless otherwise instructed, mtaDequeueStart() will use multiple threads of execution to process queued messages. Each thread of execution will repeatedly invoke the caller-supplied routine, once for each message to be processed. Thus, by default the caller-supplied routine is expected to be “thread-safe.” That is, it is expected to support being called simultaneously by more than one thread of execution. If the caller-supplied routine is not thread safe, then mtaDequeueStart() can be instructed to use a single thread of execution, as illustrated in A Complex Dequeuing Example.

Basic Dequeuing Steps

The following basic steps are necessary to dequeue messages:

  1. Initialize SDK resources and data structures with mtaInit().

  2. Call mtaDequeueStart(), passing it the address of the caller-supplied routine that is to be used to process each message.

    When mtaDequeueStart() is called, it does not return until all queued messages requiring processing have been processed, thus blocking the thread calling it until it is finished.

  3. For each queued message requiring processing, an execution thread created by mtaDequeueStart() calls the routine whose address was provided in Step 2.

    Threads created by mtaDequeueStart() each sequentially process multiple messages. That is, mtaDequeueStart() does not create a distinct thread for each and every queued message to be processed.

    For a list of the tasks the processing routine should do, see Caller-Supplied Processing Routine.


    Note –

    The mtaDequeueStart() routine will use one or more threads, with each thread calling the message processing routine. The maximum number of threads allowed can be set when calling mtaDequeueStart(). Consequently, a program that does not support threading should specify a maximum of one thread when it calls mtaDequeueStart.


    For a list of the tasks the processing routine should do, see Dequeue Message Processing Routine Tasks.

  4. After mtaDequeueStart() returns, deallocate SDK resources and data structures with a call to mtaDone().

Caller-Supplied Processing Routine

Channel programs typically perform some form of processing on each message they dequeue. For instance, virus scanning, MMS conversion, decryption, delivery to a proprietary messaging system, and so forth. When using the MTA SDK, channel programs must provide a routine which initiates this processing on a per message basis. That is, programs must supply a routine that to be called to process a single queued message. Throughout the rest of this text, this caller-supplied routine will be referred to as “the caller-supplied processing routine,” or, for short, “the processing routine.”

When called by one of the mtaDequeueStart() execution threads, the processing routine uses the SDK to access the message’s envelope, header, and any content. Upon completion of processing, the message is then either removed from the MTA queues, or, in the event of a temporary error, left in its queue for a later processing attempt.

Dequeue Message Processing Routine Tasks

The processing routine processes a single queued message per invocation. The specific steps that a processing routine takes are:

  1. Read the envelope recipient list with repeated calls to mtaDequeueRecipientNext().

    When mtaDequeueRecipient() returns the MTA_EOF status code, the list has been exhausted and all envelope recipient addresses have been provided. All queued messages are guaranteed by the MTA to always have at least one envelope recipient address.

  2. Read the message, both header and body, with repeated calls to mtaDequeueLineNext().

    When mtaDequeueLineNext() returns the MTA_EOF status code, the message has been exhausted; that is, there is no more message text to retrieve. The message will be an RFC 2822 conformant message. As such, the division between the message’s header and content will be demarked by a blank line (a line with a length of zero). A message may have no content; that is, a message may have just a header.

  3. Process the message.

    The processing in this step could be almost anything, including possibly enqueuing a new message or messages with the MTA SDK. The details of this step will depend upon the purpose of the program itself. Programs needing to do MIME parsing should consider using the mtaDecodeMessage() routine.

    For further information about message processing threads and caller-supplied message processing routines, see Processing the Message Queue.

  4. Report the disposition of each envelope recipient with per recipient calls to mtaDequeueRecipientDisposition(), or a single call to mtaDequeueMessageFinish() with the MTA_DISP item code.

    The following table lists the valid recipient dispositions:

    Symbolic Name  

    Description  

    MTA_DISP_DEFERRED

    Unable to process this recipient address. Processing has failed owing to a temporary problem, such as the network is down, a remote host is unreachable, or a mailbox is busy. Retry delivery for this recipient at a later time as determined by the configuration of the channel.

    MTA_DISP_DELIVERED

    Recipient address successfully delivered. Generate a delivery status notification if required.

    MTA_DISP_FAILED

    Unable to process this recipient address. Processing has failed owing to a permanent problem, such as an invalid recipient address, or recipient over quota. No further delivery attempts should be made for this recipient. Generate a non-delivery notification if required.

    MTA_DISP_RELAYED

    Recipient address forwarded to another address or sent into a non-RFC 1891 (NOTARY) mail system. The message’s NOTARY information was, however, preserved. There is no need to generate a “relayed” notification message.

    MTA_DISP_RELAYED_FOREIGN

    Recipient address forwarded to another address or gatewayed to a non-RFC 1891 (NOTARY) mail system; the messages NOTARY information was not preserved; generate a relayed notification message if required.

    MTA_DISP_RETURN

    For this recipient, return the message as undeliverable. Generate a non-delivery notification if required. This disposition is intended for use by queue management utilities. It is not intended for channel programs.

    MTA_DISP_TIMEDOUT

    Unable to process this recipient address. Processing failed due to timing out. This disposition is intended for use by the MTA Return Job. Channel programs should not use this disposition.

  5. Dequeue the message with mtaDequeueMessageFinish().

    The message is not actually removed from the channel queue until this final step. This helps ensure that mail is not lost should the channel program fail unexpectedly, or some other unexpected disaster occurs.

    When this routine is called, the resulting processing depends on the disposition of the envelope recipient addresses reported with mtaDequeueRecipientDisposition() (see Step 4 in this task list).

    If all recipients have a permanent disposition (all of the ones listed in the previous table, except MTA_DISP_DEFERRED), then any required non-delivery notifications are generated and the message is permanently removed from the MTA queue.

    If all recipients are to be deferred (MTA_DISP_DEFERRED), then no notifications are generated and the message is left in the queue for later delivery attempts.

    If, however, some recipients have a permanent disposition and others are deferred, then the following happens:

    1. Notifications are generated for those recipients with permanent dispositions that require notifications.

    2. A new message is enqueued for just the deferred recipients.

    3. The original message is removed from the queue.

      Deferred messages will not be processed by this routine more than once, unless another delivery attempt is made for the deferred message while the process is still running. How long a message is deferred is configured as part of a channel’s definition, using the backoff channel keyword.

  6. When finished, the processing routine should return with a status code of zero (0) to indicate a success, and an appropriate MTA_ status code in the event of an error.

    If the processing routine returns before calling mtaDequeueFinish(), then the message that was being handled is left in its queue for a subsequent processing attempt. It will be as if the MTA_DISP_DEFFERED disposition was set for all of the message’s recipients. This will be the case even if the processing routine returns a success status code of zero.

    In the event that the processing routine needs to abort processing of a single message, it should call mtaDequeueMessageFinish() with the MTA_ABORT flag set. If the processing routine returns with a status code of MTA_ABORT, then the execution thread that called the processing routine will perform an orderly exit. Consequently, the program can prematurely terminate itself in a graceful fashion by causing its processing routine to begin returning the MTA_ABORT status code each time it is called.

The process_message() Routine

This caller-supplied routine is invoked by the processing threads to do the actual processing of the messages.

The following code example shows the required syntax for a process_message() routine:


int process_message(void **ctx2, void *ctx1, mta_dq_t *dq_ctx,
                    const char *env_from, int env_from_len);

      

The following table lists the required arguments for a process_message routine, and gives a description of each.

Arguments  

Description  

ctx2

A writable pointer that the process_message() routine can use to store a pointer to a per-thread context. See the description that follows for further details.

ctx1

The caller-supplied private context passed as ctx1 to mtaDequeueStart().

dq_ctx

A dequeue context created by mtaDequeueStart() and representing the message to be processed by this invocation of the process_message() routine.

env_from

A pointer to the envelope From: address for the message to be processed. Since Internet messages are allowed to have zero length envelope From: addresses, this address can have zero length. The address will be NULL terminated.

env_from_len

The length in bytes of the envelope From: string. This length does not include any NULL terminator.

When a processing thread first begins running, it sets the value referenced by ctx2 to NULL. This assignment is made only once per thread and is done before the first call to the process_message() routine. Consequently, on the first call to the process_message routine by a given execution thread, the following test is true:

*ctx2 == NULL

That test will remain true until such time that the process_message() routine itself changes the value by making an assignment to *ctx2. If the process_message() routine needs to maintain state across all calls to itself by the same processing thread, it can allocate memory for a structure to store that state in, and then save a pointer to that memory with ctx2. The following code snippet demonstrates this:


int process_message(void **ctx2, void *ctx1, const char *env_from,
                   size_t env_from_len)
{
    struct our_state_t *state;

    state = (our_state_t *)(*ctx2);
    if (!state)
    {
        /*
         * First call for this thread.
         * Allocate a structure in which to store the state
         * information
         */
        state = (our_state_t *)calloc(1, sizeof(our_state_t));
        if (!state) return(MTA_ABORT);
        *ctx2 = (void *)state;

        /*
         * Set any appropriate initial values for the state
         * structure
         */
       ...
    }
...

      

For a sample process_message() routine, see the example code in the section that follows.

A Simple Dequeuing Example

The program shown in Example 4–1 constitutes a simplified batch-SMTP channel that reads messages from a message queue, converting each message to batch SMTP format, and writes the result to stdout. If the conversion is successful, then the message is dequeued, otherwise it is deferred.

Some lines of code are immediately preceded by a comment of the format:

/* See explanatory comment N */

where N is a number.

The numbers are links to some corresponding explanatory text in the section that follows this code, see Explanatory Text for Numbered Comments in the Simple Dequeue Example. Find the sample output in Output from the Simple Dequeue Example.


Example 4–1 Simple Dequeue Example


/*  dequeue_simple.c -- A simple dequeue example: write BSMTP to stdout
 */
#include <stdio.h>
#include <stdlib.h>
#include "mtasdk.h"

static mta_dq_process_message_t process_message;

int main()
{
     int ires;

     /*
      * Initialize the MTA SDK
      */
     if ((ires = mtaInit(0)))
     {
         mtaLog(mtaInit() returned %d; %s\n, ires, 
                mtaStrError(ires, 0));
         return(1);
     }

     /*
      *  Start the dequeue loop.  Since this example uses stdout 
      *  for output, we indicate that we only support a single
      *  thread:
      *  (MTA_THREAD_MAX_THREADS = 1).
      */
     /*  See explanatory comment 1  */
     ires = mtaDequeueStart(NULL, process_message, NULL,
                            MTA_THREAD_MAX_THREADS, 1, 0);

     /*  
      *  Check the return status 
      */
     /*  See explanatory comment 2  */
     if (!ires) 
          /* Success */
          return(0);

     /* 
      *  Print an error message to stderr
      */
     /*  See explanatory comment 3  */
     mtaLog("mtaDequeueStart() returned %d; %s\n", ires, 
             ires, mtaStrError(ires, 0));

     /* And exit with an error */
     return(1);
}

/*  See explanatory comment 4  */
static int process_message(void **my_ctx_2, void *my_ctx_1, 
                           mta_dq_t *dq, const char *env_from,
                           size_t env_from_len)
{
     int ires;
     const char *to, *line;
     size_t len;

     /*  See explanatory comment 5  */
     if (!(*my_ctx_2))
     {
          *my_ctx_2 = (void *)1;
          printf("HELO\n");
     }
     else
          printf("RSET\n");

     /*  Output the command:
      *     MAIL FROM: <from-adr> 
      */
     printf("MAIL FROM:<%s>\n", env_from);

     /* 
      *  Output the command:
      *     RCPT TO: <to-adr> 
      *  for each recipient address 
      */
     /*  See explanatory comment 6  */
     while (!(ires = mtaDequeueRecipientNext(dq, &to, 
                                             &len, 0)))
     {
          printf("RCPT TO:<%s>\n", to);
          /*  See explanatory comment 7  */
          mtaDequeueRecipientDisposition(dq, to, len,
                                         MTA_DISP_DELIVERED, 0);
     }

     /* 
      *  If ires == MTA_EOF, then we exited the loop normally; 
      *  otherwise, theres been an error of some sort. 
      */
     if (ires != MTA_EOF)
          /*  See explanatory comment 8  */
          return(ires);
     /* 
      *  Now output the message itself
      */
     printf("DATA\n");
     /*  See explanatory comment 9  */
     while (!(ires = mtaDequeueLineNext(dq, &line, &len)))
         /*  See explanatory comment 10  */
         printf("%.*s\n", len, line);

     /* 
      *  If ires == MTA_EOF, then we exited normally;
      *  otherwise, theres been an error of some sort. 
      */
     if (ires != MTA_EOF)
         /*  See explanatory comment 8  */
         return(ires); 

     /* 
      *  Output the . command to terminate this message
      */
     printf(".\n");

     /* 
      *  And dequeue the message
      */
     /*  See explanatory comment 11  */
     ires = mtaDequeueMessageFinish(dq, 0);

     /* 
      *  All done; return ires as our result 
      */
     /*  See explanatory comment 12  */
     return(ires);
}

Explanatory Text for Numbered Comments in the Simple Dequeue Example

The numbered explanatory text that follows corresponds to the numbered comments in Example 4–1:

  1. To start the dequeue processing, mtaDequeueStart() is called, and it calls process_message(), which processes each queued message. Since process_message() uses stdout for its output, only one message can be processed at a time. To effect that behavior, mtaDequeueStart() is called with the MTA_THREAD_MAX_THREADS set to one.

  2. If the call to mtaDequeueStart() succeeds, the program exits normally.

  3. If the call to mtaDequeueStart() fails, a diagnostic error message is displayed and the program exits with an error status.

  4. process_message() is called by mtaDequeueStart() for each queued message.

  5. The private context in process_message() tracks whether or not this is the first time the routine has been called. On the first call, the memory pointed at by my_ctx_2 is guaranteed to be NULL.

  6. The routine obtains each envelope recipient address, one at a time, using calls to mtaDequeueRecipientNext().

  7. Each recipient is marked as delivered using mtaDequeueRecipientDispostion(). An actual channel program would typically not make this call until after processing the message further.

  8. If process_message() returns without first dequeuing the message, mtaDequeueStart() defers the message for a later delivery attempt.

  9. The routine calls mtaDequeueLineNext() to read the message header and body, one line at a time. When there are no more lines to read, mtaDequeueLineNext() returns a status of MTA_EOF. When a line is read successfully, mtaDequeueLineNext() returns a status of MTA_OK.

  10. The lines returned by mtaDequeueLineNext() might not be NULL terminated because the returned line pointer might reference a line in a read-only, memory-mapped file.

  11. Once the message has been processed and all the disposition of all recipients set, mtaDequeueMessageFinish() is called. This actually dequeues the message.

  12. When all message processing is complete, process_message() exits. It is called again for each additional message to be processed.

Output from the Simple Dequeue Example


HELO
MAIL FROM:<sue@siroe.com\>
RCPT TO:<dan@siroe.com\>
DATA
Received:from siroe.com by siroe.com (SunONE Messaging Server 6.0)id
 <01GP37SOPRW0A9KZFV@siroe.com\>; Fri, 21 Mar 2003 09:07:32 -0800(PST)
Date: Fri, 21 Mar 2003 09:07:41 -0800 (PST)
From: postmaster@siroe.com
To: root@siroe.com
Subject: mtasdk_example1.c
Message-id: <01GP37SOPRW2A9KZFV@siroe.com\>
Content-type: TEXT/PLAIN; CHARSET=US-ASCII
Content-transfer-encoding: 7BIT

Hello
  world!
.
QUIT

            

Processing the Message Queue

This section describes the steps undertaken by each execution thread created by mtaDequeueStart(). Each execution thread processes a subset of the channel’s queued messages by repeatedly calling the caller-supplied processing routine, process_message().

To process queued messages, a processing thread takes the following steps:

  1. The thread sets ctx2 to have the value NULL:

    ctx2 = NULL;

    For information on the process_message() arguments, see The process_message() Routine.

  2. The execution thread communicates with the Job Controller to obtain a message file to process. If there are no more message files to process, then go to Step 9.

  3. For the message file, the execution thread creates a dequeue context that maintains the dequeue processing state for that message file.

  4. The execution thread then invokes the caller-supplied process_message() routine, passing to it the dequeue context created in Processing the Message Queue, as shown in the example that follows:

    istat = process_message(&ctx2, ctx1, &dq_ctx, env_from, env_from_len);

    For information on the call arguments for process_message(), see The process_message() Routine.

  5. The process_message() routine then attempts to process the message, ultimately removing it from the channel’s queue, or leaving the message file for a later processing attempt.

  6. If mtaDequeueMessageFinish() was not called before process_message() returned, then the queued message is deferred. That is, its underlying message file is left in the channel’s queue and a later processing attempt is scheduled.

  7. The dequeue context is destroyed.

  8. If the process_message() routine did not return the MTA_ABORT status code, then repeat this cycle starting at Step 2.

  9. If a caller-supplied process_done() routine was passed to mtaDequeueStart(), it is called now, for example:

  10. process_done(&ctx2, ctx1);

    Through the process_done() routine, the program can perform any cleanup necessary for the execution thread. For example, freeing up any private context and associated resources stored in the ctx2 call argument.

    For a description of the process_done() routine, see The process_done() Routine, as well as process_done() Routine.

  11. The thread exits.

    For an example of how state (context) may be preserved within an execution thread and across calls to process_message(), A Complex Dequeuing Example.

The process_done() Routine

To assist in cleaning up state information for a thread, callers can provide a routine pointed to by the process_done call argument of mtaDequeueStart().

The following code example shows the required syntax for a process_done() routine.


void process_done(void *ctx2, void *ctx1)

The following table lists the arguments required for a process_done() routine, and gives a description of each.

Required Arguments  

Description  

ctx2

The value of the last pointer stored by process_message() in the ctx2 call argument for this thread.

ctx1

The caller-supplied private context passed as ctx1 to mtaDequeueStart().

The following code example demonstrates the type of actions taken by a process_done routine.


void process_done(ctx2, ctx1)
{
    struct our_state_t *state = (struct our_state_t *)ctx2;
    if (!state)
         return;
    /*
     * Take steps to undo the state
     * (for example, close any sockets or files)
     */
    ...

    /*
     * Free the memory allocated by process_message()
     * to store the state
     */
    free(state)
}

A Complex Dequeuing Example

The program shown in Example 4–2 is a more complicated version of the simple example (see A Simple Dequeuing Example). In this example, more than one concurrent dequeue thread is permitted. Additionally, better use is made of the context support provided by mtaDequeueStart(), and a procedure to clean up and dispose of per-thread contexts is provided.

After the Messaging Server product is installed, these programs can be found in the following location:

msg_server_base/examples/mtasdk/

Some lines of code are immediately preceded by a comment of the format:

/* See explanatory comment N */

where N is a number. The numbers are links to some corresponding explanatory text in the section that follows this code, see Explanatory Text for Numbered Comments in the Complex Dequeue Example.

For the output generated by this code, see Output from the Complex Dequeue Example.


Example 4–2 Complex Dequeue Example


/*
 * dequeue_complex.c 
 *
 * Dequeuing with more than one thread used.
 *
 */
#include <stdio.h>
#include <stdlib.h>
#if !defined(_WIN32)
#include <unistd.h>
#endif
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include "mtasdk.h"

/* See explanatory comment 1 */
typedef struct {
    int debug;    /* Debug flag                                 */
    int max_count;/* Maximum. number of messages per BSMTP file */
} my_global_context_t;

/* See explanatory comment 2 */
typedef struct { 
     int   id;         /* Dequeue threads id                    */
     FILE *fp;         /* Dequeue threads current output file   */
     int   count;      /* Messages output by this dequeue thread */
} my_thread_context_t;

static const char *NotifyToStr(int ret_type, char *buf);
static const char *UniqueName(char *buf, size_t maxbuf,
                              const char *suffix);
static mta_dq_process_done_t process_done;
static mta_dq_process_message_t process_message;

int main()
{
     my_global_context_t gctx;
     int ires;

     /*
      * Initialize the MTA SDK
      */
     if ((ires = mtaInit(0)))
     {
         mtaLog(mtaInit() returned %d; %s\n, ires, 
                mtaStrError(ires, 0));
         return(1);
     }

     /*
      *  The global context is shared by all dequeue threads
      *  calling process_message() as a result of a given call
      *  to mtaDequeueStart().  The global context in this
      *  example provides process_message() with the following:
      *    (1) How many messages to put into a BSMTP file before 
      *        closing it and starting a new one, and
      *    (2) Whether or not to produce diagnostic debug output.
      */
     /* See explanatory comment 3 */
     gctx.debug     = 1; 
     gctx.max_count = 5;

     /*  Start the dequeue loop */
     /* See explanatory comment 4 */
     ires = mtaDequeueStart((void *)&gctx, process_message,
                            process_done, 0);

     /* Check the return status */
     /* See explanatory comment 5 */
     if (!ires) 
         /* Success */
         return(0);

     /* Produce an error message */
     /* See explanatory comment 6 */
     mtaLog("mtaDequeueStart() returned %d; %s", ires,
            mtaStrError(ires, 0));
     /* And exit with an error */
     returnh(1);
}

/* process_done() -- Called by mtaDequeueStart() to clean up 
 * and destroy a per-thread context created by process_message().
 * See explanatory comment 7
 */
static void process_done(void *my_ctx_2, void *my_ctx_1)
{
     my_global_context_t *gctx = (my_global_context_t *)my_ctx_1;
     my_thread_context_t *tctx = (my_thread_context_t *)my_ctx_2;
     if (!tctx)
          return;

     /* Generate any requested diagnostic output requested? */
     /* See explanatory comment 8 */
     if (gctx && gctx->debug)
          mtaLog("Dequeue thread done: id=%d; context=%p; " 
                  "messages=%d", tctx->id, tctx, tctx->count);

     /* Now clean up and destroy the context */
     if (tctx->fp)
     {
          fprintf(tctx->fp, "QUIT\n");
          fclose(tctx->fp);
     }
     free(tctx);
}

/* 
 * process_message() -- Called by mtaDequeueStart() to process a
 *                       single message.
 * See explanatory comment 9
 */
static int process_message(void **my_ctx_2, void *my_ctx_1, 
                           mta_dq_t *dq, const char *env_from, 
                           size_t env_from_len)
{
     my_global_context_t *gctx;
     my_thread_context_t *tctx;
     int ires, ret_type;
     const char *to, *env_id, *line;
     size_t len;
     char notify_buf[100];

     /* This should never happen, but just to be safe we check */
     if (!my_ctx_1 || !my_ctx_2)
          return(MTA_ABORT);

     /* The pointer to our global context was passed as my_ctx_1 */
     /* See explanatory comment 10 */
     gctx = (my_global_context_t *)my_ctx_1;

     /* 
      *  In this example, we just use the per-thread context to:
      *  (1) Track the output file for this dequeue thread across
      *      repeated calls, and
      *  (2) to count how many messages have been output by this
      *      dequeue thread.
      * See explanatory comment 11
      */
     if (!(*my_ctx_2))
     {
         /* First call to process_message() by this dequeue thread.
          * Store a pointer to our context.
          */
         tctx = (my_thread_context_t *)
                 calloc(1, sizeof(my_thread_context_t));
         if (!tctx)
              /* Insufficient virtual memory; give up now */
              return(MTA_ABORT);
         *my_ctx_2 = (void *)tctx;

         /* Debug output? */
         if (gctx->debug)
         {
              tctx->id = mtaDequeueThreadId(dq);
              mtaLog("Dequeue thread starting: id=%d; context=%p",
                      tctx->id, tctx);
         }
     }
     else
         /*
          *  This dequeue thread has already called 
          *  process_message() previously. 
          */
         tctx = (my_thread_context_t *)(*my_ctx_2);

     /* Send a HELO or a RSET? */
     if (0 == (tctx->count % gctx->max_count))
     {
          char buf[1024];
          int fd;

          /* Need to send a HELO */

          /* Send a QUIT if weve already sent a HELO previously */
          if (tctx->count > 0 && tctx->fp)
          {
             fprintf(tctx->fp, "QUIT\n");
             fclose(tctx->fp);
             tctx->fp = NULL;
          }

          /* Now open a file */
          fd = open(UniqueName(buf, sizeof(buf), ".bsmtp"),
                    O_WRONLY | O_CREAT | O_EXCL, 0770);

          if (fd < 0 || !(tctx->fp = fdopen(fd, "w")))
              return(MTA_ABORT);

          /* Now send the HELO */
          fprintf(tctx->fp, "HELO %s\n", mtaChannelToHost(NULL,
                  NULL, MTA_DQ_CONTEXT, dq, 0));
      }
      else
      {
          /*
           *  Weve already sent a HELO. Send a RSET to start a new 
           *  message.
           */
          fprintf(tctx->fp, "RSET\n");
      }
      tctx->count++;

      /*
       *  Output the command
       *     MAIL FROM: <from-adr> RET=return-type ENVID=id
       */
      env_id = NULL;
      /* See explanatory comment 12 */
      ret_type = MTA_NOTIFY_DEFAULT;
      mtaDequeueInfo(dq, MTA_ENV_ID, &env_id, NULL, 
                     MTA_NOTIFY_FLAGS, &ret_type, 0);
      fprintf(tctx->fp, "MAIL FROM:<%s> RET=%s%s%s\n", env_from,
              NotifyToStr(ret_type, NULL), 
               (env_id ? " ENVID=" : ""),(env_id ? env_id : ""));
       /*  Output the command
        *    RCPT TO: <to-adr> NOTIFY=notify-type
        *  for each recipient address
        *  See explanatory comment 13
        */
      while (!(ires = 
                 mtaDequeueRecipientNext(dq, &to, &len,
                                         MTA_NOTIFY_FLAGS, &ret_type, 0)))
      {
           fprintf(tctx->fp, "RCPT TO:<%s> NOTIFY=%s\n", to, 
                   NotifyToStr(ret_type, notify_buf));

           /* Indicate that delivery to this recipient succeeded */
           /*  See explanatory comment 14 */
           mtaDequeueRecipientDisposition(dq, to, len,
                                          MTA_DISP_DELIVERED, 0);
     }
     /*
      *  If ires == MTA_EOF, then we exited the loop normally;
      *  otherwise, theres been an error of some sort.
      *  See explanatory comment 15
      */
     if (ires != MTA_EOF) 
          return(ires);

     /* Now output the message itself */
     fprintf(tctx->fp, "DATA\n");
     /*  See explanatory comment 16 */
     while (!(ires = mtaDequeueLineNext(dq, &line, &len)))
     {
          /* Check to see if we need to dot-stuff the link */
          if (len == 1 && line[0] == .)
          fprintf(tctx->fp, ".");

          /* Now output the line */
          /*  See explanatory comment 17 */
          fprintf(tctx->fp, "%.*s\n", len, line); 
     }

     /*
      *  If ires == MTA_EOF, then we exited the loop normally;
      *  If ires == MTA_EOF, then we exited the loop normally;
      *  otherwise, theres been an error of some sort.
      */
     if (ires != MTA_EOF)
          return(ires);

     /* Output the "." command to terminate this message */
     fprintf(tctx->fp, ".\n");

     /* And dequeue the message */
     /*  See explanatory comment 18 */
     ires = mtaDequeueMessageFinish(dq, 0);

     /* All done; might as well return ires as our result */
     return(ires);
}

/* 
 *  Convert a bitmask of MTA_NOTIFY_ flags to a readable string
 */
/*  See explanatory comment 19 */
static const char *
NotifyToStr(int ret_type, char *buf)
{
     if (!buf)
    /* Doing a RET= parameter to a MAIL FROM command */
         return((ret_type & MTA_NOTIFY_CONTENT_FULL) ? 
                 "FULL" : "HDRS");
     buf[0] = \0;

     if (ret_type & MTA_NOTIFY_SUCCESS)
          strcat(buf, "SUCCESS");

    if (ret_type & MTA_NOTIFY_FAILURE)
    {
         if (buf[0])
              strcat(buf, ",");
         strcat(buf, "FAILURE");
    }
    if (ret_type & MTA_NOTIFY_DELAY)
    {
        if (buf[0])
             strcat(buf, ",");
        strcat(buf, "DELAY");
     }

     if (!buf[0])
          strcat(buf, "NEVER");
     return(buf);
}
/* Generate a unique string suitable for use as a file name */
/*  See explanatory comment 20 */
static const char *
UniqueName(char *buf, size_t maxbuf, const char *suffix)
{
     strcpy(buf, "/tmp");
     mtaUniqueString(buf+5, NULL, maxbuf-5);
     strcat(buf, suffix);
     return(buf);

}

Explanatory Text for Numbered Comments in the Complex Dequeue Example

The numbered list that follows has explanatory text that corresponds to the numbered comments in Example 4–2:

  1. The global context data structure for this example. This is passed to mtaDequeueStart(), as the ctx1 call argument.

  2. Per-thread data structure used by dequeue threads. While mtaDequeueStart() creates each dequeue thread, it is up to the process_message() routine to actually create any per-thread context it might need.

  3. Initialize the global context before calling mtaDequeueStart().

  4. Initiate dequeue processing by calling mtaDequeueStart(). The first call argument is a pointer to the global context. Each time mtaDequeueStart() calls process_message(), it passes in the global context pointer as the second argument. In this example, mtaDequeueStart() is not told to limit the number of dequeue threads it uses.

  5. If the call to mtaDequeueStart() succeeds, the program exits normally.

  6. If the call to mtaDequeueStart() fails, then a diagnostic error message is displayed and the program exits with an error status.

  7. Each dequeue thread calls process_done() as it exits. This program cleans up and destroys any per-thread contexts created by the process_message() routine.

  8. The program generates optional diagnostic output. Calling mtaLog() directs the output to the appropriate location: stdout if the program is run manually, and the channel log file if the program is run by the Job Controller.

  9. mtaDequeueStart() calls process_message() once for each queued message to be processed. On the first call, the memory pointed at by my_ctx_2 is guaranteed to be NULL. The value of the first call argument passed to mtaDequeueStart() is passed to process_message() as the my_ctx_1 call argument.

  10. The global context contains information pertinent to all the dequeue threads generated by the call mtaDequeueStart().

  11. process_message() uses a per-thread context to save data across all calls to itself by a single dequeue thread.

  12. mtaDequeueInfo() is used to obtain the envelope ID and RFC 1891 notification flags, if any, associated with the message being processed.

  13. mtaDequeueRecipientNext() is used to obtain each envelope recipient address, one address per call. When there are no more recipient addresses to obtain, the routine returns the status MTA_EOF.

  14. Each recipient is marked as delivered with a call to mtaDequeueRecipientDisposition(). An actual channel program would typically not make this call until after processing the message further.

  15. If process_message() returns without dequeuing the message, mtaDequeueStart() defers the message for a later delivery attempt.

  16. The message header and body are read one line at a time with mtaDequeueLineNext(). When there are no more lines to read, it returns a status of MTA_EOF.

  17. Lines returned by mtaDequeueLineNext() might not be NULL terminated because the returned line pointer might point to a line in a read-only, memory-mapped file.

  18. mtaDequeueMessageFinish() is called once the message had been fully processed and the disposition of all its recipients set with mtaDequeueRecipientDisposition(). The message is not truly dequeued until this happens.

  19. The routine NotifyToStr() converts a bitmap encoded set of RFC 1891 notification flags to an ASCII text string.

  20. The UniqueName() routine generates a unique string suitable for the use as a file name. This is used to generate the unique portion of the file name. This routine can be called concurrently by multiple threads and always generates a string unique amongst all processes and threads on the system.

For information on how to run this sample program, see Running Your Enqueue and Dequeue Programs.

Output from the Complex Dequeue Example

The output that follows shows the result of 100 queued messages processed with the program in Example 4–2.


11:01:16.82: Dequeue thread starting: id=10; context=32360
11:01:16.87: Dequeue thread starting: id=1; context=32390
11:01:16.93: Dequeue thread starting: id=2; context=325e8
11:01:17.00: Dequeue thread starting: id=3; context=32600
11:01:17.04: Dequeue thread starting: id=4; context=32618
11:01:17.09: Dequeue thread starting: id=5; context=32630
11:01:17.14: Dequeue thread starting: id=6; context=78e50
11:01:17.19: Dequeue thread starting: id=7; context=88a18
11:01:17.23: Dequeue thread starting: id=9; context=8ab78
11:01:17.51: Dequeue thread starting: id=8; context=8ab60
11:01:19.96: Dequeue thread done: id=2; context=325e8; messages=12
11:01:19.96: Dequeue thread done: id=5; context=32630; messages=22
11:01:19.97: Dequeue thread done: id=6; context=78e50; messages=11
11:01:19.97: Dequeue thread done: id=4; context=32618; messages=5
11:01:19.98: Dequeue thread done: id=8; context=8ab60; messages=16
11:01:20.00: Dequeue thread done: id=9; context=8ab78; messages=5
11:01:20.00: Dequeue thread done: id=3; context=32600; messages=12
11:01:20.01: Dequeue thread done: id=1; context=32390; messages=7
11:01:20.02: Dequeue thread done: id=10; context=32360; messages=6
11:01:20.03: Dequeue thread done: id=7; context=88a18; messages=4

Intermediate processing channels

Special attention is warranted for intermediate processing channels. Intermediate processing channels are channels which re-enqueue back to the MTA the mail they dequeue from it. For example, a virus scanner or a conversion channel, which, after scanning or converting a message, re-enqueues it back to the MTA for further routing or delivery. Such channels should do the following:

The sample code, Intermediate Channel Example, illustrates the SDK usage required to effect the first two preceding points.

Preserve Envelope Information

All queued messages have envelope fields which are unique to the message. For instance, a message will have the RFC 1891 envelope ID that was either assigned by the MTA when the message was first enqueued, or was specified by a remote MTA and transmitted over SMTP. The same applies to the RFC 1891 original recipient address fields that specify the original form of each of the message’s envelope recipient addresses. Furthermore, there may be other envelope fields which have non-default settings such as notification handling flags. Whenever possible, this information should be preserved as the message flows from MTA channel to MTA channel. In order to preserve this information, it must be copied from the message being dequeued to the new message being enqueued. This copying process is best done using the MTA_DQ_CONTEXT item code in conjunction with the mtaEnqueueStart() and mtaEnqueueTo() routines. When used with the former, it causes per-message envelope information to be automatically copied from the message being dequeued to the new message being enqueued. When used with the latter, it causes per-recipient information to be automatically copied.

Channel programs should not attempt to explicitly copy envelope information other than the envelope From: and envelope recipient addresses. The MTA_DQ_CONTEXT item code should always be used to implicitly perform the copy. The reason for this is straightforward: if a program attempts to do the copy explicitly by querying the fields one by one from the message being dequeued, and then setting them one by one in the message being enqueued, then any new envelope fields introduced in later versions of Messaging Server will be lost unless the program is updated to explicitly know about those new fields too.

Use MTA_ENV_TO

Intermediate processing channels should use the MTA_ENV_TO item code with mtaEnqueueTo() rather than the MTA_TO, MTA_CC, and MTA_BCC item codes. This tells the MTA that the recipient address being specified should be added to only the message’s envelope and not also to a Resent-To:, Resent-Cc:, or Resent-Bcc: header line. Example 4–3, and Example 5–2 illustrate the use of the MTA_ENV_TO item code. Both of those examples represent intermediate processing channels which are handling a previously constructed message. As such, they do not need to alter the existing message header.

Use Rewrite Rules to Prevent Message Loops

Finally, intermediate processing channels often require special rewrite rules in order to prevent message loops. Specifically, loops in which mail re-enqueued by the intermediate processing channel is queued back to the intermediate processing channel. See Preventing Mail Loops when Re-enqueuing Mail for further information on this topic.

Intermediate Channel Example

The sample program in this section, in Example 4–3, converts the body of each queued message and then re-enqueues the converted messages back to the MTA. The conversion process involves applying the “rot 13” encoding used by some news readers to encode potentially offensive message content.

To configure the MTA to run this channel, see Running Your Enqueue and Dequeue Programs. Also refer to Preventing Mail Loops when Re-enqueuing Mail, which discusses configuring special rewrite rules for programs re-enqueuing dequeued email.

Some lines of code in this example are immediately preceded by a comment of the format:

/* See explanatory comment N */

where N is a number.

The numbers are links to some corresponding explanatory text found in Explanatory Text for Numbered Comments in the Intermediate Channel Example.


Example 4–3 Intermediate Channel Example


/* intermediate_channel.c
 * A channel program that re-enqueues queued messages after first
 * transforming their content with the "rot13" transformation.
 */
#include <stdio.h>
#include <stdlib.h>
#include "mtasdk.h"

typedef struct {
     size_t  maxlen;
     char   *buf;
} rot13_buf_t;

static mta_dq_process_done_t process_done;
static mta_dq_process_message_t process_message;
static char rot13(char c);
static const char *rot13str(rot13_buf_t **dst, const char *src,
                            size_t srclen);

int main()
{
     int ires;

     /*
      * Initialize the MTA SDK
      */
     if ((ires = mtaInit(0)))
     {
         mtaLog(mtaInit() returned %d; %s\n, ires, 
                mtaStrError(ires, 0));
         return(1);
     }

     /*
      *  Start the dequeue loop
      *  See explanatory comment 1
      */
     ires = mtaDequeueStart(NULL, process_message, 
                            process_done, 0); 

      /*
       *  Check the return status
       * See explanatory comment 2
       */
      if (!ires)
           /*
            *  Success
            */
           return(0);
      /*
       *  Produce an error message
       * See explanatory comment 3 */
       */
      mtaLog("mtaDequeueStart() returned %d; %s", ires,
              mtaStrError(ires, 0));

      /*
       *  And exit with an error
       */
      return(1);
}

/* 
 *  process_done -- Clean up the private context my_ctx_2 used by
 *                  process_message.
 * See explanatory comment 4
 */
static void process_done(void *my_ctx_2, void *my_ctx_1)
{
     rot13_buf_t *rbuf;

     if (!my_ctx_2)
          return;
     rbuf = (rot13_buf_t *)my_ctx_2;
     if (rbuf->buf)
          free(rbuf->buf);
     free(rbuf);
}


/* 
 *  process_message -- Process a single message by re-enqueuing but 
 *                     with its message body converted to the rot13 
 *                     set. The private my_ctx_1 context is not 
 *                     used. The private my_ctx_2 context is used 
 *                     for a rot13 translation context.
 * See explanatory comment 5
 */

static int process_message(void **my_ctx_2, void *my_ctx_1,
                           mta_dq_t *dq,
{
     size_t len;
     const char *line, *to;
     int in_header;
     mta_nq_t *nq;

     /*
      *  Start a message enqueue
      */
     nq = NULL;
     /* See explanatory comment 6 */
     if (mtaEnqueueStart(&nq, env_from, env_from_len, 
          MTA_DQ_CONTEXT, dq, 0)) 
             goto(defer);

     /*
      *  Process the envelope recipient list
      *  See explanatory comment 7 */
      */
     while (!mtaDequeueRecipientNext(dq, &to, &len, 0))
          /* See explanatory comment 7 */
          if (mtaEnqueueTo(nq, to, len, MTA_DQ_CONTEXT, dq, 
              MTA_ENV_TO, 0) ||
              /* See explanatory comment 8 */
              mtaDequeueRecipientDisposition(dq, to, len, 
                                            MTA_DISP_DELIVERED,0))
               /* See explanatory comment 9 */
               goto defer; 
      if (mta_errno != MTA_EOF)
           goto defer;

      /* 
       *  First, get the messages header and write it
       *  unchanged to the new message being enqueued.
       * See explanatory comment 10
       */
      in_header = 1;
      while (in_header && !mtaDequeueLineNext(dq, &line, &len))
      {
           if (mtaEnqueueWriteLine(nq, line, len, 0))
                goto defer;
           if (!len)
                in_header = 0;
      }

      /*
       *  Determine why we exited the while loop
       */
      if (in_header)
      {
          /*
           *  We exited before seeing the body of the message
           * See explanatory comment 12
           */
          if (mta_errno == MTA_EOF) 
               /*
                *  Message read completely: it must have no body
                */
               goto done;
          else
               /*
                *  Error condition of some sort
                */
               goto defer;
       }

       /* 
        *  Now rot13 the body of the message
        * See explanatory comment 13
        */
       while (!mtaDequeueLineNext(dq, &line, &len))
             if (mtaEnqueueWriteLine(nq,
                             rot13str((rot13_buf_t **)my_ctx_2, 
                             line, len), len, 0))
                  goto defer;

       /*
        * If mta_errno == MTA_EOF, then we exited the loop 
        * normally; otherwise, theres been an error of some sort
        */
       if (mta_errno != MTA_EOF)
            goto defer;

       /*
        *  All done, enqueue the new message
        * See explanatory comment 14
        */
  done:
       if (!mtaEnqueueFinish(nq, 0) && 
           !mtaDequeueMessageFinish(dq, 0))
            return(0);
       /*
        *  Fall through to defer the message
        */
       nq = NULL;

       /*
        *  A processing error of some sort has occurred: defer the
        *  message for a later delivery attempt
        * See explanatory comment 15
        */
  defer: 
       mtaDequeueMessageFinish(dq, MTA_ABORT, 0);
       if (nq)
            mtaEnqueueFinish(nq, MTA_ABORT, 0);
       return(MTA_NO);
}

/* 
 *  rot13 -- an implmentation of the rotate-by-13 translation
 * See explanatory comment 16
 */
static char rot13(char c)
{
     if (A <= c && c <= Z) 
        return (((c - A + 13) % 26) + A);
     else if (a <= c && c <= z)
        return (((c - a + 13) % 26) + a);
     else return (c);
}

/* 
 *  rot13str -- Perform a rot13 translation on a string of text
 *  See explanatory comment 17
 */
static const char *rot13str(rot13_buf_t **dst, const char *src, 
                            size_t srclen)
{
     size_t i;
     char *ptr;
     rot13_buf_t *rbuf = *dst;

     /*
      *  First call?  If so, then allocate a rot13_buf_t structure
      */
     if (!rbuf)
     {
          rbuf = calloc(1, sizeof(rot13_buf_t));
          if (!rbuf)
               return(NULL);
          *dst = rbuf;
     }

     /*
      *  Need a larger buffer? 
      *  If so, then increase the length of rbuf->buf
      */
     if (rbuf->maxlen < srclen || !rbuf->buf)
     {
          size_t l;
          char *tmp;
          /* Round size up to the nearest 2k */
          l = 2048 * (int)((srclen + 2047) / 2048);
          tmp = (char *)malloc(l);
          if (!tmp)
               return(NULL);
          if (rbuf->buf)
               free(rbuf->buf);
          rbuf->buf    = tmp;
          rbuf->maxlen = l;
      }
      /*
       *  Now rot13 our input
       */
      ptr = rbuf->buf;
      for (i = 0; i < srclen; i++)
           *ptr++ = rot13(*src++);

      /*
       *  All done
       */
      return(rbuf->buf);
}

Explanatory Text for Numbered Comments in the Intermediate Channel Example

  1. The dequeue processing is initiated by calling mtaDequeueStart(). In this example, no global context is used; hence, the first call argument to mtaDequeueStart() is NULL.

  2. If the call to mtaDequeueStart() succeeds, then the program exits normally.

  3. If the call to mtaDequeueStart() fails, a diagnostic error message is displayed and the program exits with an error status.

  4. Each dequeue thread calls process_done() as it exits. The intent is to allow the program to clean up and destroy any per-thread contexts created by the process_message() routine. In this case, the buffer used by rot13str() is deallocated.

  5. The mtaDequeueStart() routine calls process_message() once for each queued message to be processed. On the first call by a dequeue thread, the memory pointed at by my_ctx_2 is NULL.

  6. A message enqueue starts. The dequeue context, dq, is provided so that per-message envelope fields can be carried over to the new message from the message being dequeued.

  7. Each envelope recipient address is obtained, one at a time, with mtaDequeueRecipientNext(). When there are no more recipient addresses to obtain, mtaDequeueRecipientNext() returns the status MTA_EOF.

  8. Each envelope recipient address is added to the recipient list for the new message being enqueued. The MTA_ENV_TO option for mtaEnqueueTo() is specified so that the address is to be added to the new message’s envelope only. It should not also be added to the message’s RFC 822 header. The new message’s header will be a copy of the header of the message being dequeued. This copy is performed at the code location marked by comment 12.

  9. Each recipient is marked as delivered with mtaDequeueRecipientDisposition().

  10. In the event of an error returned from either mtaEnqueueTo() or mtaDequeueRecipientDisposition(), or an unexpected error return from mtaDequeueRecipientNext(), the ongoing enqueue is cancelled and the processing of the current message is deferred.

  11. Each line of the current message is read and then copied to the new message being enqueued. This copying continues until a blank line is read from the current message. (A blank line signifies the end of the RFC 822 message header and the start of the RFC 822 message content.)

  12. The code here needs to determine why it exited the read loop: because of an error, or because the transition from the message’s header to body was detected.

  13. The remainder of the current message is read line by line and copied to the new message being enqueued. However, the line enqueued is first transformed using the “rot13” transformation. The per-thread context my_ctx_2 is used to hold an output buffer used by the rot13str() routine.

  14. The enqueue of the new message is finished. If that step succeeds, then the message being dequeued is removed from the MTA queues.

  15. In the event of an error, the new message enqueue is cancelled and the current message left in the queues for later processing.

  16. The rot13 character transformation.

  17. A routine that applies the rot13 transformation to a character string.

Sample Input Message for the Intermediate Channel Example

The example that follows is a sample input message from the queue to be processed by the program found in Example 4–3.


Received: from frodo.west.siroe.com by frodo.west.siroe.com
 (Sun Java System Messaging Server 6 2004Q2(built Mar 24 2004))id
<0HCH00301E6GO700@frodo.west.siroe.com\> for sue@sesta.com; Fri,
 28 Mar 2003 14:51:52 -0800 (PST)
Date: Fri, 28 Mar 2003 14:51:52 -0800 (PST)
From: root@frodo.west.siroe.com
Subject: Testing
To: sue@sesta.com
Message-id: <0HCH00303E6GO700@frodo.west.siroe.com\>
MIME-version: 1.0

This is a test message.

Output from the Intermediate Channel Example

This example shows the output generated by the dequeue and re-enqueue program (Example 4–3).


Received: from sesta.com by frodo.west.siroe.com
 (Sun Java System Messaging Server 6 2004Q2 (built Mar 24 2003))id
<0HCH00301E7DOH00@frodo.west.wiroe.com\> for sue@sesta.com; Fri,
 28 Mar 2003 14:51:58 -0800 (PST)
Received: from frodo.west.siroe.com by frodo.west.siroe.com
 (Sun Java System Messaging Server 6 2004Q2 (built Mar 24 2003))id
<0HCH00301E7DOH00@frodo.west.wiroe.com\> for sue@sesta.com; Fri,
 28 Mar 2003 14:51:52 -0800 (PST)
Date: Fri, 28 Mar 2003 14:51:52 -0800 (PST)
From: root@frodo.west.siroe.com
Subject: Testing
To: sue@sesta.com
Message-id: <0HCH00303E6GO700@frodo.west.siroe.com\>
MIME-version: 1.0

Guvf vf n grfg zrffntr.

Thread Creation Loop in mtaDequeueStart()

After mtaDequeueStart() performs any necessary initialization steps, it then starts a loop whereby it communicates with the MTA Job Controller. Based upon information from the Job Controller, it then creates zero or more execution threads to process queued messages.

While any execution threads are running, the thread that invoked mtaDequeueStart()(the primal thread) executes a loop containing a brief pause (that is, a sleep request). Each time the primal thread awakens, it communicates with the Job Controller to see if it should create more execution threads. In addition, the Job Controller itself has logic to determine if more threads are needed in the currently running channel program, or if it should create additional processes to run the same channel program.

To demonstrate, the following code example shows pseudo-code of the mtaDequeueStart() loop.


threads_running = 0
threads_max = MTA_THREAD_MAX_THREADS
attemtps    = MTA_JBC_MAX_ATTEMPTS

LOOP:
    while (threads_running < threads_max)
    {

      Go to DONE if a shut down has been requested

      pending_messages = Ask the Job Controller how many
                         messsages there are to be processed

      // If there are no pending messages
      // then consider what to do next
      if (pending_messages = 0)
      {
          // Continue to wait?
          if (attempts <= 0)
              go to DONE

          // Decrement attempts and wait
          attempts = attempts - 1;
          go to SLEEP
      }
      // Reset the attempts counter
      attempts = MTA_JBC_MAX_ATTEMPTS

      threads_needed = Ask the Job Controller how many
                       processing threads are needed

      // Cannot run more then threads_max threads per process
      if (threads_needed \> threads_max)
          threads_needed = threads_max

      // Create additional threads if needed
      if (threads_needed \> threads_running)
      {
         Create (threads_needed - threads_running) more threads
         threads_running = threads_needed
      }
    }

SLEEP:
    Sleep for MTA_JBC_RETRY_INTERVAL seconds
     -- a shut down request will cancel the sleep
    go to LOOP

DONE:
    Wait up to MTA_THREAD_WAIT_TIMEOUT seconds
    for all processing threads to exit

    Return to the caller of mtaDequeueStart()

Multiple Calls to mtaDequeueStart()

A channel program can call mtaDequeueStart() multiple times, either sequentially or in parallel. In the latter case, the program would need to create threads so as to effect multiple, simultaneous calls to mtaDequeueStart(). However, just because this can be done does not mean that it is appropriate to do so. In the former case of multiple sequential calls, there is no need to be making repeated calls. When mtaDequeueStart() returns, the channel no longer needs immediate processing and has been in that state for the number of seconds represented by the following formula:

MTA_JBC_ATTEMPTS_MAX * MTA_JBC_RETRY_INTERVAL

Instead, the channel program should exit thereby freeing up system resources. The Job Controller will start a new channel program running when there are more messages to process.

In the latter case of multiple parallel calls, there is again no need to do so. If there is an advantage to running more threads than a single call generates, then the channel’s threaddepth channel keyword setting should be increased so that a single call does generate more threads.

The only exception to either of these cases might be if the multiple calls are each for a different channel. Even then, however, the advantage of so doing is dubious as the same effect can be achieved through the use of multiple processes, one for each channel.

Calling Order Dependencies

When you are constructing programs, there is a calling order for the MTA SDK routines that must be observed; some routines must be called before others.

Figure 4–1 visually depicts the calling order dependency of the message dequeue routines. To the right of each routine name appears a horizontal line segment, possibly broken across a column, for example, mtaDequeueRecipientNext(). Routines for which two horizontal line segments, one atop the other, appear are required routines; that is, routines that must be called in order to successfully enqueue a message. The required routines are mtaInit(), mtaDequeueStart(), mtaDequeueRecipientNext(), and mtaDqueueMessageFinish().

To determine at which point a routine may be called, start in the leftmost column and work towards the rightmost column. Any routine whose line segment lies in the first (leftmost) column may be called first. Any routine whose line segment falls in the second column may next be called, after which any routine whose line segment falls in the third column may be called, and so forth. When more than one routine appears in the same column, any or all of those routines may be called in any order. Progression from left to right across the columns is mandated by the need to call the required routines.

After calling mtaDequeueRewind(), the read point into the underlying queued message file is reset to the start of the message’s outermost header; that is, you’re back in the third column.

Figure 4–1 Calling Order Dependency for Message Dequeue Routines

Calling order dependency for message dequeue routines. mtaInit,
mtaDequeueStart, mtaDequeueRecipientNext, and mtaDequeueMessageFinish are required.