Multithreaded Programming Guide

About the Thread Pool Example

Thread Pool Functions

The thr_pool.h header file declares the following functional interfaces.

thr_pool_create()

Creates a thread pool. More than one pool can be created.

typedef struct thr_pool thr_pool_t;	/* opaque to clients */

thr_pool_t *thr_pool_create(uint_t min_threads, uint_t max_threads,
                uint_t linger, pthread_attr_t *attr);
min_threads

Minimum number of threads in the pool.

max_threads

Maximum number of threads in the pool.

linger

Number of seconds that idle threads can linger before exiting, when no tasks come in. The idle threads can only exit if they are extra threads, above the number of minimum threads.

attr

Attributes of all worker threads. This can be NULL.

On error, thr_pool_create() returns NULL with errno set to the error code.

thr_pool_queue()

Enqueue a work request or task to the thread pool job queue.

int  thr_pool_queue(thr_pool_t *pool, void *(*func)(void *), void *arg);
pool

A thread pool identifier returned from thr_pool_create().

func

The task function to be called.

arg

The only argument passed to the task function.

On error, thr_pool_queue() returns -1 with errno set to the error code.

Notice the similarity of the func and arg arguments to the start_routine and arg arguments of pthread_create() shown in pthread_create Syntax. The thr_pool_queue() function can be used as a replacement for pthread_create() in existing applications. Note that if you use thr_pool_queue() instead of pthread_create(), you cannot use pthread_join() to wait for the task to complete.

thr_pool_wait()

Wait for all queued jobs to complete in the thread pool.

void  thr_pool_wait(thr_pool_t *pool);

pool is a thread pool identifier that is returned from thr_pool_create().

thr_pool_destroy()

Cancel all queued jobs and destroy the pool. Worker threads that are actively processing tasks are cancelled.

extern	void	thr_pool_destroy(thr_pool_t *pool);

pool is a thread pool identifier that is returned from thr_pool_create().

Thread Pool Code Examples

This section shows the code for the thread pool example:

thr_pool.h File

This file declares the functions used in the example.


Example A–1 thr_pool.h

/*
 * Declarations for the clients of a thread pool.
 */

#include <pthread.h>

/*
 * The thr_pool_t type is opaque to the client.
 * It is created by thr_pool_create() and must be passed
 * unmodified to the remainder of the interfaces.
 */
typedef	struct thr_pool	thr_pool_t;

/*
 * Create a thread pool.
 *	min_threads:	the minimum number of threads kept in the pool,
 *			always available to perform work requests.
 *	max_threads:	the maximum number of threads that can be
 *			in the pool, performing work requests.
 *	linger:		the number of seconds excess idle worker threads
 *			(greater than min_threads) linger before exiting.
 *	attr:		attributes of all worker threads (can be NULL);
 *			can be destroyed after calling thr_pool_create().
 * On error, thr_pool_create() returns NULL with errno set to the error code.
 */
extern	thr_pool_t	*thr_pool_create(uint_t min_threads, uint_t max_threads,
				uint_t linger, pthread_attr_t *attr);

/*
 * Enqueue a work request to the thread pool job queue.
 * If there are idle worker threads, awaken one to perform the job.
 * Else if the maximum number of workers has not been reached,
 * create a new worker thread to perform the job.
 * Else just return after adding the job to the queue;
 * an existing worker thread will perform the job when
 * it finishes the job it is currently performing.
 *
 * The job is performed as if a new detached thread were created for it:
 *	pthread_create(NULL, attr, void *(*func)(void *), void *arg);
 *
 * On error, thr_pool_queue() returns -1 with errno set to the error code.
 */
extern	int	thr_pool_queue(thr_pool_t *pool,
			void *(*func)(void *), void *arg);

/*
 * Wait for all queued jobs to complete.
 */
extern	void	thr_pool_wait(thr_pool_t *pool);

/*
 * Cancel all queued jobs and destroy the pool.
 */
extern	void	thr_pool_destroy(thr_pool_t *pool);

thr_pool.c File

This file implements the thread pool.


Example A–2 thr_pool.c

/*
 * Thread pool implementation.
 * See <thr_pool.h> for interface declarations.
 */

#if !defined(_REENTRANT)
#define	_REENTRANT
#endif

#include "thr_pool.h"
#include <stdlib.h>
#include <signal.h>
#include <errno.h>

/*
 * FIFO queued job
 */
typedef struct job job_t;
struct job {
	job_t	*job_next;		/* linked list of jobs */
	void	*(*job_func)(void *);	/* function to call */
	void	*job_arg;		/* its argument */
};

/*
 * List of active worker threads, linked through their stacks.
 */
typedef struct active active_t;
struct active {
	active_t	*active_next;	/* linked list of threads */
	pthread_t	active_tid;	/* active thread id */
};

/*
 * The thread pool, opaque to the clients.
 */
struct thr_pool {
	thr_pool_t	*pool_forw;	/* circular linked list */
	thr_pool_t	*pool_back;	/* of all thread pools */
	pthread_mutex_t	pool_mutex;	/* protects the pool data */
	pthread_cond_t	pool_busycv;	/* synchronization in pool_queue */
	pthread_cond_t	pool_workcv;	/* synchronization with workers */
	pthread_cond_t	pool_waitcv;	/* synchronization in pool_wait() */
	active_t	*pool_active;	/* list of threads performing work */
	job_t		*pool_head;	/* head of FIFO job queue */
	job_t		*pool_tail;	/* tail of FIFO job queue */
	pthread_attr_t	pool_attr;	/* attributes of the workers */
	int		pool_flags;	/* see below */
	uint_t		pool_linger;	/* seconds before idle workers exit */
	int		pool_minimum;	/* minimum number of worker threads */
	int		pool_maximum;	/* maximum number of worker threads */
	int		pool_nthreads;	/* current number of worker threads */
	int		pool_idle;	/* number of idle workers */
};

/* pool_flags */
#define	POOL_WAIT	0x01		/* waiting in thr_pool_wait() */
#define	POOL_DESTROY	0x02		/* pool is being destroyed */

/* the list of all created and not yet destroyed thread pools */
static thr_pool_t *thr_pools = NULL;

/* protects thr_pools */
static pthread_mutex_t thr_pool_lock = PTHREAD_MUTEX_INITIALIZER;

/* set of all signals */
static sigset_t fillset;

static void *worker_thread(void *);

