c++11中信号量(semaphore)的实现

时间:2021-05-26 15:16:33

c++11中有 mutex (互斥量),有 condition_variable (条件变量),并没有 semaphore (信号量)。信号量,操作系统中一般都有提,后来 google 说可以使用 mutex+condition_variable 实现一个,后来写来写去,都死锁 (deadlock) ——,O__O"…,后来 google 了一个,整理了一下思路。

信号量

神马是信号量?信号量是一个整数 count,提供两个原子(atom,不可分割)操作:P 操作和 V 操作,或是说 wait 和 signal 操作。

  • P操作 (wait操作):count 减1;如果 count < 0 那么挂起执行线程;
  • V操作 (signal操作):count 加1;如果 count <= 0 那么唤醒一个执行线程;

如何理解这个信号量?为嘛信号量是这个东西?想想互斥量 mutex,相当于一把锁,如果一个人来了 lock 一下,其他人进不去了;最初的人 unlock 了,又可以进一个人了,进去一个又 lock 住。如果 mutex 锁在 unlock 状态下叫做 1 的话,lock 状态叫 0;1 实际反映的是锁的数量 !现在可以有多把锁,数量 count 最初为 n (可以设定);

  • 来一个人取一把锁(count减1),如果发现锁的数量(count)小于0,岂不言外之意,没有锁了 ? 这样的情况下就要等(wait),或是说悬挂(suspend),或是说阻塞(block);直到神马时候呢?有人释放一把锁给 me 为止。当然了,如果最初就有锁的话,直接拿一把进去就可以了,O__O"…。
  • me 的事情办完了,要出去了,还回一把锁(count加1),如果发现 count <=0,言外之意是神马呢?有人在等丫,O__O"…;好吧,me 把自己的锁给某一个人,唤醒一个等待的线程。当然如果最初就没有人等,me 就走 me 的,不用唤醒谁了。

下面的解释就对应上面的 wait 操作和 signal 操作,也算是它的真实意图了。wait 和 signal 有神马用呢?最典型的用法可能就是 count = 1 时候相当于一把互斥锁丫!当然很多时候,共享的资源有多个,比如有 n 个坑,每个线程占一个坑,这个时候使用信号量这个工具要比 mutex 更合适。算了,问题还集中在 wait 和 signal 的实现上。

问题和思路

  • 一个 int 或是 long 变量 count,很好设定;因为 wait 和 signal 都是原子操作,所以至少要一个 mutex 来保证互斥
  • 有时候需要 suspend 一个线程,有时候要 wakeup 一个线程,条件变量是合适人选,所以需要一个 condition_variable;suspend 和 wakeup 都是在 condition_variable 上(这是载体);
  • wakeup 一个线程,肿么保证一定会 wakeup 一个 condition_variable 上的一个呢?me 们借助一个辅助变量 wakeups —— 要唤醒线程名额(初值为 0),也就是在 signal 的时候,发现有人在等,就 wakeups++(要唤醒人数+1) 然后通知一下条件变量可以唤醒一个。而 wait 操作如果发现 count 数量不够,就阻塞,直到 wakeups 有名额(大于0)为止(条件变量的 wait 就是等待该条件发生),当然唤醒一个之后 wakeups 减1。(条件变量设置的条件是 wakeups > 0。)

程序代码

  1. /*
  2. * modified by: ilovers
  3. */
  4.  
  5. #include <mutex>
  6. #include <condition_variable>
  7.  
  8. namespace ilovers {
  9.      class semaphore  {
  10.      public :
  11.         semaphore ( int value = 1 ) : count {value }, wakeups { 0 }  { }
  12.        
  13.          void wait ( ) {
  14.             std :: unique_lock <std :: mutex > lock {mutex } ;
  15.              if  ( --count < 0 )  {  // count is not enough ?
  16.                 condition. wait (lock,  [ & ] ( ) - > bool {  return wakeups > 0 ; } ) ;  // suspend and wait ...
  17.                  --wakeups ;   // ok, me wakeup !
  18.              }
  19.          }
  20.          void  signal ( ) {
  21.             std :: lock_guard <std :: mutex > lock {mutex } ;
  22.              if ( ++count <= 0 )  {  // have some thread suspended ?
  23.                  ++wakeups ;
  24.                 condition. notify_one ( ) ;  // notify one !
  25.              }
  26.          }
  27.        
  28.      private :
  29.          int count ;
  30.          int wakeups ;
  31.         std :: mutex mutex ;
  32.         std :: condition_variable condition ;
  33.      } ;
  34. } ;

几点说明:

  • semaphore 组织在 ilovers 命名空间下,使用的时候包含头文件,然后 ilovers::semaphore sem; 方式使用;默认 count = 1,也就是一个 mutex 的效果;
  • sem.wait(); 是 P 操作;sem.signal(); 是 V 操作;
  • 程序中构造对象统一使用 c++11 的 {} 方式,虽然 () 依然使用,比如 count{value} 和 count(value),lock{mutex} 和 lock(mutex) 都是可以的;

信号量的用处

