生产者与消费者模型

时间:2022-08-18 17:38:49

1、【什么是生产者与消费者模型呢?】

一种重要的模型,基于等待/通知机制。生产者/消费者模型描述的是有一块缓冲区作为仓库,生产者可将产品放入仓库,消费者可以从仓库中取出产品,生产者/消费者模型关注的是以下几个点:
1、生产者与消费者不能同时进行工作,形成的是互斥关系; 2、生产者与生产者之间不能同时生产,处于互斥关系; 3、消费者与消费者之间不能同时工作,处于互斥关系; 4、当缓冲区之内的资源满时,生产者不能生产; 5、当缓冲区之内的资源空时,消费者不能消费; 6、消费者消费的速度不能超过生产者; 形象图显示:                                                         生产者与消费者模型
2、【生产者与消费者之间的关系】 基于消费者与生产者模型的概念;在形成了三种关系 、两个对象、一种机制我们称之为是321原则 、、、三种关系指的是: 生产者与消费者关系、生产者与生产者关系、消费者与消费者的关系(三种关系都是互斥关系); 、、、两个对象指的是: 生产者对象、消费者对象 、、、一种机制指的是: 互斥机制

3、【实现单生产者与单消费者模型】

基于单链表实现:
#include<stdio.h>
#include<pthread.h>
#include<stdlib.h>

//mutex设置锁,来实现生产者与消费者的互斥关系
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
//cond设置条件变量
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

//定义一个 链表
typedef struct Node
{
int _val;
struct Node * _next;
}Node_t,*Node_p,**Node_pp;

Node_p head = NULL;
/////////////////////////////////链表的实现/////////////////////////////////////
Node_p allocNode(int val)
{
Node_p ret = (Node_p)malloc(sizeof(Node_t));
ret->_val = val;
ret->_next = NULL;
return ret;
}

void freeNode(Node_p del)
{
free(del);
}
int initList(Node_pp _head)
{
*_head = allocNode(0);
if(*_head == NULL)
return 0;
return -1;
}

void PushNode(Node_p _head,int val)
{
Node_p node = allocNode(val);
node->_next = _head->_next;
_head->_next = node;
}
int Empty(Node_p _head)
{
return (_head->_next==NULL)?0:-1;
}
int PopNode(Node_p _head,int * val)
{
if(Empty(_head) == 0)
{
return -1;
}
Node_p del = _head->_next;
_head->_next = del->_next;
*val = del->_val;
freeNode(del);
del = NULL;
return 0;
}
void PrintList(Node_p _head)
{
Node_p tmp = _head->_next;
while(tmp)
{
printf("%d ",tmp->_val);
tmp=tmp->_next;
}
printf("\n");
}

void DestroyList(Node_pp _head)
{
while(Empty(*_head) < 0)
{
int tmp;
PopNode(*_head,&tmp);
}
freeNode(*_head);
*_head = NULL;
}
//////////////////////////////////////////////////////////////////////
//生产者者 插入数据
void * producter(void * arg)
{
while(1)
{
pthread_mutex_lock(&lock);
int val = rand()%1024;
PushNode(head,val);
printf("producter push done: %d \n",val);
sleep(1);
//生产者 插入数据,此时唤醒消费者消费
pthread_cond_signal(&cond);
pthread_mutex_unlock(&lock);
}
return NULL;
}
//消费者弹出数据
void * consumer(void * arg)
{
while(1)
{
pthread_mutex_lock(&lock);
if(Empty(head)==0)
{
//要是当前的链表为空
printf("consumer check\n");
//消费者使用 条件变量 挂起
pthread_cond_wait(&cond,&lock);
}
else
{
int val;
PopNode(head,&val);
printf("producter pop done: %d \n",val);
}
//sleep(1);
pthread_mutex_unlock(&lock);
}
}
int main()
{
/*
initList(&head);
int count = 0 ;
while(count++ <10)
{
//int val = rand()%1024;
PushNode(head,count);
PrintList(head);
sleep(1);
}
count = 0 ;
while(count ++ < 7)
{
int val ;
PopNode(head,&val);
PrintList(head);
sleep(1);
}
DestroyList(&head);
*/
initList(&head);
pthread_t c,p;
pthread_create(&c,NULL,consumer,NULL);
pthread_create(&p,NULL,producter,NULL);

pthread_join(c,NULL);
pthread_join(p,NULL);
DestroyList(&head);
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&cond);
return 0 ;
}
基于环形队列实现:
#include<stdio.h>
#include<pthread.h>
#include<semaphore.h>

//使用信号量实现环形队列
int databuf[64];
sem_t semblank;//信号量 格子数
sem_t semdata;//信号量 数据数量

void * consumer(void *arg)
{
int step = 0;
while(1)
{
if(sem_wait(&semdata) <0)
{
printf("consumer check!\n");
}else{
int data = databuf[step];
step++;
step %= 64;
printf("consumer done :%d \n",data);
sem_post(&semblank);
}
usleep(500000);
}
return NULL;
}
void * producter(void *arg)
{
int step = 0 ;
while(1)
{
if(sem_wait(&semblank)< 0 )
{
printf("producter check! \n");
}else{
int data= rand()%1234;
databuf[step] = data;
step++;
step %= 64;
printf("producter done :%d \n",data);
sem_post(&semdata);
}
usleep(1);
}
return NULL;
}
int main()
{
//初始化两个信号值 格子数是 64 ,数据量为 0
sem_init(&semblank,0,64);
sem_init(&semdata,0,0);

pthread_t c,p;
pthread_create(&c,NULL,consumer,NULL);
pthread_create(&p,NULL,producter,NULL);

pthread_join(c,NULL);
pthread_join(p,NULL);
sem_destroy(&semblank);
sem_destroy(&semdata);
return 0;
}

4、【实现多生产者与消费者模型】

在这有两个生产者与两个消费者 
#include<stdio.h>
#include<pthread.h>
#include<semaphore.h>
//实现多生产者 、多消费者
//使用信号量实现环形队列
int databuf[64];
sem_t semblank;//信号量 格子数
sem_t semdata;//信号量 数据数量

//生产者与生产者是 互斥关系;
//消费者与消费者是 互斥关系;
//互斥锁
pthread_mutex_t conlock =PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t prolock =PTHREAD_MUTEX_INITIALIZER;
int stepc = 0 ;//consumer step ;
int stepp = 0 ;//productor step ;



void * consumer1(void *arg)
{
while(1)
{
pthread_mutex_lock(&conlock);
if(sem_wait(&semdata) <0)
{
printf("consumer1 check!\n");
}else{
int data = databuf[stepc];
stepc++;
stepc %= 64;
printf("consumer1 done :%d \n",data);
sem_post(&semblank);
}
usleep(500000);
pthread_mutex_unlock(&conlock);
}
return NULL;
}
void * consumer2(void *arg)
{
while(1)
{
pthread_mutex_lock(&conlock);
if(sem_wait(&semdata) <0)
{
printf("consumer2 check!\n");
}else{
int data = databuf[stepc];
stepc++;
stepc %= 64;
printf("consumer2 done :%d \n",data);
sem_post(&semblank);
}
sleep(1);
pthread_mutex_unlock(&conlock);

}
return NULL;
}

void * producter1(void *arg)
{
while(1)
{
pthread_mutex_lock(&prolock);
if(sem_wait(&semblank)< 0 )
{
printf("producter1 check! \n");
}else{
int data= rand()%1234;
databuf[stepp] = data;
stepp++;
stepp %= 64;
printf("producter1 done :%d \n",data);
sem_post(&semdata);
}
sleep(1);
pthread_mutex_unlock(&prolock);
}
return NULL;
}
void * producter2(void *arg)
{

while(1)
{
pthread_mutex_lock(&prolock);
if(sem_wait(&semblank)< 0 )
{
printf("producter2 check! \n");
}else{
int data= rand()%1234;
databuf[stepp] = data;
stepp++;
stepp %= 64;
printf("producter2 done :%d \n",data);
sem_post(&semdata);
}
sleep(1);
pthread_mutex_unlock(&prolock);
}
return NULL;
}
int main()
{
//初始化两个信号值 格子数是 64 ,数据量为 0
sem_init(&semblank,0,64);
sem_init(&semdata,0,0);

pthread_t c1,p1,c2,p2;
pthread_create(&c1,NULL,consumer1,NULL);
pthread_create(&p1,NULL,producter1,NULL);
pthread_create(&c2,NULL,consumer2,NULL);
pthread_create(&p2,NULL,producter2,NULL);

pthread_join(c1,NULL);
pthread_join(p1,NULL);
pthread_join(c2,NULL);
pthread_join(p2,NULL);
sem_destroy(&semblank);
sem_destroy(&semdata);
pthread_mutex_destroy(&conlock);
pthread_mutex_destroy(&prolock);
return 0;
}

5、【实现进程之间的单生产者与单消费者模型】

进程之间实现生产者与消费者模型:在这里要使用到进程通信; 在这里我是使用的是   共享内存  来实现进程之间的通信的,但是,共享内存不提供任何的同步与互斥机制,所以我还是用到 信号量 来实现互斥机制 实现 共享内存 与 信号量
#ifndef  _COMM_H_
#define _COMM_h_

