使用 线程 和 pxsem 实现多个生产者和一个消费者

时间:2022-03-24 19:36:25

 《Unix 进程间通信》中一个使用信号量实现多个生产者和一个消费者的例子,相比使用线程锁和条件更为简单,源码如下:

unpv22e\pxsem\prodcons3.c

使用 线程 和 pxsem 实现多个生产者和一个消费者使用 线程 和 pxsem 实现多个生产者和一个消费者
  1 #include <unistd.h>
2 #include <pthread.h>
3 #include <semaphore.h> // no name semaphore
4 #include <stdio.h>
5 #include <stdlib.h>
6
7 #define NBUFF 10
8 #define MAXNTHREADS 100
9
10 #define min(X,Y) ((X) < (Y) ? (X) : (Y))
11
12 int nitems, nproducers; /* read-only by producer and consumer */
13
14 struct { /* data shared by producers and consumer */
15 int buff[NBUFF];
16 int nput;
17 int nputval;
18 sem_t mutex, nempty, nstored; /* semaphores, not pointers */
19 } shared;
20
21 void *produce(void *), *consume(void *);
22 void err_quit(char *ptr);
23 void err_sys(char *ptr);
24
25 int
26 main(int argc, char **argv)
27 {
28 int i, count[MAXNTHREADS]; // record how many resource were created by each producter
29 pthread_t tid_produce[MAXNTHREADS], tid_consume;
30
31 if (argc != 3)
32 err_quit("usage: prodcons3 <#items> <#producers>"); // items 指生产多少个产品,producers表示有几个生产者
33 nitems = atoi(argv[1]); // resource number
34 nproducers = min(atoi(argv[2]), MAXNTHREADS);
35
36 /* 4initialize three semaphores */
37 sem_init(&shared.mutex, 0, 1); // just a lock
38 sem_init(&shared.nempty, 0, NBUFF); // record idle space
39 sem_init(&shared.nstored, 0, 0); // record already stored space
40
41 /* 4create all producers and one consumer */
42 pthread_setconcurrency(nproducers + 1);
43 for (i = 0; i < nproducers; i++) {
44 count[i] = 0;
45 pthread_create(&tid_produce[i], NULL, produce, &count[i]); //&count[i]为整形数组成员地址,即int型指针
46 }
47 pthread_create(&tid_consume, NULL, consume, NULL);
48
49 /* 4wait for all producers and the consumer */
50 for (i = 0; i < nproducers; i++) {
51 pthread_join(tid_produce[i], NULL);
52 printf("count[%d] = %d\n", i, count[i]); // 打印各个生产者生产的多少产品
53 }
54 pthread_join(tid_consume, NULL);
55
56 sem_destroy(&shared.mutex);
57 sem_destroy(&shared.nempty);
58 sem_destroy(&shared.nstored);
59 exit(0);
60 }
61 /* end main */
62
63 /* include produce */
64 void *
65 produce(void *arg)
66 {
67 for ( ; ; ) {
68 sem_wait(&shared.nempty); /* wait for at least 1 empty slot */
69 sem_wait(&shared.mutex);
70
71 if (shared.nput >= nitems) {
72 sem_post(&shared.nempty);
73 sem_post(&shared.mutex);
74 return(NULL); /* all done */
75 }
76
77 shared.buff[shared.nput % NBUFF] = shared.nputval; // 循环存放
78 printf("produce buff[%d] = %d\n", shared.nput % NBUFF, shared.buff[shared.nput % NBUFF]);
79 shared.nput++;
80 shared.nputval++; // 产品数值始终是累加的
81
82 sem_post(&shared.mutex);
83 sem_post(&shared.nstored); /* 1 more stored item */
84 *((int *) arg) += 1; // 先转换为int型地址,之后解引用加一
85 }
86 }
87 /* end produce */
88
89 /* include consume */
90 void *
91 consume(void *arg)
92 {
93 int i;
94
95 for (i = 0; i < nitems; i++) {
96 sem_wait(&shared.nstored); /* wait for at least 1 stored item */
97 sem_wait(&shared.mutex);
98
99 if (shared.buff[i % NBUFF] != i)
100 printf("error: buff[%d] = %d\n", i, shared.buff[i % NBUFF]);
101 printf("consume shared.buff[%d] = %d\n", i % NBUFF, shared.buff[i % NBUFF]);
102
103 sem_post(&shared.mutex);
104 sem_post(&shared.nempty); /* 1 more empty slot */
105 }
106 return(NULL);
107 }
108 /* end consume */
109
110 void
111 err_quit(char *ptr)
112 {
113 fprintf(stderr, "%s\n", ptr);
114 exit(1);
115 }
116
117 void
118 err_sys(char *ptr)
119 {
120 fprintf(stderr, "%s\n", ptr);
121 exit(1);
122 }
View Code