POSIX信号量
POSIX信号量有两种:有名信号量和无名信号量,无名信号量也可被称为基于内存的信号量,有名信号量通过IPC名字进行进程间同步,而无名信号量如果不是放在进程的共享内存区中,是不能用来进行进程间同步的,只能用于线程同步。
POSIX信号量还可以分为:
1.二值信号量
信号量的值只有0和1,和互斥量类似。若资源被锁住,信号量的值为0,若资源可用,信号量的值为1.
2.计数信号量
信号量的值在0到一个大于1的限制值,该计数可表示可用的资源的个数。
介绍几个函数:
1.初始化
初始化无名信号量
参数:
pshared:0表示线程共享,非零表示进程间共享
value:信号量初值
初始化有名信号量
参数:
oflag:可以设置为O_CREAT|O_EXCL
mode:可以设置为0644,自己可读写,其他用户和组内用户可读
value:表示信号量初始化的值
2.销毁信号量
3.等待一个信号量(P操作)
检查信号量的值,如果值小于或等于0,就阻塞,直到该值变成大于0,然后等待进程将信号量的值减1,进程获得共享资源的访问权限。这个操作必须是一个原子操作。
sem_timewait与sem_wait类似,但不同的是,若V操作不能立刻执行,该函数将投入睡眠并等待abs_timeout中指定时间。如果超时依旧无法执行P操作,则返回timeout错误。
4.挂出一个信号量(V操作)
将信号量的值加1,如果有进程阻塞等待该信号量,那么其中一个进程将被唤醒。这个操作也必须是原子操作。
p操作:测试信号量值大于0,减1.sem_wait
v操作:给定信号量值加1. sem_post
用posix信号量和互斥锁解决生产者消费者问题
1、生产者线程的任务
p(sem_full),p一个满的信号量(缓冲区容量),如果仓库还没满(sem_full>0),我们就能生产产品。一旦我们生产了一个产品,仓库既不是一个空的状态了,我们V一个空的信号量sem_empty.就告诉了消费者可以消费了。
由于可能有多个生产者,所以引入一个互斥量。
2、消费者的任务
要p(sem_empty),如果生产者没有V一个sem_empty,消费者p操作sem_empty可能就不行,所以要等生产者生产了产品,v了sem_empty通知到消费者,消费者才可以消费。一旦消费了一个产品,就可以V(sem_full)是的仓库容量减一(不满)通知到生产者线程又可以生产了。由于可能有多个消费者,所以引入一个互斥量。
假设仓库容量为10,刚开始可以生产产品,所以sem_full(10)信号量初始值设为10,表示可以生产10个。
刚开始仓库中没有产品可以消费,所以sem_empty(0)初始值为0.
#include<unistd.h>
#include<sys/types.h>
#include<fcntl.h>
#include<sys/stat.h>
#include<stdlib.h>
#include<stdio.h>
#include<errno.h>
#include <semaphore.h>
#include<pthread.h>
#define ERR_EXIT(m)\
do\
{\
perror(m);\
exit(EXIT_FAILURE);\
}while(0)
#define CONSUMERS_COUNT 1 //消费者线程个数
#define PRODUCERS_COUNT 2 //生产者线程个数
#define BUFFSIZE 10 //缓冲区大小(仓库大小)
int g_buffer[BUFFSIZE]; //环形缓冲区
unsigned short in=0; //初始生产产品放置位置
unsigned short out=0; //初始消费的产品位置
unsigned short produce_id=0;//当前正在消费的产品ID
unsigned short consume_id=0;//当前正在消费的产品ID
//两个信号量,一个互斥锁。
sem_t g_sem_full;//可生产产品的数量
sem_t g_sem_empty;//可消费产品的数量
pthread_mutex_t g_mutex;
pthread_t g_thread[CONSUMERS_COUNT+PRODUCERS_COUNT ];//线程ID.生产者线程个消费者线程个数
//消费者线程执行的任内务
void* consume(void*arg)
{
int num=(int)arg;
int i;
//消费者不断消费
while(1)
{
printf("%d wait buffer not empty\n",num);
//sem_wait测试信号量值大于0减一并立即返回.P操作
sem_wait(&g_sem_empty);
pthread_mutex_lock(&g_mutex);
//消费之前打印基本信息
for(i=0;i<BUFFSIZE;i++)
{
printf("%02d ",i);//仓库序号
if(g_buffer[i]==-1)
printf("%s ","null");//仓库为空
else
printf("%d ",g_buffer[i]);
if(i==out)
printf("\t<--consume");//消费位置
printf("\n");
}
consume_id=g_buffer[out];
printf("%d begin consume product %d\n",num,consume_id);
g_buffer[out]=-1;
out=(out+1)%BUFFSIZE;
printf("%d end consume product %d\n",num,consume_id);
pthread_mutex_unlock(&g_mutex);
//给定信号量值加1。V操作,消费了一个产品,仓库就可以再生产产品
sem_post(&g_sem_full);
sleep(1);
}
return NULL;
}
//生产者线程执行的任务。
void* produce(void*arg)
{
int num=(int)arg;
int i;
while(1)
{
printf("%d wait buffer not full\n",num);
//sem_wait测试信号量值大于0减一并立即返回.g_sem_full代表可以生产的个生产了一个之后,可以生产的个数就要减一
//所以初始值为buffersize
sem_wait(&g_sem_full);
pthread_mutex_lock(&g_mutex);
//生产之前打印基本信息
for(i=0;i<BUFFSIZE;i++)
{
printf("%02d ",i);//仓库序号
if(g_buffer[i]==-1)
printf("%s ","null");//仓库为空
else
printf("%d ",g_buffer[i]);
if(i==in)
printf("\t<--produce");//生产位置
printf("\n");
}
printf("%d begin produce product %d\n",num,produce_id);
g_buffer[in]=produce_id;
in=(in+1)%BUFFSIZE;
printf("%d end produce product %d\n",num,produce_id++);
pthread_mutex_unlock(&g_mutex);
//生产了一个产品的话,就给g_sem_empty信号量值加1
sem_post(&g_sem_empty);
sleep(5);
}
return NULL;
}
int main(void)
{
int i;
for(i=0;i<BUFFSIZE;i++)
g_buffer[i]=-1;
//初始化信号量
sem_init(&g_sem_full,0,BUFFSIZE);
sem_init(&g_sem_empty,0,0);
//初始化互斥量
pthread_mutex_init(&g_mutex,NULL);
//创建若干个线程,执行消费任务。
for(i=0;i<CONSUMERS_COUNT;i++)
{
pthread_create(&g_thread[i],NULL,consume,(void*)i);//产生了段错误.我们传递的是所以nt num=(int)arg 而不是int num=*(int*)arg
}
//创建若干个线程,执行生产任务。
for(i=0;i<PRODUCERS_COUNT;i++)
{
pthread_create(&g_thread[i+CONSUMERS_COUNT],NULL,produce,(void*)i);
}
//主线程等待这些线程结束
for(i=0;i<CONSUMERS_COUNT+PRODUCERS_COUNT;i++)
{
pthread_join(g_thread[i],NULL);
}
sem_destroy(&g_sem_full);//销毁POSIX信号量
sem_destroy(&g_sem_empty);
pthread_mutex_destroy(&g_mutex);
return 0;
}