C++多线程编程(8)
同步对象
1.应用面向对象技术区同步化变量。互斥量或条件变量及所有相关系统服务都被封装在一个接口类中。对互斥量的唯一访问途径是通过类成员函数。接口类与即将通过继承或复合同步的数据或设备直接关联。同步变量可用于调节访问同一个临界区的多个线程或多个进程。
2.同步变量:Mutex类。
某一时刻只能由一个线程占有,线程试图锁定该互斥量。如果互斥量没有被占有,线程将获得互斥量的所有权。如果互斥量被占有或锁定,请求互斥量的线程将被阻塞,直到取消锁定请求的互斥量前一直处于阻塞状态。
互斥量操作:创建、锁定(被占有)、取消锁定(被释放)、销毁。
Mutex类:
#include
#include
class mutex
{ protected:
pthread_mutex_t Mutex;
general_exception SemException;
public:
mutex(void);
~mutext(void);
void lock(void);
void unlock(void);
};
构造函数:
mutext::mutex(void)
{ if(pthread_mutex_init(&Mutex,NULL))
{ SemException.message("Could Not Create Mutex");
throw SemException;
}
}
class general_exception{
protected:
char Operation[2];
char Message[81];
public:
general_exception(void);
general_exception(char *Msg);
general_exception(const general_exception &N);
general_exception &operator=(const general_exception &N);
void operation(char *Op);
char *operation(void);
void message(char *Msg);
char *message(void);
};
【注意】C++构造函数不支持返回值,应该使用异常处理。对象的用户不必担心同步化对象的使用,因为对象同步化自身。同步临界区的责任由对象的生产者承担(在并行环境中保护数据组件的责任落在类身上)。
3.封装了互斥量类型的对象类:mt_rational
#include
#include
class mt_rational:public rational{
protected:
mutex Mutex; //互斥量对象,用于保护mt_rational的临界区
public:
mt_rational(long Num=0,long Den=1);
mt_rational(const mt_rational &X);
mt_rational &operator=(const mt_rational &X);
void assign(long X,long Y);
};
void mt_rational::assign(long X,long Y)
{ if(Y==0){
Exception.message("Zero Is A Invalid Denominator");
throw Exception;}
try{ Mutex.lock();
Numerator=X; //临界区:调用此函数给mt_rational类型对象赋值时发生
Denominator=Y;
Mutex.unlock();
}catch(general_exception &M)
{ Mutex.unlock();
throw M;
}
}
mt_rational &mt_rational::operator=(const mt_rational &X)
{ try{
Mutex.lock();
Numerator=X.Numerator; //临界区:赋值
Denominator=X.Denominator;
Mutex.unlock();
}
catch(general_exception &M)
{ Mutex.unlock();
throw M;
}
return *this;
}
【程序演示】两个线程更新的可锁定全局变量Number的使用(嵌入互斥量的mt_rational类)
#include
#include
#include
#include
mt_rational *Number;
void *threadA(void *X)
{ long int Q;
try{
for(Q=2;Q<500;Q+=4)
{ Number->assign((Q/2),Q); //Number指针地址指向该函数(互斥量与更新Number对象的操作捆绑在一起)
cout<<pthread_self()<<"Thread A: " <<*Number<< endl;
}
}catch(general_exception &M)
{
cout<<pthread_self()<<M.message()<<endl;
}
}
void *threadB(void *X)
{ long int Y;
try{
for(Y=5;Y<500;Y+=5){
Number->assign((Y/5),Y); //Number指针指向该函数(互斥量与更新Number对象的操作捆绑在一起)
cout<<pthread_self()<<"Thread B"<<*Number<<endl;
}
}
catch(general_exception &M){
cout<<pthread_self()<<M.message()<<endl;
}
void main(void)
{ pthread_t ThreadA;
pthread_t ThreadB;
try{
Number=new mt_rational(1,1); //创建一个带互斥量的有理数
pthread_create(&ThreadA,NULL,threadA,NULL);
pthread_create(&ThreadB,NULL,threadB,NULL);
pthread_join(ThreadA,NULL);
pthread_join(ThreadB,NULL);
delete Number; //释放Number对象内存。
}
catch(general_exception &X)
{ cout<<"From main()"<< X.message() <<endl;
}
}
4.命名互斥量类:named mutex
可被同一进程内的线程以及不同进程间的线程共享。匿名互斥量只用于执行同一进程中线程间的同步。匿名互斥量不与文件名字关联,但命名互斥量与某文件名关联。可用继承来创建一个可用于多进程间的命名互斥量。
class named_mutex:public mutex{
protected:
char MutexName[81]; //互斥量名称
int initiallyOwned; //是否互斥量在初始化创建时就被占有
public:
named_mutex(void);
named_mutex(char *MName,int Owned=0);
unsigned long lockDuration(void);
void lockDuration(unsigned long Dur);//指定多长时间后阻塞对互斥量的锁定尝试(ms)
};
如:named_mutex MyMutex("MutexName");
MyMutex.lockDuration(10000); //如果互斥量已被锁定,线程将阻塞不超过10秒。如果互斥量在时间耗尽前可用,则互斥量被锁定。
MyMutex.lock(); //锁定
...
5.事件互斥量的双重状态变量:set/unset、 true/false、 posted/not posted
调用线程想等待某互斥量变量,如互斥量变量为unset时,线程阻塞,一直等到另一个线程设置了此互斥量为止,反之设置了set,则不会被阻塞。
event_mutex类的2个构造函数:
event_mutex::event_mutex(char *MName,unsigned int Initial=0,long Reset,LSECURITY_ATTRIBUTES Secure) //由创建初始事件互斥量或条件变量的线程使用,当前进程中的线程都可以利用该事件互斥量
{ EventMutex=CreateEvent(Secure,Reset,Initial,MName);
if(!EventMutex)
{ EventException.message("Could Not Create Event Semaphore");
throw EventException;
}
if(MName!=NULL){
strcpy(MutexName,MName);
}
EventWait=INFINITE;
}
event_mutex::event_mutex(char *MName,int Inherit,long Accesss) //允许其他进程访问现有事件互斥量,打开事件互斥量只是第一步,进程必须接着调用wait(),broadCast()或reset()成员函数来实际应用事件互斥量
{ EventMutex=OpenEvent(Access.Inherit,MName);
if(!EventMutex){
EventException.message("Could Not Create Event Semaphore");
throw EventException;
}
if(MName!=NULL){
strcpy(MutexName,MName);
}
EventWait=INFINITE;
}
6.等待多个事件或互斥量:
例如:线程1在队列中插入文件名,线程2从队列中删除文件名。为确保线程1在队列中放置文件名时,线程2不会删除文件名。需要线程1在队列包含文件名时,广播通知线程2,该条件命名为NotEmpty.线程1负责决定与每个文件名关联的是哪种类型的文件。为了让线程2知道何时处理某种类型的文件,当线程1插入该类型文件的文件名时,必须广播此消息通知线程2,该条件命名为SomeTypeFile.因此线程2等待包含两个条件的事件的公式如下:
wait(NotEmpty,SomeTypeFile);
线程2将阻塞于wait(),直到线程1广播通知了这两个条件,多个条件或多个事件互斥量可以用于防止导致死锁(并发执行的线程竞争同一个资源时引发)的情形。
对于多个事件互斥量,要么都获得资源锁定,要么都不能获得资源锁定。在线程可以继续前,所有的条件必须都被设置。
【程序演示】多个事件互斥量服务的接口类声明(OS/2)——用于一个数据类或设备类的组件:
class multiple_event_mutex{
protected:
char MutexName[81];
SEMRECORD *ConditionList; //条件变量列表
int EventNumber; //事件号
long int WaitSelection;//等待选择序号
HEV EventMutex; //单个事件互斥量
MultipleEventMutex; 多个事件互斥量
general_exception EventException; //事件互斥量产生的异常
int Duration; //等待持续时间
public:
multiple_event_mutex(char * MName,int ENum,long int WaitType,int Initial);//多事件互斥量创建初始化
multiple_event_mutex(char * MName);//使用现有的事件互斥量
~ multiple_event_mutex(void);
int duration(void);
void duration(int X);
void postEvent(int X); //发送事件
void waitEventa(void); //等待多个事件
};
【程序演示】multipe_event_mutex类的定义:
multiple_event_mutex::multiple_event_mutex(char *MName,int ENum,Long int WaitType,Int Initial)
{ int N;
EventNumber=ENum; //事件个数
ConditionList=new SEMRECORD[ENum]; //分配一个条件变量列表数组
For(N=0;N<ENum;N++)
{
if(DosCreateEventSem(NULL,&EventMutex,0,Initial)){
EventException.message("Could Not Create Event Semaphore");
Throw EventException;
}
ConditionList[N].hsemCur=&EventMutex;
ConditionList[N].ulUser=N;
}
if(DosCreateMuxWaitSem(MName,&MultipleEventMutex,sizeof(ConditionList),ConditionList,WaitType){
EventException.message("Could Not Create Multipe Event Semaphore");
throw EventException;
}
}
multiple_event_mutex::multiple_event_mutex(char *MName)
{
if(DosOpenMuxWaitSem(MName,&MultipleEventMutex)){
EventException.message("Could Not Open Event Semaphore");
throw EventException;
}
}
multiple_event_mutex::~multiple_event_mutex(void)
{ if(DosCloseMuxWaitSem(MultipleEventMutex))
{ EventException.message("Could Not Close Event Semaphore");
throw EventException;
}
delete ConditionList;
}
int multiple_event_mutex::duration(void)
{ return(Duration);
}
void multiple_event_mutex::duration(int X)
{ Duration=X;
}
void multiple_event_mutex::postEvent(int X)
{ EventMutex=(HEV)ConditionList[x].hsemCur;
DosPostEventSem(EventMutex);
}
void multiple_event_mutex::waitEvents(void)
{ unsigned long User;
DosWaitMuxWaitSem(MultipleEventMutex,SEM_INDEFINITE_WAIT,&User);
}
7.对ACID使用多个事件互斥量:
数据库处理必须更新所有资源或不应更新任何资源,这种复杂更新称为事务。事务中不能容忍竞争条件和死锁。ACID测试:原子性(atomicity)、一致性(consistency)、孤立性(isolation)、持久性(durability)。
通过了ACID测试的事务在多线程、并行处理、或多用户环境中是安全的。原子性:事务完整执行或根本不执行。一致性:处理事务中数据的完整性,保证所有的数据断言、前置条件和后置条件在更新中能得到维护。孤立性:确保数据不丢失,对数据元素一次只进行一个修改来防止竞争条件。持久性:一种恢复方案,在硬件或软件崩溃时,保证数据库可以恢复到一致性的状态。
多个事件互斥量可以实现ACID,因为多个事件互斥量可能导致某线程阻塞到所有资源列表中的资源都可用锁定为止,因此多个事件互斥量可用于保护关键性任务的更新。
8.通过类成员函数锁定和取消锁定。
4个同步组件:mutex类、named_mutex类、event_mutex类、multiple_event_mutex类。
一般来说,这些类型的类倾向于用作其他类的组件,显示调用lock()\unlock()\post()\wait()函数。使用locking()函数是类中嵌入互斥量的用法之一。
通过互斥使用构造函数和析构函数:
将lock()\unlock()\post()\wait()类型成员函数放置在宿主类的构造函数和析构函数中,可以被自动调用,无需程序员进一步干预就可以发生互斥处理。
class A:private mutex{
public:
A(void){lock();}
~A(void){unlock();}
};
互斥量透明调用方法:
void myFunc(void)
{ int Count;
for(Count=0;Count<N;Count++)
{ A Value; //value的互斥量自动被锁定。如value不能锁定它的互斥量,它将阻塞到能锁定为止。
Value++;
}//for虚幻完成后,互斥量通过value的析构函数自动取消锁定。
}
void myFunc(void)
{ try{
A Count; //互斥量被锁定
//其他执行代码
Count++;
}
catch(..){
//抛出异常,Count的析构函数自动调用,自动释放被Count锁定的互斥量。
}
}
【注意】如果需要锁定和取消锁定嵌入互斥量或条件变量,或在C++块内多次设置set或重新设置reset,则在调用的常规函数内使用互斥量操作。如果互斥量操作只需要一次性激活或禁止,则可以考虑使用构造函数和析构函数。
9.在某个类中包含同步类或者使用复合、包容或继承来实现。建议只让类中的直接成员访问同步类,嵌入同步类的成员函数和数据成员应该为私有。可用多个类构造函数将宿主类配置成客户或服务器。通过将locking()和unlocking()放入构造函数和析构函数,可让同步对象更安全更易使用。在try()块中放入声明对象的优点是,抛出异常时自动调用它们的析构函数。避免*漂浮的同步对象。使用类聚集来连接同步类与数据和设备类。