生产者与消费者模型

时间:2022-01-03 17:38:22

生产者消费者模型如图

生产者与消费者模型
在实际的软件开发过程中,经常会碰到如下场景:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类、函数、线程、进程等)。产⽣数据的模块,就形象地称为生产者;⽽而处理数据的模块,就称为消费者。

单抽象出生产者和消费者,还够不上是生产者/消费者模式。该模还需要有一个缓冲区处于生产者和消费者之间,作为一个中介。生产者把数据放入缓冲区,而消费者从缓冲区取出数据。

缓冲区的优点
解耦
假设生产者和消费者分别是两个类。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。将来如果消费者的代码发生变化,可能会影响到生产者。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合也就相应降低了。

支持并发
生产者直接调用消费者的某个方法,还有另一个弊端。由于函数调用是同步的(或者叫阻塞的),在消费者的方法没有返回之前,生产者只好一直等在那边。万⼀消费者处理数据很慢,⽣产者就会白糟蹋大好时光。

使用了生产者/消费者模式之后,生产者和消费者可以是两个独立的并发主体(常见并发类型有进程和线程两种,后面的帖子会讲两种并发类型下的应用)。生产者把制造出来的数据往缓冲区一丢,就可以再去生产下⼀一个数据。基本上不用依赖消费者的处理速度。

支持忙闲不均
缓冲区还有另一个好处。如果制造数据的速度时快时慢,缓冲区的好处就体现出来了。当数据制造快的时候,消费者来不及处理,未处理的数据 暂时存在缓冲区中。等生产者的制造速度慢下来,消费者再慢慢处理掉。

生产者与消费者 要满足321原则
3种关系:生产者之间(互斥)、消费者之间(互斥)、生产者与费者(同步、互斥)
2个角色:生产者与消费者
1个场所:缓冲区

基于单链表实现的生产者与消费者模型

#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>

typedef struct Listnode{
int data;
struct Listnode *next;
}node,*pnode,**ppnode;

pnode head = NULL;
pthread_mutex_t lock =PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond =PTHREAD_COND_INITIALIZER;

static pnode AllocListnode(int data)
{
pnode tmp = (pnode)malloc(sizeof(node));
if(tmp ==NULL)
{
perror("malloc");
exit(1);
}
tmp->data = data;
tmp->next = NULL;
}

static void DeleteListnode(pnode n)
{
if(n){
free(n);
}
}

void InitList(ppnode node)
{
*node = AllocListnode(0);
}

void PushFront(pnode node,int data)
{
pnode tmp = AllocListnode(data);
tmp->next = node->next;
node->next = tmp;
}

void PopFront(pnode node,int *out)
{
if(!Empty(node))
{
pnode tmp = node->next;
node->next = tmp->next;
*out = tmp->data;
DeleteListnode(tmp);
}
}

void ShowList(pnode node)
{
pnode start = node->next;
while(start)
{
printf("%d",start->data);
start = start->next;
}
printf("\n");
}

void Destory(pnode node)
{
int tmp = 0;
while(!Empty(node))
{
PopFront(node,&tmp);
}
DeleteListnode(node);
}

int Empty(pnode node)
{
return node->next ==NULL? 1:0;
}

//消费者
void *Consum(void *arg)
{
int c;
while(1)
{
c = -1;
//加锁
pthread_mutex_lock(&lock);
//如果为空等待
while(Empty(head))
{
printf("consum begin waiting...\n");
//条件变量等待
pthread_cond_wait(&cond,&lock);
}
PopFront(head,&c);
pthread_mutex_unlock(&lock);
printf("consum done: %d \n",c);
// sleep(3);
}
}

//生产者
void *Product(void *arg)
{
int p;
while(1)
{
p = rand()%1234;
pthread_mutex_lock(&lock);
PushFront(head,p);
pthread_mutex_unlock(&lock);
//生产完了给消费者信号
pthread_cond_signal(&cond);
printf("product done: %d\n",p);
sleep(3);
}
}

int main()
{
//初始化链表
InitList(&head);
//创建生产者和消费者线程
pthread_t P,C;
pthread_create(&P,NULL,Product,NULL);
pthread_create(&C,NULL,Consum,NULL);

pthread_join(C,NULL);
pthread_join(P,NULL);

pthread_mutex_destroy(&lock);
pthread_cond_destroy(&cond);

//销毁链表
Destory(head);
return 0;
}

结果
消费者比生产者快:
生产者与消费者模型
生产者比消费者快:
生产者与消费者模型

基于环形队列的生产者消费者模型

生产者与消费者模型

环形队列是一个固定大小的临界区,如何模拟一个环呢,我们可以%上一个数组的大小。

基于环形队列的生产者与消费者模型等同于进程间通信使用信号量的方法,如上图所示生产者生产的时候需要P操作(申请格子资源),释放的时候即生产了数据之后V操作。而消费者消费的时候进行P操作(申请数据资源),释放的时候即拿走数据之后V操作。

使用的函数

#include <semaphore.h>

int sem_init(sem_t *sem,int pshared,unsigned);
int sem_wait(sem_t *sem); //相当于P操作 -1
int sem_post(sem_t *sem);//相当于V操作 +1

必须满足 的条件
消费者必须紧跟在生产者的后面(不能超过)
生产者不能给 消费者套圈
生产者和消费者不能在同一个格子
如果为满 : 消费者先跑
如果为空: 生产者先跑

基于环形队列的生产者与消费者模型:

#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>

#define SIZE 64
int ring[SIZE];
sem_t blank_sem;
sem_t data_sem;

void *Product(void *arg)
{
int step = 0;
int data = 0;
while(1)
{
sem_wait(&blank_sem);
ring[step++] = data;
sem_post(&data_sem);
step %= SIZE;
printf("Product done: %d\n",data++);
sleep(3);
}
}
void *Comsum(void *arg)
{
int step = 0;
while(1)
{
sleep(1);
sem_wait(&data_sem);
int data = ring[step++];
sem_post(&blank_sem);
//环形
step %= SIZE;
printf("Comsum done :%d\n",data);
}
}

int main()
{
sem_init(&blank_sem,0,SIZE);
sem_init(&data_sem,0,0);

pthread_t P, C;
pthread_create(&P,NULL,Product,NULL);
pthread_create(&C,NULL,Comsum,NULL);

pthread_join(P,NULL);
pthread_join(C,NULL);

sem_destroy(&blank_sem);
sem_destroy(&data_sem);
return 0;
}

结果
生产者与消费者模型
生产一条消费一条!

多线程的环形队列生产消费模型

由于生产者与生产者、消费者与消费者之间均存在互斥关系,所以我们需要加上互斥锁!

#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>

#define SIZE 64
int ring[SIZE];
sem_t blank_sem;
sem_t data_sem;

pthread_mutex_t lock1 = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t lock2 = PTHREAD_MUTEX_INITIALIZER;

void *Product1(void *arg)
{
int step = 0;
int data = 0;
while(1)
{
pthread_mutex_lock(&lock1);
sem_wait(&blank_sem);
ring[step++] = data;
sem_post(&data_sem);
pthread_mutex_unlock(&lock1);
step %= SIZE;
printf("Product1 done: %d\n",data++);
sleep(3);
}
}

void *Product2(void *arg)
{
int step = 0;
int data = 0;
while(1)
{
pthread_mutex_lock(&lock1);
sem_wait(&blank_sem);
ring[step++] = data;
sem_post(&data_sem);
pthread_mutex_unlock(&lock1);
step %= SIZE;
printf("Product2 done: %d\n",data++);
sleep(3);
}
}


void *Comsum1(void *arg)
{
int step = 0;
while(1)
{
sleep(1);
pthread_mutex_lock(&lock2);
sem_wait(&data_sem);
int data = ring[step++];
sem_post(&blank_sem);
pthread_mutex_unlock(&lock2);
//环形
step %= SIZE;
printf("Comsum1 done :%d\n",data);

}
}

void *Comsum2(void *arg)
{
int step = 0;
while(1)
{
sleep(1);
pthread_mutex_lock(&lock2);
sem_wait(&data_sem);
int data = ring[step++];
sem_post(&blank_sem);
pthread_mutex_unlock(&lock2);
//环形
step %= SIZE;
printf("Comsum2 done :%d\n",data);
}
}

int main()
{
sem_init(&blank_sem,0,SIZE);
sem_init(&data_sem,0,0);

pthread_t P1,P2, C1,C2;
pthread_create(&P1,NULL,Product1,NULL);
pthread_create(&P2,NULL,Product2,NULL);
pthread_create(&C1,NULL,Comsum1,NULL);
pthread_create(&C2,NULL,Comsum2,NULL);

pthread_join(P1,NULL);
pthread_join(P2,NULL);
pthread_join(C1,NULL);
pthread_join(C2,NULL);

sem_destroy(&blank_sem);
sem_destroy(&data_sem);
return 0;
}

结果:生产和消费匹配进行
生产者与消费者模型