信号量实现生产者消费者模型

时间:2021-10-25 15:13:50

本实验的代码中采用的有界缓冲区拥有3个单元,每个单元为5字节。为了尽量体现每个信号量的意义,在程序中生产过程和消费过程是随机(采取0~5s的随机时间间隔)进行的,

而且生产者的速度比比消费者的速度平均快两倍左右(这种关系可以相反)。生产者一次生产一个单元的产品(放入“hello”字符串),消费者一次消费一个单元的产品。

  1  /*本实验的代码中采用的有界缓冲区拥有3个单元,每个单元为5字节。
  2 *为了尽量体现每个信号量的意义,在程序中生产过程和消费过程是随机(采取0~5s的随机时间间隔)进行的,
  3 *而且生产者的速度比比消费者的速度平均快两倍左右(这种关系可以相反)。
  4 *生产者一次生产一个单元的产品(放入“hello”字符串),消费者一次消费一个单元的产品。
  5 */
  6 
  7 #include <stdio.h>
  8 #include <stdlib.h>
  9 #include <string.h>
 10 #include <unistd.h>
 11 #include <fcntl.h>
 12 #include <errno.h>
 13 #include <pthread.h>
 14 #include <semaphore.h>
 15 #include <sys/ipc.h>
 16 #define MYFIFO    "./myfifo"
 17 #define BUFFER_SIZE    3
 18 #define UNIT_SIZE    6
 19 #define RUN_TIME    30
 20 #define DELAY_TIME_LEVELS    5.0
 21 
 22 int fd;    //管道描述符
 23 time_t end_time;    //存放线程的起始时间
 24 sem_t mutex,full,avail;    //信号量描述符
 25 
 26 //生产者线程
 27 void *producer(void *arg)
 28 {
 29     int real_write;        //实际写入字节数
 30     int delay_time=0;
 31 
 32     //time(MULL)返回从公元1970年1月1日的UC时间0时0分0秒算起
 33     //到现在所经过的描述,while()的意思就是说如果在执行生产者线程
 34     //的那一刻没有超过其结束时间,那么执行
 35     while(time(NULL)<end_time){
 36         //阐述0~5s的随机数
 37         delay_time=(int)(rand()*DELAY_TIME_LEVELS/(RAND_MAX)/2.0)+1;
 38         sleep(delay_time);
 39         //P操作信号量avail和mutex
 40         sem_wait(&avail);
 41         sem_wait(&mutex);
 42         printf("\nProducer: delay=%d\n", delay_time);
 43         //生产者写入数据
 44         if((real_write=write(fd,"hello",UNIT_SIZE))==-1){
 45             //这个errno==EAGAIN表示的是你的write本来是非阻塞情况
 46             //表示现在没有数据可读,这个时候会置全局变量errno为
 47             //EAGINA,表示可以再次进行读操作;如果是阻塞情况,
 48             //那么被中断的话,errno=EINIR
 49             if(errno==EAGAIN){
 50                 printf("The FIFO has not been read yet. Please try later\n");
 51             }
 52         }
 53         else{
 54             printf("wirte %d to the FIFO\n", real_write);
 55         }
 56 
 57         //V操作信号量full和mutex
 58         sem_post(&full);
 59         sem_post(&mutex);
 60     }
 61     pthread_exit(NULL);
 62 }
 63 
 64 //消费者线程
 65 void *customer(void *arg)
 66 {
 67 //    unsigned int read_buffer[UNIT_SIZE];
 68     char read_buffer[UNIT_SIZE];
 69     int real_read;
 70     int delay_time;
 71 
 72     //time(MULL)返回从公元1970年1月1日的UC时间0时0分0秒算起
 73     //到现在所经过的描述,while()的意思就是说如果在执行生产者线程
 74     //的那一刻没有超过其结束时间,那么执行
 75     while(time(NULL)<end_time){
 76         //阐述0~5s的随机数
 77         delay_time=(int)(rand()*DELAY_TIME_LEVELS/(RAND_MAX)/2.0)+1;
 78         sleep(delay_time);
 79         //P操作信号量full和mutex
 80         sem_wait(&full);
 81         sem_wait(&mutex);
 82         memset(read_buffer,0,UNIT_SIZE);    //用0来初始化
 83         printf("\nCustomer: delay=%d\n", delay_time);
 84         //消费者消费数据
 85         if((real_read=read(fd,read_buffer,UNIT_SIZE))==-1){
 86             //这个errno==EAGAIN表示的是你的write本来是非阻塞情况
 87             //表示现在没有数据可读,这个时候会置全局变量errno为
 88             //EAGINA,表示可以再次进行读操作;如果是阻塞情况,
 89             //那么被中断的话,errno=EINIR
 90             if(errno==EAGAIN){
 91                 printf("No data yet\n");
 92             }
 93         }
 94         else{
 95             printf("read %s from the FIFO\n", read_buffer);
 96         }
 97 
 98         //V操作信号量avail和mutex
 99         sem_post(&avail);
100         sem_post(&mutex);
101     }
102 }
103 
104 int main(int argc, char const *argv[])
105 {
106     pthread_t thrd_pro_id, thrd_cus_id;
107     pthread_t mon_th_id;
108     int ret;
109 
110     srand(time(NULL));    //随机数发生器初始化
111     end_time=time(NULL)+RUN_TIME;
112     //创建有名管道。若文件已存在,则mkfifo()返回errno=EEXIST
113     if((mkfifo(MYFIFO, O_CREAT|O_EXCL)<0)&&(errno!=EEXIST)){
114         printf("Cannot create fifo\n");
115         return errno;
116     }
117     //打开管道
118     fd=open(MYFIFO,O_RDWR,0666);
119     if(fd==-1){
120         printf("Open fifo failed.\n");
121         return fd;
122     }
123 
124     ret=sem_init(&mutex,0,1);
125     ret+=sem_init(&avail,0,BUFFER_SIZE);
126     ret+=sem_init(&full,0,0);
127     if(ret!=0){    //这里ret用的倒也巧妙
128         printf("Any semaphore initialization failed\n");
129         return ret;
130     }
131     
132     //创建两个进程
133     ret=pthread_create(&thrd_pro_id, NULL, (void *)producer, NULL);
134     if(ret!=0){
135         printf("create producer thread failed.\n");
136         return ret;
137     }
138     ret=pthread_create(&thrd_cus_id, NULL, (void *)customer, NULL);
139     if(ret!=0){
140         printf("create customer thread failed.\n");
141         return ret;
142     }
143     printf("wait for producer&customer thread\n");
144     pthread_join(thrd_pro_id,NULL);
145     pthread_join(thrd_cus_id,NULL);
146 //    unlink(MYFIFO);        //所有打开该文件的进程都结束时文件被删除
147     return 0;
148 }

 

编译运行结果:

信号量实现生产者消费者模型

然后我们看看myfifo文件,果然是权限有问题

信号量实现生产者消费者模型

 然后把文件的用户读权限加上去 chmod u+r myfifo ,就有结果啦

信号量实现生产者消费者模型

 

但这终究不是办法呀,要是每次我都改一下权限那岂不是很麻烦,所以就只能该程序喽

把113行 if((mkfifo(MYFIFO, O_CREAT|O_EXCL)<0)&&(errno!=EEXIST)) 改成 if((mkfifo(MYFIFO, O_CREAT|O_EXCL|0666)<0)&&(errno!=EEXIST)) 就可以啦。

我们来看看效果,先删除原来的FIFO文件,重新编译,运行就有结果啦。

信号量实现生产者消费者模型

我们来看看FIFO文件的权限

信号量实现生产者消费者模型

好像跟我们设定的0666不太一样诶

 

然后我们取消146行的注释看看

信号量实现生产者消费者模型

果然运行完后不见myfifo