基于信号量的环形队列的生成消费模型(万字长文详解)

时间:2024-01-23 21:03:37

linux线程之信号量

POSIX信号量

阻塞队列的缺陷

==这是一个我们自己的实现阻塞队列!==

class BlockQueue
{
public:
       BlockQueue(const int &maxcap = gmaxcap)
           : maxcap_(maxcap)
           {
               pthread_mutex_init(&mutex_,nullptr);
               pthread_cond_init(&pcond_,nullptr);
               pthread_cond_init(&ccond_,nullptr);
           }
       void push(const T& in)//生产
       {
           pthread_mutex_lock(&mutex_);
           while(is_full())//判断是不是满的
           {
               pthread_cond_wait(&pcond_,&mutex_);
           }
           q_.push(in);
           pthread_cond_signal(&ccond_);
           pthread_mutex_unlock(&mutex_);
       }
       void pop(T* out)//消费
       {
           pthread_mutex_lock(&mutex_);
           while(is_empty())//判断是不是空的
           {
               pthread_cond_wait(&ccond_,&mutex_);
           }
           *out = q_.front();
           q_.pop();
           pthread_cond_signal(&pcond_);
           pthread_mutex_unlock(&mutex_);
       }
       ~BlockQueue()
       {
           pthread_mutex_destroy(&mutex_);
           pthread_cond_destroy(&pcond_);
           pthread_cond_destroy(&ccond_);
       }
private:
       bool is_empty()
       {
           return q_.empty();
       }
       bool is_full()
       {
           return q_.size() == maxcap_;
       }
private:
       std::queue<T> q_;
       int maxcap_;
       pthread_mutex_t mutex_;
       pthread_cond_t pcond_;
       pthread_cond_t ccond_;
};

==我们自己实现的这个阻塞队列有什么“不足”的地方呢?==

一个线程在,操作临界资源的时候,临界资源必须是条件的!——所以我们要在阻塞队列里操作之前进行判断!不满足则挂起到条件变量下面!

==可是,公共资源是否满足生成或者消费条件!我们是无法直接得知的——即我们不能事前得知(访问临界资源之前)==我们其实在进入锁之前!我们是不能得知这个队列能不能进行生产!——所以我们在阻塞队列里面只能==先加锁,再检测,然后操作,最后解锁!==

==而我们进行检测这个行为!本质也是在访问临界资源!==

==我们在操作临界资源的时候有可能不就绪!但是我们无法提前得知!所以只能先加锁!再检测!根据检测结果在决定下一步怎么走!==

==所以我们能不能有手段能提前得知呢?——可以!就是使用信号量!==

什么是信号量

POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步,而system V版本是用于线程互斥

了解信号量之前我们要明白

只要我们对资源整体进行加锁!就默认了,我们对这个资源整体进行使用!即使我们真实情况下只使用了一部分!

void push(const T& in)//生产
{
    pthread_mutex_lock(&mutex_);
    while(is_full())
    {
        pthread_cond_wait(&pcond_,&mutex_);
    }
    q_.push(in);//我们就对这个队列进行了整体的加锁!
    //任意线程访问这个队列的时候,都会将队列当成一个整体!
    pthread_cond_signal(&ccond_);
    pthread_mutex_unlock(&mutex_);
}

==实际情况可能存在——一份公共资源!整体是共有的!但是允许同时访问不同的区域!==

竞争关系是存在于在访问同一份的公共资源同一块区域里面,如果不是在同一块区域!那么即使是在同一份资源里面!其实也是不会产生竞争关系的!

==上面的将整个资源加锁!就是意味着将整个公共资源当做是一块大区域!==

那么什么是信号量呢?

==信号量的本质就是一个计数器!==——衡量临界资源中资源数量多少的计数器!可以用来保证!访问这个公共资源的线程,不会超过这个资源的数量!

==申请信号量的本质就是对临界资源中特定小块资源的预定机制!==——只要拥有信号量!就在未来一定能够拥有临界资源的一部分!

通过申请信号量去预定一部分的公共资源!只有持有信号量才能去访问这个公共资源!然后通过程序员编码的方式来保证不同线程可以并发的访问公共资源的不同区域!

==因为访问临界资源之前,要先申请信号量!而信号量的本质是一个计数器!——那么我们就可以在真正的访问临界资源之前,我们就可以知道临界资源的使用情况!(这就是为什么要有信号量)==

这样子就可以解决阻塞队列中要先访问公共资源,来知道公共资源状态的不足!任何线程都不用接触到临界资源!

只要申请信号量成功,就说明一定存在我们需要的资源!

只要申请信号量失败,就说明条件不就绪!只能等待!

==就不需要在临界资源里面进行判断了!直接去申请信号量!不管是申请成功还是不成功其实都没有访问对应的临界资源!==

==通过信号量提前将资源的数目暴露在外部!让我们通过申请信号量就能知道资源有没有就绪==

==但是其实上面我们的阻塞队列是不适合使用信号量的!因为阻塞队列里面虽然可以放很多节点,但是其实真正用的只有头和尾!我们只是说阻塞队列的不足而已!==

线程要访问临界资源中的某个区域——要申请信号量——所以所有的线程都必须先看到信号量!——所以信号量本身也必须是==公共资源==

公共资源的是一个计数器!那么就要进行递减和递增!

//伪代码
int sem = 10;//假设这就是一个计数器——信号量
sem--;//意味着可用资源变少了!说明有人拿走资源了!——这个操作就是申请资源!
sem++;//说明可用资源变多了!有线程释放资源了!——这个操作就归还资源!

==因为信号量本身是一个公共资源的!但是++/--这个操作本身并不是安全的!所以也必须保证操作的原子性==

PV操作
sem_t sem;//所以信号量不是简单的int类型!而是sem_t类型
sem--;//这种保证原子性的--的操作我们称之为P操作!
sem++;//这种保证原子性的++的操作我们称之为V操作!

==所以信号量的核心操作就是——PV原语==

信号量的基本使用接口

信号量头文件

include <semaphore.h>//信号量头文件!

初始化信号量

int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
sem_t 是信号量类型,由pthread库替我们进行维护!
pshared:表示是否共享,0表示线程间不共享,非零表示进程间共享——一般是设为0
value:信号量计数器的初始值——这个值的多少取决于这个临界资源的值是多少!
//返回值是成功返回0 失败返回 -1 并设置错误码!

销毁信号量

int sem_destroy(sem_t *sem);
//返回值是成功返回0 失败返回 -1 并设置错误码!

等待信号量——P操作,相当于对信号量计数器--

功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); //P()——p操作
//这一旦等待成功就会立刻返回!让我们的代码继续运行!
//如果申请失败!就会一直阻塞,直到信号量大于0,线程才会被唤醒! 
//返回值是成功返回0 失败返回 -1 并设置错误码!

发布信号量——v操作 ,相当于对信号量计数器++

功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V()——v操作
//返回值是成功返回0 失败返回 -1 并设置错误码!

基于环形队列的生产消费模型

==阻塞队列就是互斥锁的应用场景!==

==那么信号量有什么对应的应用场景呢?——环形队列==

那么什么是环形队列?

image-20230821213913029

==那么我们该如何使用这个环形队列!来实现生产消费模型呢?==

image-20230821221300768

==我们举个例子来理解基于环形队列的生产消费模型!==

image-20230821223557495

当盘子为空的时候!都站在同一个位置,A和B谁先行动呢?——肯定是A(生产者)

因为B没有东西可以拿,所以就不能行动!

当盘子全满的时候!又再次站在同一个位置!A和B谁先行动呢?——是B(消费者)

A没有地方可以放!——在计算机里面是因为数据存在覆盖问题所以不能放!

==除了上面两种情况!A和B都是可以各自行动的!==

==所以在环形队列中大部分情况下——单生产和单消费是可以并发执行的!只有在满或者空的时候才会有互斥和同步问题!==

我们对应到刚刚的环形队列中!A就是生产者线程,B就是消费者线程!

