ChorusOS 4.0 Introduction

Message Queues

This feature is designed to allow an application composed of one or multiple actors to create a shared communication environment, often referred to as message space, within which these actors can exchange messages efficiently. In particular, supervisor and user actors of the same application can use this feature to exchange messages. Furthermore, messages may be initially allocated and sent by interrupt handlers in order to be processed later by threads.

The feature is designed around the concept of message space which encapsulates within a single entity:

A message space is a temporary resource which must be explicitly created by one actor within the application. Once created, a message space may be opened by other actors within the application. Actors which have opened the same message space are said to share this message space. A message space is automatically deleted when its creating actor and all actors which opened it have exited.

A message pool is defined by two parameters (message size and number of messages) provided by the application when it creates the message space. The configuration of the set of message pools defined within a message space depends upon the needs of the application.

A message is an array of bytes which can be structured and used at application level through any appropriate convention. Messages are presented to actors as pointers within their address space.

Messages are posted to message queues belonging to the same message space. All actors sharing a message space can allocate messages from the message pools. In the same way, all actors sharing a message space have send and receive rights on each queue of the message space.

Even though most applications only need to create a single message space, the feature is designed to allow an application to create or open multiple message spaces. However, messages allocated from one message space cannot be sent to a queue of a different message space. A typical use of message spaces is as follows:

  1. The first actor, aware of the overall requirements of the application, creates the message space.

  2. Other actors of the application open the shared message space.

  3. An actor allocates a message from a message pool, and fills it with the data to be sent.

  4. The actor which allocated the message can then post it to the appropriate queue, and can assign a priority to the message.

  5. The destination actor can get the message from the queue. At this point, the message is removed from the queue.

  6. Once the destination actor has processed the message, it may free the message so that the application may allocate it again. Alternatively, the destination actor could, for example, modify the message and post it again to another queue.

In order to make the service as efficient as possible, physical memory is allocated for all messages and data structures of the message space at message space creation. At message space open time, this memory is transparently mapped by the system into the actor address space. Further operations such as posting and receiving a message are done without any copy involved.

Creating a message space is performed as follows:

#include <mipc/chMipc.h>

int msgSpaceCreate (KnMsgSpaceId     spaceGid,
                    unsigned int     msgQueueNb,
                    unsigned int     msgPoolNb,
                    const KnMsgPool* msgPools);

The spaceGid parameter is a unique global identifier assigned by the application to the message space being created. This identifier is also used by other actors of the application to open the message space. Thus, this identifier serves to bind actors participating in the application to the same message space. The K_PRIVATEID predefined global identifier indicates that the message space created will be private to the invoking actor: its queues and message pools will only be accessible to threads executing within this actor. No other actor will be able to open that message space. The message space is described by the last three parameters:

Figure 8-1 shows an example of a message space recently created by an actor.

Figure 8-1 Creating a Message Space

Graphic

The created message space is assigned a local identifier which is returned to the invoking actor as the return value of the msgSpaceCreate() call. The scope of this local identifier is the invoking actor.

A message space may be opened by other actors through the following call:

#include <mipc/chmipc.h>

int msgSpaceOpen(KnMsgSpaceId spaceGid);

The message space assigned with the spaceGid unique global identifier must have been previously created by a call to msgSpaceCreate(). A local identifier is returned to the invoking actor. This message space local identifier can then be used to manipulate messages and queues within the message space. Figure 8-2 shows an example of a message space recently opened by a second actor.

Figure 8-2 Opening a Message Space

Graphic

A message may be allocated by the following call:

#include <mipc/chmipc.h> 

int msgAllocate(int             spaceLid,
                unsigned int    poolIndex,
                unsigned int    msgSize,
                KnTimeVal*      waitLimit,
                char**          msgAddr);

msgAllocate() attempts to allocate a message from the appropriate pool of the message space identified by the spaceLid return value of a previous call to msgSpaceOpen() or msgSpaceCreate(). If poolIndex is not set to K_ANY_MSGPOOL, the allocated message will be the first free (not yet allocated) message of the pool defined by poolIndex, regardless of the value specified by the msgSize parameter. Otherwise, if poolIndex is set to K_ANY_MSGPOOL, the message will be allocated from the first pool for which the message size fits the requested msgSize. In this context, first pool means the one with the lowest index in the set of pools defined at message space creation time. If the pool is empty, no attempt will be made to allocate a message from another pool.

If the message pool is empty (all messages have been allocated and none has been freed yet), msgAllocate() will block, waiting for a message in the pool to be freed. The invoking thread is blocked until the wait condition defined by waitLimit expires.

If successful, the address of the allocated message is stored at the location defined by msgAddr. The returned address is the address of the message within the address space of the actor. Remember that a message space is mapped within the address space of the actors sharing it. However, message spaces and, as a consequence, messages themselves, may be mapped at different addresses in different actors. This is specially true for message spaces shared between supervisor and user actors.

