ChorusOS 5.0 Application Developer's Guide

Chapter 9 Scheduling and Synchronization

This chapter demonstrates the use of the ChorusOS application programming interfaces for synchronizing threads and also shows how mutexes and semaphores are used in the ChorusOS operating system. For a detailed description of the available models for thread scheduling and ChorusOS scheduling policies, refer to "Scheduling" in ChorusOS 5.0 Features and Architecture Overview.

After implementing the examples provided in this chapter, you should understand how these APIs can be used in an application.

Setting Scheduling Attributes

To set the scheduling attributes of threads at thread creation time, use the void* schedParams parameter of threadCreate(2K). To obtain and modify the scheduling attributes of a thread dynamically, use the following call:

#include <chorus.h>
#include <sched/chFifo.h>
#include <sched/chRr.h>
#include <sched/chRt.h>

int threadScheduler(KnCap*       actorCap,
                    KnThreadLid  thLi,
                    void*        oldParam,
                    void*        newParam);

The previous service enables you to get or set the scheduling parameters for any thread of any actor, provided both the actor capability and the thread identifier are known. The threadScheduler(2K) call returns the current scheduling attributes of the target thread at the location defined by oldParam (if non-NULL). It also sets the attributes of the target thread according to the description provided at the location defined by newParam (if non-NULL).

Because the size, layout, and semantics of scheduling parameters can vary (depending on the scheduler configured in the system, or on the class of the ROUND_ROBIN framework) the parameters are left empty in the generic interface definition. However, all scheduling parameter descriptions are similar, at least for the initial fields:

struct KnFifoThParms {     
		KnSchedClass     fifoClass;      /* Always set to K_SCHED_FIFO */
		KnFifoPriority   fifoPriority; 
} KnFifoThParms;
struct KnRrThParms {
		KnSchedClass     rrClass;        /* Always set to K_SCHED_RR */
		KnRrPriority     rrPriority; 
} KnRrThParms;

In the previous example, the first field defines the scheduling policy applied (or to be applied) to the thread. The second field defines the priority of the thread within the scheduling policy.

Code Example 9-1 demonstrates how the childCreate() routine receives the scheduling attributes of the thread to be created. The main thread invokes the modified routine so that the created thread will start as soon as it is created, rather than waiting for the main thread to give up the processor. Therefore, the created thread must be attributed a higher priority than the main thread. For more information, refer to the threadScheduler(2K) and threadCreate(2K) man pages.


Example 9-1 Changing Scheduling Attributes

This example applies only in the case of the FIFO and RR scheduling classes.

(file: progov/thSched.c)
#include <stdio.h>
#include <stdlib.h>
#include <chorus.h>
#define USER_STACK_SIZE (1024 * sizeof(long))
KnSem   sampleSem;
    int
childSchedCreate(KnPc entry, void* schedParams)
{
  KnActorPrivilege      actorP;
  KnDefaultStartInfo_f  startInfo;
  char*                 userStack;
  int                   childLid = -1;
  int                   res;
      /* Set defaults startInfo fields */
  startInfo.dsType            = K_DEFAULT_START_INFO;
  startInfo.dsSystemStackSize = K_DEFAULT_STACK_SIZE;
      /* Get actor's privilege */
  res = actorPrivilege(K_MYACTOR, &actorP, NULL);
  if (res != K_OK) {            
    printf("Cannot get the privilege of the actor, error %d\n", res); 
    exit(1);             
  }              
      /* Set thread privilege */
  if (actorP == K_SUPACTOR) {
    startInfo.dsPrivilege = K_SUPTHREAD;
  } else {
    startInfo.dsPrivilege = K_USERTHREAD;
  }
      /* Allocate a stack for user threads */
  if (actorP != K_SUPACTOR) {
    userStack = malloc(USER_STACK_SIZE);
    if (userStack == NULL) {           
      printf("Cannot allocate user stack\n");         
      exit(1);             
    }              
    startInfo.dsUserStackPointer = userStack + USER_STACK_SIZE;
  } 
      /* Set entry point for the new thread */
  startInfo.dsEntry = entry;
      /* Create the thread in the active state */
  res = threadCreate(K_MYACTOR, &childLid, K_ACTIVE, 
        schedParams, &startInfo);
  if (res != K_OK) {            
    printf("Cannot create the thread, error %d\n", res);
    exit(1);
  }              
  return childLid;
}
    void
