windows环境下封装条件wait和signal

时间:2021-09-25 01:05:56

linux 环境有提供好的pthread_cond_wait() 和 phread_signal()、pthread_broadcast()

windows需要自己封装,利用semophore控制线程等待和释放,先简单谈一下设计好后api该

如何使用。

假设我们封装好条件变量等待函数名字叫做wait(Mutex& mutex),Mutex是之前我们封装的

条件变量,文章最下边会给出这些文件的下载地址,在这里读者当做linux 的mutex即可。我们

封装的释放函数为signal(),广播函数为broadcast。

判断等待条件变量和逻辑处理如下:

Lock(mutex);

while(条件不满足)

{

   wait(mutex);

}

todo...;

UnLock(mutex);

激活条件变量如下:

Lock(mutex);

  todo ...; 

  if(条件满足)

  {

    signal();/broadcast();

  } 

signal();

UnLock(mutex);

Condition 是我们封装的条件变量类

这是封装好api后调用规则,那么先考虑wait内部的基本形式

void Condition::wait(Mutex &mutex)

{

  //1 Condition 类中表示阻塞线程数 

  mblocked ++;

  //2 解锁,释放互斥量

  UnLock(mutex);

  //3 阻塞等待 mQueue为信号量

   res = WaitForSingleObject(reinterpret_cast<HANDLE>(mQueue), INFINITE);

   //4 做一些判断和逻辑处理

  //5 加锁

 Lock(mutex);

}

wait内部记录一个阻塞的线程数mblocked,mblocked 是我们封装Condition类的成员变量,

然后释放外部的互斥量,然后调用阻塞函数,等待signal唤醒。

当WaitForSingleObject获取信号后会继续执行,做一些逻辑判断,最后将mutex锁住。

这里用到的mQueue是一个信号量,用信号量可以接受多个唤醒和控制线程唤醒数量。

下面是条件变量释放的函数,我们先做只是放一个条件变量的api

void Condition::signal()

{

  //1阻塞的线程减少

  mblocked --;

  //2将激活的信号个数设置为1

  signals = 1;

  //3

  if (signals)

  {

    //释放信号量
    res = ReleaseSemaphore(reinterpret_cast<HANDLE>(mQueue), signals, 0);
    ASSERT(res);
  }

}

先不要着急往下写,考虑下这么做真的合适么?

首先之前设计过外部调用

  if(条件满足)

  {

    signal();/broadcast();

  } 

这个只要条件满足就可以激活,所以我们只用mblocked表示阻塞线程数是不够的,当信号量被激活很多没有被消耗的情况下

就需要统计当前可用的资源数,那么就在Condition类添加mWait表示当前可用的信号量个数。除此之外,考虑这样一种情况,

当条件不满足的时候 线程A调用void wait(Mutex &mutex)函数,wait函数先解锁再阻塞,对应wait中第2,3步骤。而另一个

线程B当条件满足时调用 signal函数激活之前阻塞的线程A,对应signal函数中第3步 。原阻塞线程A因为捕获到信号量,所以

一次走到wait中第4、5步。由于第4和第5步之间没有加锁保护,所以这一阶段用到的类的成员变量都是不安全的。所以在第3

和第4之间加一个互斥锁,第5步之后释放这个互斥锁。同样的道理,为了避免此时signal内部调用类的成员变量造成数据不一致

所以signal内部也需要加锁,在signal内部第1步之前加锁,第3步之后解锁,或者第3步之前解锁都可以。我觉得在第三步之前

释放会好一些,在释放信号量之前解锁,避免死锁。所以添加一个成员变量mMutex

用于部分代码互斥。

那么改良后我们的函数如下:

void Condition::wait(Mutex& mutex) {   #ifndef WIN32   int ret = pthread_cond_wait(&mId, mutex.getId());   ASSERT(ret == 0);   #else //1 mBlocked++;
//2   mutex.unlock();   int res = 0;
     //3    res = WaitForSingleObject(reinterpret_cast<HANDLE>(mQueue), INFINITE);   ASSERT(res == WAIT_OBJECT_0); //用于暂时存储mWaiting的数值    unsigned wasWaiting = 0;    //4   res = WaitForSingleObject(reinterpret_cast<HANDLE>(mMutex), INFINITE);   ASSERT(res == WAIT_OBJECT_0);   wasWaiting = mWaiting;    //5    res = ReleaseMutex(reinterpret_cast<HANDLE>(mMutex));   ASSERT(res);     //6    mutex.lock(); #endif }

  步骤也做了相应的调整。

void Condition::signal () { #ifndef WIN32 int ret = pthread_cond_signal(&mId); ASSERT(ret == 0); #else unsigned signals = 0; int res = 0;
  //1 res
= WaitForSingleObject(reinterpret_cast<HANDLE>(mMutex), INFINITE); ASSERT(res == WAIT_OBJECT_0);   //2 if (mWaiting != 0) { if (mBlocked == 0) { res = ReleaseMutex(reinterpret_cast<HANDLE>(mMutex)); ASSERT(res); return; } ++mWaiting; --mBlocked; signals = 1; } else { signals = mWaiting = 1; --mBlocked; }   //3 res = ReleaseMutex(reinterpret_cast<HANDLE>(mMutex)); ASSERT(res);   //4 if (signals) { res = ReleaseSemaphore(reinterpret_cast<HANDLE>(mQueue), signals, 0); ASSERT(res); } #endif }

改良后更新了步骤,注释的就是步骤,方便接下来讨论这两段代码的隐患,因为仅仅这些还不够。目前现总结下mMutex作用:

1mMutex用于signal函数内部和wait函数 获取信号量之后的代码互斥,保护类的常用变量。

2当不同的线程调用wait等待后获得激活时,mMutex保证获得信号量之后的操作是互斥的,安全的。

由于调用wait函数之前需要加外部的互斥锁,所以不同的线程调用wai函数时第一步的mBlocked++是互斥的,不会出错。

唯一有可能出错的是那种情况呢?

就是当signal发出信号后,当前有一个因为调用wait阻塞的线程A捕获到该信号,进入第四步,修改或者访问mBlocked变量的值,

与此同时有线程A调用wait函数,此时会进入wait内部第一步mBlocked++,多线程修改和读取mBlocked会造成数据混乱,

所以此时需要在第一步之前加锁,第2步之前解锁,因此添加单个信号量mGate,用于控制当有线程处于解锁状态处理mBlocked等

类成员时,其他线程进入wait修改mBlocked值。

这个res = WaitForSingleObject(reinterpret_cast<HANDLE>(mGate), INFINITE);可以放在wait函数第4步之后,当第4步获得互斥

资源后,阻塞等待获取mGate信号,如果没获得需要等待别的线程释放mGate,如果此时mGate不被释放造成mMutex死锁。所以

别的线程中先调用 WaitForSingleObject(reinterpret_cast<HANDLE>(mGate), INFINITE);后调用WaitForSingleObject mMutex会造成

死锁。需要特别注意。如果规避了这一点,那么就可以避免死锁。所有情况都对mGate互斥访问并不友好,,出现之前讨论的情况只有一种:

就是当前应用程序中至少有一个线程处于等待,而signal释放信号后,某一个等待的线程继续执行4后面的操作,外界有新的线程调用wait时

修改mBlocked会出错。所以只需要在signal函数中判断当mWaiting数量为0时对mGate加锁,mWait根据不同情况进行对mGate进行释放。

修改后的代码如下:

先封装一个小函数: