转载~kxcfzyk:Linux C语言多线程库Pthread中条件变量的的正确用法逐步详解

时间:2022-05-16 23:20:56

Linux C语言多线程库Pthread中条件变量的的正确用法逐步详解

(本文的读者定位是了解Pthread常用多线程API和Pthread互斥锁,但是对条件变量完全不知道或者不完全了解的人群。如果您对这些都没什么概念,可能需要先了解一些基础知识)

关于条件变量典型的实际应用,可以参考非常精简的Linux线程池实现(一)——使用互斥锁和条件变量,但如果对条件变量不熟悉最好先看完本文。

Pthread库的条件变量机制的主要API有三个:

  • int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
  • int pthread_cond_broadcast(pthread_cond_t *cond);
  • int pthread_cond_signal(pthread_cond_t *cond);

注:还有一个没说的API是pthread_cond_timewait,它跟pthread_cond_wait(作用见下面)的唯一不同就是可以指定一个等待的超时时间,这里不对它作额外讨论。

它们和其它几个Pthread API一起用于处理一种特定情形的线程同步问题:

  1. 若干个线程在某个条件没满足时不能继续往下面走,于是纷纷调用pthread_cond_wait使自己在这个条件上陷入等待(休眠);
  2. 当条件满足以后,另外有个活跃着的线程调用pthread_cond_broadcast通知(唤醒)刚才那些等待在这个条件上的所有线程,让它们继续往下运行。

这种情形是非常通用、非常基础的,很多更加具体的线程同步问题都是这种情形的扩展,比如说经典的消费者/生产者问题,读者/写者问题等等。明显“条件”是这种情形的核心,所以Pthread的这套线程同步机制叫做“条件变量”。可以看出条件变量机制跟Java的wait/notify机制非常类似。

上面这种情形也可以用POSIX定义的另外一套线程/进程同步机制来实现——信号量(semaphore),而且信号量机制在实际场景中用起来比条件变量机制还简单一些,但是信号量的性能不如Pthread库中的条件变量。

条件变量通过pthread_cond_t数据类型来声明,而且使用之前必须先要初始化:

  1. pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

上面这种是通过预定义的初始化宏来静态初始化,也可以用函数动态初始化:

  1. pthread_cond_t cond;
  2. pthread_cond_init(&cond, NULL);
pthread_cond_t cond;
pthread_cond_init(&cond, NULL);

注意条件变量应该声明为全局可见的,因为条件变量会在多个线程(的函数)中被访问。条件变量最后不再使用了的时候应该销毁:

  1. pthread_cond_destroy(&cond);
pthread_cond_destroy(&cond);

在初始化后到销毁前这段时间内就是条件变量的正常生命周期了,可以按需要对它调用pthread_cond_wait、pthread_cond_signal和pthread_cond_broadcast。

pthread_cond_signal的作用跟pthread_cond_broadcast相似,但不同的是pthread_cond_signal会通知所有等待线程中的至少一个,让它(们)继续往下运行,而所有其它没被通知的等待线程则继续等待(休眠)。之所以pthread_cond_signal并不是严格地只唤醒一个等待线程,是因为在多处理器或多核系统中,可能无法实现只唤醒一个等待线程,就算能强行做到只唤醒一个等待线程,也会带来很大的性能损失,这对一个通用的基础线程同步API来说并不合适。

但实际应用场景中我们通常希望每调用一次pthread_cond_signal就唤醒一个等待线程,比如说下面这种情况:

某个线程专门负责从网络接收数据包,其它若干线程专门负责处理数据包。当没有任何数据包时,处理线程全部调用pthread_cond_wait陷入等待。当一个数据包到达时,接收线程调用phtread_cond_signal唤醒一个处理线程,处理线程拿走这个数据包去处理。当又一个数据包到达时,接收线程再次调用pthread_cond_signal唤醒一个线程……

这个问题对信号量机制来说很容易,因为信号量中的sem_post函数只会唤醒一个等待的进程或线程。虽然pthread_cond_signal本身不保证只唤醒一个等待线程,但是POSIX标准在定义这套API时考虑过了这个问题,它留了一个“后门”,让我们在应用程序中可以通过额外的代码来解决这个问题。

先不考虑那个所谓的“后门”,一个粗略看上去可行的解决办法是,除了条件变量以外,再额外设置一个全局的普通计数变量表示允许唤醒多少个等待线程:

  1. int global_count=0;
int global_count=0;

那么当通知线程需要调用pthread_cond_signal唤醒别的等待线程之前,应该先增加全局变量的计数,表示允许唤醒的线程数目又增加了一个:

  1. global_count++;
  2. pthread_cond_signal(&cond);
global_count++;
pthread_cond_signal(&cond);

pthread_cond_signal一调用,那些调用pthread_cond_wait等待在cond的线程可能会有好几个都唤醒了,索性假设全部都被唤醒了。但其实我们只想让其中一个继续往下走,其它的不应该往下走,那么其它那些等待线程就都应该再次调用pthread_cond_wait继续等待(这里明显该有个循环)。

下面的问题就是,怎么决定哪一个等待线程继续走呢?可以这样,当大家都被唤醒的时候,大家都判断一下global_count是不是大于0,也就是当前允不允许唤醒线程。如果某个等待线程检测到的global_count是大于0的,就赶紧把global_count减掉一个,然后自己往下走。这时候global_count少了一个,可能就是0了,表示不允许再唤醒线程,其它几个等待线程发现这一状况以后就不往下走,再次调用pthread_cond_wait继续等待:

  1. while(global_count<=0) {
  2. pthread_cond_wait(&cond, ...);
  3. }
  4. global_count--;
while(global_count<=0) {
pthread_cond_wait(&cond, ...);
}
global_count--;

到现在为止问题基本解决了,但是引出了一个新的问题:多个线程同时访问global_count变量会造成竞态条件。问题看上去很容易解决,使用互斥锁保护就好了。

对于通知线程线程:

  1. pthread_mutex_lock(&mutex);
  2. global_count++;
  3. pthread_cond_signal(&cond);
  4. pthread_mutex_unlock(&mutex);
pthread_mutex_lock(&mutex);
global_count++;
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);

对于等待线程:

  1. pthread_mutex_lock(&mutex);
  2. while(global_count<=0) {
  3. pthread_cond_wait(&cond, ...);
  4. }
  5. global_count--;
  6. pthread_mutex_unlock(&mutex);
pthread_mutex_lock(&mutex);
while(global_count<=0) {
pthread_cond_wait(&cond, ...);
}
global_count--;
pthread_mutex_unlock(&mutex);

新的问题又出来了:等待线程调用pthread_cond_wait陷入等待时,还占有着mutex互斥锁,下次通知线程想要唤醒线程时就无法获取mutex互斥锁了,于是出现了死锁。所以在调用pthread_cond_wait将当前线程陷入等待之前,我们应该解开mutex互斥锁,当线程被唤醒,从pthread_cond_wait函数返回时,我们应该重新获取mutex互斥锁。

比如像这样:

  1. pthread_mutex_lock(&mutex);
  2. while(global_count<=0) {
  3. pthread_mutex_unlock(&mutex);
  4. pthread_cond_wait(&cond, ...);
  5. pthread_mutex_lock(&mutex);
  6. }
  7. global_count--;
  8. pthread_mutex_unlock(&mutex);
pthread_mutex_lock(&mutex);
while(global_count<=0) {
pthread_mutex_unlock(&mutex);
pthread_cond_wait(&cond, ...);
pthread_mutex_lock(&mutex);
}
global_count--;
pthread_mutex_unlock(&mutex);

这段代码还是有问题的,在pthread_cond_wait函数调用的前后当前线程都有一段看上去“很短”的不拥有mutex互斥锁的真空期,但是对于CPU来说这段真空期并不算太短。

假设某个等待线程检测到global_count==0,于是解开mutex互斥锁,进入真空期,即将调用pthread_cond_wait。就在这时候,通知线程增加了一下global_count的计数值然后调用了pthread_cond_signal。接下来,刚才那个等待线程调用pthread_cond_wait陷入等待,由于pthread_cond_wait的调用发生在pthread_cond_signal之后,所以pthread_cond_wait并不会返回。如果程序里的等待线程就这一个,这个通知就丢失了。 问题到了这里似乎没路可走了,但是别忘了还有个“后门”没用上,那就是前面一直没提的pthread_cond_wait的第二个参数了。看看本文最开始列出的函数声明,第二个参数赫然是mutex!这下猜也能猜到这第二个参数是干嘛的了,明显就是专门帮我们解开mutex锁啊,然后在pthread_cond_wait返回之前再自动获取mutex锁。这里顺道澄清一下,和条件变量关联的mutex,不是像网上部分人说的那样是用来保护条件变量的,条件变量在实现的时候是能够做到线程安全的,因为它内部还有一个自己的互斥锁。

所以正确的做法是:

  1. pthread_mutex_lock(&mutex);
  2. while(global_count<=0) {
  3. pthread_cond_wait(&cond, &mutex);
  4. }
  5. global_count--;
  6. pthread_mutex_unlock(&mutex);
pthread_mutex_lock(&mutex);
while(global_count<=0) {
pthread_cond_wait(&cond, &mutex);
}
global_count--;
pthread_mutex_unlock(&mutex);

到这里问题是不是完全解决了?很遗憾还差一点。pthread_cond_wait是线程撤销点(cancellation points)之一,这意味着当某个线程因为调用pthread_cond_wait而陷入休眠等待时,别的线程可以通过这个线程的ID调用pthread_cancel让这个线程强制从pthread_cond_wait返回并开始执行一些清理工作,最后结束退出。

问题就出在pthread_cond_wait返回上,上面标红的地方已经强调了,pthread_cond_wait返回之前会先自动获取mutex,也就是说返回以后已经占有了mutex互斥锁。这种情况下线程直接退出会导致互斥锁一直被占用,其它线程就无法获取这个互斥锁了,再次出现死锁。 这个问题有两种解决办法,第一个办法是在线程退出前的清理工作中加入解开互斥锁的代码,这个并不难办到,因为POSIX定义了两个API:

  1. void pthread_cleanup_pop(int execute);
  2. void pthread_cleanup_push(void (*routine)(void*), void *arg);
void pthread_cleanup_pop(int execute);
void pthread_cleanup_push(void (*routine)(void*), void *arg);

pthread_cleanup_push用于向一个特殊栈压入一个函数指针,当线程退出时,这个特殊栈中的所有函数会被一个个从栈顶弹出并执行(return退出的情况除外)。phtread_cleanup_pop用于从这个特殊栈的栈顶手动弹出函数指针,execute参数非0时,弹出的函数会被自动执行。

需要注意的是,POSIX标准允许这两个API被实现为带未闭合花括号的宏,所以这两个API一定(最好)要配套使用:它们必须一前一后(push在前),而且在同一个函数的同一个嵌套层次内。

比如说这两个API的实现有可能会是类似于这样:

  1. #define pthread_cleanup_pop(execute)        XXXX { XXXX
  2. #define pthread_cleanup_push(routine, arg)  XXXX } XXXX
#define pthread_cleanup_pop(execute)		XXXX { XXXX
#define pthread_cleanup_push(routine, arg) XXXX } XXXX

所以这就是为什么它们的调用要求如此奇怪了。 有了这两个API,想要解决刚才的问题,首先要定义一个清理回调函数:

  1. void mutex_clean(void *mutex) {
  2. pthread_mutex_unlock((pthread_mutex_t*)mutex);
  3. }
void mutex_clean(void *mutex) {
pthread_mutex_unlock((pthread_mutex_t*)mutex);
}

然后在等待线程里调用那两个API:

  1. pthread_mutex_lock(&mutex);
  2. pthread_cleanup_push(mutex_clean, &mutex);
  3. while(global_count<=0) {
  4. pthread_cond_wait(&cond, &mutex);
  5. }
  6. global_count--;
  7. pthread_cleanup_pop(0);
  8. pthread_mutex_unlock(&mutex);
pthread_mutex_lock(&mutex);
pthread_cleanup_push(mutex_clean, &mutex);
while(global_count<=0) {
pthread_cond_wait(&cond, &mutex);
}
global_count--;
pthread_cleanup_pop(0);
pthread_mutex_unlock(&mutex);

或者一个稍微简洁一些的写法:

  1. pthread_mutex_lock(&mutex);
  2. pthread_cleanup_push(mutex_clean, &mutex);
  3. while(global_count<=0) {
  4. pthread_cond_wait(&cond, &mutex);
  5. }
  6. global_count--;
  7. pthread_cleanup_pop(1);
pthread_mutex_lock(&mutex);
pthread_cleanup_push(mutex_clean, &mutex);
while(global_count<=0) {
pthread_cond_wait(&cond, &mutex);
}
global_count--;
pthread_cleanup_pop(1);

另外一种解决方案比这个麻烦一些,那就是设置mutex互斥锁的robust属性值为PTHREAD_MUTEX_ROBUST。

对于robust互斥锁,当持有它的线程没解锁就退出以后,别的线程再去调用pthread_mutex_lock,函数会回一个EOWNERDEAD错误,线程检测到这这个错误后可以调用pthread_mutex_consistent使robust互斥锁恢复一致性,紧接着就可以调用phtread_mutex_unlock解锁了(尽管这个锁并不是当前这个线程加持的)。解锁完毕就可以重新调用pthread_mutex_lock了。 采用这种用方案的时候,首先要声明一个全局可见的mutex属性变量:

  1. pthread_mutexattr_t mutexattr;
pthread_mutexattr_t mutexattr;

然后初始化并设置属性值:

  1. pthread_mutexattr_init(&mutexattr);
  2. pthread_mutexattr_setrobust(&mutexaddtr, PTHREAD_MUTEX_ROBUST);
pthread_mutexattr_init(&mutexattr);
pthread_mutexattr_setrobust(&mutexaddtr, PTHREAD_MUTEX_ROBUST);

有了mutex属性,接下来就是在初始化mutex的地方作修改了,通常我们对mutex的初始化都是pthread_mutex_init(&mutex, NULL),现在改成:

  1. pthread_mutex_init(&mutex, &mutexattr);
pthread_mutex_init(&mutex, &mutexattr);

现在准备工作已经完毕,开始干正事了。对于通知线程:

  1. while(EOWNERDEAD==pthread_mutex_lock(&mutex)) {
  2. pthread_mutex_consistent(&mutex);
  3. pthread_mutex_unlock(&mutex);
  4. }
  5. global_count++;
  6. pthread_cond_signal(&cond);
  7. pthread_mutex_unlock(&mutex);
while(EOWNERDEAD==pthread_mutex_lock(&mutex)) {
pthread_mutex_consistent(&mutex);
pthread_mutex_unlock(&mutex);
}
global_count++;
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);

对于等待线程:

  1. while(EOWNERDEAD==pthread_mutex_lock(&mutex)) {
  2. pthread_mutex_consistent(&mutex);
  3. pthread_mutex_unlock(&mutex);
  4. }
  5. while(global_count<=0) {
  6. pthread_cond_wait(&cond, &mutex);
  7. }
  8. global_count--;
  9. pthread_mutex_unlock(&mutex);
while(EOWNERDEAD==pthread_mutex_lock(&mutex)) {
pthread_mutex_consistent(&mutex);
pthread_mutex_unlock(&mutex);
}
while(global_count<=0) {
pthread_cond_wait(&cond, &mutex);
}
global_count--;
pthread_mutex_unlock(&mutex);

到这里,所有的事情终于完成了!