The previous section explained the need for threads to be synchronized accurately, avoiding using delays which are difficult to tune and which depend on the load of the system. The ChorusOS operating system offers various tools for synchronizing threads:
Semaphores, which are common counting semaphores that support the P and V operations. See "Semaphores".
Mutexes, which provide a convenient and efficient way to implement mutual exclusion between multiple threads, in order to prevent a critical section from being executed in parallel by different threads. See "Mutexes".
Thread semaphores, which may be used to block a single thread awaiting the arrival of an event.
Event flags, which may be useful when a thread has to handle multiple events, providing the kind of multiplexing which is offered by the select system call, but at a much lower level.
A semaphore is an integer counter associated with a queue, possibly empty, of waiting threads. At initialization, the semaphore counter receives a user-defined positive or null value. Initialization is performed by invoking the following ChorusOS operating system service:
#include <chorus.h> int semInit(KnSem* semaphore, unsigned int count);
The semaphore parameter is the location of the semaphore and count is the semaphore counter. The semaphore must have been previously allocated by the user: allocation is not performed by the ChorusOS operating system. This implies that semaphores may be freely allocated by the user where convenient for his applications. As data structures representing semaphores are allocated by the applications, the ChorusOS operating system does not impose any limit on the maximum number of semaphores which may be used within the system.
Two atomic operations, named P and V, are provided on these semaphores.
#include <chorus.h> int semP(KnSem* semaphore, KnTimeVal* waitLimit);
semP() decrements the counter by one. If the counter reaches a negative value, the invoking thread is blocked and queued within the semaphore queue. Otherwise the thread continues its execution normally. The waitLimit parameter may be used to control how long the thread will stay queued. If waitLimit is set to K_NOTIMEOUT, the thread will stay blocked until the necessary V operation is performed. In the case of the thread being awakened due to the expiration of the period, a specific error code is returned as the result of the semP() invocation. In this case, the counter is incremented to compensate for the effect of the semP() operation.
#include <chorus.h> int semV(KnSem* semaphore);
semV() increments the counter by one. If the counter is still lower than or equal to zero, one of the waiting threads is picked up from the queue and awakened. If the counter is strictly greater than zero, there should be no thread waiting in the queue.
Figure 6-2 shows an example of two threads synchronizing by means of a semaphore.
The following example is based on the previous one, but the two threads explicitly synchronize by means of a semaphore, so that the actor will eventually be destroyed when the created thread has done its job and as soon as it has done so. Refer to the semInit(2K) man page.
(file: progov/semaphore.c) #include <stdio.h> #include <stdlib.h> #include <chorus.h> #define USER_STACK_SIZE (1024 * sizeof(long)) KnSem sampleSem; /* Semaphore allocated as global variable */ int childCreate(KnPc entry) { KnActorPrivilege actorP; KnDefaultStartInfo_f startInfo; char* userStack; int childLid = -1; int res; startInfo.dsType = K_DEFAULT_START_INFO; startInfo.dsSystemStackSize = K_DEFAULT_STACK_SIZE; res = actorPrivilege(K_MYACTOR, &actorP, NULL); if (res != K_OK) { printf("Cannot get the privilege of the actor, error %d\n", res); exit(1); } if (actorP == K_SUPACTOR) { startInfo.dsPrivilege = K_SUPTHREAD; } else { startInfo.dsPrivilege = K_USERTHREAD; } if (actorP != K_SUPACTOR) { userStack = malloc(USER_STACK_SIZE); if (userStack == NULL) { printf("Cannot allocate user stack\n"); exit(1); } startInfo.dsUserStackPointer = userStack + USER_STACK_SIZE; } startInfo.dsEntry = entry; res = threadCreate(K_MYACTOR, &childLid, K_ACTIVE, 0, &startInfo); if (res != K_OK) { printf("Cannot create the thread, error %d\n", res); exit(1); } return childLid; } void sampleThread() { int myThreadLi; int res; myThreadLi = threadSelf(); printf("I am the new thread. My thread identifier is: %d\n", myThreadLi); res = semV(&sampleSem); if (res != K_OK){ printf("Cannot perform the semV operation, error %d\n", res); exit(1); } /* Suicide */ res = threadDelete(K_MYACTOR, K_MYSELF); if (res != K_OK){ printf("Cannot suicide, error %d\n", res); exit(1); } /* Should never reach this point! */ } int main(int argc, char** argv, char**envp) { int myThreadLi; int newThreadLi; int res; /* * Initialize the semaphore to 0 so that * the first semP() operation blocks. */ res = semInit(&sampleSem, 0); if (res != K_OK) { printf("Cannot initialize the semaphore, error %d\n", res); exit(1); } newThreadLi = childCreate((KnPc)sampleThread); myThreadLi = threadSelf(); printf("Parent thread identifier = %d, Child thread identifier = %d\n", myThreadLi, newThreadLi); /* * Since semaphore has been initialized to 0 * this semP will block until a semV is performed * by the created thread, letting the main thread know * that created thread's job is done. */ res = semP(&sampleSem, K_NOTIMEOUT); if (res != K_OK) { printf("Cannot perform the semP operation, error %d\n", res); exit(1); } /* * Created thread has run and done all of its job. * It is time to safely exit. */ return 0; }
The semaphore sampleSem is allocated as global data of the actor. As the address space of the actor is shared by all threads running within the actor, both threads can freely access the semaphore in order to synchronize.
Avoid performing the semaphore initialization after having created the child thread. Depending on the scheduling, the second thread may start its execution as soon as it is created, and could reach the semV() operation before the semaphore has been initialized. Although the semV() could appear to work, semP() will never return due the fact that semInit() would reset the counter to 0.
The synchronization will work whatever the order in which the semP() and semV() operations are done. If semP() is done first, the counter will be set to -1 and the main thread will be blocked. The semV() will awake the main thread. If scheduling is reversed, the semV() will set the counter to 1, so that when the semP() operation occurs, the counter will be decremented to 0, but the thread will not block.
Assume that the two threads need to access one or more global variables in a consistent fashion. A simple example could be that each of the threads needs to add two numbers to a unique global counter. Whatever the scheduling may be, the unique global counter should always reflect the accurate sum of all numbers added by both threads.
This could be done using semaphores. However, the ChorusOS operating system provides mutexes which have been specifically designed and tuned for these types of needs.
A mutex is a binary flag associated with a queue, possibly empty, of waiting threads. The mutex can be locked or free. At initialization, the mutex is set to the free state.
#include <chorus.h> int mutexInit(KnMutex* mutex);
As for semaphores, the mutex must have been previously allocated by the user. This implies that mutexes may be allocated where convenient for the application, and that there is no limit imposed by the system on the maximum number of mutexes.
Three operations are provided on these mutexes.
mutexGet() acquires the mutex: if the mutex is free, it is atomically locked and the thread continues its execution.
#include <chorus.h> int mutexGet(KnMutex* mutex);
If the mutex is locked when the mutexGet() operation is invoked, the thread is blocked and queued in the list of threads, waiting for the mutex to become free. Note that there is no way to limit the time during which a thread waits to acquire a mutex.
mutexRel() releases the mutex, returning it to its free state. If threads are blocked while waiting for the mutex, one of them is picked up from the list and activated with the mutex locked.
#include <chorus.h> int mutexRel(KnMutex* mutex);
The last operation is similar to mutexGet(), but does not block if the mutex is already locked.
#include <chorus.h> int mutexTry(KnMutex* mutex);
By checking the return value of mutexTry(), you can determine whether the mutex was free and has been acquired by the current thread, or whether the mutex was already locked, in which case the operation has failed.
The following example shows a small and simple library routine named sampleAdd() which receives two integer arguments and adds them to a global variable one after the other. The code of the previous semaphore example has been modified so that both the main thread and the created thread perform a number of calls to that library. When the job is done, the main thread prints the result and terminates the actor. Refer to the mutexInit(2K) man page.
(file: progov/mutex.c) #include <stdio.h> #include <stdlib.h> #include <chorus.h> #define USER_STACK_SIZE (1024 * sizeof(long)) KnSem sampleSem; KnMutex sampleMutex; long grandTotal; int childCreate(KnPc entry) { KnActorPrivilege actorP; KnDefaultStartInfo_f startInfo; char* userStack; int childLid = -1; int res; startInfo.dsType = K_DEFAULT_START_INFO; startInfo.dsSystemStackSize = K_DEFAULT_STACK_SIZE; res = actorPrivilege(K_MYACTOR, &actorP, NULL); if (res != K_OK) { printf("Cannot get the privilege of the actor, error %d\n", res); exit(1); } if (actorP == K_SUPACTOR) { startInfo.dsPrivilege = K_SUPTHREAD; } else { startInfo.dsPrivilege = K_USERTHREAD; } if (actorP != K_SUPACTOR) { userStack = malloc(USER_STACK_SIZE); if (userStack == NULL) { printf("Cannot allocate user stack\n"); exit(1); } startInfo.dsUserStackPointer = userStack + USER_STACK_SIZE; } startInfo.dsEntry = entry; res = threadCreate(K_MYACTOR, &childLid, K_ACTIVE, 0, &startInfo); if (res != K_OK) { printf("Cannot create the thread, error %d\n", res); exit(1); } return childLid; } void sampleAdd(int a, int b) { int res; res = mutexGet(&sampleMutex); grandTotal += a; grandTotal += b; res = mutexRel(&sampleMutex); } void sampleThread() { int res; int i; for(i = 0; i < 10; i++) { sampleAdd(threadSelf(), i); /* Why not ??? */ } res = semV(&sampleSem); if (res != K_OK){ printf("Cannot perform the semV operation, error %d\n", res); exit(1); } /* Suicide */ threadDelete(K_MYACTOR, K_MYSELF); } int main(int argc, char** argv, char**envp) { int i; int newThreadLi; int res; res = semInit(&sampleSem, 0); if (res != K_OK) { printf("Cannot initialize the semaphore, error %d\n", res); exit(1); } res = mutexInit(&sampleMutex); newThreadLi = childCreate((KnPc)sampleThread); for(i = 0; i < 20; i++){ sampleAdd(threadSelf(), i); /* Why not ??? */ } res = semP(&sampleSem, K_NOTIMEOUT); if (res != K_OK) { printf("Cannot perform the semP operation, error %d\n", res); exit(1); } printf("grandTotal is %d\n", grandTotal); return 0; }
The mutex is allocated within the global data of the actor and is initialized before it is ever used.
The sampleAdd() routine uses the mutex to protect access to the grandTotal variable and make it atomic. Note that the mutexGet() and mutexRel() operations perform the bulk of the work. Mutex operations should always be used in pairs, as in this example.
A mutex is not recursive, a thread which has locked a mutex will deadlock if it tries to perform a second mutexGet() operation on the same mutex.