【linux】基于阻塞队列的生产者消费者模型(条件变量)

时间:2021-06-01 01:15:48

一、引入

举个例子,比方说我们想买方便面,假如现在没有超市,我们只能去供货商那里买东西,我们要一件供货商生产一件。但是对于供货商来说成本太大了。所以现在有了超市这个媒介。
【linux】基于阻塞队列的生产者消费者模型(条件变量)

消费者和生产者通过超市间接进行交易。这样当生产者不需要的时候供货商可能还在生产,当供货商不生产的时候消费者还能买到。这样就把消费和消费进行解耦。我们把超市叫做缓冲区

那么什么叫做解耦呢?我们举个反例:

当我们main调用函数的时候,main函数会生产数据交给函数,函数可以把数据暂时保存,而函数也消费了数据,符合生产者消费者模型。
但是当我们开始调用的时候main函数就什么也不干,在那里阻塞等待函数的返回,我们把main函数和调用函数之间的关系称为强耦合关系

二、生产者消费者模型

首先要知道生产者消费者都要看到“超市”,所以“超市”是一块共享资源。而既然是共享资源就会涉及到多线程访问,那么这块共享资源就要被保护起来。

2.1 三者关系

生产者和生产者之间是互斥关系
消费者和消费者之间是互斥关系
生产者和消费者之间是互斥+同步

这里的互斥是为了保证共享资源的安全性,同步是为了提高访问效率。

2.2 生产者消费者模型基本原则

我们只需要记住“321”原则:
3: 三种关系。
2: 两种角色,生产者线程、消费者线程。
1: 一个交易场所(特定结构的缓冲区)。

2.3 生产者消费者模型的好处

1️⃣ 把生产线程和消费线程进行解耦。
2️⃣ 支持消费和生产一段时间的忙闲不均问题。
3️⃣ 让消费者专注消费,生产者专注生产,提高效率。

但是这里不一定能保证高效。因为可能超市满了,那么生产者只能等待了,或者超市为空,消费者进行等待。

三、基于阻塞队列的生产者消费者模型

3.1 原理

【linux】基于阻塞队列的生产者消费者模型(条件变量)
当队列为空的时候,从队列中获取元素的线程将被阻塞,直到队列被放入元素。
当队列已满的时候,往队列放入元素的线程将被阻塞,直到有元素被取出。

3.2 代码实现

// BlockQueue.hpp
#pragma once

#include <iostream>
#include <queue>
#include <pthread.h>
#include <ctime>
#include <sys/types.h>
#include <unistd.h>

template <class T>
class BlockQueue
{
public:
    BlockQueue(const int& maxcap = 5)
        : _max(maxcap)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_pcond, nullptr);
        pthread_cond_init(&_ccond, nullptr);
    }

    void push(const T &in)
    {
        // 保护安全
        pthread_mutex_lock(&_mutex);

        if(_q.size() == _max)
        {
            // 如果满了就等待
            pthread_cond_wait(&_pcond, &_mutex);
        }
        _q.push(in);
        // 有数据了,唤醒消费者线程
        pthread_cond_signal(&_ccond);

        pthread_mutex_unlock(&_mutex);
    }

    void pop(T *out)
    {
        pthread_mutex_lock(&_mutex);
        
        if(_q.empty())
        {
            // 如果空了就等待
            pthread_cond_wait(&_ccond, &_mutex);
        }
        *out = _q.front();
        _q.pop();
        // 有空位置,唤醒生产者线程
        pthread_cond_signal(&_pcond);
        pthread_mutex_unlock(&_mutex);
    }

    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_pcond);
        pthread_cond_destroy(&_ccond);
    }
private:
    std::queue<T> _q;
    int _max;// 元素上限
    pthread_mutex_t _mutex;// 保护共享资源
    pthread_cond_t _pcond;// 生产者条件变量
    pthread_cond_t _ccond;// 消费者条件变量
};

// Main.cc
#include "BlockQueue.hpp"

using std::cout;
using std::endl;

// 消费者
void* consumer(void *_pbq)
{
    BlockQueue<int> *pbq = static_cast<BlockQueue<int> *>(_pbq);
    while(true)
    {
        int val;
        pbq->pop(&val);
        cout << "消费数据: " << val << endl;
    }
}
// 生产者
void* productor(void *_pbq)
{
    BlockQueue<int> *pbq = static_cast<BlockQueue<int> *>(_pbq);
    while(true)
    {
        int val = rand() % 100 + 1;
        pbq->push(val);
        cout << "生产数据: " << val << endl;
    }
}

int main()
{
    srand((unsigned long)time(nullptr) ^ (unsigned long)time(nullptr));
    BlockQueue<int> *pbq = new BlockQueue<int>();
    pthread_t con, pro;
    pthread_create(&con, nullptr, consumer, pbq);
    pthread_create(&pro, nullptr, productor, pbq);

    pthread_join(con, nullptr);
    pthread_join(pro, nullptr);
    return 0;
}

当生产者生产的慢的时候,因为消费者一直在读取数据,会出现生产一个消费一个的情况。
【linux】基于阻塞队列的生产者消费者模型(条件变量)
当消费者慢的时候,生产者会先把阻塞队列填满,生产者开始等待,当消费者开始消费的时候,就会出现消费一个,生产一个的情况,消费者按顺序读取阻塞队列中的值。
【linux】基于阻塞队列的生产者消费者模型(条件变量)

3.3 pthread_cond_wait的第二个参数

【linux】基于阻塞队列的生产者消费者模型(条件变量)
这里的第二个参数必须是当前正在使用的互斥锁
因为我们满了就会进行等待,如果像之前一样把锁拿走,那么其他线程就无法访问共享资源,也就是消费者无法拿到数据。
pthread_cond_wait调用的时候会自动把锁释放,并把自己挂起。
而被唤醒返回的时候会自动的重新获取传入的锁

3.4 pthread_cond_wait伪唤醒

还有一种情况,我们只有一个消费线程,但有十个生产线程,而我们可能使用的是pthread_cond_broadcast唤醒了一批线程。
【linux】基于阻塞队列的生产者消费者模型(条件变量)
所以这十个线程被唤醒了后就要直接全部push数据,这样就出现了问题
所以这里不应该用if,应该用while,当被唤醒以后继续进行判断是否为满,消费者线程同理。
【linux】基于阻塞队列的生产者消费者模型(条件变量)

四、总结

【linux】基于阻塞队列的生产者消费者模型(条件变量)
我们不仅可以往阻塞队列中放入数据,也可以放入任务(函数)。我们直接把任务传递给阻塞队列,然后就不用管了,让消费者拿到任务进行处理。

对于生产消费者模型:
当生产者在准备数据来生产任务的时候,这个时间段是耗时的,那么消费者线程就可以获得抢夺CPU资源去执行消费者的事情;同时当消费者在拿到任务去处理任务时候,也是耗时的,那么生产者就可以在这段时间抢夺CPU资源去生产任务,大大提升了效率