This section shows the code for the thread pool example:
This file declares the functions used in the example.
/* * Declarations for the clients of a thread pool. */ #include <pthread.h> /* * The thr_pool_t type is opaque to the client. * It is created by thr_pool_create() and must be passed * unmodified to the remainder of the interfaces. */ typedef struct thr_pool thr_pool_t; /* * Create a thread pool. * min_threads: the minimum number of threads kept in the pool, * always available to perform work requests. * max_threads: the maximum number of threads that can be * in the pool, performing work requests. * linger: the number of seconds excess idle worker threads * (greater than min_threads) linger before exiting. * attr: attributes of all worker threads (can be NULL); * can be destroyed after calling thr_pool_create(). * On error, thr_pool_create() returns NULL with errno set to the error code. */ extern thr_pool_t *thr_pool_create(uint_t min_threads, uint_t max_threads, uint_t linger, pthread_attr_t *attr); /* * Enqueue a work request to the thread pool job queue. * If there are idle worker threads, awaken one to perform the job. * Else if the maximum number of workers has not been reached, * create a new worker thread to perform the job. * Else just return after adding the job to the queue; * an existing worker thread will perform the job when * it finishes the job it is currently performing. * * The job is performed as if a new detached thread were created for it: * pthread_create(NULL, attr, void *(*func)(void *), void *arg); * * On error, thr_pool_queue() returns -1 with errno set to the error code. */ extern int thr_pool_queue(thr_pool_t *pool, void *(*func)(void *), void *arg); /* * Wait for all queued jobs to complete. */ extern void thr_pool_wait(thr_pool_t *pool); /* * Cancel all queued jobs and destroy the pool. */ extern void thr_pool_destroy(thr_pool_t *pool);
This file implements the thread pool.
/* * Thread pool implementation. * See <thr_pool.h> for interface declarations. */ #if !defined(_REENTRANT) #define _REENTRANT #endif #include "thr_pool.h" #include <stdlib.h> #include <signal.h> #include <errno.h> /* * FIFO queued job */ typedef struct job job_t; struct job { job_t *job_next; /* linked list of jobs */ void *(*job_func)(void *); /* function to call */ void *job_arg; /* its argument */ }; /* * List of active worker threads, linked through their stacks. */ typedef struct active active_t; struct active { active_t *active_next; /* linked list of threads */ pthread_t active_tid; /* active thread id */ }; /* * The thread pool, opaque to the clients. */ struct thr_pool { thr_pool_t *pool_forw; /* circular linked list */ thr_pool_t *pool_back; /* of all thread pools */ pthread_mutex_t pool_mutex; /* protects the pool data */ pthread_cond_t pool_busycv; /* synchronization in pool_queue */ pthread_cond_t pool_workcv; /* synchronization with workers */ pthread_cond_t pool_waitcv; /* synchronization in pool_wait() */ active_t *pool_active; /* list of threads performing work */ job_t *pool_head; /* head of FIFO job queue */ job_t *pool_tail; /* tail of FIFO job queue */ pthread_attr_t pool_attr; /* attributes of the workers */ int pool_flags; /* see below */ uint_t pool_linger; /* seconds before idle workers exit */ int pool_minimum; /* minimum number of worker threads */ int pool_maximum; /* maximum number of worker threads */ int pool_nthreads; /* current number of worker threads */ int pool_idle; /* number of idle workers */ }; /* pool_flags */ #define POOL_WAIT 0x01 /* waiting in thr_pool_wait() */ #define POOL_DESTROY 0x02 /* pool is being destroyed */ /* the list of all created and not yet destroyed thread pools */ static thr_pool_t *thr_pools = NULL; /* protects thr_pools */ static pthread_mutex_t thr_pool_lock = PTHREAD_MUTEX_INITIALIZER; /* set of all signals */ static sigset_t fillset; static void *worker_thread(void *); static int create_worker(thr_pool_t *pool) { sigset_t oset; int error; (void) pthread_sigmask(SIG_SETMASK, &fillset, &oset); error = pthread_create(NULL, &pool->pool_attr, worker_thread, pool); (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); return (error); } /* * Worker thread is terminating. Possible reasons: * - excess idle thread is terminating because there is no work. * - thread was cancelled (pool is being destroyed). * - the job function called pthread_exit(). * In the last case, create another worker thread * if necessary to keep the pool populated. */ static void worker_cleanup(thr_pool_t *pool) { --pool->pool_nthreads; if (pool->pool_flags & POOL_DESTROY) { if (pool->pool_nthreads == 0) (void) pthread_cond_broadcast(&pool->pool_busycv); } else if (pool->pool_head != NULL && pool->pool_nthreads < pool->pool_maximum && create_worker(pool) == 0) { pool->pool_nthreads++; } (void) pthread_mutex_unlock(&pool->pool_mutex); } static void notify_waiters(thr_pool_t *pool) { if (pool->pool_head == NULL && pool->pool_active == NULL) { pool->pool_flags &= ~POOL_WAIT; (void) pthread_cond_broadcast(&pool->pool_waitcv); } } /* * Called by a worker thread on return from a job. */ static void job_cleanup(thr_pool_t *pool) { pthread_t my_tid = pthread_self(); active_t *activep; active_t **activepp; (void) pthread_mutex_lock(&pool->pool_mutex); for (activepp = &pool->pool_active; (activep = *activepp) != NULL; activepp = &activep->active_next) { if (activep->active_tid == my_tid) { *activepp = activep->active_next; break; } } if (pool->pool_flags & POOL_WAIT) notify_waiters(pool); } static void * worker_thread(void *arg) { thr_pool_t *pool = (thr_pool_t *)arg; int timedout; job_t *job; void *(*func)(void *); active_t active; timestruc_t ts; /* * This is the worker's main loop. It will only be left * if a timeout occurs or if the pool is being destroyed. */ (void) pthread_mutex_lock(&pool->pool_mutex); pthread_cleanup_push(worker_cleanup, pool); active.active_tid = pthread_self(); for (;;) { /* * We don't know what this thread was doing during * its last job, so we reset its signal mask and * cancellation state back to the initial values. */ (void) pthread_sigmask(SIG_SETMASK, &fillset, NULL); (void) pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL); (void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); timedout = 0; pool->pool_idle++; if (pool->pool_flags & POOL_WAIT) notify_waiters(pool); while (pool->pool_head == NULL && !(pool->pool_flags & POOL_DESTROY)) { if (pool->pool_nthreads <= pool->pool_minimum) { (void) pthread_cond_wait(&pool->pool_workcv, &pool->pool_mutex); } else { (void) clock_gettime(CLOCK_REALTIME, &ts); ts.tv_sec += pool->pool_linger; if (pool->pool_linger == 0 || pthread_cond_timedwait(&pool->pool_workcv, &pool->pool_mutex, &ts) == ETIMEDOUT) { timedout = 1; break; } } } pool->pool_idle--; if (pool->pool_flags & POOL_DESTROY) break; if ((job = pool->pool_head) != NULL) { timedout = 0; func = job->job_func; arg = job->job_arg; pool->pool_head = job->job_next; if (job == pool->pool_tail) pool->pool_tail = NULL; active.active_next = pool->pool_active; pool->pool_active = &active; (void) pthread_mutex_unlock(&pool->pool_mutex); pthread_cleanup_push(job_cleanup, pool); free(job); /* * Call the specified job function. */ (void) func(arg); /* * If the job function calls pthread_exit(), the thread * calls job_cleanup(pool) and worker_cleanup(pool); * the integrity of the pool is thereby maintained. */ pthread_cleanup_pop(1); /* job_cleanup(pool) */ } if (timedout && pool->pool_nthreads > pool->pool_minimum) { /* * We timed out and there is no work to be done * and the number of workers exceeds the minimum. * Exit now to reduce the size of the pool. */ break; } } pthread_cleanup_pop(1); /* worker_cleanup(pool) */ return (NULL); } static void clone_attributes(pthread_attr_t *new_attr, pthread_attr_t *old_attr) { struct sched_param param; void *addr; size_t size; int value; (void) pthread_attr_init(new_attr); if (old_attr != NULL) { (void) pthread_attr_getstack(old_attr, &addr, &size); /* don't allow a non-NULL thread stack address */ (void) pthread_attr_setstack(new_attr, NULL, size); (void) pthread_attr_getscope(old_attr, &value); (void) pthread_attr_setscope(new_attr, value); (void) pthread_attr_getinheritsched(old_attr, &value); (void) pthread_attr_setinheritsched(new_attr, value); (void) pthread_attr_getschedpolicy(old_attr, &value); (void) pthread_attr_setschedpolicy(new_attr, value); (void) pthread_attr_getschedparam(old_attr, ¶m); (void) pthread_attr_setschedparam(new_attr, ¶m); (void) pthread_attr_getguardsize(old_attr, &size); (void) pthread_attr_setguardsize(new_attr, size); } /* make all pool threads be detached threads */ (void) pthread_attr_setdetachstate(new_attr, PTHREAD_CREATE_DETACHED); } thr_pool_t * thr_pool_create(uint_t min_threads, uint_t max_threads, uint_t linger, pthread_attr_t *attr) { thr_pool_t *pool; (void) sigfillset(&fillset); if (min_threads > max_threads || max_threads < 1) { errno = EINVAL; return (NULL); } if ((pool = malloc(sizeof (*pool))) == NULL) { errno = ENOMEM; return (NULL); } (void) pthread_mutex_init(&pool->pool_mutex, NULL); (void) pthread_cond_init(&pool->pool_busycv, NULL); (void) pthread_cond_init(&pool->pool_workcv, NULL); (void) pthread_cond_init(&pool->pool_waitcv, NULL); pool->pool_active = NULL; pool->pool_head = NULL; pool->pool_tail = NULL; pool->pool_flags = 0; pool->pool_linger = linger; pool->pool_minimum = min_threads; pool->pool_maximum = max_threads; pool->pool_nthreads = 0; pool->pool_idle = 0; /* * We cannot just copy the attribute pointer. * We need to initialize a new pthread_attr_t structure using * the values from the caller-supplied attribute structure. * If the attribute pointer is NULL, we need to initialize * the new pthread_attr_t structure with default values. */ clone_attributes(&pool->pool_attr, attr); /* insert into the global list of all thread pools */ (void) pthread_mutex_lock(&thr_pool_lock); if (thr_pools == NULL) { pool->pool_forw = pool; pool->pool_back = pool; thr_pools = pool; } else { thr_pools->pool_back->pool_forw = pool; pool->pool_forw = thr_pools; pool->pool_back = thr_pools->pool_back; thr_pools->pool_back = pool; } (void) pthread_mutex_unlock(&thr_pool_lock); return (pool); } int thr_pool_queue(thr_pool_t *pool, void *(*func)(void *), void *arg) { job_t *job; if ((job = malloc(sizeof (*job))) == NULL) { errno = ENOMEM; return (-1); } job->job_next = NULL; job->job_func = func; job->job_arg = arg; (void) pthread_mutex_lock(&pool->pool_mutex); if (pool->pool_head == NULL) pool->pool_head = job; else pool->pool_tail->job_next = job; pool->pool_tail = job; if (pool->pool_idle > 0) (void) pthread_cond_signal(&pool->pool_workcv); else if (pool->pool_nthreads < pool->pool_maximum && create_worker(pool) == 0) pool->pool_nthreads++; (void) pthread_mutex_unlock(&pool->pool_mutex); return (0); } void thr_pool_wait(thr_pool_t *pool) { (void) pthread_mutex_lock(&pool->pool_mutex); pthread_cleanup_push(pthread_mutex_unlock, &pool->pool_mutex); while (pool->pool_head != NULL || pool->pool_active != NULL) { pool->pool_flags |= POOL_WAIT; (void) pthread_cond_wait(&pool->pool_waitcv, &pool->pool_mutex); } pthread_cleanup_pop(1); /* pthread_mutex_unlock(&pool->pool_mutex); */ } void thr_pool_destroy(thr_pool_t *pool) { active_t *activep; job_t *job; (void) pthread_mutex_lock(&pool->pool_mutex); pthread_cleanup_push(pthread_mutex_unlock, &pool->pool_mutex); /* mark the pool as being destroyed; wakeup idle workers */ pool->pool_flags |= POOL_DESTROY; (void) pthread_cond_broadcast(&pool->pool_workcv); /* cancel all active workers */ for (activep = pool->pool_active; activep != NULL; activep = activep->active_next) (void) pthread_cancel(activep->active_tid); /* wait for all active workers to finish */ while (pool->pool_active != NULL) { pool->pool_flags |= POOL_WAIT; (void) pthread_cond_wait(&pool->pool_waitcv, &pool->pool_mutex); } /* the last worker to terminate will wake us up */ while (pool->pool_nthreads != 0) (void) pthread_cond_wait(&pool->pool_busycv, &pool->pool_mutex); pthread_cleanup_pop(1); /* pthread_mutex_unlock(&pool->pool_mutex); */ /* * Unlink the pool from the global list of all pools. */ (void) pthread_mutex_lock(&thr_pool_lock); if (thr_pools == pool) thr_pools = pool->pool_forw; if (thr_pools == pool) thr_pools = NULL; else { pool->pool_back->pool_forw = pool->pool_forw; pool->pool_forw->pool_back = pool->pool_back; } (void) pthread_mutex_unlock(&thr_pool_lock); /* * There should be no pending jobs, but just in case... */ for (job = pool->pool_head; job != NULL; job = pool->pool_head) { pool->pool_head = job->job_next; free(job); } (void) pthread_attr_destroy(&pool->pool_attr); free(pool); }