Sun Java System Messaging Server 6 2005Q4 MTA Developer's Reference

A Complex Dequeuing Example

The program shown in Example 4–2 is a more complicated version of the simple example (see A Simple Dequeuing Example). In this example, more than one concurrent dequeue thread is permitted. Additionally, better use is made of the context support provided by mtaDequeueStart(), and a procedure to clean up and dispose of per-thread contexts is provided.

After the Messaging Server product is installed, these programs can be found in the following location:

msg_server_base/examples/mtasdk/

Some lines of code are immediately preceded by a comment of the format:

/* See explanatory comment N */

where N is a number. The numbers are links to some corresponding explanatory text in the section that follows this code, see Explanatory Text for Numbered Comments in the Complex Dequeue Example.

For the output generated by this code, see Output from the Complex Dequeue Example.


Example 4–2 Complex Dequeue Example


/*
 * dequeue_complex.c 
 *
 * Dequeuing with more than one thread used.
 *
 */
#include <stdio.h>
#include <stdlib.h>
#if !defined(_WIN32)
#include <unistd.h>
#endif
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include "mtasdk.h"

/* See explanatory comment 1 */
typedef struct {
    int debug;    /* Debug flag                                 */
    int max_count;/* Maximum. number of messages per BSMTP file */
} my_global_context_t;

/* See explanatory comment 2 */
typedef struct { 
     int   id;         /* Dequeue threads id                    */
     FILE *fp;         /* Dequeue threads current output file   */
     int   count;      /* Messages output by this dequeue thread */
} my_thread_context_t;

static const char *NotifyToStr(int ret_type, char *buf);
static const char *UniqueName(char *buf, size_t maxbuf,
                              const char *suffix);
static mta_dq_process_done_t process_done;
static mta_dq_process_message_t process_message;

int main()
{
     my_global_context_t gctx;
     int ires;

     /*
      * Initialize the MTA SDK
      */
     if ((ires = mtaInit(0)))
     {
         mtaLog(mtaInit() returned %d; %s\n, ires, 
                mtaStrError(ires, 0));
         return(1);
     }

     /*
      *  The global context is shared by all dequeue threads
      *  calling process_message() as a result of a given call
      *  to mtaDequeueStart().  The global context in this
      *  example provides process_message() with the following:
      *    (1) How many messages to put into a BSMTP file before 
      *        closing it and starting a new one, and
      *    (2) Whether or not to produce diagnostic debug output.
      */
     /* See explanatory comment 3 */
     gctx.debug     = 1; 
     gctx.max_count = 5;

     /*  Start the dequeue loop */
     /* See explanatory comment 4 */
     ires = mtaDequeueStart((void *)&gctx, process_message,
                            process_done, 0);

     /* Check the return status */
     /* See explanatory comment 5 */
     if (!ires) 
         /* Success */
         return(0);

     /* Produce an error message */
     /* See explanatory comment 6 */
     mtaLog("mtaDequeueStart() returned %d; %s", ires,
            mtaStrError(ires, 0));
     /* And exit with an error */
     returnh(1);
}

/* process_done() -- Called by mtaDequeueStart() to clean up 
 * and destroy a per-thread context created by process_message().
 * See explanatory comment 7
 */
static void process_done(void *my_ctx_2, void *my_ctx_1)
{
     my_global_context_t *gctx = (my_global_context_t *)my_ctx_1;
     my_thread_context_t *tctx = (my_thread_context_t *)my_ctx_2;
     if (!tctx)
          return;

     /* Generate any requested diagnostic output requested? */
     /* See explanatory comment 8 */
     if (gctx && gctx->debug)
          mtaLog("Dequeue thread done: id=%d; context=%p; " 
                  "messages=%d", tctx->id, tctx, tctx->count);

     /* Now clean up and destroy the context */
     if (tctx->fp)
     {
          fprintf(tctx->fp, "QUIT\n");
          fclose(tctx->fp);
     }
     free(tctx);
}

/* 
 * process_message() -- Called by mtaDequeueStart() to process a
 *                       single message.
 * See explanatory comment 9
 */
static int process_message(void **my_ctx_2, void *my_ctx_1, 
                           mta_dq_t *dq, const char *env_from, 
                           size_t env_from_len)
{
     my_global_context_t *gctx;
     my_thread_context_t *tctx;
     int ires, ret_type;
     const char *to, *env_id, *line;
     size_t len;
     char notify_buf[100];

     /* This should never happen, but just to be safe we check */
     if (!my_ctx_1 || !my_ctx_2)
          return(MTA_ABORT);

     /* The pointer to our global context was passed as my_ctx_1 */
     /* See explanatory comment 10 */
     gctx = (my_global_context_t *)my_ctx_1;

     /* 
      *  In this example, we just use the per-thread context to:
      *  (1) Track the output file for this dequeue thread across
      *      repeated calls, and
      *  (2) to count how many messages have been output by this
      *      dequeue thread.
      * See explanatory comment 11
      */
     if (!(*my_ctx_2))
     {
         /* First call to process_message() by this dequeue thread.
          * Store a pointer to our context.
          */
         tctx = (my_thread_context_t *)
                 calloc(1, sizeof(my_thread_context_t));
         if (!tctx)
              /* Insufficient virtual memory; give up now */
              return(MTA_ABORT);
         *my_ctx_2 = (void *)tctx;

         /* Debug output? */
         if (gctx->debug)
         {
              tctx->id = mtaDequeueThreadId(dq);
              mtaLog("Dequeue thread starting: id=%d; context=%p",
                      tctx->id, tctx);
         }
     }
     else
         /*
          *  This dequeue thread has already called 
          *  process_message() previously. 
          */
         tctx = (my_thread_context_t *)(*my_ctx_2);

     /* Send a HELO or a RSET? */
     if (0 == (tctx->count % gctx->max_count))
     {
          char buf[1024];
          int fd;

          /* Need to send a HELO */

          /* Send a QUIT if weve already sent a HELO previously */
          if (tctx->count > 0 && tctx->fp)
          {
             fprintf(tctx->fp, "QUIT\n");
             fclose(tctx->fp);
             tctx->fp = NULL;
          }

          /* Now open a file */
          fd = open(UniqueName(buf, sizeof(buf), ".bsmtp"),
                    O_WRONLY | O_CREAT | O_EXCL, 0770);

          if (fd < 0 || !(tctx->fp = fdopen(fd, "w")))
              return(MTA_ABORT);

          /* Now send the HELO */
          fprintf(tctx->fp, "HELO %s\n", mtaChannelToHost(NULL,
                  NULL, MTA_DQ_CONTEXT, dq, 0));
      }
      else
      {
          /*
           *  Weve already sent a HELO. Send a RSET to start a new 
           *  message.
           */
          fprintf(tctx->fp, "RSET\n");
      }
      tctx->count++;

      /*
       *  Output the command
       *     MAIL FROM: <from-adr> RET=return-type ENVID=id
       */
      env_id = NULL;
      /* See explanatory comment 12 */
      ret_type = MTA_NOTIFY_DEFAULT;
      mtaDequeueInfo(dq, MTA_ENV_ID, &env_id, NULL, 
                     MTA_NOTIFY_FLAGS, &ret_type, 0);
      fprintf(tctx->fp, "MAIL FROM:<%s> RET=%s%s%s\n", env_from,
              NotifyToStr(ret_type, NULL), 
               (env_id ? " ENVID=" : ""),(env_id ? env_id : ""));
       /*  Output the command
        *    RCPT TO: <to-adr> NOTIFY=notify-type
        *  for each recipient address
        *  See explanatory comment 13
        */
      while (!(ires = 
                 mtaDequeueRecipientNext(dq, &to, &len,
                                         MTA_NOTIFY_FLAGS, &ret_type, 0)))
      {
           fprintf(tctx->fp, "RCPT TO:<%s> NOTIFY=%s\n", to, 
                   NotifyToStr(ret_type, notify_buf));

           /* Indicate that delivery to this recipient succeeded */
           /*  See explanatory comment 14 */
           mtaDequeueRecipientDisposition(dq, to, len,
                                          MTA_DISP_DELIVERED, 0);
     }
     /*
      *  If ires == MTA_EOF, then we exited the loop normally;
      *  otherwise, theres been an error of some sort.
      *  See explanatory comment 15
      */
     if (ires != MTA_EOF) 
          return(ires);

     /* Now output the message itself */
     fprintf(tctx->fp, "DATA\n");
     /*  See explanatory comment 16 */
     while (!(ires = mtaDequeueLineNext(dq, &line, &len)))
     {
          /* Check to see if we need to dot-stuff the link */
          if (len == 1 && line[0] == .)
          fprintf(tctx->fp, ".");

          /* Now output the line */
          /*  See explanatory comment 17 */
          fprintf(tctx->fp, "%.*s\n", len, line); 
     }

     /*
      *  If ires == MTA_EOF, then we exited the loop normally;
      *  If ires == MTA_EOF, then we exited the loop normally;
      *  otherwise, theres been an error of some sort.
      */
     if (ires != MTA_EOF)
          return(ires);

     /* Output the "." command to terminate this message */
     fprintf(tctx->fp, ".\n");

     /* And dequeue the message */
     /*  See explanatory comment 18 */
     ires = mtaDequeueMessageFinish(dq, 0);

     /* All done; might as well return ires as our result */
     return(ires);
}

