Linux之生产消费者模型

时间:2024-03-10 09:34:50

(。・∀・)ノ゙嗨!你好这里是ky233的主页:这里是ky233的主页,欢迎光临~https://blog.csdn.net/ky233?type=blog

点个关注不迷路⌯'▾'⌯

我们在条件满足的时候,唤醒指定的线程,我怎么知道条件是否满足呢?

一、概念

1、例子说明,生产消费的基本组成概念

我们在超市买东西,都知道超市的东西从供货商里来,而我们是消费者,超市是交易场所!

我们为什么不跳过超市直接在供货商里买呢?

这是因为供货商不卖哈哈哈,那为什么我们可以在超市买呢?因为超市把所有的消费者集结在了一起,集中在生产者里面批发,这里面的意义在于提高效率!、

本质上超市就是一个商品的缓冲区

  • 有三种关系:生产者与生产者,消费者与消费者,生产者与消费者
  • 有两种角色:生产者与消费者
  • 有一种场所:超市
  • 我们只有处理好上面的三种规则才能避免混乱

生产者与生产者之间是竞争关系,说白了就是我要这个资源,我不想给你,也是互斥关系

消费者与消费者之间是竞争关系,这个本质上也是竞争和互斥关系,尤其是在资源有限的情况下

生产者与消费者的关系是互斥或者同步的关系!东西没了让生产,消费者等待,东西多了则反之!

所以生产消费模型要遵守以上原则!

2、用基本工程师思维,重新理解生产消费

生产者和消费者对应的就是我们的线程来承担,也就是给线程进行角色化,而交易场所呢?通常是某种数据结构表示的缓冲区!

也就是说我们的一部分线程生产对应的数据,另一部分线程消费对应的数据做处理,缓冲区存放这些数据!

那么超市里有没有东西谁最清楚呢?答案是消费者最清楚,

所以条件满足时,我们在唤醒指定的线程,我们怎么直到条件是否满足呢?这是因为我们的生产者生产完成之后就可以通知消费者来消费,消费者把数据拿走,也会通知我们的生产者

3.补充点

1.如果只有一个消费者和一个生产者,那是不是只需要维护生产和数据的安全和同步这样的策略呢?是不是就不要维护生产者和生产者,消费者与消费者之间的互斥与同步了!

答案是:是的!

2.生产和消费的过程是不是把数据放到仓库当中,另一个消费者把他拿走呢?

是的,但不仅仅如此

生产者生产的数据是从哪里来的呢?消费者如何使用发送过来的数据呢?

这两点现在我们都不清楚,但是我们知道要完成就需要花时间!!

二、基于BlockingQueue的生产者消费者模型

BlockingQueue 在多线程编程中阻塞队列(Blocking Queue)是一种常 用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会 被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取 出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)

生产者消费模型主要的节省效率是在我们的生产者和消费者之间的并发动,因为处理数据和生产数据都需要花费时间!在生产的时候我们消费者可以继续除非处理我们拿到的数据而不用一直拿数据,这样就形成了并发!并发是指生产者与生产者,消费者与消费者之间的并发!!

1.BlockQueue.hpp 

#pragma once

#include <iostream>
#include <queue>
#include <mutex>
#include <pthread.h>
#include <unistd.h>
#include "lockGuard.hpp"

const int gDefaultCap = 5;

using namespace std;
template <class T>

class BlockQueue
{
private:
    bool isQueueEmpty()
    {
        return bq_.size() == 0;
    }

    bool isQueueFull()
    {
        return bq_.size() == capacity_;
    }

public:
    BlockQueue(int capacity = gDefaultCap) : capacity_(capacity)
    {
        pthread_mutex_init(&mtx_, nullptr);
        pthread_cond_init(&Empty_, nullptr);
        pthread_cond_init(&Full_, nullptr);
    }

    void push(const T &in)
    {
        // pthread_mutex_lock(&mtx_);
        // // 1.先检测当前的临界资源是否为满足访问的条件
        // // 在临界区中进行等待,所以也要释放锁,wait第二个参数传入的锁会被自动解锁
        // // 被唤醒时也会在临界资源里唤醒,从哪里阻塞也就会在哪里唤醒,并且也会自动的加锁
        // //可能会存在伪唤醒状态所以要用while再次判断下
        // while (isQueueFull())
        // {
        //     pthread_cond_wait(&Full_, &mtx_);
        // }
        // //走到这里就可以确定确实是就绪的
        // // 访问临界资源
        // bq_.push(in);
        // // 生产成功唤醒消费者

        // pthread_cond_signal(&Empty_);

        // pthread_mutex_unlock(&mtx_);

        lockGuard lockguard(&mtx_);
        while (isQueueFull())
        {
            pthread_cond_wait(&Full_, &mtx_);
        }
        // 访问临界资源
        bq_.push(in);
        // 生产成功唤醒消费者

        pthread_cond_signal(&Empty_);
        // 会自动调用析构,等价上面的写法!
    }

