linux之线程同步(生产消费模型)
为什么有同步
面对共享资源被多个线程访问,有个解决办法就是使用互斥锁!
那么是不是使用互斥锁就可以了呢?就可以解决所有的问题?例如数据不一致问题,线程安全问题——不是!
加锁也有自己的应用场景,不是所有的场景都是时候加锁的!例如下面的代码
#include<iostream> #include<pthread.h> //这是一个模拟抢票的逻辑 pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; int tickets = 1000;//充当有1000张票 void* getTicket(void* args) { std::string username = static_cast<const char*>(args); while(true) { pthread_mutex_lock(&lock); if (tickets > 0) { usleep(1245); std::cout << username << " 正在抢票" << tickets << std::endl; tickets--; pthread_mutex_unlock(&lock); } else { break; pthread_mutex_unlock(&lock); } } return nullptr; } int main() { pthread_t t1,t2,t3,t4; pthread_create(&t1,nullptr,getTicket,(void*)"thread 1"); pthread_create(&t2,nullptr,getTicket,(void*)"thread 2"); pthread_create(&t3,nullptr,getTicket,(void*)"thread 3"); pthread_create(&t4,nullptr,getTicket,(void*)"thread 4"); pthread_join(t1,nullptr); pthread_join(t2,nullptr); pthread_join(t3,nullptr); pthread_join(t4,nullptr); }
==虽然这个互斥锁解决了数据不一致问题!但是它很不合理,因为有一个线程对于锁的竞争力比另外几个线程更强!所以导致了经常只有一个线程在抢票!==
那么如何安全且合理的抢票呢?
我们举个例子:假设现在在学校里面有一个自习室,叫做学霸VIP自习室!自习室的环境特别好但是只有一张座椅板凳,只允许一个同学进去自习!——拥有这个自习室的前提条件是,谁先拿到钥匙谁就能先进去,先到先得,自习室的钥匙放在门口!
有一个同学是个卷王是最早来的!他拿下钥匙,将门反锁!后面人就进不来了!——那么这为同学就互斥式的访问了这个自习室!(这种互斥式的访问就看谁的竞争能力强(谁先早到)! 谁就先拿到钥匙,那么就能将门反锁)
在这个同学自习了两个小时,想去上厕所,但是打开门一看,门口站满了人,都准备抢这个钥匙,但是这个同学只是想上个厕所,**于是将钥匙放进口袋里面!**然后出来的时候将门反锁!
这样子在同学上厕所,就没有能进这个自习室了!——这叫==当线程执行流被切换走的时候,将钥匙也一起带走了!==这样子等回来后,又开门,反锁,继续自学。
等过了2小时,这位同学有点不好意思了,所以决定让出自习室,于是开锁,出门,反锁,挂上钥匙,等准备走的时候,又觉得自己今天一大早就出来,就自习这么一会有点亏,所以想在自习一会,因为这位同学离这个钥匙最近最近,所以他又反手拿起钥匙,开门,反锁,继续自习!
==这叫什么!——这叫因为离资源最近!所以竞争能力最强!所以又申请到了这个钥匙==
然后这位同学终于肚子饿了,出门,反锁,挂上钥匙,但是一看门口人那么多,想到下次又不知道什么时候能轮到自己,于是凭借着离钥匙最近于是又拿起钥匙,开门,反锁,自习,但是只呆了几分钟就撑不住了,又出门了,又不甘心。
因为其他人离钥匙都没有这个同学近,所以其他同学,只能看着这位同学疯狂开门,放钥匙,拿钥匙,每一次待又不到几分钟
==这样子周而复始下去,这个自习室有没有创造价值?——没有!因为大部分时间都在申请锁,释放锁这个动作了!,有没有让别人申请锁呢?——没有!因为这位同学的竞争能力太强,而导致别人无法申请到锁!==
==我们将其他在门口的人,长时间得不到锁资源,而无法访问公共资源(自习室)的这些同学处于饥饿状态!==
一个线程频繁的申请锁资源!而导致其他线程长期得不到资源的问题就是==饥饿问题!==
但是这个线程(同学)有错么?——没有就是因为就是怎么规定的!它的竞争能力就是比其他线程(同学)更强!==但是这样子不合理!==
在其他同学的投诉下!学校出了新的规定!——==所有在自习室等待的同学都要排队,然后从自习室出来的同学,不能立马申请锁!要先去当前队列的尾部!然后重新等到这位同学的时候才能申请!==——这样子就在保证一个人访问自习室的情况下(数据安全的情况下),让同学按照一定的顺序进行访问自习室(公共资源)
==我们将同学看做线程,自习室看做公共资源!——翻译过来就是,当线程访问公共资源完毕之后,不能立马申请锁!而是要到等待队列的最末端重新开始等待才能申请!的这种形式就是线程同步==
==线程同步的本质就是在临界资源访问安全的前提下,让多个线程按照一定的顺序进行资源访问!==——从而解决因为一个线程竞争能力特别强,导致其他资源饥饿的问题!
生产消费模型
什么是生产消费模型——专业的说法是一种多执行流协同的方式
那么为什么要协同呢?——因为在多线程访问的时候,总会访问到公共资源!这些公共资源在被线程无序访问的时候一定会导致数据不一致问题!虽然可以通过加锁来解决这个问题
但是又时候导致一些不合理的问题,例如:饥饿问题
所以我们==既要保证安全,又要保证顺序(这种顺序可以不是一种绝对的顺序,但是一定要有顺序),尽可能的保证一个线程能合适合理的访问某种资源!所以我们要有一种多线程在这种工作场景下的模式!——这种模式最常见的就是生产者消费者模型!==
接下来我们用生活中的一些例子来解释一下什么是生产者消费者模型!
我们望文生义一下,就是要有消费的人,也要有生成的人
生活中最典型的就是超市,我们线下买东西,一般喜欢去超市买
所以生产者和消费者是通过超市这个平台来间接进行交易的!
==那么当学生进行消费的时候!供应商在干什么呢?——可能在生产,也可能在放假==
==反过来想供应商在生成的时候,学生又在干什么呢?——可能就在消费,也可能没有在消费==
==正因为有超市的存在!生成与消费的过程分离开来了!生产了不一定要立刻消费!消费了也不一定要立刻生成!——在计算机的术语里面这就是将生成与消费进行解耦!==
当然,如果工厂一直不生产,也是不行的!消费者没得消费!反过来消费者一直不消费,工厂一直生成也是不行的!——所以==超市是一个临时保存产品的场所!==让消费者想要消费的时候能立刻消费!生产者能不用立刻生产,而是等到需要的时候再进行生产,从而完成一定程度的解耦
因为有超市的存在,所以生成者生成的东西能放在超市里!工消费者一段时间的消费!因为有消费者的消费的行为让超市能够临时提供空出位置,就可以让生成者进行一定程度的生成!==超市的存在让这两者的步调能够不怎么一致!==
==这个能临时保存产品的场所在计算机的术语里面——我们叫做缓冲区!==
在我们日常写代码的时候有么有这样的类似的模型呢?——==函数调用!==
函数调用,我们一般都是可以通过形参或者实参的方式将数据交给函数,然后函数在自己的函数体内对将我们传入的数据进行加工处理,然后将结果通过返回值的方式返回!
相当于消费者这一次直接去工厂要产品,工厂没有存货,所以开机器直接给他生成了——那么消费者在生成产品出来之前,那么只能一直等着!
==如果没有超市这样的角色!那么就可能会出现和工厂这样的强耦合的关系!进而导致消费者等待的问题!——而一等待效率就低了!==
==那么到底什么是生成消费模型呢?==
超市,可以被生产者和消费者访问,生产者将东西放在超市,消费者从超市拿东西!——==这是生产者和消费者都能看到的东西!即超市就是一个共享资源!==
那么有没有可能出现这样的情况,消费者来超市!但是发现展架上没有东西了!都被卖完了!然后恰好来了一个超市人员来补货,==正在==往展架上补货的时候,消费者就去拿货物拿能不能拿成功呢?——==不确定!==因为只要这个工作人员要么放,要么不放,==如果有中间状态,当消费者正在访问的时候,那么当前究竟有没有放是不确定的!==——==所以当生成和消费在并发访问的时候,因为超市的某一些资源都是共享的!如果访问到了同一块资源,那么就有可能出现同时访问的问题!——从而造成数据不一致问题!==(现实中,消费者还能与补货员沟通!==但是线程之间是做不到这一点的!==)
==那么所谓的消费者在代码当中——其实就是一个线程或者多个线程!==
==那么所谓的生产在代码当中——也是一个线程或者多个线程!==
==因为涉及到了多执行流访问!所以这份共享资源首先就要被保护起来!==
那么该如何被保护呢?——这我们就要先明白生产者和消费者之间的关系
首先是生产者和生产者之间是什么关系?
现实生活中,A品牌火腿和B品牌火腿,展架上只能摆一个品牌的火腿,那么这两个生产者是什么关系呢?——==竞争关系,有你没我,有我没你==
==在计算机术语中——生产者和生产者之间就是互斥关系!==
可以允许任意一个先来!但是不允许两个同时一起来!——即值允许一个生产
因为在计算机里面,数据是可以被覆盖的!一个生产者生产出来的数据放在缓冲区!,如果没有被消费者读取走!就被另一个生产者生产的数据给覆盖了!这样是不行的!
那么消费者和消费者之间的是还说呢么关系?
消费者和消费者也是典型的竞争关系!——日常生活中,我们也经常能看到所谓的限量销售!此时都是要互相抢这购买
==所以消费者和消费者之间也是互斥关系==
那么生产者和消费者之间是什么关系?
在现实的里面!商品是不会被覆盖的,在一个展架上,生产者哪怕继续生产业也没有关系!
但是在计算机里面!数据是可以被覆盖的!当缓冲区里面有数据!而生产者却继续生产!那么就会导致数据被覆盖!——所以当消费者在缓冲区拿取数据的时候!生产者是不能进行生产的!防止数据被覆盖!——例如:消费者在读取"hello world"这个字符串!只读取完“hello” ,“world”还没有读取!就被生产者生产的新数据“666”给覆盖了!结果就是消费者拿到了"hello 666"
==所以生成者和消费者之间首先就要有一种**互斥关系!**保证数据安全==(这种关系不一定要有!但是如果是访问同一份共享资源的时候!那么就要有!)
消费者去超市购买火腿肠!但是火腿肠没有货!所以只好返回,接下来的一个月每天都来问工作人员,火腿肠到货了没有——这是在干什么==在检查火腿肠的就绪状态==这种行为实际上在浪费消费者的时间!本来就没有货,还过来,而且还浪费了工作人员的时间,如果有1千个人来问,那么工作人员就将时间的都浪费在这个行为上了,本来还能干一些其他有意义的事情。生产者也是,如果生产者天天来询问缺货了没有,那么会浪费很多时间在询问这个行为上面——这样的行为==合理但是不高效!==
==最好的办法就是工作人员告诉消费者,等有货了就直接通知消费者,就不用麻烦消费者天天来询问,等快要缺货了!就告诉生产者,让它赶紧生产——生产一部分,消费一部分!这样子就不会让消费者来了但是空手而归!将生产和消费协调起来!==
==所以生成者和消费者之间还有一个同步关系!保证效率的高效!==
概念总结
生产消费模型的三种关系:生产者和生产者(互斥),消费者和消费者(互斥),生产者和消费者(互斥与同步)——==都是为了保证共享资源的安全性!==
生产消费模型的两种关系:生产者线程与消费者线程
生产消费模型的一个场所:一段特定结构的缓冲区!(存放产品——即数据)
上面可以总结为321原则——只要我们想写生成消费模型!本质就是维护这个321原则
生成消费模型的特点
- 将生产者线程和消费者线程进行解耦!
防止出现,消费者要数据,生产者才立刻生产,从而导致消费者进行等待的问题
- 支持生产和消费的一段时间的忙闲不均的问题
如果出现生产者生产快,消费者消费慢,或者生产者生产慢,消费者消费快的情况会不会出现问题呢?——==不会!因为缓冲区能够暂时缓存一部分数据==
就像是过年的时候,厂商放假,不生产,但是消费者又疯狂的进行消费,此时货品是哪里来的?——都是存再超市仓库里面!
又比如:工作日,厂商不断的生产,但是消费者都在工作!没有几个人来消费,那么生产出来的产品去哪里了?——都存在超市的仓库里面了!
==所以这样子就让生产者生产的快慢和消费者消费的快慢之间,彼此不产生太大影响!==
彼此哪怕速度不匹配也不用担心
- 提高效率
如果没有超市,那么生产者就得自己亲自找到消费者售卖产品或者消费者自己去找到生产者拿到商品!——这样子就效率不高了!
但是有了超市就可以让消费者专注消费,生产者专注生产!
==这就是生成消费模型的三个特点!==
以前写代码是main函数获取用户输入,调用fun函数,fun函数执行,返回结果!
但是这子会出现fun函数在调用的时候,main函数只能去等待,然后fun函数也只能去等待main函数去获取用户数据!——这种串行的执行方式效率很低!
==如果将其换成是生成消费模型的方式来执行会怎么样?==
让主线程去执行main函数,专门用来获取用户输入!然后将获取到到数据投入到缓冲区里面!
让线程去缓冲区里面拿数据,然后调用fun函数,专注处理数据!
有可能,此时缓冲区里面已经输入了大量的数据后,用户不再输入了!
那么此时会影响到fun函数的执行么?——不会!它会一直从缓冲区拿数据
==这样子从以前串行的执行的过程,变成了两个执行流并发的执行!——从而在一定程度上提高效率!==
但是这样也可能有一个问题——生产者和消费者如果是互斥关系,如果出现了这样一种情况
生产者刚刚生产一个数据,然后消费者在缓冲区里面拿数据!因为互斥,那么此时生产就得等消费者,同理消费者消费发现缓冲区里面没有数据,那么只能等生成者!——如果我们维持严格的互斥关系,那么是不是有可能退化成生产一个数据,消费一个数据这种情况!那样子==提高效率的特点==是不是又没有体现出来了呢?——那么生产消费模型的高效究竟体现到哪里呢?
条件变量
我们上面说过,一个线程频繁的申请锁资源!而导致其他线程长期得不到资源的问题就是==饥饿问题!==,要实现线程同步!首先就是要先解决一个线程频繁的申请锁资源(竞争能力过强),像是上面的生产消费模型,就有可能出现生产者线程疯狂的进行生产,导致不断申请锁资源,从而导致消费者无法进行消费!
所以就有了条件变量这个解决策略!——条件变量的存在就是为了满足多线程协同的需求所诞生了满足多线程协同的技术!
==什么是条件变量?==
- 当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。
这句话我们举个例子来理解:有抢票,相对的就是放票,有一家电影院里面有个电影很火,老板进行饥饿营销!初期只向市场投放1000张票,消费者或者黄牛使用抢票函数进行抢票,票一瞬间被抢完了!但是老板不进行补票!
但是黄牛或者消费者都不知道什么时候放票!——那么只能干一件事就是疯狂的进行刷票!
即抢票函数只能先申请锁,判断票数是否大于0,不大于0就释放锁,返回,一直循环!
==黄牛和消费者都在竞争式的访问这票数,在**放票的线程(其他线程)没有改变票数(变量)**之前,**黄牛或消费者(访问变量的线程)**什么都做不了!==因为获取票的条件不满足!
还有一种情况——例如一个线程访问队列时,发现队列为空,它只能等待,只到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。
==在系统里面条件变量就是一个数据类型==
从相关接口我们就可以看出来有个参数类型就是pthread_cond_t这就是一个条件变量类型!——cond(condtion的缩写)
==那么条件变量如何解决我们面临的主要问题呢?==
当我们未来访问一份临界资源的时候,首先我们就要加锁——判断——然后解锁!
我们无从知道临界资源的状态是什么!所以进入锁后首先就是要判断生产和消费条件是否满足——例如:临界资源是一个队列!我们不知道这个队列是否为空,是否为满!
//就跟这份抢票的逻辑一样! void* getTicket(void* args) { std::string username = static_cast<const char*>(args); while(true) { pthread_mutex_lock(&lock);//先加锁 if (tickets > 0)//判断抢票的逻辑是否满足! { usleep(1245); std::cout << username << " 正在抢票" << tickets << std::endl; tickets--; pthread_mutex_unlock(&lock);//然后解锁! } else { break; pthread_mutex_unlock(&lock); } } return nullptr; }
==如果只维护互斥关系!就可能出现==
==但是现在我们要做的是==
==实现这种策略的就是使用条件变量来完成的!——使用phread_cond_wait函数来实现挂起等待!==
在某个条件变量下进行挂起等待!
==有等待自然就要有唤醒!——pthread_cond_siganl,这个函数就是用来唤醒挂起等待的线程!==
==理解条件变量==
我们用一个例子来理解条件变量!
有一家公司,包下了一整包间,用来进行面试!HR在包间里面!
能不能有很多的人进面试间和HR面试呢?——不能==所以HR就是一份临界资源!必须被互斥的访问!==所以一次只能进一个人!
但是这个公司组织的很不好!包间外面乱糟糟的!一有人出来,就有一堆人冲过去!想要面试!==这就是这一堆人在无序的状态下对临界资源进行并发式的竞争!==面试官只能随便挑一个,甚至出现一个人面试多次的情况,有的人一次也没有面试到的情况(饥饿问题)
后来公司为了解决这个问题——于是多配了一个人HR,并在面试放房间的外面设置了**一个等待区!**规定,面试的人只从等待区里面挑!并且要排队等待!
只有里面的面试官说,面试完了下一个,外面的HR才会让下一个人进入!
==未来所有应聘者(线程)去等待肯定是去这个条件变量下面等待!==
==面试官去叫某个人(线程去唤醒另一个线程)也一定是通过条件变量来去叫的(去唤醒指定线程!)==!
==当条件不满足的时候,线程只能去某些定义好的条件变量上进行等待!==
在内核的角度看条件变量
我们可以将条件变量看出一个简单的结构体!里面有一个队列!
条件变量接口介绍
条件变量初始化函数——
pthread_cond_init
该函数的第一个参数——==cond,就是要初始化的条件变量!==
第二个参数——==是一个输出型参数attr,用于获取条件变量的属性!==
返回值成功返回0,失败返回错误码!
==如果条件变量被定义为全局变量!==
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
可以以这种方式进行初始化!
条件变量释放函数——
pthread_cond_destroy
该函数的第一个参数cond,即要释放的条件变量!
返回值成功返回0,失败返回错误码!
条件变量等待条件满足的接口——
pthread_cond_wait
第一个参数——==cond要在那个条件变量下等待!==
第二个参数——==使用的互斥量!==
返回值成功返回0,失败返回错误码!
还有一个等待函数
pthread_cond_timewait
——这个函数可以设定在一个特定时间,在这个时间段内阻塞式的等待!等时间到了就自动返回!唤醒线程等待的函数——
pthread_cond_signal
——这个函数是用来==唤醒一个线程的!==该函数只有一个参数,即要唤醒那个条件变量下面的线程!
返回值成功返回0,失败返回错误码!
唤醒线程函数——
pthread_cond_broadcast
这个函数和上面的函数有什么不一样呢?——==这是用来唤醒一堆线程的!==在该条件变量下等待的==所有线程==都会被立刻唤醒!
==条件变量一定要配合锁使用!——因为条件变量本身是没有互斥功能的!==
#include<pthread.h> #include<iostream> #include<string> #include<unistd.h> pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t cond = PTHREAD_COND_INITIALIZER; int tickets = 1000; void* start_routine(void* args) { std::string name = static_cast<char *>(args); while(true) { pthread_mutex_lock(&mutex); if(tickets > 0) { pthread_cond_wait(&cond,&mutex);//进入锁后先进行等待! //为什么这里要有mutex呢? std::cout << name << " -> " << tickets << std::endl; tickets--; pthread_mutex_unlock(&mutex); } else { break; } } } int main() { //通过条件变量来控制线程的执行! #define NUM 5 #define SIZE 64 pthread_t t[NUM]; for (int i = 0; i < NUM; i++) { char* name = new char[SIZE]; snprintf(name,SIZE,"thread %d", i + 1); pthread_create(&t[i], nullptr, start_routine, (void *)name); } while(true) { sleep(1); pthread_cond_signal(&cond);//主线程用来控制唤醒的线程继续运行! //在该条件变量下的线程! std::cout << "main thread wakeup one thread ..." << std::endl; } for(int i = 0;i<NUM;i++) { pthread_join(t[i],nullptr); } return 0; }
==我们可以看到虽然我们没有在start_routine函数里加上sleep,但是线程还是按照一定的时间间隔进行抢票!==
int main() { //...... while(true) { sleep(1); pthread_cond_broadcast(&cond);//将唤醒线程改成这个 std::cout << "main thread wakeup threads ..." << std::endl; } //..... return 0; }
5个线程都同时被唤醒!
实现基于BlockingQueue的生成消费模型
**在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。**我们会给队列设置一个上限!
其与普通的队列区别在于,当队列为空时,从队列获取元素的操作(进行消费)将会被阻塞,直到队列中被放入了元素(进行生产);当队列满时,往队列里存放元素的操作也会被阻塞(不能再再生产),直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
==肯定至少要有一个生产者线程(或者多个)向BlockingQueue放数据,一个消费者线程(或者多个)向BlockingQueue里面取数据!==
==这个BlockingQueue就是我们上面消费者模型说的“交易场所”——即一段特定结构的缓冲区!也是我们将要实现的!==
因为是队列所以内部可以缓存很多的数据!所以就能明显的实现一个线程在生产,另一个线程在消费的行为!
因为阻塞队列有明显的满和空的概念!所以一定会有条件满足与条件不满足的概念!
而且因为一些线程要从队列里面放,另一些线程要从队列里面拿!——这很明显是一个共享资源!——既然是共享的那么就要互斥的访问!一个线程访问了!另一个线程就不能访问!
当队列满了我们就得让生产者线程去等待!——在对应的条件变量下面等待!
同理当数据为空的时候!那么就要让消费者线程也去对应的条件变量下面等待!
==阻塞队列的模拟实现!==
//BlockingQueue.hpp #pragma once #include<iostream> #include<queue> #include<pthread.h> const int gmaxcap = 5; template<class T> class BlockQueue { public: BlockQueue(const int &maxcap = gmaxcap) : maxcap_(maxcap) { //首先进行内部成员初始化! pthread_mutex_init(&mutex_,nullptr); pthread_cond_init(&pcond_,nullptr); pthread_cond_init(&ccond_,nullptr); } //因为生产者和消费只关心拿数据和放数据! //所以阻塞队列只有两个对外接口最重要! void push(const T& in)//输入型参数,一般设计成const & { pthread_mutex_lock(&mutex_); //1.判断! if(is_full()) //判断是不是满的!但是这样的判断存在bug! { pthread_cond_wait(&pcond_,&mutex_);//生产条件不满足,无法进行生产!生产者进行等待! } //2.走到这里肯定是没有慢的! q_.push(in);//生产数据! //3.走到这里一定能保证阻塞队列里面有数据! //让消费者进行消费! pthread_cond_signal(&ccond_);//唤醒消费者!进行消费! pthread_mutex_unlock(&mutex_); } void pop(T* out)//输出型参数!一般设计成指针(*),如果是输入输出型那么就是& //通过这样的风格来区分参数的作用是什么! { pthread_mutex_lock(&mutex_); //1.判断是不是空的! if(is_empty())//这个也存在bug! { pthread_cond_wait(&ccond_,&mutex_);//是空的!放在条件变量下面进行等待! } // 2 .走到这里可以保证一定有数据!一定不为空! *out = q_.front();//获取数据! q_.pop();//弹出数据! //3. 可以保证阻塞队列里面至少有一个空的位置! pthread_cond_signal(&pcond_);//唤醒生产者 pthread_mutex_unlock(&mutex_); } ~BlockQueue() { pthread_mutex_destroy(&mutex_); pthread_cond_destroy(&pcond_); pthread_cond_destroy(&ccond_); } private: bool is_empty() { return q_.empty(); } bool is_full() { return q_.size() == maxcap_; } private: std::queue<T> q_; int maxcap_;//表示队列元素的上限! pthread_mutex_t mutex_;//因为stl本事不是线程安全的!所以要有锁来进行保护! pthread_cond_t pcond_;//万一队列满了让生产者去对应的条件变量下面休眠! pthread_cond_t ccond_;//万一队列空了让消费者去对应的条件变量下面休眠! };
//mainCp.cc #include"BlockingQueue.hpp" #include<unistd.h> #include<sys/types.h> #include<ctime> //调用逻辑 void* consumer(void* bq_) { BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(bq_); while(true) { //从事消费活动! int data; bq->pop(&data); std::cout << "消费数据: " << data << std::endl; sleep(1);//可以用这个来控制消费速度! } return nullptr; } void* productor(void* bq_) { BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(bq_); while(true) { //从事生产活动 int data = rand()% 10 +1;//在这里先用随机数构建一个数据! bq->push(data);//这样子就完成生产了! std::cout << "生产数据! " << data << std::endl; //sleep(1);//控制生产速度! } return nullptr; } int main() { srand((unsigned long)time(nullptr)^getpid()); //要看到同一份资源! BlockQueue<int>* bq = new BlockQueue<int>; pthread_t c,p;//consum and product pthread_create(&c,nullptr,consumer,bq); pthread_create(&p,nullptr,productor,bq); //通过第四个参数来将同一份资源传给不同的线程! pthread_join(c,nullptr); pthread_join(p,nullptr); delete bq; return 0; }
==阻塞队列的典型表现就是!——无论是谁慢!另外一个也必须跟着慢下来!==
生产快!但是消费慢!
消费的是历史数据!
生产慢!但是消费快!
生产一个消费一个!
问题1——在阻塞队列的的push与pop里面!我们发现了一个问题!——即我们是在持有锁的是将自己挂起的!那么锁怎么办?挂起后谁来释放锁?
void push(const T& in) { pthread_mutex_lock(&mutex_); if(is_full()) { pthread_cond_wait(&pcond_,&mutex_); //我们被挂起的时候!我们还持有锁! //那么按理来说下面的pop就应该无法进入! //因为生产者线程和消费者线程使用的都是同一把锁! //但是事实却是下面的消费者线程仍然可以调用pop函数进行正常消费! } q_.push(in); pthread_cond_signal(&ccond_); pthread_mutex_unlock(&mutex_); } void pop(T* out) { //如果上面的生产者线程因为在持有锁的时候被挂起! //那么消费者线程是如何进入锁里面的? pthread_mutex_lock(&mutex_); if(is_empty()) { pthread_cond_wait(&ccond_,&mutex_); } *out = q_.front(); q_.pop(); pthread_cond_signal(&pcond_); pthread_mutex_unlock(&mutex_); }
这就是
pthread_cond_wait
的第二个参数的作用==第二个参数必须是我们正在使用的互斥锁!==
当我们调用这个函数的时候!
该函数,首先会以原子性的方式,将锁释放后!才会将自己这个线程挂起!——所以不用担心锁不释放的问题!
那么当这个线程被重新唤醒的时候,我们不是还在临界区里面吗?
所以该函数在唤醒返回的时候!会自动重新获取我们传入的互斥锁!
问题2——假如生产者线程有10个,都因为队列满了进入了挂起状态!而消费者线程只有一个!且pop里面使用的不是pthread_cond_signal而是pthread_cond_broadcast这个函数,额能一次性唤醒10个生产者线程!==但是此时队列空出来的位置只有一个!==——那就出现问题了!
这时候就出现了伪唤醒的情况!
void push(const T& in) { pthread_mutex_lock(&mutex_); //为了防止上面的情况 while(is_full())//充当条件判断的语法要用while!而不是if //唤醒后要重新判断一次!防止出现异常或者伪唤醒的情况! { pthread_cond_wait(&pcond_,&mutex_); } q_.push(in); pthread_cond_signal(&ccond_); pthread_mutex_unlock(&mutex_); } void pop(T* out) { pthread_mutex_lock(&mutex_); while(is_empty())//唤醒后在判断! { pthread_cond_wait(&ccond_,&mutex_); } *out = q_.front(); q_.pop(); pthread_cond_signal(&pcond_); pthread_mutex_unlock(&mutex_); }
问题3——
pthread_cond_signal
这个函数能放在临界区的外部吗?==可以!既可以放在解锁之前!也可以放在解锁之后!==
不过一般都是建议放在里面! 我们以push为例:因为当我们将消费者线程唤醒!那么此时这个消费者线程就处于竞争锁比较靠前的位置!那么我们生存者线程再释放锁!那么消费者线程就能立马拿到这个锁了!然后开始运行!
不过后面解锁也可以!只是可能存在不是被我们唤醒的消费者线程拿走锁而已!但是无所谓!因为目的就是要让消费者线程拿走锁!是哪个无所谓!
阻塞队列的应用
==阻塞队列里面不仅仅可以放整形!还可以放其他的东西!——例如:任务!==
//Block_queues #pragma once #include<iostream> #include<queue> #include<pthread.h> const int gmaxcap = 5; template<class T> class BlockQueue { public: BlockQueue(const int &maxcap = gmaxcap) : maxcap_(maxcap) { //首先进行内部成员初始化! pthread_mutex_init(&mutex_,nullptr); pthread_cond_init(&pcond_,nullptr); pthread_cond_init(&ccond_,nullptr); } //因为生产者和消费只关心拿数据和放数据! //所以阻塞队列只有两个对外接口最重要! void push(const T& in)//输入型参数,一般设计成const & { pthread_mutex_lock(&mutex_); //1.判断! while(is_full()) //判断是不是满的! { pthread_cond_wait(&pcond_,&mutex_);//生产条件不满足,无法进行生产!生产者进行等待! } //2.走到这里肯定是没有慢的! q_.push(in);//生产数据! //3.走到这里一定能保证阻塞队列里面有数据! //让消费者进行消费! pthread_cond_signal(&ccond_);//唤醒消费者!进行消费! pthread_mutex_unlock(&mutex_); } void pop(T* out)//输出型参数!一般设计成指针(*),如果是输入输出型那么就是& //通过这样的风格来区分参数的作用是什么! { pthread_mutex_lock(&mutex_); //1.判断是不是空的! while(is_empty())//循环判断! { pthread_cond_wait(&ccond_,&mutex_);//是空的!放在条件变量下面进行等待! } // 2 .走到这里可以保证一定有数据!一定不为空! *out = q_.front();//获取数据! q_.pop();//弹出数据! //3. 可以保证阻塞队列里面至少有一个空的位置! pthread_cond_signal(&pcond_);//唤醒生产者 pthread_mutex_unlock(&mutex_); } ~BlockQueue() { pthread_mutex_destroy(&mutex_); pthread_cond_destroy(&pcond_); pthread_cond_destroy(&ccond_); } private: bool is_empty() { return q_.empty(); } bool is_full() { return q_.size() == maxcap_; } private: std::queue<T> q_; int maxcap_;//表示队列元素的上限! pthread_mutex_t mutex_;//因为stl本事不是线程安全的!所以要有锁来进行保护! pthread_cond_t pcond_;//万一队列满了让生产者去对应的条件变量下面休眠! pthread_cond_t ccond_;//万一队列空了让消费者去对应的条件变量下面休眠! };
#pragma once #include<iostream> #include<functional> #include<cstdio> class Task { using func_t = std::function<int(int,int,const std::string&)>; public: Task() {} Task(int x, int y, const std::string& op, func_t func) : x_(x), y_(y), op_(op), callback_(func) {} std::string operator()() { int result = callback_(x_, y_, op_); char buffer[1024]; snprintf(buffer, sizeof buffer, "%d %s %d = %d", x_, op_.c_str(), y_, result); return buffer; } std::string toTaskString() { char buffer[1024]; snprintf(buffer, sizeof buffer, "%d %s %d = ?", x_, op_.c_str(), y_); return buffer; } private: int x_; int y_; std::string op_; func_t callback_; };
#include"BlockingQueue.hpp" #include"Task.hpp" #include<unistd.h> #include<sys/types.h> #include<ctime> #include<string> #include<map> const std:: string oper = "+-*/%"; int mymath(int x,int y,const std::string& op) { using func_t = std::function<int(int,int)>; std::map<std::string,func_t> opfuncmap = { {"/",[](int x,int y) { if(y == 0) { std::cout << "div zero error!" << std::endl; return -1; } else return x/y; }}, {"%",[](int x,int y) { if(y == 0) { std::cout << "mod zero error!" << std::endl; return -1; } else return x%y; }}, {"*",[](int x,int y){return x*y;}}, {"+",[](int x,int y){return x+y;}}, {"-",[](int x,int y){return x-y;}} }; return opfuncmap[op](x,y); } //调用逻辑 void* consumer(void* bq_) { BlockQueue<Task>* bq = static_cast<BlockQueue<Task>*>(bq_); while(true) { //从事消费活动! Task t; bq->pop(&t); std::cout << "消费任务: " << t() << std::endl; // sleep(1); } return nullptr; } void* productor(void* bq_) { BlockQueue<Task>* bq = static_cast<BlockQueue<Task>*>(bq_); while(true) { //从事生产活动 int x = rand()% 10 +1;//在这里先用随机数构建一个数据! int y = rand()% 5;//在这里先用随机数构建一个数据! int operCode = rand()%oper.size(); std::string op(1,oper[operCode]); Task t(x,y,op,mymath); bq->push(t);//这样子就完成生产了! std::cout << "生产任务! " <<t.toTaskString()<<std::endl; sleep(1); } return nullptr; } int main() { srand((unsigned long)time(nullptr)^getpid()); //要看到同一份资源! BlockQueue<Task>* bq = new BlockQueue<Task>; pthread_t c,p;//consum and product pthread_create(&c,nullptr,consumer,bq); pthread_create(&p,nullptr,productor,bq); //通过第四个参数来将同一份资源传给不同的线程! pthread_join(c,nullptr); pthread_join(p,nullptr); delete bq; return 0; }
==但是无论是什么,我们最底层的阻塞队列都是不变的!!==
如果我们现在多一个新的需求!
一个线程派发任务!一个线程执行任务!**还有一个线程将结果记录在文件中!**我们该如何实现?——多一个阻塞队列即可!
//Task.hpp #pragma once #include<iostream> #include<functional> #include<cstdio> #include<ctime> #include<string> #include<map> #include<fstream> class CalTask { using func_t = std::function<int(int,int,const std::string&)>; public: CalTask() {} CalTask(int x, int y, const std::string& op, func_t func) : x_(x), y_(y), op_(op), callback_(func) {} std::string operator()() { int result = callback_(x_, y_, op_); char buffer[1024]; snprintf(buffer, sizeof buffer, "%d %s %d = %d", x_, op_.c_str(), y_, result); return buffer; } std::string toTaskString() { char buffer[1024]; snprintf(buffer, sizeof buffer, "%d %s %d = ?", x_, op_.c_str(), y_); return buffer; } private: int x_; int y_; std::string op_; func_t callback_; }; const std:: string oper = "+-*/%"; int mymath(int x,int y,const std::string& op) { using func_t = std::function<int(int,int)>; std::map<std::string,func_t> opfuncmap = { {"/",[](int x,int y) { if(y == 0) { std::cout << "div zero error!" << std::endl; return -1; } else return x/y; }}, {"%",[](int x,int y) { if(y == 0) { std::cout << "mod zero error!" << std::endl; return -1; } else return x%y; }}, {"*",[](int x,int y){return x*y;}}, {"+",[](int x,int y){return x+y;}}, {"-",[](int x,int y){return x-y;}} }; return opfuncmap[op](x,y); } class SaveTask//多一个保存任务类! { using func_t = std::function<void(const std::string&)>; public: SaveTask() {} SaveTask(const std::string &message, func_t func) : message_(message), func_(func) {} void operator()() { func_(message_); } private: std::string message_; func_t func_; }; void Save(const std::string& message)//多一个保存任务! { const std::string target = "./log.txt"; std::ofstream ofs(target,std::ofstream::app); ofs << message << std::endl; }
//mainCP.cc #include"BlockingQueue.hpp" #include"Task.hpp" #include<unistd.h> #include<sys/types.h> template<class C,class S>//C,计算,S:存储 class BlockQueueS { public: BlockQueue<C> *c_bq; BlockQueue<S> *s_bq; }; //调用逻辑 void* consumer(void* bqs_) { BlockQueueS<CalTask, SaveTask>* bqs = static_cast<BlockQueueS<CalTask, SaveTask>*>(bqs_); BlockQueue<CalTask> *bq =bqs->c_bq; BlockQueue<SaveTask> *save_bq =bqs->s_bq; while(true) { //从事消费活动! CalTask t; bq->pop(&t); std::string result = t();//拿出结果 std::cout << "计算任务: " << t() <<"... done!" <<std::endl; SaveTask save(result,Save);//构建一个存储任务! save_bq->push(save); std::cout << "推送保存任务完成...." << std::endl; // sleep(1); } return nullptr; } void* productor(void* bqs_) { BlockQueueS<CalTask, SaveTask>* bqs = static_cast<BlockQueueS<CalTask, SaveTask>*>(bqs_); BlockQueue<CalTask>* bq = bqs->c_bq; while(true) { //从事生产活动 int x = rand()% 1000 +1;//在这里先用随机数构建一个数据! int y = rand()% 89;//在这里先用随机数构建一个数据! int operCode = rand()%oper.size(); std::string op(1,oper[operCode]); CalTask t(x,y,op,mymath); bq->push(t);//这样子就完成生产了! std::cout << "生产任务! " <<t.toTaskString()<<std::endl; sleep(1); } return nullptr; } void* Saver(void* bqs_)///多一个生产任务线程! { BlockQueueS<CalTask, SaveTask> *bqs = static_cast<BlockQueueS<CalTask, SaveTask> *>(bqs_); BlockQueue<SaveTask>* save_bq = bqs->s_bq; while(true) { SaveTask t; save_bq->pop(&t);//消费存储任务! t(); std::cout << "保存任务完成...." << std::endl; } return nullptr; } int main() { srand((unsigned long)time(nullptr)^getpid()); //要看到同一份资源! BlockQueueS<CalTask,SaveTask> bqs; bqs.c_bq = new BlockQueue<CalTask>; bqs.s_bq = new BlockQueue<SaveTask>; pthread_t c,p,s;//consum and product pthread_create(&c,nullptr,consumer,&bqs); pthread_create(&p,nullptr,productor,&bqs); //通过第四个参数来将同一份资源传给不同的线程! pthread_create(&s,nullptr,Saver,&bqs); pthread_join(c,nullptr); pthread_join(p,nullptr); pthread_join(s,nullptr); delete bqs.c_bq; delete bqs.s_bq; return 0; }
==我们也可以看到一共四个线程在跑——一个主线程!三个新线程!(product,consumer,Saver)其中consumer线程既充当了消费者,也充当了生产者!==
上面的代码我们也很容易的就修改成多生产,多消费!
int main() { srand((unsigned long)time(nullptr)^getpid()); //要看到同一份资源! BlockQueueS<CalTask,SaveTask> bqs; bqs.c_bq = new BlockQueue<CalTask>; bqs.s_bq = new BlockQueue<SaveTask>; pthread_t c[2],p[3],s;//consum and product,Saver pthread_create(c,nullptr,consumer,&bqs); pthread_create(c+1,nullptr,consumer,&bqs); pthread_create(p,nullptr,productor,&bqs); pthread_create(p+1,nullptr,productor,&bqs); pthread_create(p+2,nullptr,productor,&bqs); pthread_create(&s,nullptr,Saver,&bqs); pthread_join(c[0],nullptr); pthread_join(c[1],nullptr); pthread_join(p[0],nullptr); pthread_join(p[1],nullptr); pthread_join(p[2],nullptr); pthread_join(s,nullptr); delete bqs.c_bq; delete bqs.s_bq; return 0; }
这样子就完成了!——不管是几个生产者还是几个消费者!进入阻塞队列里面都是要先加锁!生产者线程们都会去抢锁!无论外面有多少个线程!然后进入阻塞队列里面的永远只有一个生产者线程!消费者也是同理!
==那么创建多线程生产和消费的意义是什么呢?——永远都只有一个执行流在阻塞队列里面执行!(都是串行执行)==
==生成消费模型的高效体现在哪里呢?==
对于生产者而言,向blockqueue里面放置任务,对于消费者而言,向blockqueue里面拿去任务,放任务和拿任务的过程都是串行的!谈何高效呢?
对于生产者,他的任务是从哪里来的?——他获取任务和构建任务要不要时间呢?
对于消费者,难道它把任务从任务队列里面拿出来就完了么?消费者拿到任务后,后续还有什么要干呢?
我们的生产者线程,现在只随机的生成几个数据!——但是在为了真正的项目中,难道数据是怎么简单的就产生了?——不是!==这个数据肯定从数据库或者网络,或者外设==拿来的用户数据!然后才去构建任务!——==这是要花很多时间的!==
对于我们的消费者!用pop从阻塞队列拿出来任务!但是还要去执行任务!——在真正的项目中!==这个任务可能是非常的耗时间——拿出任务反倒是最简单的一步!==
==所以高效体现在哪里呢?——对于消费者线程虽然线程拿任务的时候是串行的!但是拿到任务后!执行任务却是并行的!在有可能最浪费时间的过程里面!最大的节省了时间!并发的执行任务才是生成消费模型最高效的地方!且还能不影响其他线程继续的从队列里面拿去任务,执行任务!==
==生产者也是同理!有时候构建任务才是最浪费时间的!但是它可以让多个线程都是一起构建任务!最后放任务虽然是串行的!但是也是最快的!还能不影响其他线程继续构建任务!==
==在生产之前和消费之后,让线程并行执行这才是这个模型的最高效的地方!生产和消费的时候反倒不是高效的!且该模型完成了生成和消费过程的解耦!减少出现消费者等待生产者生产的情况!==