「生産者 / 消費者」問題は、並行プログラミングに関する問題の中でも一般によく知られているものの 1 つです。この問題は次のように定式化されます。サイズが有限の 1 個のバッファーと 2 種類のスレッドが存在します。一方のスレッドを生産者、もう一方のスレッドを消費者と呼びます。
生産者は、バッファーに空きができるまでデータを入れることができません。消費者は、生産者がバッファーに何か書き込むまで、このバッファーからデータを取り出すことができません。
特定の条件のシグナルを待つスレッドの待ち行列を条件変数で表すことにします。
例 4–11には、こうした待ち行列を 2 つ使用しています。1 つは less で、バッファー内の未使用スロットを待つ生産者のための待ち行列です。もう 1 つは more で、情報が格納されたバッファースロットを待つ消費者のための待ち行列です。また、バッファーが同時に複数のスレッドによってアクセスされないようにするために、相互排他ロック (mutex ロック) も使用しています。
typedef struct { char buf[BSIZE]; int occupied; int nextin; int nextout; pthread_mutex_t mutex; pthread_cond_t more; pthread_cond_t less; } buffer_t; buffer_t buffer;
例 4–12 では、最初に、生産者のスレッドが mutex を獲得し、バッファーデータ構造 (buffer) を保護します。この生産者スレッドは、次に、生成された項目のための空きがあることを確認します。空きがない場合は、pthread_cond_wait () を呼び出します。pthread_cond_wait() は、「バッファー内に空きがある」を表す条件 less にシグナルが送られてくるのを待つスレッドの待ち行列に入ります。
同時に、pthread_cond_wait() の呼び出しによって、スレッドは mutex のロックを解除します。生産者スレッドは、条件が真になって消費者スレッドがシグナルを送ってくれるのを待ちます。例 4–12 を参照してください。条件にシグナルが送られてくると、less を待っている一番目のスレッドが呼び起こされます。しかし、そのスレッドは pthread_cond_wait() が戻る前に、mutex ロックを再び獲得する必要があります。
これにより、バッファーデータ構造への相互排他アクセスが保証されます。その後、生産者スレッドは、バッファーに本当に空きがあるか確認する必要があります。空きがあれば、最初の未使用スロットにデータを入れます。
このとき、バッファーにデータが入れられるのを消費者スレッドが待っている可能性があります。そのスレッドは、条件変数 more で待ち状態となっています。生産者スレッドはバッファーにデータを入れると、pthread_cond_signal() を呼び出して、待ち状態の最初の消費者を呼び起こします待ち状態の消費者がいないときは、この呼び出しは無視されます。
最後に、生産者スレッドは mutex ロックを解除して、ほかのスレッドがバッファーデータ構造を操作できるようにします。
void producer(buffer_t *b, char item) { pthread_mutex_lock(&b->mutex); while (b->occupied >= BSIZE) pthread_cond_wait(&b->less, &b->mutex); assert(b->occupied < BSIZE); b->buf[b->nextin++] = item; b->nextin %= BSIZE; b->occupied++; /* now: either b->occupied < BSIZE and b->nextin is the index of the next empty slot in the buffer, or b->occupied == BSIZE and b->nextin is the index of the next (occupied) slot that will be emptied by a consumer (such as b->nextin == b->nextout) */ pthread_cond_signal(&b->more); pthread_mutex_unlock(&b->mutex); }
assert() 文の用法に注意してください。NDEBUG を定義してコードをコンパイルした場合を除き、assert() はその引数が真 (0 以外) として評価された場合は何も行いません。その引数が偽 (0) として評価された場合、プログラムは終了します。こうしたアサーションはマルチスレッドプログラムで特に役立ちます。assert() は、アサーションが失敗すると実行時に発生した問題をただちに指摘します。assert() の効果はほかにもあり、役立つコメントを提供します。
/* 現在の状態: ... で始まるコメント部分も、assert で表現した方がよいかもしれません。しかし、論理式で表現するには複雑すぎるので、ここでは文章で表現しています。
assert の部分もコメント部分も、不変式になっています。これらの不変式は、プログラムの実行により偽の値に変更されない論理式ですが、次のような例外があります。この例外は、あるスレッドが不変式中のプログラム変数を変更している瞬間に起こります。もちろん、assert の論理式は、どのスレッドがいつ実行した場合でも常に真であるべきです。
不変式は非常に重要な手法です。プログラムテキストとして明示的に表現しなくても、プログラムを分析するときは不変式に置き換えて問題を考えることが大切です。
上記の生産者コード内のコメントで表現された不変式は、スレッドがそのコメントを含むコード部分を実行中には常に真となります。しかし、それを mutex_unlock() のすぐ後ろに移動すると、必ずしも常に真とはなりません。assert() のすぐ後ろに移動した場合は、真となります。
このため、この不変式は常に真となる特性を持っていますが、次のような例外があります。この例外は、生産者または消費者がバッファーの状態を変更しているときに起こります。スレッドは mutex の保護下でバッファーを操作しているとき、この不変式の値を一時的に偽にしてもかまいません。しかし、処理が完了したら不変式の値を再び真に戻さなければなりません。
例 4–13 に、消費者のコードを示します。論理フローは生産者の場合と対称的です。
char consumer(buffer_t *b) { char item; pthread_mutex_lock(&b->mutex); while(b->occupied <= 0) pthread_cond_wait(&b->more, &b->mutex); assert(b->occupied > 0); item = b->buf[b->nextout++]; b->nextout %= BSIZE; b->occupied--; /* now: either b->occupied > 0 and b->nextout is the index of the next occupied slot in the buffer, or b->occupied == 0 and b->nextout is the index of the next (empty) slot that will be filled by a producer (such as b->nextout == b->nextin) */ pthread_cond_signal(&b->less); pthread_mutex_unlock(&b->mutex); return(item); }