【原创】《Linux高级程序设计》杨宗德著 - Linux多线程编程 - 线程同步机制
互斥锁基本原理
互斥以排他方式防止共享数据被并发修改。互斥锁是一个二元变量,其状态为开锁(允许0)和上锁(禁止1),将某个共享资源与某个特定互斥锁绑定后,对该共享资源的访问如下操作:
(1)在访问该资源前,首先申请该互斥锁,如果该互斥处于开锁状态,则申请到该锁对象,并立即占有该锁(使该锁处于锁定状态),以防止其它线程访问该资源;如果该互斥锁处于锁定状态,默认阻塞等待;
(2)只有锁定该互斥锁的进程才能释放该互斥锁。其它线程的释放操作无效。
互斥锁基本操作函数
1. 初始化互斥锁
动态初始化
静态初始化
2. 申请互斥锁
3. 释放互斥锁
4. 销毁互斥锁
extern int pthread_mutex_destroy(pthtread_mutex_t *_mutex)
5. 互斥锁应用实例
#include <stdio.h>运行结果:
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
#include <string.h>
void *thread_function(void *arg);
pthread_mutex_t work_mutex;
#define WORK_SIZE 1024
char work_area[WORK_SIZE];
int time_to_exit = 0;
int main(int argc,char *argv[])
{
int res;
pthread_t a_thread;
void *thread_result;
res = pthread_mutex_init(&work_mutex, NULL); //init mutex
if (res != 0)
{
perror("Mutex initialization failed");
exit(EXIT_FAILURE);
}
res = pthread_create(&a_thread, NULL, thread_function, NULL);//create new thread
if (res != 0)
{
perror("Thread creation failed");
exit(EXIT_FAILURE);
}
pthread_mutex_lock(&work_mutex);//lock the mutex
printf("Input some text. Enter 'end' to finish\n");
while(!time_to_exit)
{
fgets(work_area, WORK_SIZE, stdin);//get a string from stdin
pthread_mutex_unlock(&work_mutex);//unlock the mutex
while(1)
{
pthread_mutex_lock(&work_mutex);//lock the mutex
if (work_area[0] != '\0')
{
pthread_mutex_unlock(&work_mutex);//unlock the mutex
sleep(1);
}
else
{
break;
}
}
}
pthread_mutex_unlock(&work_mutex);
printf("\nWaiting for thread to finish...\n");
res = pthread_join(a_thread, &thread_result);
if (res != 0)
{
perror("Thread join failed");
exit(EXIT_FAILURE);
}
printf("Thread joined\n");
pthread_mutex_destroy(&work_mutex);
exit(EXIT_SUCCESS);
}
void *thread_function(void *arg)
{
sleep(1);
pthread_mutex_lock(&work_mutex);
while(strncmp("end", work_area, 3) != 0)
{
printf("You input %d characters\n", strlen(work_area) -1);
printf("the characters is %s",work_area);
work_area[0] = '\0';
pthread_mutex_unlock(&work_mutex);
sleep(1);
pthread_mutex_lock(&work_mutex);
while (work_area[0] == '\0' )
{
pthread_mutex_unlock(&work_mutex);
sleep(1);
pthread_mutex_lock(&work_mutex);
}
}
time_to_exit = 1;
work_area[0] = '\0';
pthread_mutex_unlock(&work_mutex);
pthread_exit(0);
}
$ ./mutex_example
Input some text. Enter 'end' to finish
hello mutex
You input 11 characters
the characters is hello mutex
this is a sample
You input 16 characters
the characters is this is a sample
end
Waiting for thread to finish...
Thread joined
条件变量通信机制
互斥锁不能解决的问题
如果只使用互斥锁,可能导致do_something()永远不会执行,这是程序员所不期望的,如下分析所示:
线程A抢占到互斥锁,执行操作,完成后i==4,j=6;然后释放互斥锁;
线程A和线程B都有可能抢占到锁,如果B抢占到,条件不满足,退出;如果线程A抢占到,则执行操作,完成后i==5,j=5;然后释放互斥锁;
同理,线程A和线程B都有可能抢占到锁,如果B抢占到,则条件满足,do_something()得以执行,得到预期结果。但如果此时A没有抢占到,执行操作后i=6,j=4,此后i等于j的情况永远不会发生。
条件变量解决的问题
条件变量基本操作
include <pthread.h>说明
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t
*cond_attr);
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t
*mutex, const struct timespec *abstime);
int pthread_cond_destroy(pthread_cond_t *cond);
条件变量是一种同步机制,允许线程挂起,直到共享数据上的某些条件得到满足。条件变量上的基本操作有:触发条件(当条件变为 true 时);等待条件,挂起线程直到其他线程触发条件。
条件变量要和互斥量相联结,以避免出现条件竞争--一个线程预备等待一个条件变量,当它在真正进入等待之前,另一个线程恰好触发了该条件。
pthread_cond_init 使用 cond_attr 指定的属性初始化条件变量 cond,当 cond_attr 为 NULL 时,使用缺省的属性。LinuxThreads 实现条件变量不支持属性,因此 cond_attr 参数实际被忽略。
pthread_cond_t 类型的变量也可以用 PTHREAD_COND_INITIALIZER 常量进行静态初始化。
pthread_cond_signal 使在条件变量上等待的线程中的一个线程重新开始。如果没有等待的线程,则什么也不做。如果有多个线程在等待该条件,只有一个能重启动,但不能指定哪一个。
pthread_cond_broadcast 重启动等待该条件变量的所有线程。如果没有等待的线程,则什么也不做。
pthread_cond_wait 自动解锁互斥量(如同执行了 pthread_unlock_mutex),并等待条件变量触发。这时线程挂起,不占用 CPU 时间,直到条件变量被触发。在调用 pthread_cond_wait 之前,应用程序必须加锁互斥量。pthread_cond_wait 函数返回前,自动重新对互斥量加锁(如同执行了 pthread_lock_mutex)。
互斥量的解锁和在条件变量上挂起都是自动进行的。因此,在条件变量被触发前,如果所有的线程都要对互斥量加锁,这种机制可保证在线程加锁互斥量和进入等待条件变量期间,条件变量不被触发。
pthread_cond_timedwait 和 pthread_cond_wait 一样,自动解锁互斥量及等待条件变量,但它还限定了等待时间。如果在 abstime 指定的时间内 cond 未触发,互斥量 mutex 被重新加锁,且 pthread_cond_timedwait 返回错误 ETIMEDOUT。abstime 参数指定一个绝对时间,时间原点与 time 和 gettimeofday 相同:abstime = 0 表示 1970 年 1 月 1 日 00:00:00 GMT。
pthread_cond_destroy 销毁一个条件变量,释放它拥有的资源。进入 pthread_cond_destroy 之前,必须没有在该条件变量上等待的线程。在 LinuxThreads 的实现中,条件变量不联结资源,除检查有没有等待的线程外,pthread_cond_destroy 实际上什么也不做。
取消
pthread_cond_wait 和 pthread_cond_timedwait 是取消点。如果一个线程在这些函数上挂起时被取消,线程立即继续执行,然后再次对 pthread_cond_wait 和 pthread_cond_timedwait 在 mutex 参数加锁,最后执行取消。因此,当调用清除处理程序时,可确保,mutex 是加锁的。
异步信号安全(Async-signal Safety)
条件变量函数不是异步信号安全的,不应当在信号处理程序中进行调用。特别要注意,如果在信号处理程序中调用 pthread_cond_signal 或 pthread_cond_boardcast 函数,可能导致调用线程死锁。
pthread_cond_signal 此函数被调用是隐含了释放当前线程占用的信号量的操作。
返回值
在执行成功时,所有条件变量函数都返回 0,错误时返回非零的错误代码。
错误代码
pthread_cond_init, pthread_cond_signal, pthread_cond_broadcast, 和 pthread_cond_wait 从不返回错误代码。
pthread_cond_timedwait 函数出错时返回下列错误代码:
ETIMEDOUT abstime 指定的时间超时时,条件变量未触发
EINTR pthread_cond_timedwait 被触发中断
pthread_cond_destroy 函数出错时返回下列错误代码:
EBUSY 某些线程正在等待该条件变量
需要强调的是
条件变量不能单独使用,必须配合互斥锁一起实现对资源的互斥访问。
条件变量应用实例
#include <stdio.h>运行结果:
#include <stdlib.h>
#include <time.h>
#include "pthread.h"
#define BUFFER_SIZE 2
/* Circular buffer of integers. */
struct prodcons
{
int buffer[BUFFER_SIZE]; /* the actual data */
pthread_mutex_t lock; /* mutex ensuring exclusive access to buffer */
int readpos, writepos; /* positions for reading and writing */
pthread_cond_t notempty; /* signaled when buffer is not empty */
pthread_cond_t notfull; /* signaled when buffer is not full */
};
/* Initialize a buffer */
void init(struct prodcons *prod)
{
pthread_mutex_init(&prod->lock,NULL);
pthread_cond_init(&prod->notempty,NULL);
pthread_cond_init(&prod->notfull,NULL);
prod->readpos = 0;
prod->writepos = 0;
}
/* Store an integer in the buffer */
void put(struct prodcons * prod, int data)
{
pthread_mutex_lock(&prod->lock);
/* Wait until buffer is not full */
while ((prod->writepos + 1) % BUFFER_SIZE == prod->readpos)
{
printf("producer wait for not full\n");
pthread_cond_wait(&prod->notfull, &prod->lock);
}
/* Write the data and advance write pointer */
prod->buffer[prod->writepos] = data;
prod->writepos++;
if (prod->writepos >= BUFFER_SIZE)
prod->writepos = 0;
/*Signal that the buffer is now not empty */
pthread_cond_signal(&prod->notempty);
pthread_mutex_unlock(&prod->lock);
}
/* Read and remove an integer from the buffer */
int get(struct prodcons *prod)
{
int data;
pthread_mutex_lock(&prod->lock);
/* Wait until buffer is not empty */
while (prod->writepos == prod->readpos)
{
printf("consumer wait for not empty\n");
pthread_cond_wait(&prod->notempty, &prod->lock);
}
/* Read the data and advance read pointer */
data = prod->buffer[prod->readpos];
prod->readpos++;
if (prod->readpos >= BUFFER_SIZE)
prod->readpos = 0;
/* Signal that the buffer is now not full */
pthread_cond_signal(&prod->notfull);
pthread_mutex_unlock(&prod->lock);
return data;
}
#define OVER (-1)
struct prodcons buffer;
/*--------------------------------------------------------*/
void * producer(void * data)
{
int n;
for (n = 0; n < 5; n++)
{
printf("producer sleep 1 second......\n");
sleep(1);
printf("put the %d product\n", n);
put(&buffer, n);
}
for(n=5; n<10; n++)
{
printf("producer sleep 3 second......\n");
sleep(3);
printf("put the %d product\n",n);
put(&buffer,n);
}
put(&buffer, OVER);
printf("producer stopped!\n");
return NULL;
}
/*--------------------------------------------------------*/
void * consumer(void * data)
{
int d=0;
while (1)
{
printf("consumer sleep 2 second......\n");
sleep(2);
d=get(&buffer);
printf("get the %d product\n", d);
//d = get(&buffer);
if (d == OVER ) break;
}
printf("consumer stopped!\n");
return NULL;
}
/*--------------------------------------------------------*/
int main(int argc,char *argv[])
{
pthread_t th_a, th_b;
void * retval;
init(&buffer);
pthread_create(&th_a, NULL, producer, 0);
pthread_create(&th_b, NULL, consumer, 0);
/* Wait until producer and consumer finish. */
pthread_join(th_a, &retval);
pthread_join(th_b, &retval);
return 0;
}
$ ./pthread_cond_example
consumer sleep 2 second......
producer sleep 1 second......
put the 0 product
producer sleep 1 second......
get the 0 product
consumer sleep 2 second......
put the 1 product
producer sleep 1 second......
put the 2 product
producer wait for not full
get the 1 product
consumer sleep 2 second......
producer sleep 1 second......
put the 3 product
producer wait for not full
get the 2 product
consumer sleep 2 second......
producer sleep 1 second......
put the 4 product
producer wait for not full
get the 3 product
consumer sleep 2 second......
producer sleep 3 second......
get the 4 product
consumer sleep 2 second......
put the 5 product
producer sleep 3 second......
get the 5 product
consumer sleep 2 second......
consumer wait for not empty
put the 6 product
producer sleep 3 second......
get the 6 product
consumer sleep 2 second......
consumer wait for not empty
put the 7 product
producer sleep 3 second......
get the 7 product
consumer sleep 2 second......
consumer wait for not empty
put the 8 product
producer sleep 3 second......
get the 8 product
consumer sleep 2 second......
consumer wait for not empty
put the 9 product
producer wait for not full
get the 9 product
consumer sleep 2 second......
producer stopped!
get the -1 product
consumer stopped!
读写锁通信机制
概述
读写锁与互斥量类似,不过读写锁允许更高的并行性。互斥量要么是锁住状态,要么是不加锁状态,而且一次只有一个线程对其加锁。读写锁可以有三种状态:读模式下加锁状态,写模式下加锁状态,不加锁状态。一次只有一个线程可以占有写模式的读写锁,但是多个线程可用同时占有读模式的读写锁。读写锁也叫做共享-独占锁,当读写锁以读模式锁住时,它是以共享模式锁住的,当它以写模式锁住时,它是以独占模式锁住的。
在对数据的读写应用中,更多的是读操作,而写操作较少,例如对数据库数据的读写应用。为了满足当前能够允许多个读出,但只允许一个写入的需求,线程提供了读写锁来实现。其基本原则如下:
(1)如果有其它线程读数据,则允许其它线程执行读操作,但不允许写操作;(2)如果有其它线程写数据,则其它线程的读、写操作均允许。
因此,其将该锁分为了读锁和写锁。
(1)如果某线程申请了读锁,其它线程可以再申请读锁,但不能申请写锁;
(2)如果某线程申请了写锁,则其它线程不能申请读锁,也不能申请写锁。
定义读写锁对象的代码如下:
pthread_rwlock_t rwlock;//全局变量读写锁操作函数
读写锁API
读写锁的数据类型为pthread_rwlock_t。如果这个类型的某个变量是静态分配的,那么可通过给它赋常值PTHREAD_RWLOCK_INITIALIZER来初始化它。
阻塞获取读写锁:
int pthread_rwlock_rdlock(pthread_rwlock_t *rwptr); //获取一个读出锁都返回:成功时为0,出错时为正的Exxx值
int pthread_rwlock_wrlock(pthread_rwlock_t *rwptr); //获取一个写入锁
非阻塞获取读写锁:
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwptr);都返回:成功时为0,出错时为正的Exxx值
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwptr);
解锁:
int pthread_rwlock_unlock(pthread_rwlock_t *rwptr); //释放一个写入锁或者读出锁初始化和销毁读写锁:
int pthread_rwlock_init(pthread_rwlock_t *rwptr, const pthread_rwlockattr_t *attr)都返回:成功时为0,出错时为正的Exxx值
int pthread_rwlock_destroy(pthread_rwlock_t *rwptr);
int pthread_rwlockattr_init(pthread_rwlockattr_t *attr);都返回:成功时为0,出错时为正的Exxx值
int pthread_rwlockattr_destroy(pthread_rwlockattr_t *attr);
int pthread_rwlockattr_getpshared(const pthread_rwlockattr_t *attr, int *valptr);都返回:成功时为0,出错时为正的Exxx值
int pthread_rwlockattr_setpshared(pthread_rwlockattr_t *attr, int valptr);
读写锁应用实例
#include <errno.h>运行结果:
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <bits/pthreadtypes.h>
static pthread_rwlock_t rwlock;
#define WORK_SIZE 1024
char work_area[WORK_SIZE];
int time_to_exit;
void *thread_function_read_o(void *arg);
void *thread_function_read_t(void *arg);
void *thread_function_write_o(void *arg);
void *thread_function_write_t(void *arg);
int main(int argc,char *argv[])
{
int res;
pthread_t a_thread,b_thread,c_thread,d_thread;
void *thread_result;
res=pthread_rwlock_init(&rwlock,NULL);
if (res != 0)
{
perror("rwlock initialization failed");
exit(EXIT_FAILURE);
}
res = pthread_create(&a_thread, NULL, thread_function_read_o, NULL);//create new thread
if (res != 0)
{
perror("Thread creation failed");
exit(EXIT_FAILURE);
}
res = pthread_create(&b_thread, NULL, thread_function_read_t, NULL);//create new thread
if (res != 0)
{
perror("Thread creation failed");
exit(EXIT_FAILURE);
}
res = pthread_create(&c_thread, NULL, thread_function_write_o, NULL);//create new thread
if (res != 0)
{
perror("Thread creation failed");
exit(EXIT_FAILURE);
}
res = pthread_create(&d_thread, NULL, thread_function_write_t, NULL);//create new thread
if (res != 0)
{
perror("Thread creation failed");
exit(EXIT_FAILURE);
}
res = pthread_join(a_thread, &thread_result);
if (res != 0)
{
perror("Thread join failed");
exit(EXIT_FAILURE);
}
res = pthread_join(b_thread, &thread_result);
if (res != 0)
{
perror("Thread join failed");
exit(EXIT_FAILURE);
}
res = pthread_join(c_thread, &thread_result);
if (res != 0)
{
perror("Thread join failed");
exit(EXIT_FAILURE);
}
res = pthread_join(d_thread, &thread_result);
if (res != 0)
{
perror("Thread join failed");
exit(EXIT_FAILURE);
}
pthread_rwlock_destroy(&rwlock);
exit(EXIT_SUCCESS);
}
void *thread_function_read_o(void *arg)
{
printf("thread read one try to get lock\n");
pthread_rwlock_rdlock(&rwlock);
while(strncmp("end", work_area, 3) != 0)
{
printf("this is thread read one.");
printf("the characters is %s",work_area);
pthread_rwlock_unlock(&rwlock);
sleep(2);
pthread_rwlock_rdlock(&rwlock);
while (work_area[0] == '\0' )
{
pthread_rwlock_unlock(&rwlock);
sleep(2);
pthread_rwlock_rdlock(&rwlock);
}
}
pthread_rwlock_unlock(&rwlock);
time_to_exit=1;
pthread_exit(0);
}
void *thread_function_read_t(void *arg)
{
printf("thread read one try to get lock\n");
pthread_rwlock_rdlock(&rwlock);
while(strncmp("end", work_area, 3) != 0)
{
printf("this is thread read two.");
printf("the characters is %s",work_area);
pthread_rwlock_unlock(&rwlock);
sleep(5);
pthread_rwlock_rdlock(&rwlock);
while (work_area[0] == '\0' )
{
pthread_rwlock_unlock(&rwlock);
sleep(5);
pthread_rwlock_rdlock(&rwlock);
}
}
pthread_rwlock_unlock(&rwlock);
time_to_exit=1;
pthread_exit(0);
}
void *thread_function_write_o(void *arg)
{
printf("this is write thread one try to get lock\n");
while(!time_to_exit)
{
pthread_rwlock_wrlock(&rwlock);
printf("this is write thread one.\nInput some text. Enter 'end' to finish\n");
fgets(work_area, WORK_SIZE, stdin);
pthread_rwlock_unlock(&rwlock);
sleep(15);
}
pthread_rwlock_unlock(&rwlock);
pthread_exit(0);
}
void *thread_function_write_t(void *arg)
{
sleep(10);
while(!time_to_exit)
{
pthread_rwlock_wrlock(&rwlock);
printf("this is write thread two.\nInput some text. Enter 'end' to finish\n");
fgets(work_area, WORK_SIZE, stdin);
pthread_rwlock_unlock(&rwlock);
sleep(20);
}
pthread_rwlock_unlock(&rwlock);
pthread_exit(0);
}
$ ./pthread_rwlock_example
this is write thread one try to get lock
thread read one try to get lock
this is write thread one.
Input some text. Enter 'end' to finish
thread read one try to get lock
wr one test
this is thread read two.the characters is wr one test
this is thread read one.the characters is wr one test
this is thread read one.the characters is wr one test
this is thread read one.the characters is wr one test
this is thread read two.the characters is wr one test
this is write thread two.
Input some text. Enter 'end' to finish
wr two test
this is thread read one.the characters is wr two test
this is thread read two.the characters is wr two test
this is thread read one.the characters is wr two test
this is write thread one.
Input some text. Enter 'end' to finish
end