マルチスレッドのプログラミング

付録 A 拡張の例: スレッドプールの実装

この付録は、マルチスレッドプログラミングのインタフェースの有用なパッケージであるスレッドプールの実装例 を示しています。

スレッドプールについて

スレッドは、アプリケーションで多くの処理を一度に実行するための有効なパラダイムを提供します。 実行すべきことがある場合は、その実行のためのスレッドを作成してください。スレッドを使用すると、アプリケーションのロジックを簡略化したり、複数のプロセッサを利用したりすることが可能ですが、作成するスレッドの数が多すぎると、リソースの競合によって全体的なアプリケーション性能の問題が発生することがあります。アプリケーションが、リソースの競合 (たとえば、相互排他ロックの処理) のためにその時間の多くを消費して、有効な作業を実際に実行する時間が少なくなる可能性があります。また、プロセスの作成より少ないとは言え、スレッドの作成も負荷になります。少量の作業を実行するためにスレッドを作成することは無駄です。

スレッドプールは、要求に応じて作業を実行する一連の匿名スレッドを管理します。スレッドは、すぐには終了しません。いずれかのスレッドがタスクを完了すると、そのスレッドはアイドル状態になり、別のタスクにディスパッチされるように準備されます。スレッドの作成と削除のオーバーヘッドは、プール内のアクティブなワークスレッドの数だけの作成と削除に制限されます。アプリケーションにはワークスレッドの数より多くのタスクが存在する場合があり、これが通常の状態です。リソースの競合を軽減することによって、プロセッサの利用率やスループットが改善されます。送信されたタスクは順番に、通常、タスクごとにスレッドを作成することによって実行される場合より高速に処理されます。

スレッドプールの例について

スレッドプールの関数

thr_pool.h ヘッダーファイルは、次の関数インタフェースを宣言します。

thr_pool_create()

スレッドプールを作成します。複数のプールを作成できます。

typedef struct thr_pool thr_pool_t;	/* クライアントには見えない */

thr_pool_t *thr_pool_create(uint_t min_threads, uint_t max_threads,
                uint_t linger, pthread_attr_t *attr);
min_threads

プール内のスレッドの最小数。

max_threads

プール内のスレッドの最大数。

linger

タスクが入力されないときに、アイドル状態のスレッドが終了前に存続できる秒数。アイドル状態のスレッドが終了できるのは、スレッドの最小数を超える余分なスレッドが存在する場合だけです。

attr

すべてのワークスレッドの属性。これは NULL の場合があります。

エラーが発生した場合、thr_pool_create() は、errno にエラーコードを設定して NULL を返します。

thr_pool_queue()

作業要求またはタスクをスレッドプールのジョブ待ち行列に入れます。

int  thr_pool_queue(thr_pool_t *pool, void *(*func)(void *), void *arg);
pool

thr_pool_create() から返されたスレッドプール識別子。

func

呼び出されるタスク関数。

arg

タスク関数に渡される唯一の引数。

エラーが発生した場合、thr_pool_queue() は、errno にエラーコードを設定して -1 を返します。

func 引数と arg 引数が、pthread_create の構文」に示されている pthread_create()start_routine 引数と arg 引数に似ていることに注意してください。thr_pool_queue() 関数は、既存のアプリケーション内の pthread_create() の置き換えとして使用できます。ただし、pthread_create() の代わりに thr_pool_queue() を使用する場合は、pthread_join() を使用してタスクの完了を待つことはできません。

thr_pool_wait()

スレッドプール内で待ち行列に入れられたすべてのジョブの完了を待ちます。

void thr_pool_wait(thr_pool_t *pool);

pool は、thr_pool_create() から返されたスレッドプール識別子です。

thr_pool_destroy()

待ち行列に入れられたすべてのジョブを取り消し、プールを削除します。アクティブにタスクを処理しているワークスレッドは取り消されます。

extern void thr_pool_destroy(thr_pool_t *pool); 

pool は、thr_pool_create() から返されたスレッドプール識別子です。

スレッドプールのコード例

この節では、スレッドプールのコード例を示します。

thr_pool.h ファイル

このファイルは、この例で使用されている関数を宣言します。


例 A–1 thr_pool.h

/*
 * Declarations for the clients of a thread pool.
 */

#include <pthread.h>

/*
 * The thr_pool_t type is opaque to the client.
 * It is created by thr_pool_create() and must be passed
 * unmodified to the remainder of the interfaces.
 */
typedef	struct thr_pool	thr_pool_t;

/*
 * Create a thread pool.
 *	min_threads:	the minimum number of threads kept in the pool,
 *			always available to perform work requests.
 *	max_threads:	the maximum number of threads that can be
 *			in the pool, performing work requests.
 *	linger:		the number of seconds excess idle worker threads
 *			(greater than min_threads) linger before exiting.
 *	attr:		attributes of all worker threads (can be NULL);
 *			can be destroyed after calling thr_pool_create().
 * On error, thr_pool_create() returns NULL with errno set to the error code.
 */
extern	thr_pool_t	*thr_pool_create(uint_t min_threads, uint_t max_threads,
				uint_t linger, pthread_attr_t *attr);

/*
 * Enqueue a work request to the thread pool job queue.
 * If there are idle worker threads, awaken one to perform the job.
 * Else if the maximum number of workers has not been reached,
 * create a new worker thread to perform the job.
 * Else just return after adding the job to the queue;
 * an existing worker thread will perform the job when
 * it finishes the job it is currently performing.
 *
 * The job is performed as if a new detached thread were created for it:
 *	pthread_create(NULL, attr, void *(*func)(void *), void *arg);
 *
 * On error, thr_pool_queue() returns -1 with errno set to the error code.
 */
extern	int	thr_pool_queue(thr_pool_t *pool,
			void *(*func)(void *), void *arg);

/*
 * Wait for all queued jobs to complete.
 */
extern	void	thr_pool_wait(thr_pool_t *pool);

/*
 * Cancel all queued jobs and destroy the pool.
 */
extern	void	thr_pool_destroy(thr_pool_t *pool);

thr_pool.c ファイル

このファイルは、スレッドプールを実装します。


例 A–2 thr_pool.c

/*
 * Thread pool implementation.
 * See <thr_pool.h> for interface declarations.
 */

#if !defined(_REENTRANT)
#define	_REENTRANT
#endif

#include "thr_pool.h"
#include <stdlib.h>
#include <signal.h>
#include <errno.h>

/*
 * FIFO queued job
 */
typedef struct job job_t;
struct job {
	job_t	*job_next;		/* linked list of jobs */
	void	*(*job_func)(void *);	/* function to call */
	void	*job_arg;		/* its argument */
};

/*
 * List of active worker threads, linked through their stacks.
 */
typedef struct active active_t;
struct active {
	active_t	*active_next;	/* linked list of threads */
	pthread_t	active_tid;	/* active thread id */
};

/*
 * The thread pool, opaque to the clients.
 */
struct thr_pool {
	thr_pool_t	*pool_forw;	/* circular linked list */
	thr_pool_t	*pool_back;	/* of all thread pools */
	pthread_mutex_t	pool_mutex;	/* protects the pool data */
	pthread_cond_t	pool_busycv;	/* synchronization in pool_queue */
	pthread_cond_t	pool_workcv;	/* synchronization with workers */
	pthread_cond_t	pool_waitcv;	/* synchronization in pool_wait() */
	active_t	*pool_active;	/* list of threads performing work */
	job_t		*pool_head;	/* head of FIFO job queue */
	job_t		*pool_tail;	/* tail of FIFO job queue */
	pthread_attr_t	pool_attr;	/* attributes of the workers */
	int		pool_flags;	/* see below */
	uint_t		pool_linger;	/* seconds before idle workers exit */
	int		pool_minimum;	/* minimum number of worker threads */
	int		pool_maximum;	/* maximum number of worker threads */
	int		pool_nthreads;	/* current number of worker threads */
	int		pool_idle;	/* number of idle workers */
};

/* pool_flags */
#define	POOL_WAIT	0x01		/* waiting in thr_pool_wait() */
#define	POOL_DESTROY	0x02		/* pool is being destroyed */

/* the list of all created and not yet destroyed thread pools */
static thr_pool_t *thr_pools = NULL;

/* protects thr_pools */
static pthread_mutex_t thr_pool_lock = PTHREAD_MUTEX_INITIALIZER;