==那么为了完成环形队列的cp问题(create and product),我们的核心工作是什么呢?==

就是我们上面提到的三个规则!

  1. 消费者不能超过生产者
  2. 生产者不能把消费者套一个圈
  3. 解决在满和空的情况下,生产者和消费者的互斥与同步问题

==我们该如何保证这个三个性质呢?==

信号量是用来衡量临界资源中数量的多少——那么数量是指什么呢?

==对于生产者而言,看中的就是队列中的剩余空间!==

==对于消费者而言,看中的就是放入队列中的数据!==

所以为了更好的去衡量生产者和消费者的资源——我们该怎么做呢?

==给空间资源定义一个信号量!给数据资源也定义一个信号量!==

只有申请空间资源的计数器成功!那么就是说明还有队列里面有剩余空间!

只有申请数据资源的计数器成功!那么就说明队列里面还有数据!

==有了这两个信号量,我们皆可以维护这个三个规则!==

能不能只使用一个信号量来解决呢?——可以是可以,但是很麻烦

信号量不是计数器吗?我们可以用空间总量减去数据资源的计数器不就是空间剩下的资源么?——但是==首先这个计算过程就不是原子的!而且我们该如何去获取信号量计数器的值?拿总数减去信号量的值的过程是如何保证安全的?这样就很麻烦!==

==所以还不如直接使用两个信号量!==

image-20230822102113738

==当开始运行的时候,无论是生产者先运行也好,还是消费者先运行也好!——信号量可以保证!如果消费者没有资源!那么就会被阻塞!那么此时生产者就可以先去生成!(保证了规则一,消费者无法超过生产者)==

==反之也是一样!生产者如果生产满了!那么就会去阻塞,让消费者先去消费(保证了规则2,生产者无法套圈消费者)==

==**这样无论是空和满,就都可以保证只有一个线程在运行!(就保证了规则3,解决了在空或者满的情况下的同步与互斥问题!)**那么是非空非满的情况下就是两个线程并行运行!==

那个线程先运行是无关紧要,我们可以知道是那个线程会先挂起被阻塞!

举个例子:生产者生产了5个,然后就sleep了!

消费者于是不断地进行消费!consumer_sem不断的进行P操作,计数器不断的--,最后到0的时候!再进行P操作就会直接挂起!

这时候只有等待生产者进行生产!消费者才能继续运行了!——这就完成了一次互斥!

==对于生产和消费的位置我们要想清楚!==

位置其实本质就是队列中的下标!

一定是两个下标!——一个生产者一个是消费者

只有为空或者为满,下标才会相同!——此时一定会有一个线程拿不到资源!

==在编码中我们就可以通过下标这种方式,我们就能让生产和消费来访问不同的位置!也可以通过下标来让线程去指派要访问的资源!==

基于环形队列生产消费模型的实现

==这是单生产者,单消费者版本的环形队列!!==

//RingQueue.hpp
#pragma once

#include<iostream>
#include<vector>
#include<semaphore.h>//信号量的头文件!
#include<cassert>
static int gcap = 5;
template<class T>
class RingQueue
{
private:
    void P(sem_t& sem)
    {
        //等待信号量,等待成功,则就返回线程继续运行!
        //等待失败则就直接将自己挂起
        int n = sem_wait(&sem);
        assert(n == 0);
        (void)n;
    }
    void V(sem_t& sem)
    {
        //发布信号量!
        int n = sem_post(&sem);
        assert(n == 0);
        (void)n;
    }
public:
    RingQueue(const int &cap = gcap)
        : queue_(cap), cap_(cap)
    {
        int n = sem_init(&SpaceSem_,0,cap_);//0表示线程不共享!
        //对于空间资源一开始的值就是数组空间的大小!
        assert(n == 0);
        n = sem_init(&DataSem_,0,0);//刚开始乜没有数据所以信号量的值就是0
        assert(n == 0);

        productorStep_ = consumerStep_ = 0; //开始的下标都是0

    }
    //生产者调用push函数!
    void Push(const T &in)//向环形队列里面插入数据!
    {
        P(SpaceSem_);//生产者首先就是进行操作!申请空间资源的信号量!
        //申请信号量成功意味着一定可以进行正常的生产!
        queue_[productorStep_++] = in;
        //向可以生产的位置进行生产数据!
        productorStep_ %= cap_; 
        //等到最后的时候会自动回到最开始!
        V(DataSem_);//生产出来数据并放入空间之后!说明数据资源多了一份!
        //那么就要对数据资源的信号量进行V操作!
    }
    void Pop( T* out)//像环形队列里面拿数据!
    {
        P(DataSem_);//访问环形队列之前就要去申请信号量!
        //数据信号量如果申请成功说明里面有数据!
        *out = queue_[consumerStep_++];
        //从队列可以消费的地方开始进行消费! 
        consumerStep_ %= cap_;
        //到数组最后,越界了最返回最开始
        V(SpaceSem_);//已经消费了一个数据!说明此时一个空间已经空出来了!
        //那么我们就要对空间资源信号量进行V操作!
    }
    ~RingQueue()
    {
        //销毁信号量!
        sem_destroy(&SpaceSem_);
        sem_destroy(&DataSem_);
    }
private:
    std::vector<T> queue_;
    int cap_;//队列的容量
    sem_t SpaceSem_;//生产者信号量!——也就是生产者想要的空间资源!
    sem_t DataSem_;//消费者信号量——也就是消费者想要的数据资源!
    //sem_t这个类型是由pthread库进行维护的!
    int productorStep_;//生产者在哪里进行生产!
    int consumerStep_;//消费者在哪里进行消费!
    //生产消费位置的本质就是数组下标!
};
//main.cc主程序
#include"RingQueue.hpp"
#include<pthread.h>
#include<ctime>
#include<cstdlib>
#include<sys/types.h>
#include<unistd.h>
void* ProductorRoutine(void* rq_)
{
    RingQueue<int>* rq = static_cast<RingQueue<int>*>(rq_);
    while(true)
    {
        int data = rand()%100 +1;
        rq->Push(data);
        std::cout << "生产完成!生产的数据是: "<< data << std::endl;
        sleep(1);//让生产者慢一点
        //就会出现生产一个消费一个!
        //因为生产者如果不生产,消费者就无法消费!
        //这就是典型的同步关系!
    }
}
void *ConsumerRoutine(void *rq_)
{
    RingQueue<int>* rq = static_cast<RingQueue<int>*>(rq_);
    while(true)
    {
        int data;
        rq->Pop(&data);
        std::cout << "消费完成!消费的数据是: "<< data << std::endl;
        //sleep(1);
        //让消费者慢一点!就会出现,消费者都是一直消费的是历史的数据
    }
}
int main()
{
    srand((unsigned int)time(nullptr)^getpid()^0x121312312);//生成一个随机数
    RingQueue<int> *rq = new RingQueue<int>();
    pthread_t c,p;
    pthread_create(&c,nullptr,ProductorRoutine,rq);
    pthread_create(&p,nullptr,ConsumerRoutine,rq);

    pthread_join(p,nullptr);
    pthread_join(c,nullptr);
    return 0;
}

生产者慢,消费者快!

image-20230822160425653

就是消费一个生产一个

生成者快,消费者慢

image-20230822161012501

消费者一直在消费历史数据

如果什么都不进行限制

image-20230822161539490

那么有大部分情况都是在并发的运行——只有在为空或者为满的时候,才会有互斥

消费生成任务

//Task.hpp
#pragma once
#include<iostream>
#include<functional>
#include<cstdio>
#include<ctime>
#include<string>
#include<map>
#include<fstream>
class CalTask
{
    using func_t = std::function<int(int,int,const std::string&)>;
public:
    CalTask()
    {}

    CalTask(int x, int y, const std::string& op, func_t func)
        : x_(x), y_(y), op_(op), callback_(func)
        {}

    std::string operator()()
    {
        int result = callback_(x_, y_, op_);
        char buffer[1024];
        snprintf(buffer, sizeof buffer, "%d %s %d = %d", x_, op_.c_str(), y_, result);
        return buffer;
    }

    std::string toTaskString()
    {
        char buffer[1024];
        snprintf(buffer, sizeof buffer, "%d %s %d = ?", x_, op_.c_str(), y_);
        return buffer;
    }
private:
    int x_;
    int y_;
    std::string op_;
    func_t callback_;
};

const std:: string oper = "+-*/%";
int mymath(int x,int y,const std::string& op)
{
    using func_t = std::function<int(int,int)>;
    std::map<std::string,func_t> opfuncmap = 
    {
        {"/",[](int x,int y)
        {
            if(y == 0)
            {
                std::cout << "div zero error!" << std::endl;
                return -1;
            }
            else return x/y;
        }},
        {"%",[](int x,int y)
        {
            if(y == 0)
            {
                std::cout << "mod zero error!" << std::endl;
                return -1;
            }
            else return x%y;
        }},
        {"*",[](int x,int y){return x*y;}},
        {"+",[](int x,int y){return x+y;}},
        {"-",[](int x,int y){return x-y;}}
    };
    return opfuncmap[op](x,y);
}
//main.cc
#include"RingQueue.hpp"
#include"Task.hpp"
#include<pthread.h>
#include<ctime>
#include<cstdlib>
#include<sys/types.h>
#include<unistd.h>
void* ProductorRoutine(void* rq_)
{
    RingQueue<CalTask>* rq = static_cast<RingQueue<CalTask>*>(rq_);
    while(true)
    {
        int x = rand() % 100 +1;
        int y = rand() % 123 +1;
        int operindex = rand()%oper.size();
        std::string op(1,oper[operindex]);
        CalTask t(x,y,op,mymath);
        rq->Push(t);
        std::cout << "生产完成!生产的任务是 :" << t.toTaskString() <<std::endl; 
        sleep(1);
    }
}

void *ConsumerRoutine(void *rq_)
{
    RingQueue<CalTask>* rq = static_cast<RingQueue<CalTask>*>(rq_);
    while(true)
    {
        CalTask t;
        rq->Pop(&t);
        std::cout << "消费任务完成!消费的任务是: "<< t.toTaskString() << " 计算结果为: "<<t()<<std::endl;
    }
}
//我们可以看到其实消费者不在乎任务到底是什么!
//它只要不断的运行就可以!
//这就是底层设计和业务做解耦!
//这个任务可以是任何任务!
//我们未来只要改变任务内容!这个里不进行任何的修改也是可以运行的!
int main()
{
    srand((unsigned int)time(nullptr)^getpid()^0x121312312);//生成一个随机数
    RingQueue<CalTask> *rq = new RingQueue<CalTask>();
    pthread_t c,p;
    pthread_create(&c,nullptr,ProductorRoutine,rq);
    pthread_create(&p,nullptr,ConsumerRoutine,rq);

    pthread_join(p,nullptr);
    pthread_join(c,nullptr);
    return 0;
}

image-20230822170023453

多生产多消费的循环队列

多生产线程和多消费线程如果是在阻塞队列里面!任意一个时刻都是只有一个线程在阻塞队列里面访问临界资源,线程之间都是阻塞关系!资源都是被整体使用的

==在环形队列里面生产者和生产者之间都是互斥关系!消费者和消费者之间也都是互斥关系!——无论有多少个生产者或者多少个消费者,在任意时刻!都是只允许一个生产者和一个消费者先进到我们临界区里面!==

==所以我们要将那个代码改成多生产多消费其实是很简单的!只要保证最终进入临界区是一个生成一个消费即可!——怎么做呢?加上两个互斥锁皆可以了!==

#pragma once