static int
create_worker(thr_pool_t *pool)
{
	sigset_t oset;
	int error;

	(void) pthread_sigmask(SIG_SETMASK, &fillset, &oset);
	error = pthread_create(NULL, &pool->pool_attr, worker_thread, pool);
	(void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
	return (error);
}

/*
 * Worker thread is terminating.  Possible reasons:
 * - excess idle thread is terminating because there is no work.
 * - thread was cancelled (pool is being destroyed).
 * - the job function called pthread_exit().
 * In the last case, create another worker thread
 * if necessary to keep the pool populated.
 */
static void
worker_cleanup(thr_pool_t *pool)
{
	--pool->pool_nthreads;
	if (pool->pool_flags & POOL_DESTROY) {
		if (pool->pool_nthreads == 0)
			(void) pthread_cond_broadcast(&pool->pool_busycv);
	} else if (pool->pool_head != NULL &&
	    pool->pool_nthreads < pool->pool_maximum &&
	    create_worker(pool) == 0) {
		pool->pool_nthreads++;
	}
	(void) pthread_mutex_unlock(&pool->pool_mutex);
}

static void
notify_waiters(thr_pool_t *pool)
{
	if (pool->pool_head == NULL && pool->pool_active == NULL) {
		pool->pool_flags &= ~POOL_WAIT;
		(void) pthread_cond_broadcast(&pool->pool_waitcv);
	}
}

/*
 * Called by a worker thread on return from a job.
 */
static void
job_cleanup(thr_pool_t *pool)
{
	pthread_t my_tid = pthread_self();
	active_t *activep;
	active_t **activepp;

	(void) pthread_mutex_lock(&pool->pool_mutex);
	for (activepp = &pool->pool_active;
	    (activep = *activepp) != NULL;
	    activepp = &activep->active_next) {
		if (activep->active_tid == my_tid) {
			*activepp = activep->active_next;
			break;
		}
	}
	if (pool->pool_flags & POOL_WAIT)
		notify_waiters(pool);
}

static void *
worker_thread(void *arg)
{
	thr_pool_t *pool = (thr_pool_t *)arg;
	int timedout;
	job_t *job;
	void *(*func)(void *);
	active_t active;
	timestruc_t ts;

	/*
	 * This is the worker's main loop.  It will only be left
	 * if a timeout occurs or if the pool is being destroyed.
	 */
	(void) pthread_mutex_lock(&pool->pool_mutex);
	pthread_cleanup_push(worker_cleanup, pool);
	active.active_tid = pthread_self();
	for (;;) {
		/*
		 * We don't know what this thread was doing during
		 * its last job, so we reset its signal mask and
		 * cancellation state back to the initial values.
		 */
		(void) pthread_sigmask(SIG_SETMASK, &fillset, NULL);
		(void) pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
		(void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);

		timedout = 0;
		pool->pool_idle++;
		if (pool->pool_flags & POOL_WAIT)
			notify_waiters(pool);
		while (pool->pool_head == NULL &&
		    !(pool->pool_flags & POOL_DESTROY)) {
			if (pool->pool_nthreads <= pool->pool_minimum) {
				(void) pthread_cond_wait(&pool->pool_workcv,
				    &pool->pool_mutex);
			} else {
				(void) clock_gettime(CLOCK_REALTIME, &ts);
				ts.tv_sec += pool->pool_linger;
				if (pool->pool_linger == 0 ||
				    pthread_cond_timedwait(&pool->pool_workcv,
				    &pool->pool_mutex, &ts) == ETIMEDOUT) {
					timedout = 1;
					break;
				}
			}
		}
		pool->pool_idle--;
		if (pool->pool_flags & POOL_DESTROY)
			break;
		if ((job = pool->pool_head) != NULL) {
			timedout = 0;
			func = job->job_func;
			arg = job->job_arg;
			pool->pool_head = job->job_next;
			if (job == pool->pool_tail)
				pool->pool_tail = NULL;
			active.active_next = pool->pool_active;
			pool->pool_active = &active;
			(void) pthread_mutex_unlock(&pool->pool_mutex);
			pthread_cleanup_push(job_cleanup, pool);
			free(job);
			/*
			 * Call the specified job function.
			 */
			(void) func(arg);
			/*
			 * If the job function calls pthread_exit(), the thread
			 * calls job_cleanup(pool) and worker_cleanup(pool);
			 * the integrity of the pool is thereby maintained.
			 */
			pthread_cleanup_pop(1);	/* job_cleanup(pool) */
		}
		if (timedout && pool->pool_nthreads > pool->pool_minimum) {
			/*
			 * We timed out and there is no work to be done
			 * and the number of workers exceeds the minimum.
			 * Exit now to reduce the size of the pool.
			 */
			break;
		}
	}
	pthread_cleanup_pop(1);	/* worker_cleanup(pool) */
	return (NULL);
}

static void
clone_attributes(pthread_attr_t *new_attr, pthread_attr_t *old_attr)
{
	struct sched_param param;
	void *addr;
	size_t size;
	int value;

	(void) pthread_attr_init(new_attr);

	if (old_attr != NULL) {
		(void) pthread_attr_getstack(old_attr, &addr, &size);
		/* don't allow a non-NULL thread stack address */
		(void) pthread_attr_setstack(new_attr, NULL, size);

		(void) pthread_attr_getscope(old_attr, &value);
		(void) pthread_attr_setscope(new_attr, value);

		(void) pthread_attr_getinheritsched(old_attr, &value);
		(void) pthread_attr_setinheritsched(new_attr, value);

		(void) pthread_attr_getschedpolicy(old_attr, &value);
		(void) pthread_attr_setschedpolicy(new_attr, value);

		(void) pthread_attr_getschedparam(old_attr, &param);
		(void) pthread_attr_setschedparam(new_attr, &param);

		(void) pthread_attr_getguardsize(old_attr, &size);
		(void) pthread_attr_setguardsize(new_attr, size);
	}

	/* make all pool threads be detached threads */
	(void) pthread_attr_setdetachstate(new_attr, PTHREAD_CREATE_DETACHED);
}

thr_pool_t *
thr_pool_create(uint_t min_threads, uint_t max_threads, uint_t linger,
	pthread_attr_t *attr)
{
	thr_pool_t	*pool;

	(void) sigfillset(&fillset);

	if (min_threads > max_threads || max_threads < 1) {
		errno = EINVAL;
		return (NULL);
	}

	if ((pool = malloc(sizeof (*pool))) == NULL) {
		errno = ENOMEM;
		return (NULL);
	}
	(void) pthread_mutex_init(&pool->pool_mutex, NULL);
	(void) pthread_cond_init(&pool->pool_busycv, NULL);
	(void) pthread_cond_init(&pool->pool_workcv, NULL);
	(void) pthread_cond_init(&pool->pool_waitcv, NULL);
	pool->pool_active = NULL;
	pool->pool_head = NULL;
	pool->pool_tail = NULL;
	pool->pool_flags = 0;
	pool->pool_linger = linger;
	pool->pool_minimum = min_threads;
	pool->pool_maximum = max_threads;
	pool->pool_nthreads = 0;
	pool->pool_idle = 0;

	/*
	 * We cannot just copy the attribute pointer.
	 * We need to initialize a new pthread_attr_t structure using
	 * the values from the caller-supplied attribute structure.
	 * If the attribute pointer is NULL, we need to initialize
	 * the new pthread_attr_t structure with default values.
	 */
	clone_attributes(&pool->pool_attr, attr);

	/* insert into the global list of all thread pools */
	(void) pthread_mutex_lock(&thr_pool_lock);
	if (thr_pools == NULL) {
		pool->pool_forw = pool;
		pool->pool_back = pool;
		thr_pools = pool;
	} else {
		thr_pools->pool_back->pool_forw = pool;
		pool->pool_forw = thr_pools;
		pool->pool_back = thr_pools->pool_back;
		thr_pools->pool_back = pool;
	}
	(void) pthread_mutex_unlock(&thr_pool_lock);

	return (pool);
}

int
thr_pool_queue(thr_pool_t *pool, void *(*func)(void *), void *arg)
{
	job_t *job;

	if ((job = malloc(sizeof (*job))) == NULL) {
		errno = ENOMEM;
		return (-1);
	}
	job->job_next = NULL;
	job->job_func = func;
	job->job_arg = arg;

	(void) pthread_mutex_lock(&pool->pool_mutex);

	if (pool->pool_head == NULL)
		pool->pool_head = job;
	else
		pool->pool_tail->job_next = job;
	pool->pool_tail = job;

	if (pool->pool_idle > 0)
		(void) pthread_cond_signal(&pool->pool_workcv);
	else if (pool->pool_nthreads < pool->pool_maximum &&
	    create_worker(pool) == 0)
		pool->pool_nthreads++;

	(void) pthread_mutex_unlock(&pool->pool_mutex);
	return (0);
}

void
thr_pool_wait(thr_pool_t *pool)
{
	(void) pthread_mutex_lock(&pool->pool_mutex);
	pthread_cleanup_push(pthread_mutex_unlock, &pool->pool_mutex);
	while (pool->pool_head != NULL || pool->pool_active != NULL) {
		pool->pool_flags |= POOL_WAIT;
		(void) pthread_cond_wait(&pool->pool_waitcv, &pool->pool_mutex);
	}
	pthread_cleanup_pop(1);	/* pthread_mutex_unlock(&pool->pool_mutex); */
}

void
thr_pool_destroy(thr_pool_t *pool)
{
	active_t *activep;
	job_t *job;

	(void) pthread_mutex_lock(&pool->pool_mutex);
	pthread_cleanup_push(pthread_mutex_unlock, &pool->pool_mutex);

	/* mark the pool as being destroyed; wakeup idle workers */
	pool->pool_flags |= POOL_DESTROY;
	(void) pthread_cond_broadcast(&pool->pool_workcv);

	/* cancel all active workers */
	for (activep = pool->pool_active;
	    activep != NULL;
	    activep = activep->active_next)
		(void) pthread_cancel(activep->active_tid);

	/* wait for all active workers to finish */
	while (pool->pool_active != NULL) {
		pool->pool_flags |= POOL_WAIT;
		(void) pthread_cond_wait(&pool->pool_waitcv, &pool->pool_mutex);
	}

	/* the last worker to terminate will wake us up */
	while (pool->pool_nthreads != 0)
		(void) pthread_cond_wait(&pool->pool_busycv, &pool->pool_mutex);

	pthread_cleanup_pop(1);	/* pthread_mutex_unlock(&pool->pool_mutex); */

	/*
	 * Unlink the pool from the global list of all pools.
	 */
	(void) pthread_mutex_lock(&thr_pool_lock);
	if (thr_pools == pool)
		thr_pools = pool->pool_forw;
	if (thr_pools == pool)
		thr_pools = NULL;
	else {
		pool->pool_back->pool_forw = pool->pool_forw;
		pool->pool_forw->pool_back = pool->pool_back;
	}
	(void) pthread_mutex_unlock(&thr_pool_lock);

	/*
	 * There should be no pending jobs, but just in case...
	 */
	for (job = pool->pool_head; job != NULL; job = pool->pool_head) {
		pool->pool_head = job->job_next;
		free(job);
	}
	(void) pthread_attr_destroy(&pool->pool_attr);
	free(pool);
}