生产者消费者模型:有一群生产者在生产产品,并将这些产品提供给消费者去消费。为使生产者进程和消费者进程能并发执行,在两者之间设置缓冲区,生产者进程将其所生产的产品放入缓冲区中;消费者进程可从缓冲区中取走产品进行消费。尽管所有的生产者进程和消费者进程都是以异步方式运行的,但它们之间必须保持同步,既不允许消费者进程到空缓冲区去取产品,也不允许生产者进程想一个已装满产品的缓冲区中投放产品。
在实际的开发中,经常会碰到如下场景:某个模块负责生产数据,这些数据由另一个模块来负责处理。产生数据的模块就形象的称为生产者,而处理数据的模块就称为消费者。只有生产者和消费者还不够,这个模型还必须要有一个缓冲区处于生产者和消费者之间,作为中介。生产者把数据放入缓冲区,而消费者从缓冲区中取出数据。
生产者生产一个结构体串在链表的表头上,消费者从表头取一个结构体。
#include<stdio.h>
#include<pthread.h>
#include<unistd.h>
#include<stdlib.h>
#include<assert.h>
typedef struct Node
{
int data;
struct Node * next;
}Node,*Node_p,**Node_pp;
Node_p CreatNode(int data)
{
Node_p _n=(Node_p)malloc(sizeof(Node));
if(_n==NULL)
{
return NULL;
}
_n->data=data;
_n->next=NULL;
return _n;
}
void Init(Node_pp list)
{
*list=CreatNode(0);
}
void PushFront(Node_p list ,int data)
{
assert(list);
Node_p _n=CreatNode(data);
if(_n==NULL)
{
perror("Push");
return;
}
_n->next=list->next;
list->next=_n;
}
void del_Node(Node_p del)
{
assert(del);
free(del);
}
int isEmpty(Node_p list)
{
assert(list);
if(list->next==NULL)
return 1;
else
return 0;
}
void PopFront(Node_p list,int *data)
{
if(!isEmpty(list))
{
Node_p del=list->next;
list->next=del->next;
*data=del->data;
del_Node(del);
}
else
{
printf("list Empty\n");
}
}
void destroy(Node_p list)
{
int data;
assert(list);
while(!isEmpty(list))
{
PopFront(list,&data);
}
del_Node(list);
}
void ShowList(Node_p list)
{
assert(list);
Node_p cur=list->next;
while(cur->next)
{
printf("%d->",cur->data);
cur=cur->next;
}
printf("\n");
}
Node_p list=NULL;
pthread_mutex_t mylock= PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t mycond=PTHREAD_COND_INITIALIZER;
void * consum(void *arg)
{
int data=0;
while(1)
{
pthread_mutex_lock(&mylock);
while(isEmpty(list))
{
pthread_cond_wait(&mycond,&mylock);
}
PopFront(list,&data);
pthread_mutex_unlock(&mylock);
printf("consumer:%d\n",data);
}
return NULL;
}
void * produce(void *arg)
{
int data=0;
while(1)
{
usleep(123456);
data=rand()%1000;
pthread_mutex_lock(&mylock);
PushFront(list,data);
pthread_mutex_unlock(&mylock);
pthread_cond_signal(&mycond);
printf("Producer:%d\n",data);
}
return NULL;
}
int main()
{
Init(&list);
pthread_t consumer,producer;
pthread_create(&consumer,NULL,consum,NULL);
pthread_create(&producer,NULL,produce,NULL);
pthread_join(consumer,NULL);
pthread_join(producer,NULL);
destroy(list);
pthread_mutex_destroy(&mylock);
pthread_cond_destroy(&mycond);
return 0;
}
用环形缓冲区实现生产者消费者模型
#include<stdio.h>
#include<pthread.h>
#include<unistd.h>
#include<stdlib.h>
#include<assert.h>
int arr[20];
static int in=0;
static int out=0;
void Pop(int *arr,int* data)
{
if(in==out) //empty
printf("empty");
else
{
*data=arr[out];
out++;
}
if(out==20)
out=0;
}
void Push(int* arr,int data)
{
if((in+1)%20==out)//full
printf("full");
else
{
arr[in]=data;
in++;
if(in==20)
in=0;
}
}
pthread_mutex_t mylock= PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t mycond=PTHREAD_COND_INITIALIZER;
void * Consumer(void *arg)
{
int data=0;
while(1)
{
pthread_mutex_lock(&mylock);
while(in==out)
{
pthread_cond_wait(&mycond,&mylock);
}
Pop(arr,&data);
pthread_mutex_unlock(&mylock);
printf("consumer:%d\n",data);
}
return NULL;
}
void * produce(void *arg)
{
int data=0;
while(1)
{
usleep(123456);
data=rand()%1000;
pthread_mutex_lock(&mylock);
Push(arr,data);
pthread_mutex_unlock(&mylock);
pthread_cond_signal(&mycond);
printf("Producer:%d\n",data);
}
return NULL;
}
int main()
{
pthread_t consumer,producer;
pthread_create(&consumer,NULL,consum,NULL);
pthread_create(&producer,NULL,produce,NULL);
pthread_join(consumer,NULL);
pthread_join(producer,NULL);
pthread_mutex_destroy(&mylock);
pthread_cond_destroy(&mycond);
return 0;
}