#include<iostream>
#include<vector>
#include<cassert>
#include<semaphore.h>//信号量的头文件!
#include<pthread.h>
static int gcap = 5;
template<class T>
class RingQueue
{
private:
    void P(sem_t& sem)
    {
        int n = sem_wait(&sem);
        assert(n == 0);
        (void)n;
    }
    void V(sem_t& sem)
    {
        int n = sem_post(&sem);
        assert(n == 0);
        (void)n;
    }
public:
    RingQueue(const int &cap = gcap)
        : queue_(cap), cap_(cap)
    {
        int n = sem_init(&SpaceSem_,0,cap_);
        assert(n == 0);
        n = sem_init(&DataSem_,0,0);
        assert(n == 0);

        productorStep_ = consumerStep_ = 0; 

        //将互斥锁进行初始化!
        pthread_mutex_init(&pmutex_,nullptr);
        pthread_mutex_init(&cmutex_,nullptr);
    }
    
    void Push(const T &in)
    {
        pthread_mutex_lock(&pmutex_);//先对生产者的互斥锁进行加锁!

        P(SpaceSem_);//生产者首先就是进行操作!申请空间资源的信号量!
        //申请信号量成功意味着一定可以进行正常的生产!

        //如果此时持有锁的时候,因为没有信号量而被挂起有问题么?
        //没有!因为这个线程本身就是所有生产者线程中的竞争胜利者!
        //这个生产者被挂起之后!等到有信号量的时候!再被唤醒
        //然后继续向下执行!这个是没有问题的!

        queue_[productorStep_++] = in;
        productorStep_ %= cap_; 
        V(DataSem_);
        
        pthread_mutex_unlock(&pmutex_);//对生产者互斥锁解锁!
    }
    void Pop( T* out)//像环形队列里面拿数据!
    {
        pthread_mutex_lock(&cmutex_);//先对消费者的互斥锁进行加锁!

        P(DataSem_);
        *out = queue_[consumerStep_++];
        consumerStep_ %= cap_;
        V(SpaceSem_);

        pthread_mutex_unlock(&cmutex_);//对消费者互斥锁解锁!
    }
    ~RingQueue()
    {
        sem_destroy(&SpaceSem_);
        sem_destroy(&DataSem_);
        //销毁互斥锁!
        pthread_mutex_destroy(&pmutex_);
        pthread_mutex_destroy(&cmutex_);
    }
private:
    std::vector<T> queue_;
    int cap_;
    sem_t SpaceSem_;
    sem_t DataSem_;
    int productorStep_;
    int consumerStep_;

    pthread_mutex_t pmutex_;//生产者的互斥锁!
    pthread_mutex_t cmutex_;//消费者的互斥锁!
    // 要有两个互斥锁!一个管理生产者,一个管理消费者!
    //先让生产者之间进行竞争,然后再让消费者之间进行竞争!
    //最后决胜出一个生产者和一个消费者!
    //让最后的这个生产者和消费者进入临界区!
};
#include"RingQueue.hpp"
#include"Task.hpp"
#include<pthread.h>
#include<ctime>
#include<cstdlib>
#include<sys/types.h>
#include<unistd.h>

std::string SelfName()
{
    char name[128];
    snprintf(name,sizeof name,"thread[0x%x]",pthread_self());
    return name;
}
void* ProductorRoutine(void* rq_)
{
    RingQueue<CalTask>* rq = static_cast<RingQueue<CalTask>*>(rq_);
    while(true)
    {
        int x = rand() % 100 +1;
        int y = rand() % 123 +1;
        int operindex = rand()%oper.size();
        std::string op(1,oper[operindex]);
        CalTask t(x,y,op,mymath);
        rq->Push(t);
        std::cout << SelfName()<< "  生产完成!生产的任务是 :" << t.toTaskString() <<std::endl; 
        sleep(1);
    }
}
void *ConsumerRoutine(void *rq_)
{
    RingQueue<CalTask>* rq = static_cast<RingQueue<CalTask>*>(rq_);
    while(true)
    {
        CalTask t;
        rq->Pop(&t);
        std::cout << SelfName()<< " 消费任务完成!消费的任务是: "<< t.toTaskString() << " 计算结果为: "<<t()<<std::endl;
    }
}

