本节介绍如何将 libC 和 libiostream 库的 iostream 类用于多线程环境的输入输出 (input-output, I/O)。另外,还提供了如何通过从 iostream 类派生来扩展库的功能的示例。但本节并不是指导如何采用 C++ 编写多线程代码。
此处的讨论只适用于原来的 iostream(libC 和 libiostream),而不适用于 libCstd(即新的 iostream,它是 C++ 标准库的一部分)。
iostream 库允许多线程环境中的应用程序使用其接口,以及运行支持的 Solaris 操作系统版本时使用多线程功能的程序使用其接口。如果应用程序使用以前版本库的单线程功能,那么该应用程序不会受到影响。
如果库能在有多个线程的环境中正常运行,则该库定义为 MT 安全的。通常,此处的“正常”意味着其所有公用函数都是可重入的。iostream 库提供了保护,防止多个线程尝试修改由多个线程共享的对象(即 C++ 类的实例)的状态。但 iostream 对象的 MT 安全作用域仅限于该对象的公共成员函数正在执行的那一段时间。
应用程序并不能因为使用 libC 库中的 MT 安全对象,而被自动保证为是 MT 安全的。应用程序只有按预期那样能够在多线程环境中执行时,才被定义为 MT 安全的。
MT 安全的 iostream 库的组织与其他版本的 iostream 库稍有不同。库的导出接口指的是 iostream 类的受保护的公共成员函数以及可用基类集合,这一点与其他版本的库相同;但类的分层结构是不同的。有关详细信息,请参见11.4.2 iostream 库接口更改。
原来的核心类已重命名,即添加了前缀 unsafe_。表 11–1 列出了属于 iostream 软件包核心的类。
表 11–1 原来的 iostream 核心类
类 |
说明 |
---|---|
stream_MT |
多线程安全类的基类。 |
streambuf |
缓冲区的基类。 |
unsafe_ios |
该类包含各种流类通用的状态变量;例如,错误和格式化状态。 |
unsafe_istream |
该类支持从 streambuf 检索的字符序列的有格式和无格式转换。 |
unsafe_ostream |
该类支持存储到 streambuf 中的字符序列的有格式和无格式转换。 |
unsafe_iostream |
该类合并 unsafe_istream 类和 unsafe_ostream 类,以便进行双向操作。 |
每个 MT 安全的类都是从基类 stream_MT 派生而来。每个 MT 安全的类(除了 streambuf 外)也都是从现有的 unsafe_ 基类派生而来。示例如下:
class streambuf: public stream_MT {...}; class ios: virtual public unsafe_ios, public stream_MT {...}; class istream: virtual public ios, public unsafe_istream {...}; |
类 stream_MT 提供了使每个 iostream 类成为 MT 安全的类所需的互斥锁,另外,还提供了动态启用和禁用这些锁的功能,以便可以动态更改 MT 安全属性。用于 I/O 转换和缓冲区管理的基本功能划入 unsafe_ 类,为库增加 MT 安全的功能则划入派生类。每个类的 MT 安全版本都包含与 unsafe_ base 类相同的受保护的公共成员函数。MT 安全版本类中的每个成员函数都可用作包装器,它可以锁定对象、调用 unsafe_ base 中的相同函数以及解锁对象。
streambuf 类不是从非安全类派生而来的。streambuf 类的受保护的公共成员函数可以通过锁定来重入。此外还提供了带 _unlocked 后缀的已解锁版本。
已向 iostream 接口添加了一组 MT 安全的重入公共函数。 用户指定的缓冲区被作为每个函数的附加参数。这些函数如下所述:
表 11–2 多线程安全的可重入公共函数
功能 |
说明 |
---|---|
char *oct_r (char *buf, int buflen, long num, int width) |
将指针返回到用八进制表示数字的 ASCII 字符串。非零宽度假定为格式化的字段宽度。返回值不保证指向用户提供缓冲区的开始部分。 |
char *hex_r (char *buf, int buflen, long num, int width) |
将指针返回到用十六进制表示数字的 ASCII 字符串。非零宽度假定为格式化的字段宽度。返回值不保证指向用户提供缓冲区的开始部分。 |
char *dec_r (char *buf, int buflen, long num, int width) |
将指针返回到用十进制表示数字的 ASCII 字符串。非零宽度假定为格式化的字段宽度。返回值不保证指向用户提供缓冲区的开始部分。 |
char *chr_r (char *buf, int buflen, long num, int width) |
返回指向包含字符 chr 的 ASCII 字符串的指针。如果宽度非零,则字符串包含后跟 chr 的 width 个空格。返回值不保证指向用户提供缓冲区的开始部分。 |
char *form_r (char *buf, int buflen, long num, int width) |
返回由 sprintf 格式化字符串的指针,其中使用了格式字符串 format 和其余参数。缓冲区必须具有足够的空间以包含格式化的字符串。 |
用来确保与早期版本的 libC 兼容的 iostream 库的公共转换例程(oct、hex、dec、chr 和 form)不是 MT 安全的。
生成使用 libC 库的 iostream 类以在多线程环境中运行的应用程序时,应使用 -mt 选项编译和链接该应用程序的源代码。此选项可将 -D_REENTRANT 传递给预处理程序,并将 -lthread 传递给链接程序。
请使用 -mt(而不是 -lthread)与 libC 和 libthread 链接。该选项确保了库的正确链接顺序。错误使用 -lthread 可能会导致应用程序无法正常运行。
对于使用 iostream 类 的单线程应用程序,不需要使用特殊的编译器和链接程序选项。缺省情况下,编译器会与 libC 库链接。
有关 iostream 库的 MT 安全性的限制定义意味着,用于 iostream 的许多编程常用方式在使用共享 iostream 对象的多线程环境中是不安全的。
要实现 MT 安全,必须在具有可能导致出现错误的 I/O 操作的关键区中进行错误检查。以下示例说明了如何检查错误:
#include <iostream.h> enum iostate {IOok, IOeof, IOfail}; iostate read_number(istream& istr, int& num) { stream_locker sl(istr, stream_locker::lock_now); istr >> num; if (istr.eof()) return IOeof; if (istr.fail()) return IOfail; return IOok; } |
在此示例中,stream_locker 对象 sl 的构造函数锁定 istream 对象 istr。在 read_number 终止时调用的析构函数 sl 解锁 istr。
要实现 MT 安全,必须在执行上次输入操作和 gcount 调用这一期间独占使用 istream 对象的线程内调用 gcount 函数。以下示例说明了对 gcount 的调用:
#include <iostream.h> #include <rlocks.h> void fetch_line(istream& istr, char* line, int& linecount) { stream_locker sl(istr, stream_locker::lock_defer); sl.lock(); // lock the stream istr istr >> line; linecount = istr.gcount(); sl.unlock(); // unlock istr ... } |
在此示例中,stream_locker 类的成员函数 lock 和 unlock 定义了程序中的互斥区域。
要实现 MT 安全,必须锁定为用户定义类型定义且涉及对各个操作进行特定排序的 I/O 操作,才能定义关键区。以下示例说明了用户定义的 I/O 操作:
#include <rlocks.h> #include <iostream.h> class mystream: public istream { // other definitions... int getRecord(char* name, int& id, float& gpa); }; int mystream::getRecord(char* name, int& id, float& gpa) { stream_locker sl(this, stream_locker::lock_now); *this >> name; *this >> id; *this >> gpa; return this->fail() == 0; } |
使用此版本的 libC 库中的 MT 安全类会导致一些性能开销,即使是单线程应用程序中也是如此,但如果使用 libC 的 unsafe_ 类,则可避免此开销。
可以使用作用域解析运算符执行基类 unsafe_ 的成员函数,例如:
cout.unsafe_ostream::put(’4’); |
cin.unsafe_istream::read(buf, len); |
unsafe_ 类不能在多线程应用程序中安全地使用。
可以使 cout 和 cin 对象成为不安全对象,然后执行正常操作,而不是使用 unsafe_ 类。这会稍微降低性能。以下示例说明了如何使用 unsafe cout 和 cin:
#include <iostream.h> //disable mt-safety cout.set_safe_flag(stream_MT::unsafe_object); //disable mt-safety cin.set_safe_flag(stream_MT::unsafe_object); cout.put(”4’); cin.read(buf, len); |
iostream 对象是 MT 安全对象时,有互斥锁定保护对象的成员变量。该锁定给仅在单线程环境中执行的应用程序增加了不必要的开销。为了提高性能,可以动态地启用或禁用 iostream 对象的 MT 安全性。以下示例使 iostream 对象成为 MT 不安全的对象:
fs.set_safe_flag(stream_MT::unsafe_object);// disable MT-safety .... do various i/o operations |
可以在多个线程未共享 iostream 的情况下(例如,在只有一个线程的程序中,或在每个 iostream 都是线程专用的程序中),在代码中安全地使用 MT 不安全的流。
如果显式在程序中插入同步,还可以在多个线程共享 iostream 的环境中安全地使用 MT 不安全的 iostream。以下示例说明了该技术:
generic_lock(); fs.set_safe_flag(stream_MT::unsafe_object); ... do various i/o operations generic_unlock(); |
其中,函数 generic_lock 和 generic_unlock 可以是使用诸如互斥锁、信号或读取器/写入器锁定等基元的任何同步机制。
libC 库提供的 stream_locker 类是实现这一目的的首选机制。
有关更多信息,请参见11.4.5 对象锁定。
本节介绍为使 iostream 库成为 MT 安全库而对其所做的接口更改。
stream_MT stream_locker unsafe_ios unsafe_istream unsafe_ostream unsafe_iostream unsafe_fstreambase unsafe_strstreambase |
下表列出了已增加到 iostream 接口的新类的分层结构。
class streambuf: public stream_MT {...}; class unsafe_ios {...}; class ios: virtual public unsafe_ios, public stream_MT {...}; class unsafe_fstreambase: virtual public unsafe_ios {...}; class fstreambase: virtual public ios, public unsafe_fstreambase {...}; class unsafe_strstreambase: virtual public unsafe_ios {...}; class strstreambase: virtual public ios, public unsafe_strstreambase {...}; class unsafe_istream: virtual public unsafe_ios {...}; class unsafe_ostream: virtual public unsafe_ios {...}; class istream: virtual public ios, public unsafe_istream {...}; class ostream: virtual public ios, public unsafe_ostream {...}; class unsafe_iostream: public unsafe_istream, public unsafe_ostream {...}; |
class streambuf { public: int sgetc_unlocked(); void sgetn_unlocked(char *, int); int snextc_unlocked(); int sbumpc_unlocked(); void stossc_unlocked(); int in_avail_unlocked(); int sputbackc_unlocked(char); int sputc_unlocked(int); int sputn_unlocked(const char *, int); int out_waiting_unlocked(); protected: char* base_unlocked(); char* ebuf_unlocked(); int blen_unlocked(); char* pbase_unlocked(); char* eback_unlocked(); char* gptr_unlocked(); char* egptr_unlocked(); char* pptr_unlocked(); void setp_unlocked(char*, char*); void setg_unlocked(char*, char*, char*); void pbump_unlocked(int); void gbump_unlocked(int); void setb_unlocked(char*, char*, int); int unbuffered_unlocked(); char *epptr_unlocked(); void unbuffered_unlocked(int); int allocate_unlocked(int); }; class filebuf: public streambuf { public: int is_open_unlocked(); filebuf* close_unlocked(); filebuf* open_unlocked(const char*, int, int = filebuf::openprot); filebuf* attach_unlocked(int); }; class strstreambuf: public streambuf { public: int freeze_unlocked(); char* str_unlocked(); }; unsafe_ostream& endl(unsafe_ostream&); unsafe_ostream& ends(unsafe_ostream&); unsafe_ostream& flush(unsafe_ostream&); unsafe_istream& ws(unsafe_istream&); unsafe_ios& dec(unsafe_ios&); unsafe_ios& hex(unsafe_ios&); unsafe_ios& oct(unsafe_ios&); char* dec_r (char* buf, int buflen, long num, int width) char* hex_r (char* buf, int buflen, long num, int width) char* oct_r (char* buf, int buflen, long num, int width) char* chr_r (char* buf, int buflen, long chr, int width) char* str_r (char* buf, int buflen, const char* format, int width = 0); char* form_r (char* buf, int buflen, const char* format,...) |
全局和多线程应用程序中的静态数据不能在线程间安全共享。尽管线程独立执行,但它们在进程中共享对全局和静态对象的访问。如果一个线程修改了这种共享对象,那么进程中的其他线程将观察该更改,使得状态难以维持。在 C++ 中,类对象(类的实例)靠其成员变量的值来维持状态。如果类对象被共享,那么该类对象将易于被其他线程更改。
多线程应用程序使用 iostream 库并包含 iostream.h 时,在缺省情况下,标准流(cout、cin、cerr 和 clog)定义为全局共享对象。由于 iostream 库是 MT 安全库,它可在执行 iostream 对象的成员函数时保护其共享对象的状态不会被其他线程访问或更改。但对象的 MT 安全性作用域限于执行该对象的公共成员函数期间。例如,
int c; cin.get(c); |
获得 get 缓冲区中下一个字符,并更新 ThreadA 中的缓冲区指针。但如果 ThreadA 中的下一个指令是另一个 get 调用,则 libC 库不能保证返回序列中的下一个字符。这是因为存在一些问题,例如,ThreadB 可能也在 ThreadA 中所做的两次 get 调用之间执行了 get 调用。
更多关于共享对象和多线程问题的处理策略,请参见11.4.5 对象锁定。
通常,使用 iostream 对象时,一序列 I/O 操作必须是 MT 安全的。例如,如下所示代码:
cout << " Error message:" << errstring[err_number] << "\n"; |
涉及执行 cout 流对象的三个成员函数。由于 cout 是共享对象,因此必须独立执行序列,才能在多线程环境中正常使用关键区。要独立对 iostream 类对象执行一序列操作,必须使用某种形式的锁定。
libC 库提供了 stream_locker 类用于锁定针对 iostream 对象的操作。有关 stream_locker 类的信息,请参见11.4.5 对象锁定。
最简单的共享对象和多线程问题处理策略是通过确保 iostream 对象是线程局部对象来避免问题。例如,
不过在许多情况下(例如缺省共享标准流对象),使对象专用于某线程是不可能的,这就需要其他的策略了。
要独立对 iostream 类对象执行一序列操作,必须使用某种形式的锁定。锁定会增加一些开销,即使是在单线程应用程序中也是如此。是增加锁定还是使 iostream 对象成为线程的专用对象取决于为应用程序选择的线程模型: 线程是独立的还是协同操作的?
如果每个独立的线程都使用其自己的 iostream 对象来生成或使用数据,则这些 iostream 对象专用于各自的线程,因此不需要锁定。
如果多个线程协同操作(即,共享同一个 iostream 对象),则必须同步对共享对象的访问,而且必须使用某种形式的锁定使序列化操作独立化。
iostream 库提供了 stream_locker 类,用于锁定针对 iostream 对象的一系列操作。因此可以将动态启用或禁用 iostream 对象的锁定所造成的性能开销降到最低。
可以使用 stream_locker 类的对象使针对流对象的一序列操作独立化。例如,下例中所示代码尝试查找文件中的某一位置,并读取下一个数据块。
#include <fstream.h> #include <rlocks.h> void lock_example (fstream& fs) { const int len = 128; char buf[len]; int offset = 48; stream_locker s_lock(fs, stream_locker::lock_now); .....// open file fs.seekg(offset, ios::beg); fs.read(buf, len); } |
在此示例中,stream_locker 对象的构造函数定义了每次只能执行一个线程的互斥区域的开始位置。从函数返回后调用的析构函数定义了互斥区域的结束位置。stream_locker 对象确保了在文件中查找特定偏移和从文件中读取能够同时独立执行,并且在原来的 ThreadA 读取文件之前,ThreadB 不能更改文件偏移。
另一种使用 stream_locker 对象的方法是显式定义互斥区域。在以下示例中,为了使 I/O 操作和后续错误检查独立化,使用了 vbstream_locker 对象的成员函数 lock 和 unlock 调用。
{ ... stream_locker file_lck(openfile_stream, stream_locker::lock_defer); .... file_lck.lock(); // lock openfile_stream openfile_stream << "Value: " << int_value << "\n"; if(!openfile_stream) { file_error("Output of value failed\n"); return; } file_lck.unlock(); // unlock openfile_stream } |
有关更多信息,请参见 stream_locker(3CC4) 手册页。
可以通过派生新类来扩展 或专用化 iostream 类的功能。如果将在多线程环境中使用从派生类实例化的对象,则这些类必须是 MT 安全类。
通过保护对象的内部状态不会被多线程修改来使类对象成为 MT 安全对象。为此,应使用互斥锁序列化对受保护的公共成员函数中成员变量的访问。
通过在 stream_locker 对象定义的关键区中使用 streambuf 的成员函数 _unlocked 来避免锁定开销。
在应用程序直接调用函数情况下,锁定 streambuf 类的公共虚拟函数。这些函数包括: xsgetn、underflow、pbackfail、xsputn、overflow、seekoff 和 seekpos。
使用 ios 类中的成员函数 iword 和 pword 来扩展 ios 对象的格式化状态。但如果多个线程共享 iword 或 pword 函数的相同索引,将会出现问题。要使线程成为 MT 安全线程,请使用适当的锁定方案。
锁定返回的成员变量的值大于 char 的成员函数。
在删除多个线程共享的 iostream 对象之前,主线程必须核实子线程已完成了对共享对象的使用。下例说明了如何安全销毁共享对象。
#include <fstream.h> #include <thread.h> fstream* fp; void *process_rtn(void*) { // body of sub-threads which uses fp... } void multi_process(const char* filename, int numthreads) { fp = new fstream(filename, ios::in); // create fstream object // before creating threads. // create threads for (int i=0; i<numthreads; i++) thr_create(0, STACKSIZE, process_rtn, 0, 0, 0); ... // wait for threads to finish for (int i=0; i<numthreads; i++) thr_join(0, 0, 0); delete fp; // delete fstream object after fp = NULL; // all threads have completed. } |
以下代码是以 MT 安全方式使用来自 libC 的 iostream 对象的多线程应用程序示例。
该示例应用程序创建了多达 255 个线程。每个线程读取不同的输入文件,每次读取一行,并且使用标准输出流 cout 将行输出到输出文件。所有线程共享的输出文件用值来标记,该值表明了哪个线程执行输出操作。
// create tagged thread data // the output file is of the form: // <tag><string of data>\n // where tag is an integer value in a unsigned char. // Allows up to 255 threads to be run in this application // <string of data> is any printable characters // Because tag is an integer value written as char, // you need to use od to look at the output file, suggest: // od -c out.file |more #include <stdlib.h> #include <stdio.h> #include <iostream.h> #include <fstream.h> #include <thread.h> struct thread_args { char* filename; int thread_tag; }; const int thread_bufsize = 256; // entry routine for each thread void* ThreadDuties(void* v) { // obtain arguments for this thread thread_args* tt = (thread_args*)v; char ibuf[thread_bufsize]; // open thread input file ifstream instr(tt->filename); stream_locker lockout(cout, stream_locker::lock_defer); while(1) { // read a line at a time instr.getline(ibuf, thread_bufsize - 1, ’\n’); if(instr.eof()) break; // lock cout stream so the i/o operation is atomic lockout.lock(); // tag line and send to cout cout << (unsigned char)tt->thread_tag << ibuf << "\n"; lockout.unlock(); } return 0; } int main(int argc, char** argv) { // argv: 1+ list of filenames per thread if(argc < 2) { cout << “usage: " << argv[0] << " <files..>\n"; exit(1); } int num_threads = argc - 1; int total_tags = 0; // array of thread_ids thread_t created_threads[thread_bufsize]; // array of arguments to thread entry routine thread_args thr_args[thread_bufsize]; int i; for(i = 0; i < num_threads; i++) { thr_args[i].filename = argv[1 + i]; // assign a tag to a thread - a value less than 256 thr_args[i].thread_tag = total_tags++; // create threads thr_create(0, 0, ThreadDuties, &thr_args[i], THR_SUSPENDED, &created_threads[i]); } for(i = 0; i < num_threads; i++) { thr_continue(created_threads[i]); } for(i = 0; i < num_threads; i++) { thr_join(created_threads[i], 0, 0); } return 0; } |