经典生产者与消费者问题(线程的同步与互斥)

时间:2022-04-21 16:42:11

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
互斥:保证一个资源只能被一个进程使用。
首先,解释“321”:
1、一个交易场所(缓冲区,类似于超市)
2、两种角色:生产者(生产数据)与消费者(消费数据)
3、三种关系:生产者与生产者互斥
消费者与消费者互斥
生产者与消费者同步与互斥
在未做任何处理之前让生产者生产数据,消费者消费数据,就会出现以下问题:
经典生产者与消费者问题(线程的同步与互斥)
参考代码:

#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<assert.h>
#include<pthread.h>
typedef struct Node
{
int _data;
struct Node *next;
}node,*pnode,**ppnode;
pthread_mutex_t lock=PTHREAD_MUTEX_INITIALIZER;
pnode phead=NULL;
pnode alloc(int data)
{
pnode temp=(pnode)malloc(sizeof(node));
if(temp==NULL)
return NULL;
temp->_data=data;
temp->next=NULL;
return temp;
}
void initList(ppnode phead)
{
*phead=alloc(0);
}
void pushList(pnode phead,int data)
{
pnode _p=alloc(data);
_p->next=phead->next;
phead->next=_p;
}
int popList(pnode phead)
{

if(phead->next==NULL)
{
return -1;
}
else
{
pnode temp=phead->next;
int data=temp->_data;
phead->next=temp->next;
free(temp);
return data;
temp=NULL;
}
return 0;
}
void destroyList(pnode phead)
{
while(phead->next)
{
pnode temp=phead->next;
phead->next=temp->next;
free(temp);
}
}
void showList(pnode phead)
{
pnode temp=phead->next;
while(temp)
{
printf("%d",temp->_data);
temp=temp->next;
}
printf("\n");
}
void* producter(void* arg)
{
int data=0;
while(1)
{
sleep(1);
data=rand()%100;
pushList(phead,data);
printf("producter:%d\n",data);
}
return (void*)0;
}
void* consumer(void* arg)
{
int data=0;
while(1)
{
sleep(2);
if(data=popList(phead))
{
printf("consumer:%d\n",data);
}
}
return (void*)0;
}
int main()
{
initList(&phead);
/*pushList(phead,1);
pushList(phead,2);
pushList(phead,3);
pushList(phead,4);
showList(phead);
popList(phead);
showList(phead);*/

pthread_t tid1;
pthread_t tid2;
pthread_create(&tid1,NULL,producter,NULL);
pthread_create(&tid2,NULL,consumer,NULL);
pthread_join(tid1,NULL);
pthread_join(tid2,NULL);
destroyList(phead);
return 0;
}

运行结果:
经典生产者与消费者问题(线程的同步与互斥)
经典生产者与消费者问题(线程的同步与互斥)
为了保证原子性访问临界资源,故引入加互斥锁。
参考代码:

