Linux互斥锁、条件变量和信号量

时间:2022-12-13 15:14:27

Linux互斥锁、条件变量和信号量  来自http://kongweile.iteye.com/blog/1155490 

http://www.cnblogs.com/qingxia/archive/2012/08/30/2663791.html

进行多线程编程,最应该注意的就是那些共享的数据,因为无法知道哪个线程会在哪个时候对它进行操作,也无法得知哪个线程会先运行,哪个线程会后运行。所以,要对这些资源进行合理的分配和正确的使用。在Linux下,提供了互斥锁、条件变量和信号量来对共享资源进行保护。
用的最多的是互斥锁和条件变量,所以先会先学这两个。



一、互斥锁
互斥锁,是一种信号量,常用来防止两个进程或线程在同一时刻访问相同的共享资源。
需要的头文件:pthread.h
互斥锁标识符:pthread_mutex_t

(1)互斥锁初始化:
函数原型: int pthread_mutex_init (pthread_mutex_t* mutex,const pthread_mutexattr_t* mutexattr);
函数传入值:  mutex:互斥锁。
mutexattr:PTHREAD_MUTEX_INITIALIZER 创建快速互斥锁。
PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP 创建递归互斥锁。
PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP  创建检错互斥锁。
函数返回值:成功:0;出错:-1

(2)互斥操作函数
int pthread_mutex_lock(pthread_mutex_t* mutex); //上锁
int pthread_mutex_trylock (pthread_mutex_t* mutex); //只有在互斥被锁住的情况下才阻塞
int pthread_mutex_unlock (pthread_mutex_t* mutex); //解锁
int pthread_mutex_destroy (pthread_mutex_t* mutex); //清除互斥锁
函数传入值:mutex:互斥锁。
函数返回值:成功:0;出错:-1

使用形式:
pthread_mutex_t mutex;
pthread_mutex_init (&mutex, NULL); /*定义*/
...

pthread_mutex_lock(&mutex); /*获取互斥锁*/
... /*临界资源*/
pthread_mutex_unlock(&mutex); /*释放互斥锁*/

如果一个线程已经给一个互斥量上锁了,后来在操作的过程中又再次调用了该上锁的操作,那么该线程将会无限阻塞在这个地方,从而导致死锁。这就需要互斥量的属性。

互斥量分为下面三种:
1、快速型。这种类型也是默认的类型。该线程的行为正如上面所说的。
2、递归型。如果遇到我们上面所提到的死锁情况,同一线程循环给互斥量上锁,那么系统将会知道该上锁行为来自同一线程,那么就会同意线程给该互斥量上锁。
3、错误检测型。如果该互斥量已经被上锁,那么后续的上锁将会失败而不会阻塞,pthread_mutex_lock()操作将会返回EDEADLK。

互斥量的属性类型为pthread_mutexattr_t。声明后调用pthread_mutexattr_init()来创建该互斥量。然后调用pthread_mutexattr_settype来设置属性。格式如下:int pthread_mutexattr_settype(pthread_mutexattr_t *attr, int kind);
第一个参数attr,就是前面声明的属性变量;第二个参数kind,就是我们要设置的属性类型。他有下面几个选项:
PTHREAD_MUTEX_FAST_NP
PTHREAD_MUTEX_RECURSIVE_NP
PTHREAD_MUTEX_ERRORCHECK_NP

下面给出一个使用属性的简单过程:
pthread_mutex_t mutex;
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr,PTHREAD_MUTEX_RECURSIVE_NP);
pthread_mutex_init(&mutex,&attr);
pthread_mutex_destroy(&attr);

前面我们提到在调用pthread_mutex_lock()的时候,如果此时mutex已经被其他线程上锁,那么该操作将会一直阻塞在这个地方。如果我们此时不想一直阻塞在这个地方,那么可以调用下面函数:pthread_mutex_trylock。
如果此时互斥量没有被上锁,那么pthread_mutex_trylock将会返回0,并会对该互斥量上锁。如果互斥量已经被上锁,那么会立刻返回EBUSY。




二、条件变量
需要的头文件:pthread.h
条件变量标识符:pthread_cond_t

1、互斥锁的存在问题:
互斥锁一个明显的缺点是它只有两种状态:锁定和非锁定。设想一种简单情景:多个线程访问同一个共享资源时,并不知道何时应该使用共享资源,如果在临界区里加入判断语句,或者可以有效,但一来效率不高,二来复杂环境下就难以编写了,这是我们需要一个结构,能在条件成立时触发相应线程,进行变量修改和访问。

2、条件变量:
条件变量通过允许线程阻塞等待另一个线程发送信号的方法弥补了互斥锁的不足,它常和互斥锁一起使用。使用时,条件变量被用来阻塞一个线程,当条件不满足时,线程往往解开相应的互斥锁并等待条件发生变化。一旦其它的某个线程改变了条件变量,它将通知相应的条件变量唤醒一个或多个正被此条件变量阻塞的线程。这些线程将重新锁定互斥锁并重新测试条件是否满足。

3、条件变量的相关函数
pthread_cond_t cond = PTHREAD_COND_INITIALIZER; //条件变量结构
int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t*cond_attr);
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
const struct timespec *abstime);
int pthread_cond_destroy(pthread_cond_t *cond);

详细说明:
(1)创建和注销
条件变量和互斥锁一样,都有静态动态两种创建方式
a.静态方式
静态方式使用PTHREAD_COND_INITIALIZER常量,如下:
pthread_cond_t cond=PTHREAD_COND_INITIALIZER
b.动态方式
动态方式调用pthread_cond_init()函数,API定义如下:
int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *cond_attr)
尽管POSIX标准中为条件变量定义了属性,但在LinuxThreads中没有实现,因此cond_attr值通常为NULL,且被忽略。
注销一个条件变量需要调用pthread_cond_destroy(),只有在没有线程在该条件变量上等待的时候才能注销这个条件变量,否则返回EBUSY。因为Linux实现的条件变量没有分配什么资源,所以注销动作只包括检查是否有等待线程。API定义如下:int pthread_cond_destroy(pthread_cond_t *cond)

(2)等待和激发
a.等待
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex) //等待
int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
const struct timespec *abstime) //有时等待
等待条件有两种方式:无条件等待pthread_cond_wait()和计时等待pthread_cond_timedwait(),其中计时等待方式如果在给定时刻前条件没有满足,则返回ETIMEOUT,结束等待,其中abstime以与time()系统调用相同意义的绝对时间形式出现,0表示格林尼治时间1970年1月1日0时0分0秒。

无论哪种等待方式,都必须和一个互斥锁配合,以防止多个线程同时请求pthread_cond_wait()(或 pthread_cond_timedwait(),下同)的竞争条件(Race Condition)。

mutex互斥锁必须是普通锁(PTHREAD_MUTEX_TIMED_NP)或者适应锁(PTHREAD_MUTEX_ADAPTIVE_NP),且在调用pthread_cond_wait()前必须由本线程加锁(pthread_mutex_lock()),而在更新条件等待队列以前,mutex保持锁定状态,并在线程挂起进入等待前解锁。

在条件满足从而离开 pthread_cond_wait()之前,mutex将被重新加锁,以与进入pthread_cond_wait()前的加锁动作对应
b.激发
激发条件有两种形式,pthread_cond_signal()激活一个等待该条件的线程,存在多个等待线程时按入队顺序激活其中一个;而pthread_cond_broadcast()则激活所有等待线程

(3)其他操作
pthread_cond_wait ()和pthread_cond_timedwait()都被实现为取消点,因此,在该处等待的线程将立即重新运行,在重新锁定mutex后离开pthread_cond_wait(),然后执行取消动作。也就是说如果pthread_cond_wait()被取消,mutex是保持锁定状态的,因而需要定义退出回调函数来为其解锁。
pthread_cond_wait实际上可以看作是以下几个动作的合体:
解锁线程锁;
等待条件为true;
加锁线程锁;

使用形式:这两个线程实在一个程序里的,作者给出的模型很好.
// 线程一代码
pthread_mutex_lock(&mutex);
if (条件满足)
  pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);

// 线程二代码
pthread_mutex_lock(&mutex);
while (条件不满足)
  pthread_cond_wait(&cond, &mutex);
pthread_mutex_unlock(&mutex);
/*线程二中为什么使用while呢?因为在pthread_cond_signal和pthread_cond_wait返回之间,有时间差,假设在这个时间差内,条件改变了,显然需要重新检查条件。也就是说在pthread_cond_wait被唤醒的时候可能该条件已经不成立。*/

这里可以看一看这一个利用生产者和消费者解释虚假唤醒的例子  http://www.acmtime.com/?p=869,感谢这位shimmer分享者

 

Linux互斥锁、条件变量和信号量Linux互斥锁、条件变量和信号量
//本程序用于演示使用POSIX条件变量解决生产者-消费者问题。
//编译命令: $ gcc -o pro_csm pro_csm.c -lpthread
//执行命令:$./pro_csm
//要点:条件变量的使用;环形缓冲区的使用;防止条件变量的虚假唤醒
//
//测试:可用while循环中的if语句替换上面的while语句,重新执行观察实验结果。
//
#include <stdio.h>
#include <pthread.h>
#include <unistd.h> 
#define BUFFER_SIZE 16 // 缓冲区数量
#define PRO_NO 30 // PRODUCING NO
#define OVER ( - 1) //生产结束标志
#define PSLEEP 10000  // 生产者随机睡眠时间
#define CSLEEP 10000  // 消费者随机睡眠时间
#define PPNO 2  // 生产者数量
#define CPNO 2  // 消费者数量
pthread_mutex_t lock; /* 互斥体lock 用于对缓冲区的互斥操作 */
pthread_cond_t notempty; /* 缓冲区非空的条件变量 */
pthread_cond_t notfull; /* 缓冲区未满的条件变量 */

struct prodcons
{// 缓冲区相关数据结构
    int buf[BUFFER_SIZE]; /* 实际数据存放的数组*/
    int readpos, writepos; /* 读写指针*/
};

struct prodcons buffer;

/* 初始化缓冲区结构 */
void init(struct prodcons *b)
{
    b->readpos = 0;
    b->writepos = 0;
}

/* 测试:生产者线程将0 到 PRO_NO的整数送入缓冲区,消费者线
程从缓冲区中获取整数,两者都打印信息*/

void *producer(void *data)
{
    int n;
    for (n = 0; n <= PRO_NO; n++)
    {    
        pthread_mutex_lock(&lock);
        /* 等待缓冲区未满,应该用while判断,因为有可能发送虚假唤醒:期待的条件尚不成立的唤醒。*/
        while ((buffer.writepos + 1) % BUFFER_SIZE == buffer.readpos)
        //if ((buffer.writepos + 1) % BUFFER_SIZE == buffer.readpos) //会产生虚假唤醒
        {    
            pthread_cond_wait(&notfull, &lock);
        }
        /* 写数据,并移动指针 */
        if (n < PRO_NO)
        {
            buffer.buf[buffer.writepos] = n;
            printf("%d --->\n", n);
            usleep(PSLEEP);
        }
        else
        {
            buffer.buf[buffer.writepos] = OVER;            
            printf("%d --->\n", OVER);
        }
        buffer.writepos++;
        if (buffer.writepos >= BUFFER_SIZE)
            buffer.writepos = 0;
        /* 设置缓冲区非空的条件变量*/
        pthread_cond_signal(&notempty);
        pthread_mutex_unlock(&lock);
    } 
    return NULL;
}

void *consumer(void *data)
{
    int d;
    while (1)
    {    
        pthread_mutex_lock(&lock);
        /* 等待缓冲区非空,应该用while判断,因为有可能发送虚假唤醒:期待的条件尚不成立的唤醒。*/
        while(buffer.writepos == buffer.readpos)
        //if(buffer.writepos == buffer.readpos) //会产生虚假唤醒
        {
            pthread_cond_wait(&notempty, &lock);
        }
        /* 读数据,移动读指针*/
        d = buffer.buf[buffer.readpos];
        //usleep(CSLEEP);
        buffer.readpos++;
        if (buffer.readpos >= BUFFER_SIZE)
            buffer.readpos = 0;
        /* 设置缓冲区未满的条件变量*/
        pthread_cond_signal(&notfull);
        pthread_mutex_unlock(&lock);
        printf("--->%d \n", d);
        if (d == OVER)
            break;
        
    }
    return NULL;
}

int main(void)
{
    pthread_t th_c, th_p;
    void *retval;
    int i;
    init(&buffer);
    pthread_mutex_init(&lock, NULL);
    pthread_cond_init(&notempty, NULL);
    pthread_cond_init(&notfull, NULL);
    /* 创建生产者和消费者线程*/
    pthread_create(&th_c, NULL, producer, 0);
    pthread_create(&th_p, NULL, consumer, 0);
    /* 等待两个线程结束*/
    pthread_join(th_c, &retval);
    pthread_join(th_p, &retval);
    pthread_mutex_destroy(&lock);
    pthread_cond_destroy(&notempty);
    pthread_cond_destroy(&notfull);
    return 0;
}
View Code

 

我做的笔记在这里

 

生产者消费者问题,有1:1, 1:N, N:1, N:M
--
模型1
producer(){
    pthread_mutex_lock(&lock);
    make a product
    pthread_cond_signal(&not_empty);
    pthread_mutex_unlock(&lock);
}

consumer(){
    pthread_mutex_lock(&lock);
    if(there is no product)
        pthread_cond_wait(& not_empty);
    consume a product
    pthread_mutex_unlock(&lock);
}
注解:这个模型比较简单,有一下问题
1,如果生产者进程先运行,那么此时没有消费者进程,但是producter生产出产品会发出一个信号,此时没有消费者接受,会发生信号丢失.
2,对于N:M模型,还要考虑生产者和消费者对于临界区的互斥访问.如,消费者被唤醒后,资源正被其他消费者访问
3,在多核处理器中,生产者生产一个产品,并且发送一个信号,但是可被多个消费者接受,从而可能出现同时多个进程被唤醒;
但是我们只有一个产品而有多个消费者,所有一些消费者及时被唤醒也没有产品被消费,这就是错误.我们称之为(spurious_wakeup,虚假唤醒). ### 模型2 主要解决的是生产者\消费者互斥的问题,虚假唤醒的问题. 如何解决互斥问题
? 答:在生产的时候,堆生产者进行lock/unlock,这样就可以放着出现critical section被多个producter/consumer同时访问的情况. 如何解决虚假唤醒的问题? 答:虚假唤醒的解决方案其实就是if(是否满足条件)改为while(是否满足条件) 为什么这回解决问题呢?因为一个生产者发出一个信号的时候,多个消费者同时接受信号,这样他们被唤醒之后,
还要进行一个循环,如果有一个进程已经开始消费这个产品(对这个产品加锁),
那么其他没有开始消费这个产品的进程 判断的条件是不满足的(即使他们中有一个 成功对mutex加锁,不满足while条件的话,会一直等待while循环),
而不是上面出现的同时取消费这个产品.
======code 从开始进行. pthread_mutex_t lock;//互斥体lock用于对缓冲区的互斥操作 pthread_cond_t notempty;//buffer非空的条件变量 pthraed_cond_t notfull;//buffer非满的条件变量 struct prodcons{ int buf[BUffer_size];//实际数据存放的数组 int readpos,writepos;//读写指针 };//缓冲区相关的数据结构 struct procons buffer; pthread_t th_c,th_p; void *retval; init(&buffer);//初始化缓冲区表示,对写位置和读位置进行初始化. pthread_mutex_init(&lock,null); pthread_cond_init(&notempty,NULL); pthread_cond_init(&notefull,NULL); //创建生产者和消费者线程 pthread_create(&th_c,NULL,producer,0); pthread_create(&th_p,NULL,consumer,0); //等待两个线程结束 pthread_join(th_c, &retval)//retval记录线程结束时的状态信息放在哪里,这是个指针 pthread_join(th_p,&retval); //销毁互斥变量 pthread_mutex_destory(&lock); //销毁状态变量 pthread_cond_destroy(&notempty); pthread_cond_destroy(&notfull); ---//那么我们生产者线程和消费者线程到底是什么样子呢? void *producer(void *data){ ///data参数没有用到,不用管. pthread_mutex_lock(&lock); //等待缓冲区未满,如果buffer满了就进入while循环-->wait. // while((buffer.writepos+1)%buffer_size==buffer.readpos){ pthread_cond_wait(&notfull,&lock); } 生产数据,放入buffer中,比如,这里实际到producer线程如何结束?我们可以定义让producer生产N=16个数据后就producer线程就结束. 为了防止线程一闪而过,我们需要需要在生产数据的时候加入sleep函数,这样的话,我们就能得到感性的认识了. pthread_cond_signal(&notempty); pthread_mutex_unlock(&lock); return NULL; } void *consumer(void *data){ int d; while(1){ pthread_mutex_lock(&lock); // while(buffer.writepos==buffer.readpos){ pthread_cond_wait(&notempty,&lock); } 消费产品,我们什么结束程序呢? 等到生产者发出了结束标志over,就行. 具体方式是:当生产者线程生产了负数,就表示生产者线程结束了,那consumer就可以结束了. pthread_cond_signal(&notfull); pthread_mutex_unlock(&lock); if(是否结束) break; } return NULL; }

 


 

