生产者消费者问题属于有界缓冲区问题。我们现在讲述多个生产者向一个缓冲区中存入数据,多个生产者从缓冲区中取数据。
共享缓冲区作为一个环绕缓冲区,存数据到头时再从头开始。
我们使用一个互斥量保护生产者向缓冲区中存入数据。
由于有多个生产者,因此需要记住现在向缓冲区中存入的位置。
使用一个互斥量保护缓冲区中消息的数目,这个生产的数据数目作为生产者和消费者沟通的桥梁。
使用一个条件变量用于唤醒消费者。由于有多个消费者,同样消费者也需要记住每次取的位置。
#include "unpipc.h" #include "my_err.h" #include <pthread.h> #define MAXITEMS 6 #define MAXTHREADS 3 int buffer[MAXITEMS];
//生产者使用的结构体 struct shared { pthread_mutex_t mutex; int input;//下标 int i;//存入的值 }shared={PTHREAD_MUTEX_INITIALIZER};
//消费者使用的结构体,生产者也会使用 struct ready { pthread_mutex_t mutex; pthread_cond_t cond; int nready;//生产者为消费者准备好的可用的商品数量 int nget;//消费者从仓库中抓取商品的次数 }ready={PTHREAD_MUTEX_INITIALIZER,PTHREAD_COND_INITIALIZER}; void *produce(void *arg) { while(1) { pthread_mutex_lock(&shared.mutex); if(shared.i>=MAXITEMS)//仓库满,不能再生产 { pthread_mutex_unlock(&shared.mutex); return NULL; } buffer[shared.input]=shared.i; if(++shared.input>=MAXITEMS)//1...MAXITEMS,循环缓冲,如果抓取的次数大于MAXITEMS,从头在开始计数 shared.input=0; //++shared.input; ++shared.i;//值 pthread_mutex_unlock(&shared.mutex); //使临界区尽量短使用多个互斥量,生产完毕可以消费了 pthread_mutex_lock(&ready.mutex); if(ready.nready==0)//如果有消费者等待消费,则唤醒 pthread_cond_signal(&ready.cond); ++ready.nready;//可以消费的商品的数目+1 pthread_mutex_unlock(&ready.mutex); } return NULL; } void *consume(void *arg) { while(1) { pthread_mutex_lock(&ready.mutex); if(ready.nready==0)//如果可以消费的数目==0,仓库中没有可消费的商品,等待 pthread_cond_wait(&ready.cond,&ready.mutex); if(++ready.nget>=MAXITEMS)//1...MAXITEMS { if(ready.nget==MAXITEMS)//抓取商品的次数==仓库的最大容量时少操作一次, printf("buffer[%d] = %d \n",ready.nget-1,buffer[(ready.nget-1)%MAXITEMS]); pthread_cond_signal(&ready.cond);// pthread_mutex_unlock(&ready.mutex); return NULL; } --ready.nready; pthread_mutex_unlock(&ready.mutex); printf("buffer[%d] = %d\n",ready.nget-1,buffer[(ready.nget-1)%MAXITEMS]); } return NULL; } int main() { pthread_setconcurrency(MAXTHREADS*2);// int i; pthread_t pid_produce[MAXTHREADS],pid_consume[MAXTHREADS]; for(i=0;i<MAXTHREADS;++i) pthread_create(&pid_produce[i],NULL,produce,NULL); for(i=0;i<MAXTHREADS;++i) pthread_create(&pid_consume[i],NULL,consume,NULL); for(i=0;i<MAXTHREADS;++i) pthread_join(pid_produce[i],NULL); for(i=0;i<MAXTHREADS;++i) pthread_join(pid_consume[i],NULL); exit(0); }