#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<assert.h>
#include<pthread.h>
typedef struct Node
{
int _data;
struct Node *next;
}node,*pnode,**ppnode;
pthread_mutex_t lock=PTHREAD_MUTEX_INITIALIZER;
pnode phead=NULL;
pnode alloc(int data)
{
pnode temp=(pnode)malloc(sizeof(node));
if(temp==NULL)
return NULL;
temp->_data=data;
temp->next=NULL;
return temp;
}
void initList(ppnode phead)
{
*phead=alloc(0);
}
void pushList(pnode phead,int data)
{
pnode _p=alloc(data);
_p->next=phead->next;
phead->next=_p;
}
int popList(pnode phead)
{

if(phead->next==NULL)
{
return -1;
}
else
{
pnode temp=phead->next;
int data=temp->_data;
phead->next=temp->next;
free(temp);
return data;
temp=NULL;
}
return 0;
}
void destroyList(pnode phead)
{
while(phead->next)
{
pnode temp=phead->next;
phead->next=temp->next;
free(temp);
}
}
void showList(pnode phead)
{
pnode temp=phead->next;
while(temp)
{
printf("%d",temp->_data);
temp=temp->next;
}
printf("\n");
}
void* producter(void* arg)
{
int data=0;
while(1)
{
sleep(1);
pthread_mutex_lock(&lock);
data=rand()%100;
pushList(phead,data);
printf("producter:%d\n",data);
pthread_mutex_unlock(&lock);
}
return (void*)0;
}
void* consumer(void* arg)
{
int data=0;
while(1)
{
sleep(2);
pthread_mutex_lock(&lock);
if(data=popList(phead))
{
printf("consumer:%d\n",data);
pthread_mutex_unlock(&lock);
}
}
return (void*)0;
}
int main()
{
initList(&phead);
/*pushList(phead,1);
pushList(phead,2);
pushList(phead,3);
pushList(phead,4);
showList(phead);
popList(phead);
showList(phead);*/

pthread_t tid1;
pthread_t tid2;
pthread_create(&tid1,NULL,producter,NULL);
pthread_create(&tid2,NULL,consumer,NULL);
pthread_join(tid1,NULL);
pthread_join(tid2,NULL);
pthread_mutex_destroy(&lock);
destroyList(phead);
return 0;
}

运行结果:
经典生产者与消费者问题(线程的同步与互斥)
但互斥锁只能解决互斥问题,保证原子性操作,还是不能解决同步问题,故引入条件变量(Condition Variable )。
参考代码:

#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<assert.h>
#include<pthread.h>
typedef struct Node
{
int _data;
struct Node *next;
}node,*pnode,**ppnode;
pthread_mutex_t lock=PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond=PTHREAD_COND_INITIALIZER;
pnode phead=NULL;
pnode alloc(int data)
{
pnode temp=(pnode)malloc(sizeof(node));
if(temp==NULL)
return NULL;
temp->_data=data;
temp->next=NULL;
return temp;
}
void initList(ppnode phead)
{
*phead=alloc(0);
}
void pushList(pnode phead,int data)
{
pnode _p=alloc(data);
_p->next=phead->next;
phead->next=_p;
}
int popList(pnode phead)
{

if(phead->next==NULL)
{
return -1;
}
else
{
pnode temp=phead->next;
int data=temp->_data;
phead->next=temp->next;
free(temp);
return data;
temp=NULL;
}
return 0;
}
void destroyList(pnode phead)
{
while(phead->next)
{
pnode temp=phead->next;
phead->next=temp->next;
free(temp);
}
}
void showList(pnode phead)
{
pnode temp=phead->next;
while(temp)
{
printf("%d",temp->_data);
temp=temp->next;
}
printf("\n");
}
void* producter(void* arg)
{
int data=0;
while(1)
{
sleep(2);
pthread_mutex_lock(&lock);
data=rand()%100;
pushList(phead,data);
printf("producter:%d\n",data);
pthread_cond_signal(&cond);
printf("data has been producted!\n ");
pthread_mutex_unlock(&lock);
}
return (void*)0;
}
void* consumer(void* arg)
{
int data=0;
while(1)
{
sleep(1);
pthread_mutex_lock(&lock);
while(phead->next==NULL)
{
printf("No data is producted!\n");
pthread_cond_wait(&cond,&lock);
}
data=popList(phead);
printf("consumer:%d\n",data);
pthread_mutex_unlock(&lock);
}
return (void*)0;
}
int main()
{
initList(&phead);
/*pushList(phead,1);
pushList(phead,2);
pushList(phead,3);
pushList(phead,4);
showList(phead);
popList(phead);
showList(phead);*/

pthread_t tid1;
pthread_t tid2;
pthread_create(&tid1,NULL,producter,NULL);
pthread_create(&tid2,NULL,consumer,NULL);
pthread_join(tid1,NULL);
pthread_join(tid2,NULL);
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&cond);
destroyList(phead);
return 0;
}

运行结果:
经典生产者与消费者问题(线程的同步与互斥)