#include<stdio.h>
#include<unistd.h>
#include<sys/types.h>
#include<sys/ipc.h>
#include<sys/shm.h>
#include<sys/sem.h>

#define PATHNAME "."
#define PROJ_ID 0x0666

//实现信号量 与共享内存的函数声明

int creatshm(int size);
int getshm(int size);
void * attshm(int shmid);
int dttshm(void *addr);
int destroyshm(int shmid);


union semun {
int val; /* Value for SETVAL */
struct semid_ds *buf; /* Buffer for IPC_STAT, IPC_SET */
unsigned short *array; /* Array for GETALL, SETALL */
struct seminfo *__buf; /* Buffer for IPC_INFO (Linux-specific) */
};

int creatSem(int nsems);
int getSem(int nsems);
int destroySem(int semid);
int initSem(int semid,int which,int value);
int P(int semid,int which);
int V(int semid,int which);


#endif
#include"comm.h"


//Sem
static int commSem(int flag,int nsems)
{
key_t key = ftok(PATHNAME,PROJ_ID);
if(key < 0)
{
perror("ftok");
return -1;
}
int semid = semget(key,nsems,flag);
if(semid < 0 )
{
perror("semget");
return -2;
}
return semid;
}
//创建的信号量
int creatSem(int nsems)
{
return commSem(IPC_CREAT|IPC_EXCL|0666,nsems);
}
//得到已创建的信号量
int getSem(int nsems)
{
return commSem(IPC_CREAT,nsems);
}
//销毁信号量集
int destroySem(int semid)
{
if(semctl(semid,0,IPC_RMID) < 0 )
{
perror("semctl");
return -1;
}
return 0;

}
//信号量集的初始化
int initSem(int semid ,int which,int value)
{
union semun _semun;
_semun.val = value;
if(semctl(semid,which,SETVAL,_semun) < 0 )
{
perror("semctl");
return -1;
}
return 0;
}
int Semop(int semid,int which,int op)
{
struct sembuf _sembuf;
_sembuf.sem_num = which;
_sembuf.sem_op = op;
_sembuf.sem_flg = 0;
return semop(semid,&_sembuf,1);
}


int P(int semid,int which)
{
if(Semop(semid,which ,-1) ==0 )
return 0;
perror("P_sem");
return -1;
}

int V(int semid,int which)
{
if(Semop(semid,which ,1) == 0 )
return 0;
perror("V_sem");
return -1;
}


//shm
static int commshm(int flag,int size)
{
key_t key=ftok(PATHNAME,PROJ_ID);
if(key < 0)
{
perror("ftok");
return -1;
}
int shmid = shmget(key,size,flag);
if(shmid<0)
{
perror("shmget");
}
return shmid;
}
//创建共享内存
int creatshm(int size)
{
return commshm(IPC_CREAT|IPC_EXCL|0666,size);

}
//得到共享内存
int getshm(int size)
{
return commshm(IPC_CREAT,size);

}
//销毁共享内存
int destroyshm(int shmid)
{
if(shmctl(shmid,IPC_RMID,NULL) < 0 )
{
perror("shmctl");
return -1;
}
return 0;
}
//搭接到共享内存
void * attshm(int shmid)
{
void * shmaddr = shmat(shmid,NULL,0);
return shmaddr;

}
//断开连接
int dttshm(void *addr)
{
return shmdt(addr);
}
实现生产者函数
#include"comm.h"


//conducter put data
int main()
{
// creat shm
int shmid = creatshm(4096);
int *addr= (int *)attshm(shmid);
int step = 0;
// creat sem
int semid = creatSem(2);
//consumer sem blank 64
initSem(semid,0,64);
//server sem data 0;
initSem(semid,1,0);;
while(1)
{
P(semid,0);
int data = rand()%1234;
addr[step] = data;
++step;
step %= 64;
printf("conducter done : %d! \n",data);
sleep(1);
V(semid,1);
}
dttch(addr);
destroySem(semid);
destroyshm(shmid);
return 0;

}
实现消费者
#include"comm.h"


//consumer get data
int main()
{
// get shm
int shmid = getshm(4096);
int *addr= (int *)attshm(shmid);
// get sem
int semid = getSem(2);
int step = 0 ;
while(1)
{
P(semid,1);
int data =addr[step] ;
++step;
step %= 64;
printf("consumer done : %d! \n",data);
sleep(1);
V(semid,0);
}
dttch(addr);
return 0;

}