线程同步主要为了协调线程间工作,尤其是数据的使用,这里主要是用信号量来同步。
第一个例子——生产者消费者:
设置:
#库存最多1,有库存消费者才消费,没则等待;没库存生产者才生产,没则等待。
#并没有用semA本身做计数,只用来传达消息给消费者线程。
#为了体现阻塞效果,生产者三秒一次。
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
static pthread_t thread_producerA_id;
static pthread_t thread_consumerA_id;
static sem_t semA;
static int i = 0;
void *consumerA(void *pA)
{
int *p = pA;
while(1)
{
printf("before sem_wait():semA:%ld\n",semA);
sem_wait(&semA);//return value?
printf("after sem_wait():semA:%ld\n",semA);
if(0 == *p)
{
printf("Error! wait for the stock!\n");//不该有的
sleep(1);
}else
{
*p = 0;//consume
printf("consume done!the stock is %d\n",*p);
}
// printf("consumer sleep to increase the stock!\n");//暂时没加,下边用
}
return NULL;
}
void *producerA(void *pA)
{
int *p = pA;
while(1){
printf("before sem_post:semA:%ld\n",semA);
sem_post(&semA);//return value?
printf("after sem_post:semA:%ld\n",semA);
if(*p != 0)
{
printf("don not produce !the stock is %d\n",*p);
}else
{
*p = *p + 1;//produce
printf("produce done!the stock is %d\n",*p);
}
printf("producer sleep !!!!\n");//先用生产者睡眠
sleep(3);
}
return NULL;
}
int main()
{
int a;
a = 0;//初始化
int iRet = 0;
//semaphore initialization
sem_init(&semA,0,0);
iRet = pthread_create(&thread_consumerA_id,NULL,consumerA,&a);
if(iRet != 0)
{
printf("create failed!\n");
}
iRet = pthread_create(&thread_producerA_id,NULL,producerA,&a);
if(iRet != 0)
{
printf("create failed!\n");
}
printf("before join a\n");
pthread_join(thread_producerA_id,NULL);
printf("after join a\n");
printf("before join b\n");
pthread_join(thread_consumerA_id,NULL);
printf("after join b\n");
}
期待的结果:因为使用了阻塞的sem_wait()所以消费者不会判断出库存为零的情况。
# ./a.out问题:
before join a
before sem_wait():semA:0
before sem_post:semA:0
after sem_wait():semA:0
Error! wait for the stock!
after sem_post:semA:0
produce done!the stock is 1
producer sleep !!!!
还是出现了消费者接收到信号量,但是实际产品没到位的情况,因为生产者通知行为(sem_post())先于生产行为了。信号量的更改放到生产行为后边!
semaphore信号量只保证本身的操作是原子的,外部代码不保证。
void *producerA(void *pA)
{
int *p = pA;
while(1){
if(*p != 0)
{
printf("don not produce !the stock is %d\n",*p);
}else
{
*p = *p + 1;//produce
printf("before sem_post:semA:%ld\n",semA);
sem_post(&semA);//return value?
printf("after sem_post:semA:%ld\n",semA);
printf("produce done!the stock is %d\n",*p);
}
printf("producer sleep !!!!\n");
sleep(3);
}
return NULL;
}
结果看起来正常:信号量经生产者从0变1,信号量经消费者从1变0
./a.out逻辑顺序,生产之后sem_post(),消费之前sem_wait()。
before join a
before sem_post:semA:0
after sem_post:semA:1
produce done!the stock is 1
producer sleep !!!!
before sem_wait():semA:1
after sem_wait():semA:0
consume done!the stock is 0
before sem_wait():semA:0
before sem_post:semA:0
after sem_post:semA:1
produce done!the stock is 1
producer sleep !!!!
after sem_wait():semA:0
consume done!the stock is 0
before sem_wait():semA:0
因为semaphore信号量相关操作只保证本身的操作是原子的,外部代码不保证。所以这里还是不严谨的!
sem_wait()之后还未消费(a = a - 1),生产者已经又可以生产了(a = a + 1),然后刚好再执行到消费(a = a - 1),就会导致生产者以为生产完成,而消费者得不到产品。
下面做一个干扰演示:其他不变,改生产者的打印,生产之后再判断*p,如果有0值,说明被消费过程插入:
void *producerA(void *pA)
{
int *p = pA;
while(1){
if(*p != 0)
{
}else
{
*p = *p + 1;//produce
sem_post(&semA);//return value?
if(*p == 0)
printf("consumer disturb!the stock is %d\n",*p);
}
}
return NULL;
}
大量的
consumer disturb!the stock is 0
consumer disturb!the stock is 0
consumer disturb!the stock is 0
consumer disturb!the stock is 0
consumer disturb!the stock is 0
consumer disturb!the stock is 0
虽然有了信号量传递消息,如果生产者和消费者还想操作同一个共享数据。除了额外加逻辑判断,就剩使用信号量互斥了。
从逻辑上解决问题的话,方式很多,比如不使用置零和置一操作,而是使用加法和减法等等,但是具体方法和可行性待验证,毕竟,小到一个赋值操作也不保证是原子的。
这里主要分析一下信号量互斥:如果使用多线程互斥将信号量的增加和生产行为绑定成一个原子操作,因为sem_wait()是阻塞操作,会产生死锁:
void *consumerA(void *pA)
{
int *p = pA;
while(1)
{
pthread_mutex_lock(&(sMux));
sem_wait(&semA);//return value?
if(0 == *p)
{
printf("Error! wait for the stock!\n");
sleep(1);
}else
{
*p = 0;//consume
}
pthread_mutex_unlock(&(sMux));
}
return NULL;
}
void *producerA(void *pA){ int *p = pA; while(1){ if(*p != 0) { }else { pthread_mutex_lock(&(sMux)); *p = *p + 1;//produce printf("before sem_post:semA:%ld\n",semA); sem_post(&semA);//return value? printf("after sem_post:semA:%ld\n",semA); if(*p == 0) printf("consumer disturb!the stock is %d\n",*p); pthread_mutex_unlock(&(sMux)); } } return NULL;}运行结果:直接死锁!!!
所以,改成不忙等的sem_trywait()能解决问题!
消费者:注意判断返回值,-1是接收失败,如果愿意,也可以看看errno应该是EAGAIN
void *consumerA(void *pA)生产者:
{
int *p = pA;
int iRet;
while(1)
{
pthread_mutex_lock(&(sMux));
iRet = sem_trywait(&semA);//return value?
if(iRet == 0)
{
if(0 == *p)
{
printf("Error! wait for the stock!\n");
sleep(1);
}else
{
*p = 0;//consume
<span style="white-space:pre"> </span>printf("consume done!the stock is %d\n",*p);
}
}else
{
// printf("sem_trywait() returns -1\n");
}
pthread_mutex_unlock(&(sMux));
}
return NULL;
}
void *producerA(void *pA)为了主线清晰,打印语句都删了,根据需要自己加吧。。。。
{
int *p = pA;
while(1){
if(*p != 0)
{
}else
{
pthread_mutex_lock(&(sMux));
*p = *p + 1;//produce
sem_post(&semA);//return value?
if(*p == 0)
printf("consumer disturb!the stock is %d\n",*p);
pthread_mutex_unlock(&(sMux));
}
}
return NULL;
}
总之,改到这一步,现在程序能够准确运行了!!!!!!!
但是这个还有个问题,信号量不用阻塞的sem_wait()而用sem_trywait(),资源消耗就高了,还是忙等,用信号量的优势没有体现出来。
想不提升处理器资源,还是得用阻塞方法。
互斥锁只保护变量操作的原子性,而对大流程不做限制:
生产者:
mutex_lock()
*p = *p + 1;
mutex_unlock();
sem_post;
消费者:
sem_wait();
mutex_lock();
*p = *p - 1;
mutex_unlock();
这样做,虽然还是不能让生产过程和信号量增操作绑成原子操作,但是最少能保证有几次sem_post()就有几个增量给*p,有几次sem_wait()就有几个减量给*p。如果有多个消费者,也是谁wait()成功谁去处理消息,wait()不到的还在阻塞(这个特性的优势又一次体现出来了),这样保证不会出现“A拿到信号量,但B把消息拿走了”的现象产生。而多个生产者呢,也是先生产完再产生的信号量。
代码如下:
消费者
void *consumerA(void *pA)生产者
{
int *p = pA;
int iRet;
while(1)
{
iRet = sem_wait(&semA);//return value?
if(iRet == 0)
{
if(0 == *p)
{
printf("Error! wait for the stock!\n");
sleep(1);
}else
{
pthread_mutex_lock(&(sMux));
*p = *p - 1;//consume
printf("consume done!the stock is %d\n",*p);
pthread_mutex_unlock(&(sMux));
}
}else
{
printf("sem_wait() returns -1\n");
}
printf("consumer sleep!\n");
sleep(3);
}
return NULL;
}
void *producerA(void *pA){ int *p = pA; int iRet; while(1){ pthread_mutex_lock(&(sMux)); *p = *p + 1;//produce pthread_mutex_unlock(&(sMux)); iRet = sem_post(&semA);//return value? if(iRet == 0) { sleep(1); if(*p == 0) printf("consumer disturb!the stock is %d\n",*p); } else { printf("sem_post() failed!\n"); sleep(10); } } return NULL;}运行结果:
按时间间隔,生产3个,消费1个。
如果把时间间隔去掉:
可能因为两个函数的语句执行时间问题,生产者还是快于消费者。时间长的话这个变量可能会爆掉。虽然实际使用时不太可能这么简单粗暴,一个while(1)没完没了的生产,应是有事件之类的才做改变,并给出信号量。但是毕竟还是要考虑这种情况的。
所谓爆掉,应该有两种东西,爆掉,一个是信号量,一个是库存,这里边直观打印出来的是库存(把sem的打印关了),其实两个都在爆。
关于库存,这就应该是人来解决了,数据上限也不一样,可能是一个指定大小的自定义消息队列,判断爆的方法肯定是看队列空间,化解的方法,其实也就是多弄几个消费者,多开几个线程吧。
关于信号量,不知道怎么办了。先测试一下爆的结果!关掉库存打印,改为打印信号量,同时让消费者睡眠,增加爆信号量效率。
多消费者实例:
//多个消费者,通过阻塞实现互斥,避免你接信号量我处理消息。运行结果,库存不小于0,证明两个消费者是严格按照生产者产生的信号来消费的。那个警告,是正常预期了,肯定要发生这种事,但是逻辑上已经没错误了,因为生产过程在信号量产生之前。
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
static pthread_t thread_producerA_id;
static pthread_t thread_consumerA_id;
static pthread_t thread_consumerA2_id;
static pthread_mutex_t sMux;
static sem_t semA;
static int i = 0;
void *consumerA(void *pA)
{
int *p = pA;
int iRet;
while(1)
{
iRet = sem_wait(&semA);//return value?
if(iRet == 0)
{
printf("consume:sem is %d\n",semA);
if(0 == *p)
{
printf("Error! wait for the stock!\n");
sleep(1);
}else
{
pthread_mutex_lock(&(sMux));
printf("consumer id:%lld\n",pthread_self());
*p = *p - 1;//consume
printf("consume done!the stock is %d\n",*p);
pthread_mutex_unlock(&(sMux));
}
}else
{
printf("sem_wait() returns -1\n");
}
// printf("consumer sleep!\n");
// sleep(3);
}
return NULL;
}
void *producerA(void *pA)
{
int *p = pA;
int iRet;
while(1){
pthread_mutex_lock(&(sMux));
*p = *p + 1;//produce
printf("produce done!The stock is %d\n",*p);
pthread_mutex_unlock(&(sMux));
iRet = sem_post(&semA);//return value?
printf("produce:sem is %d\n",semA);
if(iRet == 0)
{
sleep(1);
if(*p == 0)
printf("WARN:consume done before produce loop done!\n",*p);
}
else
{
printf("sem_post() failed!\n");
// sleep(10);
}
}
return NULL;
}
int main()
{
int a;
a = 0;//初始化
int iRet = 0;
//semaphore initialization
sem_init(&semA,0,0);
iRet = pthread_create(&thread_consumerA_id,NULL,consumerA,&a);
if(iRet != 0)
{
printf("create failed!\n");
}
iRet = pthread_create(&thread_consumerA2_id,NULL,consumerA,&a);
if(iRet != 0)
{
printf("create failed!\n");
}
iRet = pthread_create(&thread_producerA_id,NULL,producerA,&a);
if(iRet != 0)
{
printf("create failed!\n");
}
pthread_join(thread_producerA_id,NULL);
pthread_join(thread_consumerA2_id,NULL);
pthread_join(thread_consumerA_id,NULL);
}
=========================================================================================================================
信号量的机制,应该还是让线程睡眠咯!只不过不是sleep()的那个睡眠,那个睡眠太被动了,必须睡到指定时间还能醒,而信号量的sem_wait()随时可以被唤醒,这当时是系统支持的了,不用管太多。
至于为什么这个信号量只是线程间同步,而进程不能用它同步,通过打印也看到了,其实就是个变量。进程是拥有资源的基本单位嘛,而线程间共享,所以这个变量当然只有进程内部可见。