生产者消费者模型
简单来说就是“321原则(并非某一规则,而是为了理解生产者消费者模型)”
“3”代表的是三种关系
生产者与消费者的互斥与同步关系
生产者与生产者的互斥(或竞争)关系
消费者与消费者的互斥(或竞争)关系
“2”代表两种角色
生产者:往交易场所放东西(在计算机中一般都是数据)的人
消费者:从交易场所取东西的人
“1”代表一个交易场所
所谓交易场所就是内存中具有存储数据的一段有界缓冲区
综上,给出生产者消费者模型的描述:两个进程共享一个缓冲区,一个进程称为生产者向缓冲区中放数据,另一个称为消费者从缓冲取中取数据,当缓冲区中被放时,生产者进程就必须可进入挂起状态,直到消费者从缓冲中取走数据时,生产者才能继续向缓冲区中存放数据,同样当缓冲取中没有数据时,消费者进程就必须进入挂起休眠状态,直到生产者向缓冲区中放入数据时,消费者才能被唤醒继续从缓冲区中取走数据。
图示:
基于链表的生产者消费者模型
分析:当使用链表来模拟生产者消费者模型时,我们可以借助链表的插入来扮演生产者的角色,用链表的删除来充当消费者的角色,为了便于实现,我们直接采用链表的头插和链表的头删操作来模拟放数据和取数据这两个过程。
(1)条件变量接口说明
int pthread_cond_wait(pthread_cond_t* redtrist cond , pthread_mutex_t* redtrist )//挂起等待
1、不满足条件时必须进行休眠(释放Mutex) 2、不能抱着锁资源休眠(进行阻塞式等待) 3、能被唤醒(被唤醒时能够重新获得Mutex资源并等待)
int pthread_cond_signal(pthread_cond_t* cond ) //唤醒
1、生产者生产好数据之后通知消费者来消费数据
2、消费者消费完数据后通知生产者前来生产
(2)锁相关接口说明
int pthread_mutex_lock(pthread_mutex_t *mutex);
阻塞式加锁
int pthread_mutex_trylock(pthread_mutex_t *mutex);
非阻塞式加锁
int pthread_mutex_unlock(pthread_mutex_t *mutex);
解锁,无论是阻塞式的加锁还是非阻塞式的加锁都需要使用此函数进行解锁
基于单链表的生产者消费者模型实现
#include<stdio.h>
#include<pthread.h>
#include<stdlib.h>
typedef struct list
{
int data;
struct list* next;
}list,*plist,**pplist;
plist head;
pthread_cond_t cond=PTHREAD_COND_INITIALIZER;
plist alloc_node(int d,plist l)
{
plist tmp=(plist)malloc(sizeof(list));
if(!tmp){
perror("malloc");
exit(1);
}
tmp->data=d;
tmp->next=l;
return tmp;
}
void initlist(pplist l){
*l=alloc_node(0,NULL);
}
int isempty(plist l){
return l->next==NULL?1:0;
}
void free_node(plist l){
if(l!=NULL){
free(l);
l=NULL;
}
}
void push_front(plist l,int d){
plist tmp=alloc_node(d,NULL);
tmp->next=l->next;
l->next=tmp;
}
void pop_front(plist l,int* out)
{
if(!isempty(l)){
plist tmp=l->next;
l->next=tmp->next;
*out=tmp->data;
free_node(tmp);
}
}
void showlist(plist l)
{
plist start=l->next;
while(start){
printf("%d ",start->data);
start=start->next;
}
printf("\n");
}
void destroy(plist l){
int data;
while(!isempty(l)){
pop_front(l,&data);
}
free_node(l);
}
void* consume(void* arg){
pthread_mutex_t* lockp=(pthread_mutex_t*)arg;
int data=0;
while(1){
pthread_mutex_lock(lockp);
if(isempty(head)){
pthread_cond_wait(&cond,&lockp);
}
pop_front(head,&data);
printf("consum done: %d\n",data);
pthread_mutex_unlock(lockp);
pthread_cond_signal(&cond);
}
}
void* product(void* arg){
pthread_mutex_t* lockp=(pthread_mutex_t*)arg;
int data=0;
while(1){
pthread_mutex_lock(lockp);
data=rand()%1234;
push_front(head,data);
printf("product done: %d\n",data);
pthread_mutex_unlock(lockp);
pthread_cond_signal(&cond);
}
}
int main()
{
pthread_mutex_t lock;
pthread_mutex_init(&lock,NULL);
initlist(&head);
pthread_t consumer,producter;
pthread_create(&consumer,NULL,consume,(pthread_mutex_t *)&lock);
pthread_create(&producter,NULL,product,(pthread_mutex_t *)&lock);
pthread_join(consumer,NULL);
pthread_join(producter,NULL);
destroy(head);
pthread_mutex_destroy(&lock);
return 0;
}
基于多元信号量的生产者消费者
背景知识
涉及到信号量的知识,首先我们需要搞清楚的一个东西就是PV操作的含义,PV操作是由P操作原语和V操作原语组成的(原语很好理解,就是任务要么全部做完,要么都不做),对于信号量的具体操作如下所示:
P(S):
(1)将信号量S的值进行减1操作;
(2)如果S>=0则该进程继续执行,否则该进程就会被挂起等待,直到有进程释放了,才可以被唤醒;
V(S):
(1)与P操作正好相反,V操作是将信号量的值加1;
(2)如果S>0,则该进程继续执行,否则释放队列中第一个等待信号量的进程。
PV操作的意义:用信号量及PV操作,实现进程的同步与互斥,PV操作属于进程的低级通信。
在使用型号量来模拟生产者和消费者模型时,我们是环形队列来实现生产者和消费者这两种角色之间的对于资源的存取,首先我们知道队列的底层存储数据的方式其实就是一个数组,只要控制好对于队头和队尾的相关计算,我们就可以实现循环队列,而生产者依旧是在有空间的时候进行存放数据,没有空间时进入挂起等待状态,消费者则是在有数据时进行取数据,没有数据时进行挂起等待操作,这样我们便可以实现生产者和消费模型
不过先不要高兴的太早了,我们还需要制定这个规则才能实现:
生产者优先:其实就算消费者优先,由于刚开始没有生产出数据,消费者也会被挂起;
消费者永远不能追上消费者:试想一下如果消费者追上生产者或者超过消费者的时候,此时消费者消费的并不是生产者实际所生产出的数据,而属于垃圾数据;
生产者不能将消费者包一圈:这个也很好理解,如果生产者允许将消费者包一圈的话,那就相当于生产者可以无限的生产,并不停的覆盖掉原来所产生的数据,那么如果原来生产出的数据中如果有的是消费者需要获取的数据,那么除了生产者在次生产出该数据外,消费者将再也不能得到所想要的数据;
图示:
代码实现:
#include<stdio.h>
#include<pthread.h>
#include<semaphore.h>
int ring[64];
sem_t semBlank;
sem_t semData;
void* consume(void* arg){
int step=0;
while(1){
sem_wait(&semData);
int data=ring[step];
step++;
step%=64;
printf("consume done: %d\n",data);
sem_post(&semBlank);
}
}
void* product(void* arg){
int step=0;
while(1){
sem_wait(&semBlank);
int data=rand()%1234;
ring[step]=data;
step++;
step%=64;
printf("produce done: %d\n",data);
sem_post(&semData);
}
}
int main()
{
sem_init(&semBlank,0,64);
sem_init(&semData,0,0);
pthread_t consumer,producter;
pthread_create(&consumer,NULL,consume,NULL);
pthread_create(&producter,NULL,product,NULL);
pthread_join(consumer,NULL);
pthread_join(producter,NULL);
sem_destroy(&semBlank);
sem_destroy(&semData);
return 0;
}
运行结果: