文章目录
一、引入
举个例子,比方说我们想买方便面,假如现在没有超市,我们只能去供货商那里买东西,我们要一件供货商生产一件。但是对于供货商来说成本太大了。所以现在有了超市这个媒介。
消费者和生产者通过超市间接进行交易。这样当生产者不需要的时候供货商可能还在生产,当供货商不生产的时候消费者还能买到。这样就把消费和消费进行解耦。我们把超市叫做缓冲区。
那么什么叫做解耦呢?我们举个反例:
当我们main调用函数的时候,main函数会生产数据交给函数,函数可以把数据暂时保存,而函数也消费了数据,符合生产者消费者模型。
但是当我们开始调用的时候main函数就什么也不干,在那里阻塞等待函数的返回,我们把main函数和调用函数之间的关系称为强耦合关系。
二、生产者消费者模型
首先要知道生产者消费者都要看到“超市”,所以“超市”是一块共享资源。而既然是共享资源就会涉及到多线程访问,那么这块共享资源就要被保护起来。
2.1 三者关系
生产者和生产者之间是互斥关系。
消费者和消费者之间是互斥关系。
生产者和消费者之间是互斥+同步。
这里的互斥是为了保证共享资源的安全性,同步是为了提高访问效率。
2.2 生产者消费者模型基本原则
我们只需要记住“321”原则:
3: 三种关系。
2: 两种角色,生产者线程、消费者线程。
1: 一个交易场所(特定结构的缓冲区)。
2.3 生产者消费者模型的好处
1️⃣ 把生产线程和消费线程进行解耦。
2️⃣ 支持消费和生产一段时间的忙闲不均问题。
3️⃣ 让消费者专注消费,生产者专注生产,提高效率。
但是这里不一定能保证高效。因为可能超市满了,那么生产者只能等待了,或者超市为空,消费者进行等待。
三、基于阻塞队列的生产者消费者模型
3.1 原理
当队列为空的时候,从队列中获取元素的线程将被阻塞,直到队列被放入元素。
当队列已满的时候,往队列放入元素的线程将被阻塞,直到有元素被取出。
3.2 代码实现
// BlockQueue.hpp
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
#include <ctime>
#include <sys/types.h>
#include <unistd.h>
template <class T>
class BlockQueue
{
public:
BlockQueue(const int& maxcap = 5)
: _max(maxcap)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_pcond, nullptr);
pthread_cond_init(&_ccond, nullptr);
}
void push(const T &in)
{
// 保护安全
pthread_mutex_lock(&_mutex);
if(_q.size() == _max)
{
// 如果满了就等待
pthread_cond_wait(&_pcond, &_mutex);
}
_q.push(in);
// 有数据了,唤醒消费者线程
pthread_cond_signal(&_ccond);
pthread_mutex_unlock(&_mutex);
}
void pop(T *out)
{
pthread_mutex_lock(&_mutex);
if(_q.empty())
{
// 如果空了就等待
pthread_cond_wait(&_ccond, &_mutex);
}
*out = _q.front();
_q.pop();
// 有空位置,唤醒生产者线程
pthread_cond_signal(&_pcond);
pthread_mutex_unlock(&_mutex);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_pcond);
pthread_cond_destroy(&_ccond);
}
private:
std::queue<T> _q;
int _max;// 元素上限
pthread_mutex_t _mutex;// 保护共享资源
pthread_cond_t _pcond;// 生产者条件变量
pthread_cond_t _ccond;// 消费者条件变量
};
// Main.cc
#include "BlockQueue.hpp"
using std::cout;
using std::endl;
// 消费者
void* consumer(void *_pbq)
{
BlockQueue<int> *pbq = static_cast<BlockQueue<int> *>(_pbq);
while(true)
{
int val;
pbq->pop(&val);
cout << "消费数据: " << val << endl;
}
}
// 生产者
void* productor(void *_pbq)
{
BlockQueue<int> *pbq = static_cast<BlockQueue<int> *>(_pbq);
while(true)
{
int val = rand() % 100 + 1;
pbq->push(val);
cout << "生产数据: " << val << endl;
}
}
int main()
{
srand((unsigned long)time(nullptr) ^ (unsigned long)time(nullptr));
BlockQueue<int> *pbq = new BlockQueue<int>();
pthread_t con, pro;
pthread_create(&con, nullptr, consumer, pbq);
pthread_create(&pro, nullptr, productor, pbq);
pthread_join(con, nullptr);
pthread_join(pro, nullptr);
return 0;
}
当生产者生产的慢的时候,因为消费者一直在读取数据,会出现生产一个消费一个的情况。
当消费者慢的时候,生产者会先把阻塞队列填满,生产者开始等待,当消费者开始消费的时候,就会出现消费一个,生产一个的情况,消费者按顺序读取阻塞队列中的值。
3.3 pthread_cond_wait的第二个参数
这里的第二个参数必须是当前正在使用的互斥锁。
因为我们满了就会进行等待,如果像之前一样把锁拿走,那么其他线程就无法访问共享资源,也就是消费者无法拿到数据。
在pthread_cond_wait
调用的时候会自动把锁释放,并把自己挂起。
而被唤醒返回的时候会自动的重新获取传入的锁。
3.4 pthread_cond_wait伪唤醒
还有一种情况,我们只有一个消费线程,但有十个生产线程,而我们可能使用的是pthread_cond_broadcast
唤醒了一批线程。
所以这十个线程被唤醒了后就要直接全部push数据,这样就出现了问题。
所以这里不应该用if,应该用while,当被唤醒以后继续进行判断是否为满,消费者线程同理。
四、总结
我们不仅可以往阻塞队列中放入数据,也可以放入任务(函数)。我们直接把任务传递给阻塞队列,然后就不用管了,让消费者拿到任务进行处理。
对于生产消费者模型:
当生产者在准备数据来生产任务的时候,这个时间段是耗时的,那么消费者线程就可以获得抢夺CPU资源去执行消费者的事情;同时当消费者在拿到任务去处理任务时候,也是耗时的,那么生产者就可以在这段时间抢夺CPU资源去生产任务,大大提升了效率。