マルチスレッドのプログラミング

「生産者 / 消費者」問題

「生産者 / 消費者」問題は、並行プログラミングに関する問題の中でも一般によく知られているものの 1 つです。この問題は次のように定式化されます。サイズが有限の 1 個のバッファと 2 種類のスレッドが存在します。一方のスレッドを生産者、もう一方のスレッドを消費者と呼びます。生産者がバッファにデータを入れ、消費者がバッファからデータを取り出します。

生産者は、バッファに空きができるまでデータを入れることができません。消費者は、バッファが空の間はデータを取り出すことができません。

特定の条件のシグナルを待つスレッドの待ち行列を条件変数で表すことにします。

例 4-11 では、そうした待ち行列として lessmore の 2 つを使用しています。less はバッファ内の未使用スロットを待つ生産者のための待ち行列で、more は情報が格納されたバッファスロットを待つ消費者のための待ち行列です。また、バッファが同時に複数のスレッドによってアクセスされないようにするために、相互排他ロック (mutex ロック) も使用しています。


例 4-11 「生産者 / 消費者」問題と条件変数


typedef struct {
    char buf[BSIZE];
    int occupied;
    int nextin;
    int nextout;
    pthread_mutex_t mutex;
    pthread_cond_t more;
    pthread_cond_t less;
} buffer_t;

buffer_t buffer;

例 4-12 は、生産者側の処理です。最初に、mutex をロックしてバッファデータ構造 (buffer) を保護します。次に、これから作成するデータのための空きがあるか確認します。空きがない場合は、pthread_cond_wait() を呼び出して、「バッファ内に空きがある」を表す条件 less にシグナルが送られてくるのを待つスレッドの待ち行列に入ります。

同時に、pthread_cond_wait() の呼び出しによって、スレッドは mutex のロックを解除します。生産者スレッドは、条件が真になって消費者スレッドがシグナルを送ってくれるのを待ちます (詳細は、例 4-12 を参照してください)。条件にシグナルが送られてくると、less を待っている一番目のスレッドが呼び起こされます。しかし、そのスレッドは pthread_cond_wait() が戻る前に、mutex ロックを再び獲得する必要があります。

このようにして、バッファデータ構造への相互排他アクセスが保証されます。その後、生産者スレッドはバッファに本当に空きがあるか確認しなければなりません。空きがある場合は、最初の未使用スロットにデータを入れます。

このとき、バッファにデータが入れられるのを消費者スレッドが待っている可能性があります。そのスレッドは、条件変数 more で待ち状態となっています。生産者スレッドはバッファにデータを入れると、pthread_cond_signal() を呼び出して、待ち状態の最初の消費者を呼び起こします (待ち状態の消費者がいないときは、この呼び出しは無視されます)。

最後に、生産者スレッドは mutex ロックを解除して、他のスレッドがバッファデータ構造を操作できるようにします。


例 4-12 「生産者 / 消費者」問題 − 生産者


void producer(buffer_t *b, char item)
{
    pthread_mutex_lock(&b->mutex);
   
    while (b->occupied >= BSIZE)
        pthread_cond_wait(&b->less, &b->mutex);

    assert(b->occupied < BSIZE);

    b->buf[b->nextin++] = item;

    b->nextin %= BSIZE;
    b->occupied++;

    /* 現在の状態:  「b->occupied < BSIZE かつ b->nextin はバッファ内
       の次の空きスロットのインデックス」または「b->occupied == BSIZE 
       かつ b->nextin は次の (占有されている) スロットのインデックス。これは
       消費者によって空にされる (例: b->nextin == b->nextout) 」 */

    pthread_cond_signal(&b->more);

    pthread_mutex_unlock(&b->mutex);
}

上記のコード例の assert() 文の用法に注意してください。コンパイル時に NDEBUG を定義しなければ、assert() は次のように動作します。すなわち、引数が真 (0 以外の値) のときは何も行わず、引数が偽 (0) のときはプログラムを強制的に終了させます。このように、実行時に発生した問題をただちに指摘できる点がマルチスレッドプログラムに特に適しています。assert() はデバッグのための有用な情報も与えてくれます。

/* 現在の状態: ... で始まるコメント部分も assert() で表現した方がよいかもしれません。しかし、論理式で表現するには複雑すぎるので、ここでは文章で表現しています。

上記の assert() やコメント部分の論理式は、どちらも不変式の例です。不変式は、あるスレッドが不変式中の変数を変更している瞬間を除いて、プログラムの実行により偽の値に変更されない論理式です (もちろん assert() の論理式は、どのスレッドがいつ実行した場合でも常に真であるべきです)。

不変式は非常に重要な手法です。プログラムテキストとして明示的に表現しなくても、プログラムを分析するときは不変式に置き換えて問題を考えることが大切です。

上記の生産者コード内のコメントで表現された不変式は、スレッドがそのコメントを含むコード部分を処理中には常に真となります。しかし、それを mutex_unlock() のすぐ後ろに移動すると、必ずしも常に真とはなりません。assert() のすぐ後ろに移動した場合は、真となります。

つまり、この不変式は、生産者または消費者がバッファの状態を変更しようとしているとき以外は、常に真となるような特性を表現しています。スレッドは mutex の保護下でバッファを操作しているとき、この不変式の値を一時的に偽にしてもかまいません。しかし、処理が完了したら不変式の値を再び真に戻さなければなりません。

例 4-13 は、消費者の処理です。この処理の流れは生産者の場合と対称的です。


例 4-13 「生産者 / 消費者」問題 − 消費者


char consumer(buffer_t *b)
{
    char item;
    pthread_mutex_lock(&b->mutex);
    while(b->occupied <= 0)
        pthread_cond_wait(&b->more, &b->mutex);

    assert(b->occupied > 0);

    item = b->buf[b->nextout++];
    b->nextout %= BSIZE;
    b->occupied--;

    /* 現在の状態:  「b->occupied > 0 かつ b->nextout はバッファ内の
       最初の占有されているスロットのインデックス」または「b->occupied == 0 
       かつ b-> nextout は次の (未使用) スロットのインデックス。これは生産者側
       によっていっぱいにされる (例: b->nextout == b->nextin) 」 */

    pthread_cond_signal(&b->less);
    pthread_mutex_unlock(&b->mutex);

    return(item);
}