posix信号量 Link with -lpthread.
sem_t *sem_open(const char *name, int oflag);//打开POSIX信号量
sem_t *sem_open(const char *name, int oflag,mode_t mode, unsigned int value);
int sem_init(sem_t *sem, int pshared, unsigned int value);
posix互斥锁(第七章)
int pthread_mutex_destroy(pthread_mutex_t *mutex);
int pthread_mutex_init(pthread_mutex_t *restrict mutex,const pthread_mutexattr_t *restrict attr);
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
有两个信号量。
p操作:测试信号量值大于0,减1.sem_wait
v操作:给定信号量值加1. sem_post
用posix信号量和互斥锁解决生产者消费者问题
一、生产者线程的任务:
p(sem_full),p一个满的信号量(缓冲区容量),如果仓库还没满(sem_full>0),我们就能生产产品。一旦我们
生产了一个产品,仓库既不是一个空的状态了,我们V一个空的信号量sem_empty.就告诉了消费者可以消费了。
由于可能有多个生产者,所以引入一个互斥量。
二:消费者的任务:
要p(sem_empty),如果生产者没有V一个sem_empty,消费者p操作sem_empty可能就不行,所以要等生产者生产了产品,
v了sem_empty通知到消费者,消费者才可以消费。一旦消费了一个产品,就可以V(sem_full)是的仓库容量减一(不满)通知
到生产者线程又可以生产了。由于可能有多个消费者,所以引入一个互斥量。
假设仓库容量为10,刚开始可以生产产品,所以sem_full(10)信号量初始值设为10,表示可以生产10个。
刚开始仓库中没有产品可以消费,所以sem_empty(0)初始值为0.
1 #include<unistd.h>
2 #include<sys/types.h>
3 #include<fcntl.h>
4 #include<sys/stat.h>
5 #include<stdlib.h>
6 #include<stdio.h>
7 #include<errno.h>
8 #include <semaphore.h>
9 #include<pthread.h>
10 #define ERR_EXIT(m)\
11 do\
12 {\
13 perror(m);\
14 exit(EXIT_FAILURE);\
15 }while(0)
16 #define CONSUMERS_COUNT 1 //消费者线程个数
17 #define PRODUCERS_COUNT 2 //生产者线程个数
18 #define BUFFSIZE 10 //缓冲区大小(仓库大小)
19 int g_buffer[BUFFSIZE]; //环形缓冲区
20 unsigned short in=0; //初始生产产品放置位置
21 unsigned short out=0; //初始消费的产品位置
22 unsigned short produce_id=0;//当前正在消费的产品ID
23 unsigned short consume_id=0;//当前正在消费的产品ID
24 //两个信号量,一个互斥锁。
25 sem_t g_sem_full;//可生产产品的数量
26 sem_t g_sem_empty;//可消费产品的数量
27 pthread_mutex_t g_mutex;
28
29 pthread_t g_thread[CONSUMERS_COUNT+PRODUCERS_COUNT ];//线程ID.生产者线程个数+消费者线程个数
30 //消费者线程执行的任内务
31 void* consume(void*arg)
32 {
33 int num=(int)arg;
34 int i;
//消费者不断消费
35 while(1)
36 {
37 printf("%d wait buffer not empty\n",num);
38 //sem_wait测试信号量值大于0减一并立即返回.P操作
39 sem_wait(&g_sem_empty);
40 pthread_mutex_lock(&g_mutex);
41 //消费之前打印基本信息
42 for(i=0;i<BUFFSIZE;i++)
43 {
44 printf("%02d ",i);//仓库序号
45 if(g_buffer[i]==-1)
46 printf("%s ","null");//仓库为空
47 else
48 printf("%d ",g_buffer[i]);
49 if(i==out)
50 printf("\t<--consume");//消费位置
51 printf("\n");
52 }
53 consume_id=g_buffer[out];
54 printf("%d begin consume product %d\n",num,consume_id);
55 g_buffer[out]=-1;
56 out=(out+1)%BUFFSIZE;
57 printf("%d end consume product %d\n",num,consume_id);
58 pthread_mutex_unlock(&g_mutex);
59 //给定信号量值加1。V操作,消费了一个产品,仓库就可以再生产产品
60 sem_post(&g_sem_full);
61 sleep(1);
62 }
63 return NULL;
64 }
65 //生产者线程执行的任务。
66 void* produce(void*arg)
67 {
68 int num=(int)arg;
69 int i;
70 while(1)
71 {
72 printf("%d wait buffer not full\n",num);
73 //sem_wait测试信号量值大于0减一并立即返回.g_sem_full代表可以生产的个数,生产了一个之后,可以生产的个数就要减一
74 //所以初始值为buffersize
75 sem_wait(&g_sem_full);
76 pthread_mutex_lock(&g_mutex);
77 //生产之前打印基本信息
78 for(i=0;i<BUFFSIZE;i++)
79 {
80 printf("%02d ",i);//仓库序号
81 if(g_buffer[i]==-1)
82 printf("%s ","null");//仓库为空
83 else
84 printf("%d ",g_buffer[i]);
85 if(i==in)
86 printf("\t<--produce");//生产位置
87 printf("\n");
88 }
89 printf("%d begin produce product %d\n",num,produce_id);
90 g_buffer[in]=produce_id;
91 in=(in+1)%BUFFSIZE;
92 printf("%d end produce product %d\n",num,produce_id++);
93 pthread_mutex_unlock(&g_mutex);
94 //生产了一个产品的话,就给g_sem_empty信号量值加1
95 sem_post(&g_sem_empty);
96 sleep(5);
97 }
98 return NULL;
99 }
100 int main(void)
101 {
102 int i;
103 for(i=0;i<BUFFSIZE;i++)
104 g_buffer[i]=-1;
105 //初始化信号量
106 sem_init(&g_sem_full,0,BUFFSIZE);
107 sem_init(&g_sem_empty,0,0);
108 //初始化互斥量
109 pthread_mutex_init(&g_mutex,NULL);
110
111 //创建若干个线程,执行消费任务。
112 for(i=0;i<CONSUMERS_COUNT;i++)
113 {
114 pthread_create(&g_thread[i],NULL,consume,(void*)i);//产生了段错误.我们传递的是值,所以int num=(int)arg 而不是int num=*(int*)arg
115 }
116 //创建若干个线程,执行生产任务。
117 for(i=0;i<PRODUCERS_COUNT;i++)
118 {
119 pthread_create(&g_thread[i+CONSUMERS_COUNT],NULL,produce,(void*)i);
120 }
121 //主线程等待这些线程结束
122 for(i=0;i<CONSUMERS_COUNT+PRODUCERS_COUNT;i++)
123 {
124 pthread_join(g_thread[i],NULL);
125 }
126
127 sem_destroy(&g_sem_full);//销毁POSIX信号量
128 sem_destroy(&g_sem_empty);
129 pthread_mutex_destroy(&g_mutex);
130 return 0;
131 }