信号量的用法大体来说和 mutex m 一样;m.lock(); 和 m.unlock(); 将需要保护的代码上下围起来;mutex 的特点就是 count = 1,典型的(二分)互斥量;而 sem(n) 则是数量有 n 个,比如 n 个坑,都占完的情况下,阻塞当前线程,否则可以进去(占个坑,O__O"…)。

同步两个线程的顺序

如果 semaphore::count = 0 的情况下,一个线程 A 首先 wait 的话,就会被阻塞掉;而一个线程 B 如果不 wait 就直接干活,等干完 signal 一下的话,过一会 A 就会执行了(count = 1,已经有数量了)。这样就能保证 B 在 A 的前面执行了,类似的方法可以将多个线程 A、B、C、D 按照 D、C、B、A 排个顺序执行出来。

  1. #include <iostream>
  2. #include <thread>
  3. #include "ilovers/semaphore"
  4.  
  5. std :: mutex m ;
  6. ilovers :: semaphore ba ( 0 ), cb ( 0 ), dc ( 0 ) ;
  7.  
  8. void a ( )
  9. {
  10.     ba. wait ( ) ;   // b --> a
  11.     std :: lock_guard <std :: mutex > lock {m } ;
  12.     std :: cout  <<  "thread a"  <<  '\n' ;
  13. }
  14. void b ( )
  15. {
  16.     cb. wait ( ) ;   // c --> b
  17.     std :: lock_guard <std :: mutex > lock {m } ;
  18.     std :: cout  <<  "thread b"  <<  '\n' ;
  19.     ba. signal ( ) ;   // b --> a
  20. }
  21. void c ( )
  22. {
  23.     dc. wait ( ) ;   // d --> c
  24.     std :: lock_guard <std :: mutex > lock {m } ;
  25.     std :: cout  <<  "thread c"  <<  '\n' ;
  26.     cb. signal ( ) ;   // c --> b
  27. }
  28. void d ( )
  29. {
  30.     std :: lock_guard <std :: mutex > lock {m } ;
  31.     std :: cout  <<  "thread d"  <<  '\n' ;
  32.     dc. signal ( ) ;   // d --> c
  33. }
  34.  
  35. int main ( )
  36. {
  37.    
  38.     std :: thread th1 {a }, th2 {b }, th3 {c }, th4 {d } ;
  39.    
  40.     th1. join ( ) ;
  41.     th2. join ( ) ;
  42.     th3. join ( ) ;
  43.     th4. join ( ) ;
  44.    
  45.     std :: cout  <<  "thread main"  << std :: endl ;
  46.    
  47.      return  0 ;
  48. }

去掉上面的同步的信号量 ba、cb、dc 之后,程序的输出可能是 a b c d、a c d b、a c b d 等,几乎不可能是 d c b a 的顺序,然后加上三个控制顺序的信号量后,输出的顺序就是 d c b a main。对了 std::lock_guard 和 mutex 是为了互斥访问 std::cout 对象,这点要注意些(cout 一个共享对象/变量)。

后话

如果 u 没有注意到这个真相,me 这里提一下:count = -5,表明当前有 5 个线程阻塞了(really?),count = 10,表明当前有 10 个位置可以直接用(不用等)。如果 count = N0(初值),而当前 count = 0 表明 N0 个进程在干活,没有人在等;count = -10,表明 N0 个干活,N0+10 执行了 wait 操作,10 个在等待。

当初 me 认为不需要 wakeups 辅助变量就可以 wait 和 notify,而是直接使用 count,后来发现不对,notify_one 的时候 count 一定是负的,而不是 count > 0(有空闲位置,分配给一个挂起的线程?no !此时的已经没有挂起的线程了!),根据负的 count 则不能断定要不要真的从条件变量上卸掉一个线程,O__O"…。比如,假设当前有 5 个挂起来的线程,count = -5,此时有一个线程工作完了,signal/V 操作了一下,count = -4,这个时候 conditon_variable 怎么知道要不要唤醒一个线程呢?但是通过一个 wakeups 就可以:一个线程工作完了,V 操作一下,同时 wakeups++,这个时候 condition_variable 感知到 wakeups 有名额了,然后唤醒一个线程。唤醒的逻辑就是如此。

总赶脚有点撇脚

思来想去,wakeups 这个辅助变量真的有用么?或是说一定需要借助它吗?wakeups me 们说它的意义是:有一个 V 操作然后去唤醒一个挂起的线程。这里面为嘛要借助一个辅助变量呢?(虽然借助于 wakeups 然后设置条件变量的条件 wakeups > 0 可以。) V 操作的时候直接 notify_one 而 P 操作的时候只是简单的挂起线程(不设置条件),不是更自然么?好吧,至少 me 目前是这么想的,貌似也是行的通的:

简单版本

  1. /*
  2. * author: ilovers
  3. */
  4.  
  5. #include <mutex>
  6. #include <condition_variable>
  7.  
  8. namespace ilovers {
  9.      class semaphore  {
  10.      public :
  11.         semaphore ( int value = 1 ) : count {value } { }
  12.        
  13.          void wait ( ) {
  14.             std :: unique_lock <std :: mutex > lock {mutex } ;
  15.              if  ( --count < 0 )  // count is not enough ?
  16.                 condition. wait (lock ) ;  // suspend and wait...
  17.          }
  18.          void  signal ( ) {
  19.             std :: lock_guard <std :: mutex > lock {mutex } ;
  20.              if ( ++count <= 0 )  // have some thread suspended ?
  21.                 condition. notify_one ( ) ;  // notify one !
  22.          }
  23.        
  24.      private :
  25.          int count ;
  26.         std :: mutex mutex ;
  27.         std :: condition_variable condition ;
  28.      } ;
  29. } ;

赶脚这个才是更自然的 semaphore 的实现,O__O"…。me 测试了一下 semaphore 作为 mutex 使用(默认),去同步四个线程,每个线程 ++sum 100000 次,发现不带 wakeups 的简单版本,速度很慢, 20秒左右;而上面带 wakeups 的版本则快一些,2 秒左右(当然结果是一致的),O__O"…可能是 condition_variable::wait 操作,带有条件则执行效率更高的缘故?现在只能这么猜测了。

http://ilovers.sinaapp.com/article/c11%E4%B8%AD%E4%BF%A1%E5%8F%B7%E9%87%8Fsemaphore%E7%9A%84%E5%AE%9E%E7%8E%B0