linux 环境有提供好的pthread_cond_wait() 和 phread_signal()、pthread_broadcast()
windows需要自己封装,利用semophore控制线程等待和释放,先简单谈一下设计好后api该
如何使用。
假设我们封装好条件变量等待函数名字叫做wait(Mutex& mutex),Mutex是之前我们封装的
条件变量,文章最下边会给出这些文件的下载地址,在这里读者当做linux 的mutex即可。我们
封装的释放函数为signal(),广播函数为broadcast。
判断等待条件变量和逻辑处理如下:
Lock(mutex);
while(条件不满足)
{
wait(mutex);
}
todo...;
UnLock(mutex);
激活条件变量如下:
Lock(mutex);
todo ...;
if(条件满足)
{
signal();/broadcast();
}
signal();
UnLock(mutex);
Condition 是我们封装的条件变量类
这是封装好api后调用规则,那么先考虑wait内部的基本形式
void Condition::wait(Mutex &mutex)
{
//1 Condition 类中表示阻塞线程数
mblocked ++;
//2 解锁,释放互斥量
UnLock(mutex);
//3 阻塞等待 mQueue为信号量
res = WaitForSingleObject(reinterpret_cast<HANDLE>(mQueue), INFINITE);
//4 做一些判断和逻辑处理
//5 加锁
Lock(mutex);
}
wait内部记录一个阻塞的线程数mblocked,mblocked 是我们封装Condition类的成员变量,
然后释放外部的互斥量,然后调用阻塞函数,等待signal唤醒。
当WaitForSingleObject获取信号后会继续执行,做一些逻辑判断,最后将mutex锁住。
这里用到的mQueue是一个信号量,用信号量可以接受多个唤醒和控制线程唤醒数量。
下面是条件变量释放的函数,我们先做只是放一个条件变量的api
void Condition::signal()
{
//1阻塞的线程减少
mblocked --;
//2将激活的信号个数设置为1
signals = 1;
//3
if (signals)
{
//释放信号量
res = ReleaseSemaphore(reinterpret_cast<HANDLE>(mQueue), signals, 0);
ASSERT(res);
}
}
先不要着急往下写,考虑下这么做真的合适么?
首先之前设计过外部调用
if(条件满足)
{
signal();/broadcast();
}
这个只要条件满足就可以激活,所以我们只用mblocked表示阻塞线程数是不够的,当信号量被激活很多没有被消耗的情况下
就需要统计当前可用的资源数,那么就在Condition类添加mWait表示当前可用的信号量个数。除此之外,考虑这样一种情况,
当条件不满足的时候 线程A调用void wait(Mutex &mutex)函数,wait函数先解锁再阻塞,对应wait中第2,3步骤。而另一个
线程B当条件满足时调用 signal函数激活之前阻塞的线程A,对应signal函数中第3步 。原阻塞线程A因为捕获到信号量,所以
一次走到wait中第4、5步。由于第4和第5步之间没有加锁保护,所以这一阶段用到的类的成员变量都是不安全的。所以在第3
和第4之间加一个互斥锁,第5步之后释放这个互斥锁。同样的道理,为了避免此时signal内部调用类的成员变量造成数据不一致
所以signal内部也需要加锁,在signal内部第1步之前加锁,第3步之后解锁,或者第3步之前解锁都可以。我觉得在第三步之前
释放会好一些,在释放信号量之前解锁,避免死锁。所以添加一个成员变量mMutex
用于部分代码互斥。
那么改良后我们的函数如下:
void
Condition::wait(Mutex& mutex)
{
#ifndef WIN32
int ret = pthread_cond_wait(&mId, mutex.getId());
ASSERT(ret == 0);
#else
//1
mBlocked++;
//2
mutex.unlock();
int res = 0;
//3
res = WaitForSingleObject(reinterpret_cast<HANDLE>(mQueue), INFINITE);
ASSERT(res == WAIT_OBJECT_0);
//用于暂时存储mWaiting的数值
unsigned wasWaiting = 0;
//4
res = WaitForSingleObject(reinterpret_cast<HANDLE>(mMutex), INFINITE);
ASSERT(res == WAIT_OBJECT_0);
wasWaiting = mWaiting;
//5
res = ReleaseMutex(reinterpret_cast<HANDLE>(mMutex));
ASSERT(res);
//6
mutex.lock();
#endif
}
步骤也做了相应的调整。
void
Condition::signal ()
{
#ifndef WIN32
int ret = pthread_cond_signal(&mId);
ASSERT(ret == 0);
#else
unsigned signals = 0;
int res = 0;
//1
res = WaitForSingleObject(reinterpret_cast<HANDLE>(mMutex), INFINITE);
ASSERT(res == WAIT_OBJECT_0);
//2
if (mWaiting != 0)
{
if (mBlocked == 0)
{
res = ReleaseMutex(reinterpret_cast<HANDLE>(mMutex));
ASSERT(res);
return;
}
++mWaiting;
--mBlocked;
signals = 1;
}
else
{
signals = mWaiting = 1;
--mBlocked;
}
//3
res = ReleaseMutex(reinterpret_cast<HANDLE>(mMutex));
ASSERT(res);
//4
if (signals)
{
res = ReleaseSemaphore(reinterpret_cast<HANDLE>(mQueue), signals, 0);
ASSERT(res);
}
#endif
}
改良后更新了步骤,注释的就是步骤,方便接下来讨论这两段代码的隐患,因为仅仅这些还不够。目前现总结下mMutex作用:
1mMutex用于signal函数内部和wait函数 获取信号量之后的代码互斥,保护类的常用变量。
2当不同的线程调用wait等待后获得激活时,mMutex保证获得信号量之后的操作是互斥的,安全的。
由于调用wait函数之前需要加外部的互斥锁,所以不同的线程调用wai函数时第一步的mBlocked++是互斥的,不会出错。
唯一有可能出错的是那种情况呢?
就是当signal发出信号后,当前有一个因为调用wait阻塞的线程A捕获到该信号,进入第四步,修改或者访问mBlocked变量的值,
与此同时有线程A调用wait函数,此时会进入wait内部第一步mBlocked++,多线程修改和读取mBlocked会造成数据混乱,
所以此时需要在第一步之前加锁,第2步之前解锁,因此添加单个信号量mGate,用于控制当有线程处于解锁状态处理mBlocked等
类成员时,其他线程进入wait修改mBlocked值。
这个res = WaitForSingleObject(reinterpret_cast<HANDLE>(mGate), INFINITE);可以放在wait函数第4步之后,当第4步获得互斥
资源后,阻塞等待获取mGate信号,如果没获得需要等待别的线程释放mGate,如果此时mGate不被释放造成mMutex死锁。所以
别的线程中先调用 WaitForSingleObject(reinterpret_cast<HANDLE>(mGate), INFINITE);后调用WaitForSingleObject mMutex会造成
死锁。需要特别注意。如果规避了这一点,那么就可以避免死锁。所有情况都对mGate互斥访问并不友好,,出现之前讨论的情况只有一种:
就是当前应用程序中至少有一个线程处于等待,而signal释放信号后,某一个等待的线程继续执行4后面的操作,外界有新的线程调用wait时
修改mBlocked会出错。所以只需要在signal函数中判断当mWaiting数量为0时对mGate加锁,mWait根据不同情况进行对mGate进行释放。
修改后的代码如下:
先封装一个小函数: