多线程编程之Linux环境下的多线程(二)

时间:2024-09-17 09:35:29

  上一篇文章中主要讲解了Linux环境下多线程的基本概念和特性,本文将说明Linux环境下多线程的同步方式。

  在《UNIX环境高级编程》第二版的“第11章 线程”中,提到了类UNIX系统中的三种基本的同步机制:互斥、读写锁、条件变量。下面分别针对这三种机制进行说明:

一、线程互斥

  互斥意味着具有“排它性”,即两个线程不能同时进入被互斥保护的代码。Linux下可以通过pthread_mutex_t 定义互斥体机制完成多线程的互斥操作,该机制的作用是对某个需要互斥的部分,在进入时先得到互斥体,如果没有得到互斥体,表明互斥部分被其它线程拥有,此时欲获取互斥体的线程阻塞,直到拥有该互斥体的线程完成互斥部分的操作为止。 互斥量的操作函数包括:

#include <pthread.h>
int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t, *mutexattr);
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
int pthread_mutex_destory(pthread_mutex_t *mutex);

  与其他函数一样,这些函数成功时返回0,失败时将返回错误代码,但这些函数并不设置errno,所以必须对函数的返回代码进行检查。下面以一个例子来说明用法:

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <semaphore.h> #define SIZE 1024
char buffer[SIZE]; void *thread_function(void *arg);
pthread_mutex_t mutex; int main()
{
int res;
pthread_t a_thread;
void *thread_result; res = pthread_mutex_init(&mutex, NULL);
if (res != )
{
perror("Mutex init failed!");
exit(EXIT_FAILURE);
} res = pthread_create(&a_thread, NULL, thread_function, NULL);
if (res != )
{
perror("Thread create failed!");
exit(EXIT_FAILURE);
} printf("Input some text. Enter 'end' to finish/n"); while ()
{
pthread_mutex_lock(&mutex);
scanf("%s", buffer);
pthread_mutex_unlock(&mutex);
if (strncmp("end", buffer, ) == )
break;
sleep();
} res = pthread_join(a_thread, &thread_result);
if (res != )
{
perror("Thread join failed!");
exit(EXIT_FAILURE);
} printf("Thread joined/n"); pthread_mutex_destroy(&mutex); exit(EXIT_SUCCESS);
} void *thread_function(void *arg)
{
sleep(); while ()
{
pthread_mutex_lock(&mutex);
printf("You input %d characters/n", strlen(buffer));
pthread_mutex_unlock(&mutex);
if (strncmp("end", buffer, ) == )
break;
sleep();
}
}

  编译语句为:

gcc -D_REENTRANT thread4.c -o thread4 –lpthread

  运行结果为:

$ ./thread2
Input some text. Enter 'end' to finish You input characters You input characters You input characters
end
You input characters
Thread joined

二、读写锁

  读写锁与互斥量类似,不过读写锁允许更高的并行性。互斥量要么是锁住状态要么是不加锁状态,一次只能有一个线程对其加锁。而读写锁有三种状态:读模式下加锁状态,写模式下加锁状态,不加锁状态。一次只有一个线程可以占有写模式的读写锁,但是可以有多个线程同时占有读模式的读写锁。

  虽然读写锁的实现有很多种不同的方式,不过当读写锁处于读模式锁住状态时,如果有另外的线程试图加以写模式锁,读写锁通常都会阻塞随后的读模式加锁请求,这样可以避免读模式锁长期占用,而等待的写模式锁请求一直得不到满足。

  读写锁也叫共享-独占锁,读写锁非常适合对数据结构读的次数远大于写次数的情况。读写锁的接口函数包括以下几个:

2.1 创建与销毁

#include <pthread.h>
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
成功则返回0, 出错则返回错误编号.

2.2 读加锁和写加锁

#include <pthread.h>
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
成功则返回0, 出错则返回错误编号.

2.3 非阻塞式获得读写锁

#include <pthread.h>
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);
成功则返回0, 出错则返回错误编号.

三、条件变量

  条件变量是线程可用的另一种同步机制。条件变量给多个线程提供了一个会合的场所。条件变量与互斥量一起使用,允许线程以无竞争的方式等待特定的条件发生,条件变量本身是由互斥变量保护的。线程在改变条件状态前必须首先锁住互斥量,其他线程在获得互斥量之前不会觉察这种改变,因为必须锁住互斥量以后才能计算条件。

  条件变量的操作函数包括以下:

3.1 创建与销毁

#include <pthread.h>
int pthread_cond_init (pthread_cond_t *cond, const pthread_condattr_t *attr);
int pthread_cond_destroy (pthread_cond_t *cond);

3.2 条件等待

#include <pthread.h>
int pthread_cond_wait (pthread_cond_t *cond, pthread_mutex_t *mutex);
int pthread_cond_timedwait (pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec* timeout);

  使用pthread_cond_wait可以等待条件变为真,传递给pthread_cond_wait的互斥量对条件进行保护,调用者把锁住的互斥量传递给函数,函数把调用线程放到等待条件的线程列表上,然后对互斥量解锁,这两个操作是原子操作。这样就关闭了条件检查和线程进入休眠状态等待条件改变这两个操作之间的时间通道,这样线程就不会错过条件的任何变化。pthread_cond_wait 返回时,互斥量再次被锁住。

3.3 通知条件

#include <pthread.h>
int pthread_cond_broadcast (pthread_cond_t *cond);
int pthread_cond_signal (pthread_cond_t *cond);

  这两个函数可以用于通知线程条件已经满足,pthread_cond_signal 将唤醒等待该条件的某个线程,而pthread_cond_broadcast 将唤醒等待该条件的所有线程。

四、信号量

  顺便提一下,在头文件<semaphore.h>中定义的信号量则完成了互斥体和条件变量的封装,按照多线程程序设计中访问控制机制,控制对资源的同步访问,提供程序设计人员更方便的调用接口。 其函数接口如下:

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int val);
int sem_wait(sem_t *sem);
int sem_post(sem_t *sem);

  信号量本质上是一个非负的整数计数器,它被用来控制对公共资源的访问。每一次调用wait操作将会使semaphore值减1,而如果semaphore值已经为0,则wait操作将会阻塞。每一次调用post操作将会使semaphore值加1。

  信号量与线程锁、条件变量相比还有以下几点不同:

(1)锁必须是同一个线程获取以及释放,否则会死锁。而条件变量和信号量则不必。

(2)信号的递增与减少会被系统自动记住,系统内部有一个计数器实现信号量,不必担心会丢失;而唤醒一个条件变量时,如果没有相应的线程在等待该条件变量,这次唤醒将被丢失。

五、一个模拟信号量的实例

  从前面的内容中可以看出,pthread只提供了互斥和条件变量用于线程互斥,而读写锁用于多读少写的应用场合。下面一个“生产者/消费者问题”为例来阐述Linux线程的控制和通信。一组生产者线程与一组消费者线程通过缓冲区发生联系。生产者线程将生产的产品送入缓冲区,消费者线程则从中取出产品。缓冲区有N 个,是一个环形的缓冲池。

#include <stdio.h>
#include <pthread.h>
#define BUFFER_SIZE 16 // 缓冲区数量
struct prodcons
{
// 缓冲区相关数据结构
int buffer[BUFFER_SIZE]; /* 实际数据存放的数组*/
pthread_mutex_t lock; /* 互斥体lock 用于对缓冲区的互斥操作 */
int readpos, writepos; /* 读写指针*/
pthread_cond_t notempty; /* 缓冲区非空的条件变量 */
pthread_cond_t notfull; /* 缓冲区未满的条件变量 */
};
/* 初始化缓冲区结构 */
void init(struct prodcons *b)
{
pthread_mutex_init(&b->lock, NULL);
pthread_cond_init(&b->notempty, NULL);
pthread_cond_init(&b->notfull, NULL);
b->readpos = ;
b->writepos = ;
}
/* 将产品放入缓冲区,这里是存入一个整数*/
void put(struct prodcons *b, int data)
{
pthread_mutex_lock(&b->lock);
/* 等待缓冲区未满*/
if ((b->writepos + ) % BUFFER_SIZE == b->readpos)
{
pthread_cond_wait(&b->notfull, &b->lock);
}
/* 写数据,并移动指针 */
b->buffer[b->writepos] = data;
b->writepos++;
if (b->writepos >= BUFFER_SIZE)
b->writepos = ;
/* 设置缓冲区非空的条件变量*/
pthread_cond_signal(&b->notempty);
pthread_mutex_unlock(&b->lock);
}
/* 从缓冲区中取出整数*/
int get(struct prodcons *b)
{
int data;
pthread_mutex_lock(&b->lock);
/* 等待缓冲区非空*/
if (b->writepos == b->readpos)
{
pthread_cond_wait(&b->notempty, &b->lock);
}
/* 读数据,移动读指针*/
data = b->buffer[b->readpos];
b->readpos++;
if (b->readpos >= BUFFER_SIZE)
b->readpos = ;
/* 设置缓冲区未满的条件变量*/
pthread_cond_signal(&b->notfull);
pthread_mutex_unlock(&b->lock);
return data;
} /* 测试:生产者线程将1 到10000 的整数送入缓冲区,消费者线
程从缓冲区中获取整数,两者都打印信息*/
#define OVER ( - 1)
struct prodcons buffer;
void *producer(void *data)
{
int n;
for (n = ; n < ; n++)
{
printf("%d --->\n", n);
put(&buffer, n);
} put(&buffer, OVER);
return NULL;
} void *consumer(void *data)
{
int d;
while ()
{
d = get(&buffer);
if (d == OVER)
break;
printf("--->%d \n", d);
}
return NULL;
} int main(void)
{
pthread_t th_a, th_b;
void *retval;
init(&buffer);
/* 创建生产者和消费者线程*/
pthread_create(&th_a, NULL, producer, );
pthread_create(&th_b, NULL, consumer, );
/* 等待两个线程结束*/
pthread_join(th_a, &retval);
pthread_join(th_b, &retval);
return ;
}