Figure 8-3 illustrates two actors allocating two messages from two different pools of the same message space.

Figure 8-3 Allocating Messages from Pools

Graphic

Once it has been allocated and initialized by the application, a message may be posted to a message queue with:

#include <mipc/chmipc.h> 

int msgPut(int             spaceLid,
           unsigned int    queueIndex,
           char*           msg,
           unsigned int    prio);

msgPut() posts the message, the address of which is msg, to the message queue queueIndex within the message space, the local identifier of which is spaceLid. The message must have been previously allocated by a call to msgAllocate(). The message will be inserted into the queue according to its priority, prio. Messages with a high priority will be taken first from the queue.

Posting a message to a queue is done without any message copy, and may be done within an interrupt handler, or with preemption disabled.

Figure 8-4 illustrates the previous actors posting their messages to different queues.

Figure 8-4 Posting Messages to Queues

Graphic

Getting a message from a queue, if any, is achieved using:

#include <mipc/chmipc.h>

int msgGet(int             spaceLid,
           unsigned int    queueIndex,
           KnTimeVal*      waitLimit,
           char**          msgAddr,
           KnUniqueId*     srcActor);

msgGet() enables the invoking thread to get the first message with the highest priority pending behind the message queue queueIndex, within the message space whose local identifier is spaceLid. Messages with equal priority are posted and delivered in a first-in first-out order.

The address of the message delivered to the invoking thread is returned at the location defined by the msgAddr parameter. If no message is pending, the invoking thread is blocked until a message is sent to the message queue, or until the time-out, if any, defined by the waitLimit parameter expires.

The srcActor, if non-null, points to a location where the unique identifier of the actor (referred to as the source actor) which posted the message is to be stored.

No data copy is performed to deliver the message to the invoking thread. Multiple threads can be blocked, waiting in the same message queue. At present it is not possible for one thread to wait for message arrival on multiple message queues. This type of multiplexing mechanism could be implemented at the application level using the ChorusOS event flags facility.

Figure 8-5 illustrates previous actors receiving messages from queues.

Figure 8-5 Getting Messages from Queues

Graphic

A message which is of no further use to the application may be returned to its pool of messages available for further allocation with the following call:

#include <mipc/chMipc.h> 

int msgFree(int      spaceLid,
            char*    msg);

Example 8-1 illustrates a very simple use of the message queue facility.

Refer to the msgSpaceCreate(2K), msgSpaceOpen(2K), msgAllocate(2K), msgPut(2K), msgGet(2K), and msgFree(2K) man pages.


Example 8-1 Communicating Using Message Spaces

(file: progov/msgSpace.c)

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <chorus.h>
#include <mipc/chMipc.h>
#include <am/afexec.h>

AcParam param;

#define NB_MSG_POOLS 2
#define NB_MSG_QUEUES 3
#define SMALL_MSG_SZ  32
#define LARGE_MSG_SZ  256
#define NB_SMALL_MSG  13
#define NB_LARGE_MSG  4
#define SAMPLE_SPACE  1111
#define LARGE_POOL 0
#define SMALL_POOL 1
#define Q1 0
#define Q2 1
#define Q3 2

KnMsgPool samplePools[NB_MSG_POOLS];
char* tagPtr = "Spawned";