    void pop(T *out)
    {
        lockGuard lockguard(&mtx_);

        // pthread_mutex_lock(&mtx_);
        while (isQueueEmpty())
        {
            pthread_cond_wait(&Empty_, &mtx_);
        }
        *out = bq_.front();
        bq_.pop();

        // pthread_mutex_unlock(&mtx_);
        //  拿走之后唤醒生产者
        pthread_cond_signal(&Full_);
    }

    ~BlockQueue()
    {
        pthread_mutex_destroy(&mtx_);
        pthread_cond_destroy(&Empty_);
        pthread_cond_destroy(&Full_);
    }

private:
    queue<T> bq_;
    int capacity_;         // 容量上限
    pthread_mutex_t mtx_;  // 通过互斥锁保证线程安全
    pthread_cond_t Empty_; // 来表述阻塞队列是否空的条件
    pthread_cond_t Full_;  // 来表述阻塞队列是否满了的条件
};

2.ConProd.cc

#include "BlockQueue.hpp"
#include "Task.hpp"
#include <ctime>

int myadd(int x, int y)
{
    return x + y;
}
void *consumer(void *args)
{
    BlockQueue<Task> *bqueue = (BlockQueue<Task> *)args;
    while (1)
    {
        // 获取任务
        Task t;
        bqueue->pop(&t);
        // 完成任务
        cout << pthread_self()<<" 消费者" << t.x_ << "+" << t.y_ << "=" << t() << endl;
        sleep(1);
    }
    return nullptr;
}

void *productor(void *args)
{
    BlockQueue<Task> *bqueue = (BlockQueue<Task> *)args;

    while (1)
    {
        // 制作任务
        int x = rand()%10 + 1;
        usleep(rand()%1000);
        int y = rand()%5 + 1;
        // int x,y;
        // cout<<"请输入x:";
        // cin>>x;
        // cout<<"请输入y";
        // cin>>y;
        Task t(x, y, myadd);
        // 生产任务
        bqueue->push(t);
        // 输出信息
        cout << pthread_self()<<" 生产者" << t.x_ << "+" << t.y_ << "=?" << endl;

    }
    return nullptr;
}

int main()
{
    srand((uint64_t)time(nullptr) ^ getpid() ^ 0x25415);
    BlockQueue<Task> *bqueue = new BlockQueue<Task>();

    pthread_t c[2], p[2];
    pthread_create(c, nullptr, consumer, bqueue);
    pthread_create(c+1, nullptr, consumer, bqueue);
    pthread_create(p, nullptr, consumer, bqueue);
    pthread_create(p+1, nullptr, productor, bqueue);

    pthread_join(c[0], nullptr);
    pthread_join(c[1], nullptr);
    pthread_join(p[0], nullptr);
    pthread_join(p[1], nullptr);
    delete bqueue;
    return 0;
}

3.lockGuard.hpp

#pragma once

#include <iostream>
#include <pthread.h>
using namespace std;

class Mutex
{
public:
    Mutex(pthread_mutex_t *mtx) : pmtx_(mtx)
    {
    }

    void lock()
    {
        cout << "加锁" << endl;
        pthread_mutex_lock(pmtx_);
    }

    void unlock()
    {
        cout << "解锁" << endl;
        pthread_mutex_unlock(pmtx_);
    }

    ~Mutex()
    {
    }

private:
    pthread_mutex_t *pmtx_;
};

// RAII的枷锁风格
class lockGuard
{
public:
    lockGuard(pthread_mutex_t *mtx) : mtx_(mtx)
    {
        mtx_.lock();
    }

    ~lockGuard()
    {
        mtx_.unlock();
    }

private:
    Mutex mtx_;
};

 4.Task.hpp

#pragma once

#include <iostream>
#include <functional>
using namespace std;
    typedef function<int(int, int)> func_t;

class Task
{

public:
Task()
{}
Task(int x,int y ,func_t func):x_(x),y_(y),func_(func)
{}
    int operator()()
    {
        return func_(x_, y_);
    }

public:
    int x_;
    int y_;
    func_t func_;
};