/* set of all signals */
static sigset_t fillset;

static void *worker_thread(void *);

static int
create_worker(thr_pool_t *pool)
{
	sigset_t oset;
	int error;

	(void) pthread_sigmask(SIG_SETMASK, &fillset, &oset);
	error = pthread_create(NULL, &pool->pool_attr, worker_thread, pool);
	(void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
	return (error);
}

/*
 * Worker thread is terminating.  Possible reasons:
 * - excess idle thread is terminating because there is no work.
 * - thread was cancelled (pool is being destroyed).
 * - the job function called pthread_exit().
 * In the last case, create another worker thread
 * if necessary to keep the pool populated.
 */
static void
worker_cleanup(thr_pool_t *pool)
{
	--pool->pool_nthreads;
	if (pool->pool_flags & POOL_DESTROY) {
		if (pool->pool_nthreads == 0)
			(void) pthread_cond_broadcast(&pool->pool_busycv);
	} else if (pool->pool_head != NULL &&
	    pool->pool_nthreads < pool->pool_maximum &&
	    create_worker(pool) == 0) {
		pool->pool_nthreads++;
	}
	(void) pthread_mutex_unlock(&pool->pool_mutex);
}

static void
notify_waiters(thr_pool_t *pool)
{
	if (pool->pool_head == NULL && pool->pool_active == NULL) {
		pool->pool_flags &= ~POOL_WAIT;
		(void) pthread_cond_broadcast(&pool->pool_waitcv);
	}
}

/*
 * Called by a worker thread on return from a job.
 */
static void
job_cleanup(thr_pool_t *pool)
{
	pthread_t my_tid = pthread_self();
	active_t *activep;
	active_t **activepp;

	(void) pthread_mutex_lock(&pool->pool_mutex);
	for (activepp = &pool->pool_active;
	    (activep = *activepp) != NULL;
	    activepp = &activep->active_next) {
		if (activep->active_tid == my_tid) {
			*activepp = activep->active_next;
			break;
		}
	}
	if (pool->pool_flags & POOL_WAIT)
		notify_waiters(pool);
}

static void *
worker_thread(void *arg)
{
	thr_pool_t *pool = (thr_pool_t *)arg;
	int timedout;
	job_t *job;
	void *(*func)(void *);
	active_t active;
	timestruc_t ts;

	/*
	 * This is the worker's main loop.  It will only be left
	 * if a timeout occurs or if the pool is being destroyed.
	 */
	(void) pthread_mutex_lock(&pool->pool_mutex);
	pthread_cleanup_push(worker_cleanup, pool);
	active.active_tid = pthread_self();
	for (;;) {
		/*
		 * We don't know what this thread was doing during
		 * its last job, so we reset its signal mask and
		 * cancellation state back to the initial values.
		 */
		(void) pthread_sigmask(SIG_SETMASK, &fillset, NULL);
		(void) pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
		(void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);

		timedout = 0;
		pool->pool_idle++;
		if (pool->pool_flags & POOL_WAIT)
			notify_waiters(pool);
		while (pool->pool_head == NULL &&
		    !(pool->pool_flags & POOL_DESTROY)) {
			if (pool->pool_nthreads <= pool->pool_minimum) {
				(void) pthread_cond_wait(&pool->pool_workcv,
				    &pool->pool_mutex);
			} else {
				(void) clock_gettime(CLOCK_REALTIME, &ts);
				ts.tv_sec += pool->pool_linger;
				if (pool->pool_linger == 0 ||
				    pthread_cond_timedwait(&pool->pool_workcv,
				    &pool->pool_mutex, &ts) == ETIMEDOUT) {
					timedout = 1;
					break;
				}
			}
		}
		pool->pool_idle--;
		if (pool->pool_flags & POOL_DESTROY)
			break;
		if ((job = pool->pool_head) != NULL) {
			timedout = 0;
			func = job->job_func;
			arg = job->job_arg;
			pool->pool_head = job->job_next;
			if (job == pool->pool_tail)
				pool->pool_tail = NULL;
			active.active_next = pool->pool_active;
			pool->pool_active = &active;
			(void) pthread_mutex_unlock(&pool->pool_mutex);
			pthread_cleanup_push(job_cleanup, pool);
			free(job);
			/*
			 * Call the specified job function.
			 */
			(void) func(arg);
			/*
			 * If the job function calls pthread_exit(), the thread
			 * calls job_cleanup(pool) and worker_cleanup(pool);
			 * the integrity of the pool is thereby maintained.
			 */
			pthread_cleanup_pop(1);	/* job_cleanup(pool) */
		}
		if (timedout && pool->pool_nthreads > pool->pool_minimum) {
			/*
			 * We timed out and there is no work to be done
			 * and the number of workers exceeds the minimum.
			 * Exit now to reduce the size of the pool.
			 */
			break;
		}
	}
	pthread_cleanup_pop(1);	/* worker_cleanup(pool) */
	return (NULL);
}

static void
clone_attributes(pthread_attr_t *new_attr, pthread_attr_t *old_attr)
{
	struct sched_param param;
	void *addr;
	size_t size;
	int value;

	(void) pthread_attr_init(new_attr);

	if (old_attr != NULL) {
		(void) pthread_attr_getstack(old_attr, &addr, &size);
		/* don't allow a non-NULL thread stack address */
		(void) pthread_attr_setstack(new_attr, NULL, size);

		(void) pthread_attr_getscope(old_attr, &value);
		(void) pthread_attr_setscope(new_attr, value);

		(void) pthread_attr_getinheritsched(old_attr, &value);
		(void) pthread_attr_setinheritsched(new_attr, value);

		(void) pthread_attr_getschedpolicy(old_attr, &value);
		(void) pthread_attr_setschedpolicy(new_attr, value);

		(void) pthread_attr_getschedparam(old_attr, &param);
		(void) pthread_attr_setschedparam(new_attr, &param);

		(void) pthread_attr_getguardsize(old_attr, &size);
		(void) pthread_attr_setguardsize(new_attr, size);
	}

	/* make all pool threads be detached threads */
	(void) pthread_attr_setdetachstate(new_attr, PTHREAD_CREATE_DETACHED);
}

thr_pool_t *
thr_pool_create(uint_t min_threads, uint_t max_threads, uint_t linger,
	pthread_attr_t *attr)
{
	thr_pool_t	*pool;

	(void) sigfillset(&fillset);

	if (min_threads > max_threads || max_threads < 1) {
		errno = EINVAL;
		return (NULL);
	}

	if ((pool = malloc(sizeof (*pool))) == NULL) {
		errno = ENOMEM;
		return (NULL);
	}
	(void) pthread_mutex_init(&pool->pool_mutex, NULL);
	(void) pthread_cond_init(&pool->pool_busycv, NULL);
	(void) pthread_cond_init(&pool->pool_workcv, NULL);
	(void) pthread_cond_init(&pool->pool_waitcv, NULL);
	pool->pool_active = NULL;
	pool->pool_head = NULL;
	pool->pool_tail = NULL;
	pool->pool_flags = 0;
	pool->pool_linger = linger;
	pool->pool_minimum = min_threads;
	pool->pool_maximum = max_threads;
	pool->pool_nthreads = 0;
	pool->pool_idle = 0;

	/*
	 * We cannot just copy the attribute pointer.
	 * We need to initialize a new pthread_attr_t structure using
	 * the values from the caller-supplied attribute structure.
	 * If the attribute pointer is NULL, we need to initialize
	 * the new pthread_attr_t structure with default values.
	 */
	clone_attributes(&pool->pool_attr, attr);

	/* insert into the global list of all thread pools */
	(void) pthread_mutex_lock(&thr_pool_lock);
	if (thr_pools == NULL) {
		pool->pool_forw = pool;
		pool->pool_back = pool;
		thr_pools = pool;
	} else {
		thr_pools->pool_back->pool_forw = pool;
		pool->pool_forw = thr_pools;
		pool->pool_back = thr_pools->pool_back;
		thr_pools->pool_back = pool;
	}
	(void) pthread_mutex_unlock(&thr_pool_lock);

	return (pool);
}

int
thr_pool_queue(thr_pool_t *pool, void *(*func)(void *), void *arg)
{
	job_t *job;

	if ((job = malloc(sizeof (*job))) == NULL) {
		errno = ENOMEM;
		return (-1);
	}
	job->job_next = NULL;
	job->job_func = func;
	job->job_arg = arg;

	(void) pthread_mutex_lock(&pool->pool_mutex);

	if (pool->pool_head == NULL)
		pool->pool_head = job;
	else
		pool->pool_tail->job_next = job;
	pool->pool_tail = job;

	if (pool->pool_idle > 0)
		(void) pthread_cond_signal(&pool->pool_workcv);
	else if (pool->pool_nthreads < pool->pool_maximum &&
	    create_worker(pool) == 0)
		pool->pool_nthreads++;

	(void) pthread_mutex_unlock(&pool->pool_mutex);
	return (0);
}

void
thr_pool_wait(thr_pool_t *pool)
{
	(void) pthread_mutex_lock(&pool->pool_mutex);
	pthread_cleanup_push(pthread_mutex_unlock, &pool->pool_mutex);
	while (pool->pool_head != NULL || pool->pool_active != NULL) {
		pool->pool_flags |= POOL_WAIT;
		(void) pthread_cond_wait(&pool->pool_waitcv, &pool->pool_mutex);
	}
	pthread_cleanup_pop(1);	/* pthread_mutex_unlock(&pool->pool_mutex); */
}

void
thr_pool_destroy(thr_pool_t *pool)
{
	active_t *activep;
	job_t *job;

	(void) pthread_mutex_lock(&pool->pool_mutex);
	pthread_cleanup_push(pthread_mutex_unlock, &pool->pool_mutex);

	/* mark the pool as being destroyed; wakeup idle workers */
	pool->pool_flags |= POOL_DESTROY;
	(void) pthread_cond_broadcast(&pool->pool_workcv);

	/* cancel all active workers */
	for (activep = pool->pool_active;
	    activep != NULL;
	    activep = activep->active_next)
		(void) pthread_cancel(activep->active_tid);

	/* wait for all active workers to finish */
	while (pool->pool_active != NULL) {
		pool->pool_flags |= POOL_WAIT;
		(void) pthread_cond_wait(&pool->pool_waitcv, &pool->pool_mutex);
	}

	/* the last worker to terminate will wake us up */
	while (pool->pool_nthreads != 0)
		(void) pthread_cond_wait(&pool->pool_busycv, &pool->pool_mutex);

	pthread_cleanup_pop(1);	/* pthread_mutex_unlock(&pool->pool_mutex); */

	/*
	 * Unlink the pool from the global list of all pools.
	 */
	(void) pthread_mutex_lock(&thr_pool_lock);
	if (thr_pools == pool)
		thr_pools = pool->pool_forw;
	if (thr_pools == pool)
		thr_pools = NULL;
	else {
		pool->pool_back->pool_forw = pool->pool_forw;
		pool->pool_forw->pool_back = pool->pool_back;
	}
	(void) pthread_mutex_unlock(&thr_pool_lock);

	/*
	 * There should be no pending jobs, but just in case...
	 */
	for (job = pool->pool_head; job != NULL; job = pool->pool_head) {
		pool->pool_head = job->job_next;
		free(job);
	}
	(void) pthread_attr_destroy(&pool->pool_attr);
	free(pool);
}

スレッドプールの例が示している内容

この例は、スレッドを使用したプログラミングで注意が必要な側面の 1 つである、取り消しと予期しないスレッドの終了を示しています。ワークスレッドが、予期されるとおりにタスク関数から単に復帰するのではなく、thr_pool_queue() に渡されたタスク関数内から pthread_exit() を呼び出すことによって終了する可能性があります。スレッドプールは、pthread_cleanup_push() 関数内の終了を捉えることによってこの状態から回復できます。唯一の弊害は、ここで別のワークスレッドを作成する必要がある点です。アクティブにタスクを処理しているワークスレッドは、thr_pool_destroy() で取り消されます。thr_pool_wait() または thr_pool_destroy() の呼び出し側は、待機中にアプリケーションによって取り消される可能性があります。この状態も、pthread_cleanup_push() を使用して対処できます。

このパッケージの例はそのままでも有用ですが、ここには含まれていない、次のようないくつかの機能がアプリケーションに必要になる可能性があります。