int main(int argc, char** argv, char**envp)
{
  int          res;
  int          msgSpaceLi;
  char*        smallMsg;
  char*        smallReply;
  char*        largeMsg;
  KnCap        spawnedCap;
  KnActorPrivilege actorP;

  res = actorPrivilege(K_MYACTOR, &actorP, NULL);
  if (res != K_OK) {						      
      printf("Cannot get actor privilege, error %d\n", res);	      
      exit(1);							      
  }								      

  if (argc == 1) {
  /*
   * This is the first actor (or spawning actor):
   *   Create a message space,
   *   Spawn another actor,
   *   Allocate, modify and post a small message on Q2
   *   Get a large Message from Q3, print its contents, free it
   *   Get reply of small message on Q1, print its contents, free it.
   */

    samplePools[LARGE_POOL].msgSize = LARGE_MSG_SZ;
    samplePools[LARGE_POOL].msgNumber = NB_LARGE_MSG;

    samplePools[SMALL_POOL].msgSize = SMALL_MSG_SZ;
    samplePools[SMALL_POOL].msgNumber = NB_SMALL_MSG;

    msgSpaceLi = msgSpaceCreate(SAMPLE_SPACE, NB_MSG_QUEUES, 
				NB_MSG_POOLS, samplePools);
    if (msgSpaceLi < 0) {					      
      printf("Cannot create the message space error %d\n",	      
        	msgSpaceLi);					      
      exit(1);						      
    }								      

	/*
	 * Message Space has been created, spawn the other actor,
	 * argv[1] set to "Spawned" to differentiate the 2 actors.
	 */
    param.acFlags = (actorP == K_SUPACTOR)? AFX_SUPERVISOR_SPACE : 
                                            AFX_USER_SPACE;
    res = afexeclp(argv[0], &spawnedCap, &param , argv[0], tagPtr,
		   NULL);
    if (res == -1) {						      
      printf("Cannot spawn second actor, error %d\n", errno);	      
      exit(1);							      
    }								      

  	/*
	    * Allocate a small message 
	    */
    res = msgAllocate(msgSpaceLi, SMALL_POOL, SMALL_MSG_SZ, 
		      K_NOTIMEOUT, &smallMsg);
    if (res != K_OK) {						      
      printf("Cannot allocate a small message, error %d\n", res);     
      exit(1);							      
    }								

      /*
       * Initialize the allocated message
       */
    strncpy(smallMsg, "Sending a small message\n", SMALL_MSG_SZ);

      /*
       * Post the allocated small message to Q2 with priority 2
       */
    res = msgPut(msgSpaceLi, Q2, smallMsg, 2);
    if (res != K_OK) {						      
      printf("Cannot post the small message to Q2, error %d\n", res); 
      exit(1);							      
    }								      
  
      /*
       * Get a large message from Q3 and print its contents
       */
    res = msgGet(msgSpaceLi, Q3, K_NOTIMEOUT, &largeMsg, NULL);
    if (res != K_OK) {						      
      printf("Cannot get the large message from  Q3, error %d\n",     
	     res);						      
      exit(1);							      
    }								      

    printf("Received large message contains:\n%s\n", largeMsg);

      /*
       * Free the received large message
       */
    res = msgFree(msgSpaceLi, largeMsg);
    if (res != K_OK) {						      
      printf("Cannot free the large message, error %d\n", res);	      
      exit(1);							      
    }								      

  /*
   * Get the reply to small message from Q1 and print its contents
   */
    res = msgGet(msgSpaceLi, Q1, K_NOTIMEOUT, &smallReply, NULL);
    if (res != K_OK) {						      
      printf("Cannot get the small message reply from  Q1, "	      
             "error %d\n", res);				      
      exit(1);							      
    }								      

    printf("Received small reply contains:\n%s\n", smallReply);

      /*
       * Free the received small reply
       */
    res = msgFree(msgSpaceLi, smallReply);
    if (res != K_OK) {						      
      printf("Cannot free the small reply message, error %d\n", res); 
      exit(1);							      
    }								      


  } else {
    
      /*
       * This is the spawned actor:
       *   Check we have effectively been spawned
       *   Open the message space
       *   Allocate, initialize and post a large message to Q3
       *   Get a small message from Q2, print its contents
       *   Modify it and repost it to Q1
       */

    int     l;

    if ((argc != 2) || (strcmp(argv[1], tagPtr) != 0)) {	      
      printf("%s does not take any argument!\n", argv[0]);	      
      exit(1);						      
    }								      
	    /*
	     * Open the message space, using the same global identifier
	     */
    msgSpaceLi = msgSpaceOpen(SAMPLE_SPACE);
    if (msgSpaceLi < 0) {					      
      printf("Cannot open the message space error %d\n",	      
      msgSpaceLi);					      
      exit(1);						      
    }								      

	     /*
	      * Allocate the large message
	      */
    res = msgAllocate(msgSpaceLi, K_ANY_MSGPOOL, LARGE_MSG_SZ, 
		      K_NOTIMEOUT, &largeMsg);
    if (res != K_OK) {						      
      printf("Cannot allocate a large message, error %d\n", res);     
      exit(1);							      
    }								      

    strcpy(largeMsg, "Sending a very large large large large large message\n");

      /*
       * Post the large message to Q3 with priority 0
       */
    res = msgPut(msgSpaceLi, Q3, largeMsg, 0);
    if (res != K_OK) {						      
      printf("Cannot post the large message to Q3, error %d\n", res); 
      exit(1);							      
    }								      

      /*
       * Get the small message from Q2
       */
    res = msgGet(msgSpaceLi, Q2, K_NOTIMEOUT, &smallMsg, NULL);
    if (res != K_OK) {						      
      printf("Cannot get the small message from Q2, error %d\n", res);
      exit(1);							      
    }								      
	
    printf("Spawned actor received small message containing:\n%s\n", smallMsg);
    
    for (l = 0; l < strlen(smallMsg); l++) {
      if ((smallMsg[l]>= 'a') && (smallMsg[l] <= 'z')) {
        smallMsg[l] = smallMsg[l] - 'a' + 'A';
      }
    }
    
      /*
       * Post the small message back to Q1, with priority 4
       */
    res = msgPut(msgSpaceLi, Q1, smallMsg, 4);
    if (res != K_OK) {						      
      printf("Cannot post the small message reply to Q1, error %d\n", 
	     res);						      
      exit(1);							      
    }	     
  }
  return 0;
}