int main()
{
    srand((unsigned int)time(nullptr)^getpid()^0x121312312);//生成一个随机数
    RingQueue<CalTask> *rq = new RingQueue<CalTask>();
    pthread_t p[4],c[8];

    for(int i = 0;i<4;++i) pthread_create(p+i,nullptr,ConsumerRoutine,rq);
    for(int i = 0;i<8;++i) pthread_create(c+i,nullptr,ProductorRoutine,rq);

    for(int i =0;i< 4;++i) pthread_join(p[i],nullptr);
    for(int i = 0;i<8;++i) pthread_join(c[i],nullptr);

    delete rq;
    return 0;
}

image-20230822214905815

==此时这就是一个多线程的多生产多消费模型!==

==上面的代码有什么可以优化的地方呢?——是先申请信号量,再加锁!还是先加锁再申请信号量呢比较好呢?==

其实是先申请信号量再加锁!——我们维护互斥关系==不是为了保护信号量!==因为信号量的申请本身就是原子的!==我们是为了保护临界资源的!例如:循环队列和队列下标!==

如果我们先加锁,那么意味着一个线程走完了释放了锁,另一个线程也只能先去申请锁!在来申请信号量!

在上一个线程持有锁期间!我们**不能提前把信号量提前指派只给不同的线程!**这样子就让当前的大部分线程其实都是串行的!==信号量的作用就没有了!==

我们为什么要有信号量?就是为了让线程能够提前知道,共享资源的状态!然后方便去快速的访问!==我们先加锁,然后申请信号量这和先加锁,然后判断有什么区别吗?线程只能在锁上等待!这时候线程什么都做不了!只能等==这就和我们使用信号量的初衷相违背了!

==如果我们反过来!先申请信号量再加锁,就相当于先给线程派发任务!然后让拿到任务的线程一个个排队,互斥的访问共享资源!==

**当一个线程在访问共享资源的时候,在信号量上等待的其他线程,不是干等,这些线程可以不断的进行P操作将信号量不断的进行申请分配,直到没有信号量,那么其他没有信号量的线程才会被挂起,然后拿着信号量进入锁里面,依次访问不同的下标!**、

==先申请信号量,那么信号量就可以线程并行的进行访问!然后一次性分配完,然后申请锁锁再串行的执行下面的代码!==

==先申请锁,那么就只能一个个串行的申请信号量!然后串行的继续执行下面的代码!==

void Push(const T &in)
{
    P(SpaceSem_);//先申请
    pthread_mutex_lock(&pmutex_);//先对生产者的互斥锁进行加锁!
    queue_[productorStep_++] = in;
    productorStep_ %= cap_; 
    V(DataSem_);

    pthread_mutex_unlock(&pmutex_);//对生产者互斥锁解锁!
}
void Pop( T* out)
{
    P(DataSem_);//先申请
    pthread_mutex_lock(&cmutex_);//先对消费者的互斥锁进行加锁!
    *out = queue_[consumerStep_++];
    consumerStep_ %= cap_;
    pthread_mutex_unlock(&cmutex_);//对消费者互斥锁解锁!
    V(SpaceSem_);
}

什么情况下要用到多生产多消费?

==我们知道,无论多少个生成线程,消费线程进入共享资源的永远只有一个生成,一个消费==那么多生产多消费的意义是什么呢?

==因为获取和构建任务——是要花时间的!==

==我们今天使用的只是简单的随机数来构建任务!任务也比较简单!但是未来我们可能会从文件里面,数据库里面,网络里面去读取数据,这样子效率一定是比较低的!==

==消费任务也是如此!以后执行任务的可能会花费很长的时间!——从队列里面拿出任务反倒可能是最快的==

==创建任务和执行任务的时候多线程情况下是可以并行执行的!只是放任务和拿任务是串行的而已——而且放任务和拿任务在未来可能划花费的时间可能是最少的!==

==如果构建任务和处理任务本身是比较花费时间的而且我们可以通过创建多线程来解决!那么我们才采用多生产多消费!==