この節では、libC ライブラリと libiostream ライブラリの iostream クラスを、マルチスレッド環境での入出力に使用する方法を説明します。さらに、iostream クラスの派生クラスを作成し、ライブラリの機能を拡張する例も紹介します。ここでは、C++ のマルチスレッドコードを記述するための指針は示しません。
この節では、従来の iostream (libC と libiostream) だけを取り扱います。この節の説明は、C++ 標準ライブラリに含まれている新しい iostream (libCstd) には当てはまりません。
iostream ライブラリのインタフェースは、マルチスレッド環境用のアプリケーション、すなわちサポートされている Solaris オペレーティングシステムのバージョンで実行される、マルチスレッド機能を使用するプログラムから使用できます。従来のライブラリのシングルスレッド機能を使用するアプリケーションは影響を受けません。
ライブラリが「マルチスレッドを使用しても安全」といえるのは、複数のスレッドが存在する環境で正しく機能する場合です。一般に、ここでの「正しく機能する」とは、公開関数がすべて再入可能なことを指します。iostream ライブラリには、複数のスレッドの間で共有されるオブジェクト (C++ クラスのインスタンス) の状態が、複数のスレッドから変更されるのを防ぐ機能があります。ただし、iostream オブジェクトがマルチスレッドで使用しても安全になるのは、そのオブジェクトの公開メンバー関数が実行されている間に限られます。
注 - アプリケーションで libC ライブラリからマルチスレッドで使用しても安全なオブジェクトを使用しているからといって、そのアプリケーションが自動的にマルチスレッドで使用しても安全になるわけではありません。アプリケーションがマルチスレッドで使用しても安全になるのは、マルチスレッド環境で想定したとおりに実行される場合だけです。
マルチスレッドで使用しても安全な iostream ライブラリの構成は、従来の iostream ライブラリの構成と多少異なります。マルチスレッドで使用しても安全な iostream ライブラリのインタフェースは、iostream クラスやその基底クラスの公開および限定公開のメンバー関数を示していて、従来のライブラリと整合性が保たれていますが、クラス階層に違いがあります。詳細については、「10.4.2 iostream ライブラリのインタフェースの変更」を参照してください。
従来の中核クラスの名前が変更されています (先頭に unsafe_ という文字列が付きました)。iostream パッケージの中核クラスを表 10-1 に示します。
表 10-1 iostream の中核クラス
|
マルチスレッドで使用しても安全なクラスは、すべて基底クラス stream_MT の派生クラスです。また、これらのクラスは、streambuf を除いて、(先頭に unsafe_ が付いた) 従来の基底クラスの派生クラスでもあります。この例を次に示します。
class streambuf: public stream_MT {...}; class ios: virtual public unsafe_ios, public stream_MT {...}; class istream: virtual public ios, public unsafe_istream {...};
stream_MT には、それぞれの iostream クラスをマルチスレッドで使用しても安全にするための相互排他 (mutex) ロック機能が含まれています。また、このクラスには、マルチスレッドで使用しても安全な属性を動的に変更できるように、ロックを動的に有効および無効にする機能もあります。入出力変換とバッファー管理の基本機能は、従来の unsafe_ クラスにまとめられています。したがって、ライブラリに新しく追加されたマルチスレッドで使用しても安全な機能は、その派生クラスだけで使用できます。マルチスレッドで使用しても安全なクラスには、従来の unsafe_ 基底クラスと同じ公開メンバー関数と限定公開メンバー関数が含まれています。これらのメンバー関数は、オブジェクトをロックし、unsafe_ 基底クラスの同名の関数を呼び出し、そのあとでオブジェクトのロックを解除するラッパーとして働きます。
注 - streambuf クラスは、unsafe クラスの派生クラスではありません。streambuf クラスの公開メンバー関数と限定公開メンバー関数は、ロックを行うことで再入可能になります。ロックを行わない関数も用意されています。これらの関数は、名前の後ろに _unlocked という文字列が付きます。
iostream のインタフェースには、マルチスレッドで使用しても安全な、再入可能な公開関数が追加されています。これらの関数は、追加引数としてユーザーが指定したバッファーを受け取ります。これらの関数を次に示します。
表 10-2 マルチスレッドで使用しても安全な、再入可能な公開関数
|
注 - 以前の libC との互換性を確保するために提供されている iostream ライブラリの公開変換ルーチン (oct、hex、dec、chr、form) は、マルチスレッドで使用すると安全ではありません。
libC ライブラリの iostream クラスを使用した、マルチスレッド環境用のアプリケーションを構築するには、-mt オプションを付けてソースコードのコンパイルとリンクを行う必要があります。このオプションを付けると、プリプロセッサに -D_REENTRANT が渡され、リンカーに -lthread が渡されます。
注 - libC と libthread へのリンクを行うには、(-lthread オプションではなく) -mt オプションを使用します。このオプションを使用しないと、ライブラリが正しい順番でリンクされないことがあります。誤って -lthread オプションを使用すると、作成したアプリケーションが正しく機能しない場合があります。
iostream クラスを使用するシングルスレッドアプリケーションについては、コンパイラオプションやリンカーオプションを特に必要としません。オプションを何も指定しなかった場合は、コンパイラは libC ライブラリへのリンクを行います。
iostream ライブラリのマルチスレッドでの安全性には制約があります。これは、マルチスレッド環境で iostream オブジェクトが共有された場合に、iostream を使用するプログラミング手法の多くが安全ではなくなるためです。
マルチスレッドでの安全性を実現するには、エラーの原因になる入出力操作を含んでいる危険領域で、エラーチェックを行う必要があります。エラーが発生したかどうかを確認するには次のようにします。
例 10-1 エラー状態のチェック
#include <iostream.h> enum iostate {IOok, IOeof, IOfail}; iostate read_number(istream& istr, int& num) { stream_locker sl(istr, stream_locker::lock_now); istr >> num; if (istr.eof()) return IOeof; if (istr.fail()) return IOfail; return IOok; }
この例では、stream_locker オブジェクト sl のコンストラクタによって、istream オブジェクト istr がロックされます。このロックは、read_number が終了したときに呼び出される sl のデストラクタによって解除されます istr。
マルチスレッドでの安全性を実現するには、最後の入力操作と gcount の呼び出しを行う期間に、istream オブジェクトを排他的に使用するスレッドの内部から、gcount 関数を呼び出す必要があります。gcount は次のように呼び出します。
例 10-2 gcount の呼び出し
#include <iostream.h> #include <rlocks.h> void fetch_line(istream& istr, char* line, int& linecount) { stream_locker sl(istr, stream_locker::lock_defer); sl.lock(); // lock the stream istr istr >> line; linecount = istr.gcount(); sl.unlock(); // unlock istr ... }
この例では、stream_locker クラスの lock メンバー関数を呼び出してから unlock メンバー関数を呼び出すまでが、プログラムの相互排他領域になります。
マルチスレッドでの安全性を実現するには、別々の操作を特定の順番で行う必要があるユーザー定義型用の入出力操作を、危険領域としてロックする必要があります。この入出力操作の例を次に示します。
例 10-3 ユーザー定義の入出力操作
#include <rlocks.h> #include <iostream.h> class mystream: public istream { // other definitions... int getRecord(char* name, int& id, float& gpa); }; int mystream::getRecord(char* name, int& id, float& gpa) { stream_locker sl(this, stream_locker::lock_now); *this >> name; *this >> id; *this >> gpa; return this->fail() == 0; }
現行の libC ライブラリに含まれているマルチスレッドで使用しても安全なクラスを使用すると、シングルスレッドアプリケーションの場合でさえも多少のオーバーヘッドが発生します。libC の unsafe_ クラスを使用すると、このオーバーヘッドを回避できます。
次のようにスコープ決定演算子を使用すると、unsafe_ 基底クラスのメンバー関数を実行できます。
cout.unsafe_ostream::put(’4’);
cin.unsafe_istream::read(buf, len);
注 - unsafe_ クラスは、マルチスレッドアプリケーションでは安全に使用できません。
unsafe_ クラスを使用する代わりに、cout オブジェクトと cin オブジェクトを unsafe にしてから、通常の操作を行うこともできます。ただし、パフォーマンスが若干低下します。unsafe な cout と cin は、次のように使用します。
例 10-4 マルチスレッドでの安全性の無効化
#include <iostream.h> //disable mt-safety cout.set_safe_flag(stream_MT::unsafe_object); //disable mt-safety cin.set_safe_flag(stream_MT::unsafe_object); cout.put(”4’); cin.read(buf, len);
iostream オブジェクトがマルチスレッドで使用しても安全な場合は、相互排他ロックを行うことで、そのオブジェクトのメンバー変数が保護されます。 しかし、シングルスレッド環境でしか実行されないアプリケーションでは、このロック処理のために、本来なら必要のないオーバーヘッドがかかります。iostream オブジェクトのマルチスレッドでの安全性の有効/無効を動的に切り替えると、パフォーマンスを改善できます。たとえば、iostream オブジェクトのマルチスレッドでの安全性を無効にするには、次のようにします。
例 10-5 マルチスレッドでの安全性の無効化
fs.set_safe_flag(stream_MT::unsafe_object);// disable MT-safety .... do various i/o operations
iostream が複数のスレッド間で共有されないコード領域では、マルチスレッドでの安全性の無効化ストリームであっても、安全に使用できます。たとえば、スレッドが 1 つしかないプログラムや、スレッドごとに非公開の iostream を使用するプログラムでは問題は起きません。
プログラムに同期処理を明示的に挿入すると、iostream が複数のスレッド間で共有される場合にも、マルチスレッドで使用すると安全ではない iostream を安全に使用できるようになります。この例を次に示します。
例 10-6 マルチスレッドで使用すると安全ではないオブジェクトの同期処理
generic_lock(); fs.set_safe_flag(stream_MT::unsafe_object); ... do various i/o operations generic_unlock();
ここで、generic_lock 関数と generic_unlock 関数は、相互排他ロック (mutex)、セマフォー、読み取り/書き込みロックといった基本型を使用する同期機能であれば、何でもかまいません。
詳細は 「10.4.5 オブジェクトのロック」を参照してください。
この節では、iostream ライブラリをマルチスレッドで使用しても安全にするために行われたインタフェースの変更内容について説明します。
libC インタフェースに追加された新しいクラスを次の表に示します。
例 10-7 新しいクラス
stream_MT stream_locker unsafe_ios unsafe_istream unsafe_ostream unsafe_iostream unsafe_fstreambase unsafe_strstreambase
iostream インタフェースに追加された新しいクラス階層を次の表に示します。
例 10-8 新しいクラス階層
class streambuf: public stream_MT {...}; class unsafe_ios {...}; class ios: virtual public unsafe_ios, public stream_MT {...}; class unsafe_fstreambase: virtual public unsafe_ios {...}; class fstreambase: virtual public ios, public unsafe_fstreambase {...}; class unsafe_strstreambase: virtual public unsafe_ios {...}; class strstreambase: virtual public ios, public unsafe_strstreambase {...}; class unsafe_istream: virtual public unsafe_ios {...}; class unsafe_ostream: virtual public unsafe_ios {...}; class istream: virtual public ios, public unsafe_istream {...}; class ostream: virtual public ios, public unsafe_ostream {...}; class unsafe_iostream: public unsafe_istream, public unsafe_ostream {...};
iostream インタフェースに追加された新しい関数を次の表に示します。
例 10-9 新しい関数
class streambuf { public: int sgetc_unlocked(); void sgetn_unlocked(char *, int); int snextc_unlocked(); int sbumpc_unlocked(); void stossc_unlocked(); int in_avail_unlocked(); int sputbackc_unlocked(char); int sputc_unlocked(int); int sputn_unlocked(const char *, int); int out_waiting_unlocked(); protected: char* base_unlocked(); char* ebuf_unlocked(); int blen_unlocked(); char* pbase_unlocked(); char* eback_unlocked(); char* gptr_unlocked(); char* egptr_unlocked(); char* pptr_unlocked(); void setp_unlocked(char*, char*); void setg_unlocked(char*, char*, char*); void pbump_unlocked(int); void gbump_unlocked(int); void setb_unlocked(char*, char*, int); int unbuffered_unlocked(); char *epptr_unlocked(); void unbuffered_unlocked(int); int allocate_unlocked(int); }; class filebuf: public streambuf { public: int is_open_unlocked(); filebuf* close_unlocked(); filebuf* open_unlocked(const char*, int, int = filebuf::openprot); filebuf* attach_unlocked(int); }; class strstreambuf: public streambuf { public: int freeze_unlocked(); char* str_unlocked(); }; unsafe_ostream& endl(unsafe_ostream&); unsafe_ostream& ends(unsafe_ostream&); unsafe_ostream& flush(unsafe_ostream&); unsafe_istream& ws(unsafe_istream&); unsafe_ios& dec(unsafe_ios&); unsafe_ios& hex(unsafe_ios&); unsafe_ios& oct(unsafe_ios&); char* dec_r (char* buf, int buflen, long num, int width) char* hex_r (char* buf, int buflen, long num, int width) char* oct_r (char* buf, int buflen, long num, int width) char* chr_r (char* buf, int buflen, long chr, int width) char* str_r (char* buf, int buflen, const char* format, int width = 0); char* form_r (char* buf, int buflen, const char* format,...)
マルチスレッドアプリケーションでの大域データと静的データは、スレッド間で安全に共有されません。スレッドはそれぞれ個別に実行されますが、同じプロセス内のスレッドは、大域オブジェクトと静的オブジェクトへのアクセスを共有します。このような共有オブジェクトをあるスレッドで変更すると、その変更が同じプロセス内のほかのスレッドにも反映されるため、状態を保つことが難しくなります。C++ では、クラスオブジェクト (クラスのインスタンス) の状態は、メンバー変数の値が変わると変化します。そのため、共有されたクラスオブジェクトは、ほかのスレッドからの変更に対して脆弱です。
マルチスレッドアプリケーションで iostream ライブラリを使用し、iostream.h をインクルードすると、デフォルトでは標準ストリーム (cout、cin、cerr、clog) が大域的な共有オブジェクトとして定義されます。iostream ライブラリはマルチスレッドで使用しても安全なので、iostream オブジェクトのメンバー関数の実行中は、共有オブジェクトの状態が、ほかのスレッドからのアクセスや変更から保護されます。ただし、オブジェクトがマルチスレッドで使用しても安全なのは、そのオブジェクトの公開メンバー関数が実行されている間だけです。たとえば、次を見てください。
int c; cin.get(c);
このコードを使用して、スレッド A が get バッファーの次の文字を取り出し、バッファーポインタを更新したとします。ところが、スレッド A が、次の命令で再び get を呼び出したとしても、libC ライブラリはシーケンスのその次の文字を返すことを保証しません。なぜなら、スレッド A の 2 つの get の呼び出しの間に、スレッド B からも別の get が呼び出される可能性があるからです。
このような共有オブジェクトとマルチスレッド処理の問題に対処する方法については、「10.4.5 オブジェクトのロック」を参照してください。
iostream オブジェクトを使用した場合に、一続きの入出力操作をマルチスレッドで使用しても安全にしなければならない場合がよくあります。次は
cout << " Error message:" << errstring[err_number] << "\n";
このコードでは、cout ストリームオブジェクトの 3 つのメンバー関数が実行されます。cout は共有オブジェクトなので、マルチスレッド環境では、この操作全体を危険領域として不可分的に (連続して) 実行しなければなりません。iostream クラスのオブジェクトに対する一続きの操作を不可分的に実行するには、何らかのロック処理が必要です。
iostream オブジェクトをロックできるように、libC ライブラリに新しく stream_locker クラスが追加されています。stream_locker クラスの詳細については、「10.4.5 オブジェクトのロック」を参照してください。
共有オブジェクトに対処する方法とマルチスレッド化は iostream オブジェクトをスレッドの局所的なオブジェクトにして、問題そのものを解消してしまうことです。たとえば、次を見てください。
スレッドのエントリ関数の中でオブジェクトを局所的に宣言する。
スレッド固有データの中でオブジェクトを宣言する。(スレッド固有データの使用法については、thr_keycreate(3T) のマニュアルページを参照してください。
ストリームオブジェクトを特定のスレッド専用にする。このオブジェクトスレッドは、慣例により非公開 (private) になります。
ただし、デフォルトの共有標準ストリームオブジェクトを初めとして、多くの場合はオブジェクトをスレッドの局所的なオブジェクトにすることはできません。そのため、別の手段が必要です。
iostream クラスのオブジェクトに対する一続きの操作を不可分的に実行するには、何らかのロック処理が必要です。ただし、ロック処理を行うと、シングルスレッドアプリケーションの場合でさえも、オーバーヘッドが多少増加します。ロック処理を追加する必要があるか、それとも iostream オブジェクトをスレッドの非公開オブジェクトにすればよいかは、アプリケーションで採用しているスレッドモデル (独立スレッドと連携スレッドのどちらを使用しているか) によって決まります。
スレッドごとに別々の iostream オブジェクトを使用してデータを入出力する場合は、それぞれの iostream オブジェクトが、該当するスレッドの非公開オブジェクトになります。ロック処理の必要はありません。
複数のスレッドを連携させる (これらのスレッドの間で、同じ iostream オブジェクトを共有させる) 場合は、その共有オブジェクトへのアクセスの同期をとる必要があり、何らかのロック処理によって、一続きの操作を不可分的にする必要があります。
iostream ライブラリには、iostream オブジェクトに対する一続きの操作をロックするための stream_locker クラスが含まれています。これにより、iostream オブジェクトのロックを動的に切り換えることで生じるオーバーヘッドを最小限にできます。
stream_locker クラスのオブジェクトを使用すると、ストリームオブジェクトに対する一続きの操作を不可分的にできます。たとえば、次の例を考えてみましょう。このコードは、ファイル内の位置を特定の場所まで移動し、その後続のデータブロックを読み込みます。
例 10-10 ロック処理の使用例
#include <fstream.h> #include <rlocks.h> void lock_example (fstream& fs) { const int len = 128; char buf[len]; int offset = 48; stream_locker s_lock(fs, stream_locker::lock_now); .....// open file fs.seekg(offset, ios::beg); fs.read(buf, len); }
この例では、stream_locker オブジェクトのコンストラクタが実行されてから、デストラクタが実行されるまでが、一度に 1 つのスレッドしか実行できない相互排他領域になります。デストラクタは、lock_example 関数が終了したときに呼び出されます。この stream_locker オブジェクトにより、ファイル内の特定のオフセットへの移動と、ファイルからの読み込みの連続的な (不可分的な) 実行が保証され、ファイルからの読み込みを行う前に、別のスレッドによってオフセットが変更されてしまう可能性がなくなります。
stream_locker オブジェクトを使用して、相互排他領域を明示的に定義することもできます。次の例では、入出力操作と、そのあとで行うエラーチェックを不可分的にするために、stream_locker オブジェクトのメンバー関数、lock と unlock を呼び出しています。
例 10-11 入出力操作とエラーチェックの不可分化
{ ... stream_locker file_lck(openfile_stream, stream_locker::lock_defer); .... file_lck.lock(); // lock openfile_stream openfile_stream << "Value: " << int_value << "\n"; if(!openfile_stream) { file_error("Output of value failed\n"); return; } file_lck.unlock(); // unlock openfile_stream }
詳細は、stream_locker(3CC4) のマニュアルページを参照してください。
iostream クラスから新しいクラスを派生させて、機能を拡張または特殊化できます。マルチスレッド環境で、これらの派生クラスからインスタンス化したオブジェクトを使用する場合は、その派生クラスがマルチスレッドで使用しても安全でなければなりません。
マルチスレッドで使用しても安全なクラスを派生させる場合は、次のことに注意する必要があります。
クラスオブジェクトの内部状態を複数のスレッドによる変更から保護し、そのオブジェクトをマルチスレッドで使用しても安全にします。そのためには、公開および限定公開のメンバー関数に含まれているメンバー変数へのアクセスを、相互排他ロックで直列化します。
マルチスレッドで使用しても安全な基底クラスのメンバー関数を、一続きに呼び出す必要がある場合は、それらの呼び出しを stream_locker オブジェクトを使用して不可分にします。
stream_locker オブジェクトで定義した危険領域の内部では、streambuf クラスの _unlocked メンバー関数を使用して、ロック処理のオーバーヘッドを防止します。
streambuf クラスの公開仮想関数を、アプリケーションから直接呼び出す場合は、それらの関数をロックします。該当する関数は、次のとおりです: xsgetn、underflow、pbackfail、xsputn、overflow、seekoff、seekpos。
ios クラスの iword メンバー関数と pword メンバー関数を使用して、ios オブジェクトの書式設定状態を拡張します。ただし、複数のスレッドが iword 関数や pword 関数の同じ添字を共有している場合は、問題が発生することがあります。これらのスレッドをマルチスレッドで使用しても安全にするには、適切なロック機能を使用する必要があります。
メンバー関数のうち、char 型よりも大きなサイズのメンバー変数値を返すものをロックします。
複数のスレッドの間で共有される iostream オブジェクトを削除するには、サブスレッドがそのオブジェクトの使用を終えていることを、メインスレッドで確認する必要があります。共有オブジェクトを安全に破棄する方法を次に示します。
例 10-12 共有オブジェクトの破棄
#include <fstream.h> #include <thread.h> fstream* fp; void *process_rtn(void*) { // body of sub-threads which uses fp... } void multi_process(const char* filename, int numthreads) { fp = new fstream(filename, ios::in); // create fstream object // before creating threads. // create threads for (int i=0; i<numthreads; i++) thr_create(0, STACKSIZE, process_rtn, 0, 0, 0); ... // wait for threads to finish for (int i=0; i<numthreads; i++) thr_join(0, 0, 0); delete fp; // delete fstream object after fp = NULL; // all threads have completed. }
ここでは、libC ライブラリの iostream オブジェクトを安全な方法で使用するマルチスレッドアプリケーションの例を示します。
このアプリケーションは、最大で 255 のスレッドを生成します。それぞれのスレッドは、別々の入力ファイルを 1 行ずつ読み込み、標準出力ストリーム cout を介して共通の出力ファイルに書き出します。この出力ファイルは、すべてのスレッドから共有されるため、出力操作がどのスレッドから行われたかを示す値をタグとして付けます。
例 10-13 iostream オブジェクトをマルチスレッドで使用しても安全な方法で使用
// create tagged thread data // the output file is of the form: // <tag><string of data>\n // where tag is an integer value in a unsigned char. // Allows up to 255 threads to be run in this application // <string of data> is any printable characters // Because tag is an integer value written as char, // you need to use od to look at the output file, suggest: // od -c out.file |more #include <stdlib.h> #include <stdio.h> #include <iostream.h> #include <fstream.h> #include <thread.h> struct thread_args { char* filename; int thread_tag; }; const int thread_bufsize = 256; // entry routine for each thread void* ThreadDuties(void* v) { // obtain arguments for this thread thread_args* tt = (thread_args*)v; char ibuf[thread_bufsize]; // open thread input file ifstream instr(tt->filename); stream_locker lockout(cout, stream_locker::lock_defer); while(1) { // read a line at a time instr.getline(ibuf, thread_bufsize - 1, ’\n’); if(instr.eof()) break; // lock cout stream so the i/o operation is atomic lockout.lock(); // tag line and send to cout cout << (unsigned char)tt->thread_tag << ibuf << "\n"; lockout.unlock(); } return 0; } int main(int argc, char** argv) { // argv: 1+ list of filenames per thread if(argc < 2) { cout << “usage: " << argv[0] << " <files..>\n"; exit(1); } int num_threads = argc - 1; int total_tags = 0; // array of thread_ids thread_t created_threads[thread_bufsize]; // array of arguments to thread entry routine thread_args thr_args[thread_bufsize]; int i; for(i = 0; i < num_threads; i++) { thr_args[i].filename = argv[1 + i]; // assign a tag to a thread - a value less than 256 thr_args[i].thread_tag = total_tags++; // create threads thr_create(0, 0, ThreadDuties, &thr_args[i], THR_SUSPENDED, &created_threads[i]); } for(i = 0; i < num_threads; i++) { thr_continue(created_threads[i]); } for(i = 0; i < num_threads; i++) { thr_join(created_threads[i], 0, 0); } return 0; }