sampleThread()
{
  int myThreadLi;
  int res;
  myThreadLi = threadSelf();
  printf("I am the new thread. My thread identifier is: %d\n", myThreadLi);
  res  = semV(&sampleSem);
  if (res != K_OK){            
    printf("Cannot perform the semV operation, error %d\n", res);
    exit(1);             
  }              
  (void) threadDelete(K_MYACTOR, K_MYSELF);
}
int main(int argc, char** argv, char**envp)
{
  int                  myThreadLi;
  int                  newThreadLi;
  int                  res;
  KnThreadDefaultSched schedParams;
  res = semInit(&sampleSem, 0);
  if (res != K_OK) {            
    printf("Cannot initialize the semaphore, error %d\n", res);       
    exit(1);             
  }              
      /* acquire my own scheduling attributes  */
  res = threadScheduler(K_MYACTOR, K_MYSELF, &schedParams, NULL);
  if (res != K_OK) {
			printf("threadScheduler failed, res=%d\n", res);
			exit(1);
  }
      /* Increase priority of thread to be created */
  schedParams.tdPriority -= 1;
  newThreadLi = childSchedCreate((KnPc)sampleThread, &schedParams);
  myThreadLi = threadSelf();
  printf("Parent thread identifier = %d, Child thread identifier = %d\n",
      myThreadLi, newThreadLi);
  res = semP(&sampleSem, K_NOTIMEOUT);
  if (res != K_OK) {            
    printf("Cannot perform the semP operation, error %d\n", res);     
    exit(1);             
  }              
  return 0;
}

Semaphores

A semaphore is an integer counter associated with a queue of waiting threads (the queue can be empty). At initialization, the semaphore counter receives a user-defined positive or NULL value. Initialization is performed by invoking the following ChorusOS operating system call:

#include <chorus.h>
int semInit(KnSem*        semaphore,
            unsigned int  count);

The semaphore parameter is the location of the semaphore, and count is the semaphore counter. As is the case with mutexes, the ChorusOS operating system does not allocate semaphores itself. You must have allocated the semaphore previously. This enables you to allocate semaphores freely, wherever convenient for your application. Because data structures representing semaphores are allocated by the applications themselves, the ChorusOS operating system does not impose any limit on the maximum number of semaphores that may be used within the system.

Two atomic operations, P and V, are provided with these semaphores.

#include <chorus.h>
int semP(KnSem*     semaphore,
         KnTimeVal* waitLimit);

semP(2K) decrements the counter by one. When the counter reaches a negative value, the invoking thread is blocked and queued within the semaphore queue. Otherwise, the thread continues its execution normally. The waitLimit parameter may be used to control how long the thread remains queued. If waitLimit is set to K_NOTIMEOUT, the thread will remain blocked until the V operation is performed. If the thread is awakened due to the expiration of the time period, a specific error code will be returned as the result of the semP(2K) invocation. In this case, the counter is incremented to compensate for the effect of the semP(2K) operation.

	
#include <chorus.h>
int semV(KnSem* semaphore);

semV(2K) increments the counter by one. If the counter is still lower than or equal to zero, one of the waiting threads is picked up from the queue and awakened. If the counter is greater than zero, there should be no threads waiting in the queue.

Figure 9-1 shows an example of two threads synchronizing by means of a semaphore.

Figure 9-1 Synchronizing With Semaphores

Graphic

Using Semaphores

In the following example, two threads explicitly synchronize by means of a semaphore, so that the actor is eventually destroyed when the created thread has performed its function. Refer to the semInit(2K) man page for more information.


Example 9-2 Synchronizing Using Semaphores

(file: progov/semaphore.c)
#include <stdio.h>
#include <stdlib.h>
#include <chorus.h>
#define USER_STACK_SIZE (1024 * sizeof(long))         
KnSem   sampleSem; /* Semaphore allocated as global variable */
int childCreate(KnPc entry)
{
  KnActorPrivilege      actorP;
  KnDefaultStartInfo_f  startInfo;
  char*                 userStack;
  int                   childLid = -1;
  int                   res;
  startInfo.dsType            = K_DEFAULT_START_INFO;
  startInfo.dsSystemStackSize = K_DEFAULT_STACK_SIZE;
  res = actorPrivilege(K_MYACTOR, &actorP, NULL);
  if (res != K_OK) {
    printf("Cannot get the privilege of the actor, error %d\n", res);
    exit(1);
  }
  if (actorP == K_SUPACTOR) {
    startInfo.dsPrivilege = K_SUPTHREAD;
  } else {
    startInfo.dsPrivilege = K_USERTHREAD;
  }
  if (actorP != K_SUPACTOR) {
    userStack = malloc(USER_STACK_SIZE);
    if (userStack == NULL) {
      printf("Cannot allocate user stack\n");
      exit(1);
    }
    startInfo.dsUserStackPointer = userStack + USER_STACK_SIZE;
  } 
  startInfo.dsEntry = entry;
  res = threadCreate(K_MYACTOR, &childLid, K_ACTIVE, 0, &startInfo);
  if (res != K_OK) {
    printf("Cannot create the thread, error %d\n", res);
    exit(1);
  }
  return childLid;
}
    void
