c++11中有 mutex (互斥量),有 condition_variable (条件变量),并没有 semaphore (信号量)。信号量,操作系统中一般都有提,后来 google 说可以使用 mutex+condition_variable 实现一个,后来写来写去,都死锁 (deadlock) ——,O__O"…,后来 google 了一个,整理了一下思路。
信号量
神马是信号量?信号量是一个整数 count,提供两个原子(atom,不可分割)操作:P 操作和 V 操作,或是说 wait 和 signal 操作。
- P操作 (wait操作):count 减1;如果 count < 0 那么挂起执行线程;
- V操作 (signal操作):count 加1;如果 count <= 0 那么唤醒一个执行线程;
如何理解这个信号量?为嘛信号量是这个东西?想想互斥量 mutex,相当于一把锁,如果一个人来了 lock 一下,其他人进不去了;最初的人 unlock 了,又可以进一个人了,进去一个又 lock 住。如果 mutex 锁在 unlock 状态下叫做 1 的话,lock 状态叫 0;1 实际反映的是锁的数量 !现在可以有多把锁,数量 count 最初为 n (可以设定);
- 来一个人取一把锁(count减1),如果发现锁的数量(count)小于0,岂不言外之意,没有锁了 ? 这样的情况下就要等(wait),或是说悬挂(suspend),或是说阻塞(block);直到神马时候呢?有人释放一把锁给 me 为止。当然了,如果最初就有锁的话,直接拿一把进去就可以了,O__O"…。
- me 的事情办完了,要出去了,还回一把锁(count加1),如果发现 count <=0,言外之意是神马呢?有人在等丫,O__O"…;好吧,me 把自己的锁给某一个人,唤醒一个等待的线程。当然如果最初就没有人等,me 就走 me 的,不用唤醒谁了。
下面的解释就对应上面的 wait 操作和 signal 操作,也算是它的真实意图了。wait 和 signal 有神马用呢?最典型的用法可能就是 count = 1 时候相当于一把互斥锁丫!当然很多时候,共享的资源有多个,比如有 n 个坑,每个线程占一个坑,这个时候使用信号量这个工具要比 mutex 更合适。算了,问题还集中在 wait 和 signal 的实现上。
问题和思路
- 一个 int 或是 long 变量 count,很好设定;因为 wait 和 signal 都是原子操作,所以至少要一个 mutex 来保证互斥;
- 有时候需要 suspend 一个线程,有时候要 wakeup 一个线程,条件变量是合适人选,所以需要一个 condition_variable;suspend 和 wakeup 都是在 condition_variable 上(这是载体);
- wakeup 一个线程,肿么保证一定会 wakeup 一个 condition_variable 上的一个呢?me 们借助一个辅助变量 wakeups —— 要唤醒线程名额(初值为 0),也就是在 signal 的时候,发现有人在等,就 wakeups++(要唤醒人数+1) 然后通知一下条件变量可以唤醒一个。而 wait 操作如果发现 count 数量不够,就阻塞,直到 wakeups 有名额(大于0)为止(条件变量的 wait 就是等待该条件发生),当然唤醒一个之后 wakeups 减1。(条件变量设置的条件是 wakeups > 0。)
程序代码
-
/*
-
* modified by: ilovers
-
*/
-
-
#include <mutex>
-
#include <condition_variable>
-
-
namespace ilovers {
-
class semaphore {
-
public :
-
semaphore ( int value = 1 ) : count {value }, wakeups { 0 } { }
-
-
void wait ( ) {
-
std :: unique_lock <std :: mutex > lock {mutex } ;
-
if ( --count < 0 ) { // count is not enough ?
-
condition. wait (lock, [ & ] ( ) - > bool { return wakeups > 0 ; } ) ; // suspend and wait ...
-
--wakeups ; // ok, me wakeup !
-
}
-
}
-
void signal ( ) {
-
std :: lock_guard <std :: mutex > lock {mutex } ;
-
if ( ++count <= 0 ) { // have some thread suspended ?
-
++wakeups ;
-
condition. notify_one ( ) ; // notify one !
-
}
-
}
-
-
private :
-
int count ;
-
int wakeups ;
-
std :: mutex mutex ;
-
std :: condition_variable condition ;
-
} ;
-
} ;
几点说明:
- semaphore 组织在 ilovers 命名空间下,使用的时候包含头文件,然后 ilovers::semaphore sem; 方式使用;默认 count = 1,也就是一个 mutex 的效果;
- sem.wait(); 是 P 操作;sem.signal(); 是 V 操作;
- 程序中构造对象统一使用 c++11 的 {} 方式,虽然 () 依然使用,比如 count{value} 和 count(value),lock{mutex} 和 lock(mutex) 都是可以的;
信号量的用处
信号量的用法大体来说和 mutex m 一样;m.lock(); 和 m.unlock(); 将需要保护的代码上下围起来;mutex 的特点就是 count = 1,典型的(二分)互斥量;而 sem(n) 则是数量有 n 个,比如 n 个坑,都占完的情况下,阻塞当前线程,否则可以进去(占个坑,O__O"…)。
同步两个线程的顺序
如果 semaphore::count = 0 的情况下,一个线程 A 首先 wait 的话,就会被阻塞掉;而一个线程 B 如果不 wait 就直接干活,等干完 signal 一下的话,过一会 A 就会执行了(count = 1,已经有数量了)。这样就能保证 B 在 A 的前面执行了,类似的方法可以将多个线程 A、B、C、D 按照 D、C、B、A 排个顺序执行出来。
-
#include <iostream>
-
#include <thread>
-
#include "ilovers/semaphore"
-
-
std :: mutex m ;
-
ilovers :: semaphore ba ( 0 ), cb ( 0 ), dc ( 0 ) ;
-
-
void a ( )
-
{
-
ba. wait ( ) ; // b --> a
-
std :: lock_guard <std :: mutex > lock {m } ;
-
std :: cout << "thread a" << '\n' ;
-
}
-
void b ( )
-
{
-
cb. wait ( ) ; // c --> b
-
std :: lock_guard <std :: mutex > lock {m } ;
-
std :: cout << "thread b" << '\n' ;
-
ba. signal ( ) ; // b --> a
-
}
-
void c ( )
-
{
-
dc. wait ( ) ; // d --> c
-
std :: lock_guard <std :: mutex > lock {m } ;
-
std :: cout << "thread c" << '\n' ;
-
cb. signal ( ) ; // c --> b
-
}
-
void d ( )
-
{
-
std :: lock_guard <std :: mutex > lock {m } ;
-
std :: cout << "thread d" << '\n' ;
-
dc. signal ( ) ; // d --> c
-
}
-
-
int main ( )
-
{
-
-
std :: thread th1 {a }, th2 {b }, th3 {c }, th4 {d } ;
-
-
th1. join ( ) ;
-
th2. join ( ) ;
-
th3. join ( ) ;
-
th4. join ( ) ;
-
-
std :: cout << "thread main" << std :: endl ;
-
-
return 0 ;
-
}
去掉上面的同步的信号量 ba、cb、dc 之后,程序的输出可能是 a b c d、a c d b、a c b d 等,几乎不可能是 d c b a 的顺序,然后加上三个控制顺序的信号量后,输出的顺序就是 d c b a main。对了 std::lock_guard 和 mutex 是为了互斥访问 std::cout 对象,这点要注意些(cout 一个共享对象/变量)。
后话
如果 u 没有注意到这个真相,me 这里提一下:count = -5,表明当前有 5 个线程阻塞了(really?),count = 10,表明当前有 10 个位置可以直接用(不用等)。如果 count = N0(初值),而当前 count = 0 表明 N0 个进程在干活,没有人在等;count = -10,表明 N0 个干活,N0+10 执行了 wait 操作,10 个在等待。
当初 me 认为不需要 wakeups 辅助变量就可以 wait 和 notify,而是直接使用 count,后来发现不对,notify_one 的时候 count 一定是负的,而不是 count > 0(有空闲位置,分配给一个挂起的线程?no !此时的已经没有挂起的线程了!),根据负的 count 则不能断定要不要真的从条件变量上卸掉一个线程,O__O"…。比如,假设当前有 5 个挂起来的线程,count = -5,此时有一个线程工作完了,signal/V 操作了一下,count = -4,这个时候 conditon_variable 怎么知道要不要唤醒一个线程呢?但是通过一个 wakeups 就可以:一个线程工作完了,V 操作一下,同时 wakeups++,这个时候 condition_variable 感知到 wakeups 有名额了,然后唤醒一个线程。唤醒的逻辑就是如此。
总赶脚有点撇脚
思来想去,wakeups 这个辅助变量真的有用么?或是说一定需要借助它吗?wakeups me 们说它的意义是:有一个 V 操作然后去唤醒一个挂起的线程。这里面为嘛要借助一个辅助变量呢?(虽然借助于 wakeups 然后设置条件变量的条件 wakeups > 0 可以。) V 操作的时候直接 notify_one 而 P 操作的时候只是简单的挂起线程(不设置条件),不是更自然么?好吧,至少 me 目前是这么想的,貌似也是行的通的:
简单版本
-
/*
-
* author: ilovers
-
*/
-
-
#include <mutex>
-
#include <condition_variable>
-
-
namespace ilovers {
-
class semaphore {
-
public :
-
semaphore ( int value = 1 ) : count {value } { }
-
-
void wait ( ) {
-
std :: unique_lock <std :: mutex > lock {mutex } ;
-
if ( --count < 0 ) // count is not enough ?
-
condition. wait (lock ) ; // suspend and wait...
-
}
-
void signal ( ) {
-
std :: lock_guard <std :: mutex > lock {mutex } ;
-
if ( ++count <= 0 ) // have some thread suspended ?
-
condition. notify_one ( ) ; // notify one !
-
}
-
-
private :
-
int count ;
-
std :: mutex mutex ;
-
std :: condition_variable condition ;
-
} ;
-
} ;
赶脚这个才是更自然的 semaphore 的实现,O__O"…。me 测试了一下 semaphore 作为 mutex 使用(默认),去同步四个线程,每个线程 ++sum 100000 次,发现不带 wakeups 的简单版本,速度很慢, 20秒左右;而上面带 wakeups 的版本则快一些,2 秒左右(当然结果是一致的),O__O"…可能是 condition_variable::wait 操作,带有条件则执行效率更高的缘故?现在只能这么猜测了。
http://ilovers.sinaapp.com/article/c11%E4%B8%AD%E4%BF%A1%E5%8F%B7%E9%87%8Fsemaphore%E7%9A%84%E5%AE%9E%E7%8E%B0