/* 
 *  Convert a bitmask of MTA_NOTIFY_ flags to a readable string
 */
/*  See explanatory comment 19 */
static const char *
NotifyToStr(int ret_type, char *buf)
{
     if (!buf)
    /* Doing a RET= parameter to a MAIL FROM command */
         return((ret_type & MTA_NOTIFY_CONTENT_FULL) ? 
                 "FULL" : "HDRS");
     buf[0] = \0;

     if (ret_type & MTA_NOTIFY_SUCCESS)
          strcat(buf, "SUCCESS");

    if (ret_type & MTA_NOTIFY_FAILURE)
    {
         if (buf[0])
              strcat(buf, ",");
         strcat(buf, "FAILURE");
    }
    if (ret_type & MTA_NOTIFY_DELAY)
    {
        if (buf[0])
             strcat(buf, ",");
        strcat(buf, "DELAY");
     }

     if (!buf[0])
          strcat(buf, "NEVER");
     return(buf);
}
/* Generate a unique string suitable for use as a file name */
/*  See explanatory comment 20 */
static const char *
UniqueName(char *buf, size_t maxbuf, const char *suffix)
{
     strcpy(buf, "/tmp");
     mtaUniqueString(buf+5, NULL, maxbuf-5);
     strcat(buf, suffix);
     return(buf);

}

Explanatory Text for Numbered Comments in the Complex Dequeue Example

The numbered list that follows has explanatory text that corresponds to the numbered comments in Example 4–2:

  1. The global context data structure for this example. This is passed to mtaDequeueStart(), as the ctx1 call argument.

  2. Per-thread data structure used by dequeue threads. While mtaDequeueStart() creates each dequeue thread, it is up to the process_message() routine to actually create any per-thread context it might need.

  3. Initialize the global context before calling mtaDequeueStart().

  4. Initiate dequeue processing by calling mtaDequeueStart(). The first call argument is a pointer to the global context. Each time mtaDequeueStart() calls process_message(), it passes in the global context pointer as the second argument. In this example, mtaDequeueStart() is not told to limit the number of dequeue threads it uses.

  5. If the call to mtaDequeueStart() succeeds, the program exits normally.

  6. If the call to mtaDequeueStart() fails, then a diagnostic error message is displayed and the program exits with an error status.

  7. Each dequeue thread calls process_done() as it exits. This program cleans up and destroys any per-thread contexts created by the process_message() routine.

  8. The program generates optional diagnostic output. Calling mtaLog() directs the output to the appropriate location: stdout if the program is run manually, and the channel log file if the program is run by the Job Controller.

  9. mtaDequeueStart() calls process_message() once for each queued message to be processed. On the first call, the memory pointed at by my_ctx_2 is guaranteed to be NULL. The value of the first call argument passed to mtaDequeueStart() is passed to process_message() as the my_ctx_1 call argument.

  10. The global context contains information pertinent to all the dequeue threads generated by the call mtaDequeueStart().

  11. process_message() uses a per-thread context to save data across all calls to itself by a single dequeue thread.

  12. mtaDequeueInfo() is used to obtain the envelope ID and RFC 1891 notification flags, if any, associated with the message being processed.

  13. mtaDequeueRecipientNext() is used to obtain each envelope recipient address, one address per call. When there are no more recipient addresses to obtain, the routine returns the status MTA_EOF.

  14. Each recipient is marked as delivered with a call to mtaDequeueRecipientDisposition(). An actual channel program would typically not make this call until after processing the message further.

  15. If process_message() returns without dequeuing the message, mtaDequeueStart() defers the message for a later delivery attempt.

  16. The message header and body are read one line at a time with mtaDequeueLineNext(). When there are no more lines to read, it returns a status of MTA_EOF.

  17. Lines returned by mtaDequeueLineNext() might not be NULL terminated because the returned line pointer might point to a line in a read-only, memory-mapped file.

  18. mtaDequeueMessageFinish() is called once the message had been fully processed and the disposition of all its recipients set with mtaDequeueRecipientDisposition(). The message is not truly dequeued until this happens.

  19. The routine NotifyToStr() converts a bitmap encoded set of RFC 1891 notification flags to an ASCII text string.

  20. The UniqueName() routine generates a unique string suitable for the use as a file name. This is used to generate the unique portion of the file name. This routine can be called concurrently by multiple threads and always generates a string unique amongst all processes and threads on the system.

For information on how to run this sample program, see Running Your Enqueue and Dequeue Programs.

Output from the Complex Dequeue Example

The output that follows shows the result of 100 queued messages processed with the program in Example 4–2.


11:01:16.82: Dequeue thread starting: id=10; context=32360
11:01:16.87: Dequeue thread starting: id=1; context=32390
11:01:16.93: Dequeue thread starting: id=2; context=325e8
11:01:17.00: Dequeue thread starting: id=3; context=32600
11:01:17.04: Dequeue thread starting: id=4; context=32618
11:01:17.09: Dequeue thread starting: id=5; context=32630
11:01:17.14: Dequeue thread starting: id=6; context=78e50
11:01:17.19: Dequeue thread starting: id=7; context=88a18
11:01:17.23: Dequeue thread starting: id=9; context=8ab78
11:01:17.51: Dequeue thread starting: id=8; context=8ab60
11:01:19.96: Dequeue thread done: id=2; context=325e8; messages=12
11:01:19.96: Dequeue thread done: id=5; context=32630; messages=22
11:01:19.97: Dequeue thread done: id=6; context=78e50; messages=11
11:01:19.97: Dequeue thread done: id=4; context=32618; messages=5
11:01:19.98: Dequeue thread done: id=8; context=8ab60; messages=16
11:01:20.00: Dequeue thread done: id=9; context=8ab78; messages=5
11:01:20.00: Dequeue thread done: id=3; context=32600; messages=12
11:01:20.01: Dequeue thread done: id=1; context=32390; messages=7
11:01:20.02: Dequeue thread done: id=10; context=32360; messages=6
11:01:20.03: Dequeue thread done: id=7; context=88a18; messages=4