sampleThread()
{
  int myThreadLi;
  int res;
  myThreadLi = threadSelf();
  printf("I am the new thread. My thread identifier is: %d\n", myThreadLi);
  res  = semV(&sampleSem);
  if (res != K_OK){            
    printf("Cannot perform the semV operation, error %d\n", res);     
    exit(1);             
  }              
      /* Suicide */
  res = threadDelete(K_MYACTOR, K_MYSELF);
  if (res != K_OK){            
    printf("Cannot suicide, error %d\n", res);         
    exit(1);             
  }              
      /* Should never reach this point! */
}
int main(int argc, char** argv, char**envp)
{
  int        myThreadLi;
  int        newThreadLi;
  int        res;
     /*
      * Initialize the semaphore to 0 so that
      * the first semP() operation blocks.
      */
  res = semInit(&sampleSem, 0);
  if (res != K_OK) {            
    printf("Cannot initialize the semaphore, error %d\n", res);       
    exit(1);             
  }              
  newThreadLi = childCreate((KnPc)sampleThread);
  myThreadLi = threadSelf();
  printf("Parent thread identifier = %d, Child thread identifier = %d\n",
      myThreadLi, newThreadLi);
      /* 
       * Since semaphore has been initialized to 0
       * this semP will block until a semV is performed 
       * by the created thread, letting the main thread know
       * that created thread's job is done.
       */
  res = semP(&sampleSem, K_NOTIMEOUT);
  if (res != K_OK) {            
    printf("Cannot perform the semP operation, error %d\n", res);     
    exit(1);             
  }              
     /*
      * Created thread has run and done all of its job.
      * It is time to safely exit.
      */
 return 0;
}

Mutual Exclusion Locks

Assume that two threads need to access one or more global variables in a consistent fashion (for example, if each thread needs to add two numbers to a unique global counter). The unique global counter should always reflect the accurate sum of all numbers added by both threads, regardless of the scheduling.

Reflecting this sum could be done using semaphores. However, the ChorusOS operating system provides mutexes which have been specifically designed and tuned for these specific requirements.

A mutex is a binary flag associated with a (possibly empty) queue of waiting threads. The mutex may be locked or free. At initialization, the mutex is set to the free state.

#include <chorus.h>
int mutexInit(KnMutex* mutex);

For this example to work, the mutexes should have been previously allocated. Mutexes may be allocated wherever convenient for the application. There is no limit imposed on the maximum number of mutexes.

Three operations are provided with mutexes.

For more information, refer to the mutexInit(2K) man page.

The following example shows a small, basic library routine called sampleAdd() which receives two integer arguments and adds them to a global variable one after the other. Both the main thread and the created thread perform a number of calls to the library. When the job is completed, the main thread prints the result and terminates the actor.


Example 9-3 Using Mutual Exclusion Locks

(file: progov/mutex.c)
#include <stdio.h>
#include <stdlib.h>
#include <chorus.h>
#define USER_STACK_SIZE (1024 * sizeof(long))
KnSem   sampleSem;
KnMutex sampleMutex;
long    grandTotal;
    int
childCreate(KnPc entry)
{
  KnActorPrivilege      actorP;
  KnDefaultStartInfo_f  startInfo;
  char*                 userStack;
  int                   childLid = -1;
  int                   res;
  startInfo.dsType            = K_DEFAULT_START_INFO;
  startInfo.dsSystemStackSize = K_DEFAULT_STACK_SIZE;
  res = actorPrivilege(K_MYACTOR, &actorP, NULL);
  if (res != K_OK) {
    printf("Cannot get the privilege of the actor, error %d\n", res);
    exit(1);
  }
  if (actorP == K_SUPACTOR) {
    startInfo.dsPrivilege = K_SUPTHREAD;
  } else {
    startInfo.dsPrivilege = K_USERTHREAD;
  }
  if (actorP != K_SUPACTOR) {
    userStack = malloc(USER_STACK_SIZE);
    if (userStack == NULL) {
      printf("Cannot allocate user stack\n");
      exit(1);
    }
    startInfo.dsUserStackPointer = userStack + USER_STACK_SIZE;
  } 
  startInfo.dsEntry = entry;
  res = threadCreate(K_MYACTOR, &childLid, K_ACTIVE, 0, &startInfo);
  if (res != K_OK) {
    printf("Cannot create the thread, error %d\n", res);
    exit(1);
  }
  return childLid;
}
    int
sampleAdd(int a, int b)
{
  (void) mutexGet(&sampleMutex)
  grandTotal += a + b;
  (void) mutexRel(&sampleMutex);
}
    void
sampleThread()
{
  int res;
  int i;
  for(i = 0; i < 10; i++) {
    sampleAdd(threadSelf(), i);        /* Why not ??? */
  }
  res  = semV(&sampleSem);
  if (res != K_OK){            
    printf("Cannot perform the semV operation, error %d\n", res);
    exit(1);
  }              
      /* Suicide */
  (void) threadDelete(K_MYACTOR, K_MYSELF);
}
int main(int argc, char** argv, char**envp)
{
  int        i;
  int        res;
  res = semInit(&sampleSem, 0);
  if (res != K_OK) {            
    printf("Cannot initialize the semaphore, error %d\n", res);
    exit(1);             
  }              
  (void) mutexInit(&sampleMutex);
  (void) childCreate((KnPc)sampleThread);
  for(i = 0; i < 20; i++){
    sampleAdd(threadSelf(), i);      /* Why not ??? */
  }
  res = semP(&sampleSem, K_NOTIMEOUT);
  if (res != K_OK) {            
    printf("Cannot perform the semP operation, error %d\n", res);     
    exit(1);             
  }              
  printf("grandTotal is %d\n", grandTotal);
  return 0;
}

Note the following points concerning the previous example: