Sun Java System Messaging Server 6 2005Q4 MTA Developer's Reference

Multiple Calls to mtaDequeueStart()

A channel program can call mtaDequeueStart() multiple times: either sequentially or in parallel. In the latter case, the program would need to create threads so as to effect multiple, simultaneous calls to mtaDequeueStart(). However, just because this can be done does not mean that it is appropriate to do so. In the former case of multiple sequential calls, there’s no need to be making repeated calls. When mtaDequeueStart() returns, the channel no longer needs immediate processing and has been in that state for

MTA_JBC_ATTEMPTS_MAX * MTA_JBC_RETRY_INTERVAL

seconds. Instead, the channel program should exit thereby freeing up system resources. The Job Controller will start a new channel program running when there are more messages to process. In the latter case of multiple parallel calls, there is again no need to do so. If there is an advantage to running more threads than a single call generates, then the channel’s threaddepth channel keyword setting should be increased so that a single call does generate more threads. The only exception to either of these cases might be if the multiple calls are each for a different channel. Even then, however, the advantage of so doing is dubious as the same effect can be achieved through the use of multiple processes, one for each channel.

Message Processing

When mtaDequeueStart() is called, a communication path with the MTA Job Controller is established. The Job Controller is then asked if there are messages to be processed for the channel. Typically there will be messages to process since it is the Job Controller that normally starts channel programs, and it does so when there are queued messages in need of processing. Based upon information obtained from the Job Controller, mtaDequeueStart() will then begin to create non-joinable processing threads. Each processing thread immediately begins processing the queued messages.

Message Processing Procedure

To process queued messages, a processing thread takes the following steps:

  1. The thread sets ctx2 to have the value NULL:

    ctx2 = NULL;

    For information on the process_message arguments, see process_message() Routine

  2. The thread communicates with the Job Controller to obtain a message file to process. If there are no more message files to process, then go to Message Processing Procedure.

  3. For the message file, the thread creates a dequeue context that maintains the dequeue processing state for that message file.

  4. The thread then invokes the caller-supplied process_message routine, passing to it the dequeue context created in Message Processing Procedure, for example:

    istat = process_message(&ctx2, ctx1, &dq_ctx, env_from, env_from_len);

    For a description of the process_message routine, see process_message() Routine

  5. The process_message routine then attempts to process the message, ultimately removing it from the channel’s queues or leaving the message file for a later processing attempt.

  6. If mtaDequeueMessageFinish() was not called before the process_message routine returned, then the queued message is deferred. That is, its underlying message file is left in the channel’s queue and a later processing attempt is scheduled.

  7. The dequeue context is destroyed.

  8. If the process_message routine did not return the MTA_ABORT status code, then repeat this cycle starting at Message Processing Procedure.

  9. The caller-supplied process_done routine is called, for example:

    process_done(&ctx2, ctx1);

    For a description of the process_done routine, see process_done() Routine

  10. The thread exits.

process_message() Routine

This caller-supplied routine is invoked by the processing threads to do the actual processing of the messages.

The following code fragment shows the required syntax for a process_message routine.


int process_message(void      **ctx2,
                    void       *ctx1,
                    mta_dq_t   *dq_ctx,
                    const char *env_from,
                    int         env_from_len);

The following table lists the required arguments for a process_message routine, and gives a description of each.

Arguments  

Description  

ctx2

A writable pointer that the process_message routine can use to store a pointer to a per-thread context. See the description that follows for further details.

ctx1

The caller-supplied private context passed as ctx1 to mtaDequeueStart().

dq_ctx

A dequeue context created by mtaDequeueStart() and representing the message to be processed by this invocation of the process_message routine.

env_from

A pointer to the envelope From: address for the message to be processed. Since Internet messages are allowed to have zero length envelope From: addresses, this address can have zero length. The address will be NULL terminated.

env_from_len

The length in bytes of the envelope From: string. This length does not include any NULL terminator.

When a processing thread first begins running, it sets the value referenced by ctx2 to NULL. This assignment is made only once per thread and is done before the first call to the process_message routine. Consequently, on the first call to the process_message routine, the following test is true:

*ctx2 == NULL

That test will remain true until such time that the process_message routine itself changes the value by making an assignment to *ctx2. As demonstrated in the following code fragment, if the process_message routine needs to maintain state across calls to itself by the same processing thread, it can allocate memory for a structure to store that state in, and then save a pointer to that memory with ctx2.


int process_message(void **ctx2, void *ctx1,
                    const char *env_from, size_t env_from_len)
{
    struct our_state_t *state;

    state = (our_state_t *)(*ctx2);
    if (!state)
    {
        /*
         * First call for this thread.
         * Allocate a structure in which to store the state
         * information
         */
        state = (our_state_t *)calloc(1, sizeof(our_state_t));
        if (!state) return(MTA_ABORT);
        *ctx2 = (void *)state;

        /*
         * Set any appropriate initial values for the state
         * structure
         */
       ...
    }
...

For a sample process_message routine, see Example 5–2.

process_done() Routine

To assist in cleaning up state information for a thread, callers can provide a routine pointed to by the process_done argument.

The following code fragment shows the required syntax for a process_done() routine.


void process_done(void *ctx2,
                  void *ctx1);

The following table lists the arguments required for a process_done routine, and gives a description of each.

Required Arguments  

Description  

ctx2

The value of the last pointer stored by process_message in the ctx2 call argument for this thread.

ctx1

The caller-supplied private context passed as ctx1 to mtaDequeueStart().

The following code fragment demonstrates the type of actions taken by a process_done routine.


void process_done(ctx2, ctx1)
{
    struct our_state_t *state = (our_state_t *)ctx2;
    if (!state)
         return;
    /*
     * Take steps to undo the state
     * (for example, close any sockets or files)
     */
    ...

    /*
     * Free the memory allocated by process_message()
     * to store the state
     */
    free(state)
}

Thread Creation Loop

While the processing threads are running, the thread that invoked mtaDequeueStart() executes a loop containing a brief pause (that is, a sleep request). Each time the mtaDequeueStart() thread awakens, it communicates with the Job Controller to see if it should create more processing threads. In addition, the Job Controller itself has logic to determine if more threads are needed in the currently running channel program, or if it should create additional processes to run the same channel program.

To demonstrate, the following code fragment shows pseudo code of the mtaDequeueStart() loop.


threads_running = 0
threads_max = MTA_THREAD_MAX_THREADS
attemtps    = MTA_JBC_MAX_ATTEMPTS

LOOP:
    while (threads_running < threads_max)
    {

      Go to DONE if a shut down has been requested

      pending_messages = Ask the Job Controller how many
                         messsages there are to be processed

      // If there are no pending messages
      // then consider what to do next
      if (pending_messages = 0)
      {
          // Continue to wait?
          if (attempts <= 0)
              go to DONE

          // Decrement attempts and wait
          attempts = attempts - 1;
          go to SLEEP
      }
      // Reset the attempts counter
      attempts = MTA_JBC_MAX_ATTEMPTS

      threads_needed = Ask the Job Controller how many
                       processing threads are needed

      // Cannot run more then threads_max threads per process
      if (threads_needed \> threads_max)
          threads_needed = threads_max

      // Create additional threads if needed
      if (threads_needed \> threads_running)
      {
         Create (threads_needed - threads_running) more threads
         threads_running = threads_needed
      }
    }

SLEEP:
    Sleep for MTA_JBC_RETRY_INTERVAL seconds
     -- a shut down request will cancel the sleep
    go to LOOP

DONE:
    Wait up to MTA_THREAD_WAIT_TIMEOUT seconds
    for all processing threads to exit

    Return to the caller of mtaDequeueStart()