生产者消费者模型

时间:2022-07-13 17:38:27

对于生产者消费者模型,必须满足以下三点:

①3种关系:生产者与生产者之间互斥,消费者与消费者之间互斥,生产者与消费者之间互斥且同步 ②2种角色:生产者,消费者 ③1种交易场所
本文根据交易场所,讨论两种生产者消费者模型:基于单链表和基于环形队列
1.基于单链表的单生产者单消费者模型: 背景知识:互斥锁和条件变量(对于互斥锁这里不加赘述,前面文章已经详细讨论过http://blog.csdn.net/chenkaixin_1024/article/details/73294394,这里重点讨论条件变量)
条件变量用于描述临界资源的内部状态,实现线程间同步机制,与互斥锁搭配使用(在互斥锁的保护下使用),实现线程的同步与互斥
条件变量的创建,初始化,销毁与互斥锁类似,条件变量类型:pthread_cond_t
初始化,销毁接口如下:初始化:int pthread_cond_init(pthread_cond_t* cond,const pthread_condattr_t* attr);             当条件变量为全局变量或者是静态变量时,可以直接用PTHEAD_COND_INITIALIZER进行初始化(相当于初始化函数第二个参数为NULL的情况)销毁:int pthread_cond_destory(pthread_cond_t* cond);返回值:若成功返回0,否则返回错误码
大多数情况下,我们将条件变量与互斥锁组合起来使用,而我们的条件变量则是用来阻塞当前线程的,直到达到某个条件时,解除阻塞所以,对于条件变量,最主要的操作就是等待和解除阻塞,接口如下:int pthread_cond_wait(pthread_cond_t *cond,pthread_mutex_t *mutex);int pthread_cond_timewait(pthread_cond_t *cond,pthread_mutex *mutex,const timespec *abstime);
上述两个函数用于等待,均为阻塞式等待,它们均做以下几步:1. 释放互斥锁2. 阻塞等待3. 当被唤醒时,重新获得互斥锁并返回对于后者的第三个参数,可以设定等待超时,如果到达了abstime所指定的 时刻仍然没有别的线程来唤醒当前线程,就返回ETIMEDOUT注意:pthread_cond_wait很有可能调用失败,从而导致无法阻塞,由此出错,所以对于pthread_cond_wait要放在循环当中
int pthread_cond_signal(pthread_cond_t *cond);int pthread_cond_broadcast(pthread_cond_t *cond);上述两个函数用于唤醒线程,前者用于唤醒另外一个线程,后者用于唤醒在这个条件变量下所有阻塞的线程
下面是具体对于基于单链表的单生产者单消费者模型的实现:
#include<stdio.h>
#include<pthread.h>
#include<stdlib.h>

typedef struct list{
struct list* _next;
int _val;
}Node,*pNode;

pNode pHead=NULL;
pthread_mutex_t Mutex=PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t needproduct=PTHREAD_COND_INITIALIZER;

void* product(void* arg)
{
pNode pcur=NULL;
while(1)
{
sleep(2);
int value=rand()%100;
pthread_mutex_lock(&Mutex);
if(pHead==NULL)
{
// printf("3\n");
pHead=(pNode)malloc(sizeof(Node));
pHead->_next=NULL;
pHead->_val=value;
pcur=pHead;
}
else
{
// printf("1\n");
pNode p=(pNode)malloc(sizeof(Node));
pcur->_next=p;
p->_val=value;
p->_next=NULL;
pcur=p;
// printf("...");
}
printf("producter:%d\n",value);
// printf("pHead->%d\n",pHead->_val);
pthread_mutex_unlock(&Mutex);
pthread_cond_signal(&needproduct);
}
}

void* consum(void* arg)
{
while(1)
{
// printf("2\n");
sleep(1);
pthread_mutex_lock(&Mutex);
// printf("consumer\n");
while(pHead==NULL)
{
pthread_cond_wait(&needproduct,&Mutex);//pthread_cond_wait调用失败
}
// printf("4\n");
printf("consumer:%d\n",pHead->_val);
pNode pcur=pHead;
pHead=pcur->_next;
free(pcur);

pthread_mutex_unlock(&Mutex);
}
}


int main()
{
pthread_t tid1,tid2;
int err=pthread_create(&tid1,NULL,product,NULL);
if(err!=0)
{
printf("pthread_create product:%s\n",strerror(err));
return -1;
}

err=pthread_create(&tid2,NULL,consum,NULL);
if(err!=0)
{
printf("pthread_create consum:%s\n",strerror(err));
return -1;
}


pthread_join(tid1,NULL);
pthread_join(tid2,NULL);

return 0;
}

2.基于环形队列的单生产者单消费者模型:
背景知识:信号量 严格来说,互斥锁用0,1表示当前资源是否被占用,从而限制线程对资源的申请,实现加锁的目的,但是在某种程度上,我们也可以这么来看,0,1来表示当前资源的可用数目,当可用数目为0时,所有线程在对该资源进行申请时就会失败,当为1时,这个资源就可以被申请成功。而对于我们的信号量而言,也就是用来描述临界资源的可用数量,信号量变量类型为sem_t,其具体操作如下:①信号量的初始化与销毁int sem_init(sem_t *sem,int pshared,unsigned int value);函数功能:用于信号量的初始化参数:第一个参数表示指向信号量的指针,第二个参数表示当它为0时表示实现线程间的同步,非0表示实现进程间的同步,第三个参数表示可用资源的数目返回值:若成功返回0,否则返回-1且设置errno
int sem_destroy(sem_t* sem);函数功能:用于释放信号量
②信号量的P操作和V操作P操作:int sem_wait(sem_t* sem);函数功能:用于获得资源,使信号量对应描述的资源数目减1,当调用sem_wait时资源数目已经为0时,挂起等待注意:若不希望挂起等待的话,可以调用sem_trywait
V操作:int sem_post(sem_t* sem);函数功能:可以进行释放资源,且对信号量所描述的资源数目加1,同时唤醒由于资源数目先前为0而挂起等待的线程
对于基于环形队列的生产者消费者模型,生产者与消费者之间得满足以下几点:
1.生产者不能将消费者套一个圈(即数据不能被覆盖)2.消费者不能超过生产者(即只有生产者生产了,消费者才能消费)3.生产者与消费者不能访问同一位置(即不能这里生产者在放数据,那里消费者在消费该位置的数据)
下面是对基于环形队列的单生产者单消费者模型的实现:
#include<stdio.h>
#include<pthread.h>
#include<semaphore.h>
#define MAX_PRODUCT 10

sem_t data_sem;
sem_t blank_sem;
int queue[MAX_PRODUCT];

void init_queue()
{
int i=0;
for(;i<MAX_PRODUCT;i++)
{
queue[i]=0;
}
}

void P(sem_t* sem)
{
sem_wait(sem);
}

void V(sem_t* sem)
{
sem_post(sem);
}

void* product(void* arg)
{
int idx=0;
while(1)
{
sleep(2);
P(&blank_sem);
int value=rand()%100;
queue[idx%MAX_PRODUCT]=value;
idx++;
printf("product:%d\n",value);
V(&data_sem);
}
}

void* consum(void* arg)
{
int idx=0;
while(1)
{
sleep(1);
P(&data_sem);
int value=queue[idx%MAX_PRODUCT];
idx++;
printf("consum:%d\n",value);
V(&blank_sem);
}
}


int main()
{
sem_init(&data_sem,0,0);
sem_init(&blank_sem,0,MAX_PRODUCT);
init_queue();
pthread_t tid1,tid2;
int err=pthread_create(&tid1,NULL,product,NULL);
if(err!=0)
{
printf("pthread_create:%s\n",strerror(err));
return -1;
}

err=pthread_create(&tid2,NULL,consum,NULL);
if(err!=0)
{
printf("pthread_create:%s\n",strerror(err));
return -1;
}


pthread_join(tid1,NULL);
pthread_join(tid2,NULL);

sem_destroy(&blank_sem);
sem_destroy(&data_sem);
return 0;
}

而对于多生产者多消费者模型而言,则要考虑生产者与生产者之间的互斥,消费者与消费者之间的互斥,所以我们加入互斥锁来实现:

#include<stdio.h>
#include<pthread.h>
#include<semaphore.h>
#define MAX_PRODUCT 10

sem_t data_sem;
sem_t blank_sem;
int queue[MAX_PRODUCT];

void init_queue()
{
int i=0;
for(;i<MAX_PRODUCT;i++)
{
queue[i]=0;
}
}

void P(sem_t* sem)
{
sem_wait(sem);
}

void V(sem_t* sem)
{
sem_post(sem);
}

pthread_mutex_t P_Mutex=PTHREAD_MUTEX_INITIALIZER;
int p=0;

void* product1(void* arg)
{
int idx=0;
while(1)
{
sleep(2);
pthread_mutex_lock(&P_Mutex);
P(&blank_sem);
int value=rand()%100;
queue[p%MAX_PRODUCT]=value;
p++;
printf("product1:%d\n",value);
V(&data_sem);
pthread_mutex_unlock(&P_Mutex);
}
}


void* product2(void* arg)
{
int idx=0;
while(1)
{
sleep(2);
pthread_mutex_lock(&P_Mutex);
P(&blank_sem);
int value=rand()%100;
queue[p%MAX_PRODUCT]=value;
p++;
printf("product2:%d\n",value);
V(&data_sem);
pthread_mutex_unlock(&P_Mutex);
}
}

pthread_mutex_t C_Mutex=PTHREAD_MUTEX_INITIALIZER;
int c=0;

void* consum1(void* arg)
{
while(1)
{
sleep(1);
pthread_mutex_lock(&C_Mutex);
P(&data_sem);
int value=queue[c%MAX_PRODUCT];
c++;
printf("consum1:%d\n",value);
V(&blank_sem);
pthread_mutex_unlock(&C_Mutex);
}
}

void* consum2(void* arg)
{
while(1)
{
sleep(1);
pthread_mutex_lock(&C_Mutex);
P(&data_sem);
int value=queue[c%MAX_PRODUCT];
c++;
printf("consum2:%d\n",value);
V(&blank_sem);
pthread_mutex_unlock(&C_Mutex);
}
}

int main()
{
sem_init(&data_sem,0,0);
sem_init(&blank_sem,0,MAX_PRODUCT);
init_queue();
pthread_t tid1,tid2,tid3,tid4;
int err=pthread_create(&tid1,NULL,product1,NULL);
if(err!=0)
{
printf("pthread_create:%s\n",strerror(err));
return -1;
}

err=pthread_create(&tid2,NULL,product2,NULL);
if(err!=0)
{
printf("pthread_create:%s\n",strerror(err));
return -1;
}

err=pthread_create(&tid3,NULL,consum1,NULL);
if(err!=0)
{
printf("pthread_create:%s\n",strerror(err));
return -1;
}


err=pthread_create(&tid4,NULL,consum2,NULL);
if(err!=0)
{
printf("pthread_create:%s\n",strerror(err));
return -1;
}


pthread_join(tid1,NULL);
pthread_join(tid2,NULL);
pthread_join(tid3,NULL);
pthread_join(tid4,NULL);

sem_destroy(&blank_sem);
sem_destroy(&data_sem);
return 0;
}