三、信号量

sem_init:初始化信号量sem_t,初始化的时候可以指定信号量的初始值,以及是否可以在多进程间共享。
sem_wait:一直阻塞等待直到信号量>0。
sem_timedwait:阻塞等待若干时间直到信号量>0。
sem_post:使信号量加1。
sem_destroy:释放信号量。和sem_init对应。

信号量其实就是一个计数器,也是一个整数。每一次调用wait操作将会使semaphore值减一,而如果semaphore值已经为0,则wait操作将会阻塞。每一次调用post操作将会使semaphore值加一。
需要的头文件:semaphore.h
信号量标识符:sem_t

主要函数:
(1)sem_init
功能:         用于创建一个信号量,并初始化信号量的值。
函数原型:     int sem_init (sem_t* sem, int pshared, unsigned int value);
函数传入值:   sem:信号量。
pshared:决定信号量能否在几个进程间共享。由于目前LINUX还没有实现进程间共享信息量,所以这个值只能取0。
value:初始计算器
函数返回值:   0:成功;-1:失败。

(2)其他函数。
//等待信号量
int sem_wait (sem_t* sem);
int sem_trywait (sem_t* sem);
//发送信号量
int sem_post (sem_t* sem);
//得到信号量值
int sem_getvalue (sem_t* sem);
//删除信号量
int sem_destroy (sem_t* sem);
功能:sem_wait和sem_trywait相当于P操作,它们都能将信号量的值减一,两者的区别在于若信号量的值小于零时,sem_wait将会阻塞进程,而sem_trywait则会立即返回。
sem_post相当于V操作,它将信号量的值加一,同时发出唤醒的信号给等待的进程(或线程)。
sem_getvalue 得到信号量的值。
sem_destroy 摧毁信号量。

使用形式:
sem_t sem;
sem_init(&sem, 0, 1); /*信号量初始化*/  
...

sem_wait(&sem);   /*等待信号量*/
... /*临界资源*/
sem_post(&sem);   /*释放信号量*/

信号量与线程锁、条件变量相比还有以下几点不同:
1)锁必须是同一个线程获取以及释放,否则会死锁。而条件变量和信号量则不必。
2)信号的递增与减少会被系统自动记住,系统内部有一个计数器实现信号量,不必担心会丢失,而唤醒一个条件变量时,如果没有相应的线程在等待该条件变量,这次唤醒将被丢失。