Linux线程同步--POSIX信号量

时间:2022-03-23 15:16:51

POSIX信号量

POSIX信号量有两种:有名信号量和无名信号量,无名信号量也可被称为基于内存的信号量,有名信号量通过IPC名字进行进程间同步,而无名信号量如果不是放在进程的共享内存区中,是不能用来进行进程间同步的,只能用于线程同步。

POSIX信号量还可以分为:
1.二值信号量
信号量的值只有0和1,和互斥量类似。若资源被锁住,信号量的值为0,若资源可用,信号量的值为1.
2.计数信号量
信号量的值在0到一个大于1的限制值,该计数可表示可用的资源的个数。

介绍几个函数:
1.初始化
初始化无名信号量
参数:
pshared:0表示线程共享,非零表示进程间共享
value:信号量初值
Linux线程同步--POSIX信号量
初始化有名信号量
Linux线程同步--POSIX信号量
参数:
oflag:可以设置为O_CREAT|O_EXCL
mode:可以设置为0644,自己可读写,其他用户和组内用户可读
value:表示信号量初始化的值

2.销毁信号量
Linux线程同步--POSIX信号量
3.等待一个信号量(P操作)
检查信号量的值,如果值小于或等于0,就阻塞,直到该值变成大于0,然后等待进程将信号量的值减1,进程获得共享资源的访问权限。这个操作必须是一个原子操作。
Linux线程同步--POSIX信号量
sem_timewait与sem_wait类似,但不同的是,若V操作不能立刻执行,该函数将投入睡眠并等待abs_timeout中指定时间。如果超时依旧无法执行P操作,则返回timeout错误。

4.挂出一个信号量(V操作)
将信号量的值加1,如果有进程阻塞等待该信号量,那么其中一个进程将被唤醒。这个操作也必须是原子操作。
Linux线程同步--POSIX信号量

p操作:测试信号量值大于0,减1.sem_wait
v操作:给定信号量值加1. sem_post

用posix信号量和互斥锁解决生产者消费者问题

1、生产者线程的任务
p(sem_full),p一个满的信号量(缓冲区容量),如果仓库还没满(sem_full>0),我们就能生产产品。一旦我们生产了一个产品,仓库既不是一个空的状态了,我们V一个空的信号量sem_empty.就告诉了消费者可以消费了。
由于可能有多个生产者,所以引入一个互斥量。

2、消费者的任务
要p(sem_empty),如果生产者没有V一个sem_empty,消费者p操作sem_empty可能就不行,所以要等生产者生产了产品,v了sem_empty通知到消费者,消费者才可以消费。一旦消费了一个产品,就可以V(sem_full)是的仓库容量减一(不满)通知到生产者线程又可以生产了。由于可能有多个消费者,所以引入一个互斥量。
假设仓库容量为10,刚开始可以生产产品,所以sem_full(10)信号量初始值设为10,表示可以生产10个。
刚开始仓库中没有产品可以消费,所以sem_empty(0)初始值为0.

#include<unistd.h>
#include<sys/types.h>
#include<fcntl.h>
#include<sys/stat.h>
#include<stdlib.h>
#include<stdio.h>
#include<errno.h>
#include <semaphore.h>
#include<pthread.h>
#define ERR_EXIT(m)\
    do\
    {\
        perror(m);\
        exit(EXIT_FAILURE);\
    }while(0)
#define CONSUMERS_COUNT 1 //消费者线程个数
#define PRODUCERS_COUNT 2 //生产者线程个数
#define BUFFSIZE 10 //缓冲区大小(仓库大小)
int g_buffer[BUFFSIZE];       //环形缓冲区
unsigned short in=0;        //初始生产产品放置位置
unsigned short out=0;        //初始消费的产品位置
unsigned short produce_id=0;//当前正在消费的产品ID
unsigned short consume_id=0;//当前正在消费的产品ID
//两个信号量,一个互斥锁。
sem_t g_sem_full;//可生产产品的数量
sem_t g_sem_empty;//可消费产品的数量
pthread_mutex_t g_mutex;

pthread_t  g_thread[CONSUMERS_COUNT+PRODUCERS_COUNT ];//线程ID.生产者线程个消费者线程个数
//消费者线程执行的任内务
void* consume(void*arg)
{
    int num=(int)arg;
    int i;
   //消费者不断消费
    while(1)
    {
        printf("%d wait buffer not empty\n",num);
    //sem_wait测试信号量值大于0减一并立即返回.P操作
        sem_wait(&g_sem_empty);
        pthread_mutex_lock(&g_mutex);
        //消费之前打印基本信息
        for(i=0;i<BUFFSIZE;i++)
            {
                printf("%02d ",i);//仓库序号
                if(g_buffer[i]==-1)
                    printf("%s ","null");//仓库为空
                else
                    printf("%d ",g_buffer[i]);
                if(i==out)
                    printf("\t<--consume");//消费位置
                printf("\n");
            }
        consume_id=g_buffer[out];
        printf("%d begin consume product %d\n",num,consume_id);
        g_buffer[out]=-1;
        out=(out+1)%BUFFSIZE;
        printf("%d end consume product %d\n",num,consume_id);
        pthread_mutex_unlock(&g_mutex);
        //给定信号量值加1。V操作,消费了一个产品,仓库就可以再生产产品
        sem_post(&g_sem_full);
        sleep(1);
    }    
    return NULL;
}
//生产者线程执行的任务。
void* produce(void*arg)
{    
    int num=(int)arg;
    int i;
    while(1)
    {
        printf("%d wait buffer not full\n",num);
        //sem_wait测试信号量值大于0减一并立即返回.g_sem_full代表可以生产的个生产了一个之后,可以生产的个数就要减一
        //所以初始值为buffersize
        sem_wait(&g_sem_full);
        pthread_mutex_lock(&g_mutex);
        //生产之前打印基本信息
        for(i=0;i<BUFFSIZE;i++)
        {
            printf("%02d ",i);//仓库序号
            if(g_buffer[i]==-1)
                printf("%s ","null");//仓库为空
            else
                printf("%d ",g_buffer[i]);
            if(i==in)
                printf("\t<--produce");//生产位置
            printf("\n");
        }
        printf("%d begin produce product %d\n",num,produce_id);
        g_buffer[in]=produce_id;
        in=(in+1)%BUFFSIZE;
        printf("%d end produce product %d\n",num,produce_id++);
        pthread_mutex_unlock(&g_mutex);
        //生产了一个产品的话,就给g_sem_empty信号量值加1
        sem_post(&g_sem_empty);
        sleep(5);
    }
    return NULL;
}
int main(void)
{
    int i;
    for(i=0;i<BUFFSIZE;i++)
        g_buffer[i]=-1;
    //初始化信号量
    sem_init(&g_sem_full,0,BUFFSIZE);
    sem_init(&g_sem_empty,0,0);
    //初始化互斥量
    pthread_mutex_init(&g_mutex,NULL);

    //创建若干个线程,执行消费任务。
    for(i=0;i<CONSUMERS_COUNT;i++)
    {
    pthread_create(&g_thread[i],NULL,consume,(void*)i);//产生了段错误.我们传递的是所以nt num=(int)arg 而不是int num=*(int*)arg
    }
    //创建若干个线程,执行生产任务。
    for(i=0;i<PRODUCERS_COUNT;i++)
    {
        pthread_create(&g_thread[i+CONSUMERS_COUNT],NULL,produce,(void*)i);
    }
    //主线程等待这些线程结束
    for(i=0;i<CONSUMERS_COUNT+PRODUCERS_COUNT;i++)
    {
        pthread_join(g_thread[i],NULL);
    }

    sem_destroy(&g_sem_full);//销毁POSIX信号量
    sem_destroy(&g_sem_empty);
    pthread_mutex_destroy(&g_mutex);
    return 0;
}