一、引入
当我们有多个线程一起工作时,就要注意一些问题,就好像小组合作的时候要注意组员之间的关系一样,多个线程同时访问共享数据时可能会冲突,这跟前面讲信号时所说的可重入性是同样的问题。比如两个线程都要把某个全局变量增加1,这个操作在某平台需要三条指令完成:
1、从内存读变量值到寄存器
2、寄存器的值加1
3、将寄存器的值写回内存
假设两个线程在多处理器平台上同时执行这三条指令,则可能导致下图所示的结果,最后变量只加了一次而非两次。
以下程序模拟上述过程,也就是模拟内核执行+1的过程:
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#define NLOOP 5000
int counter; /* 保存通过线程累加的结果 */
void *doit(void *);
int main(int argc, char **argv)
{
/* 创建线程 */
pthread_t tidA, tidB;
pthread_create(&tidA, NULL, &doit, NULL);
pthread_create(&tidB, NULL, &doit, NULL);
/* 等待回收两个进程 */
pthread_join(tidA, NULL);
pthread_join(tidB, NULL);
return 0;
}
void *doit(void *vptr)
{
int i, val;
for (i = 0; i < NLOOP; i++)
{
/* 模仿底层的操作 */
val = counter;//从内存读变量值到寄存器
//如果在这个过程之间被打断,而且counter被别的线程修改,那结果就会出现错误
printf("%x: %d\n", (unsigned int)pthread_self(), val + 1);//+1
counter = val + 1;//将寄存器的值写回内存
}
return NULL;
}
我们创建两个线程,各自把对counter做5000次加一运算,正常情况下最后counter应该等于10000,但事实上每次运行该程序的结果都不一样,有时候数到5000多,有时候数到6000多。
二、互斥量与临界区(Critical Section)
设置互斥量形成临界区是保证在某一时刻只有一个线程能访问数据的简便办法。在任意时刻只允许一个线程对共享资源进行访问。如果有多个线程试图同时访问临界区,那么在有一个线程进入后其他所有试图访问此临界区的线程将被挂起,并一直持续到进入临界区的线程离开。临界区在被释放后,其他线程可以继续抢占,并以此达到用原子方式操作共享资源的目的。
临界区的选定因尽可能小,如果选定太大会影响程序的并行处理性能。
所以子线程所执行的代码应该改成以下模型(加锁和解锁代码没有添加,只是模型):
互斥量操作函数:
#include <pthread.h>
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; //定义互斥量(定义锁的类型为互斥量)
int pthread_mutex_destroy(pthread_mutex_t *mutex); //销毁锁
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr); //初始化锁
int pthread_mutex_lock(pthread_mutex_t *mutex); //上锁(若已经被别的线程上锁了则该线程会被阻塞)
int pthread_mutex_trylock(pthread_mutex_t *mutex); //请求上锁,如果已经被别的线程上锁了则该线程不会被阻塞,继续执行
int pthread_mutex_unlock(pthread_mutex_t *mutex); //解锁
使用互斥量对上诉程序进行修改,使其达到各自把对counter做5000次加一运算的目的:
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#define NLOOP 5000
int counter; /* incremented by threads */
//定义锁counter_mutex并初始化 (锁可以定义为全局的也可以为局部的)
pthread_mutex_t counter_mutex = PTHREAD_MUTEX_INITIALIZER;
void *doit(void *);
int main(int argc, char **argv)
{
pthread_t tidA, tidB;
pthread_create(&tidA, NULL, doit, NULL);
pthread_create(&tidB, NULL, doit, NULL);
pthread_join(tidA, NULL);
pthread_join(tidB, NULL);
return 0;
}
void *doit(void *vptr)
{
int i, val;
for (i = 0; i < NLOOP; i++) {
pthread_mutex_lock(&counter_mutex);//上锁
val = counter;
printf("%x: %d\n", (unsigned int)pthread_self(), val + 1);
counter = val + 1;
pthread_mutex_unlock(&counter_mutex);//解锁
}
return NULL;
}
三、读写锁
读共享,写独占
读写锁原语:
#include <pthread.h>
pthread_rwlock_t rwlock; // 定义读写锁
pthread_rwlock_init // 初始化读写锁
pthread_rwlock_destroy // 销毁读写锁
pthread_rwlock_rdlock // 申请读锁(不成功(已被上写锁)时阻塞)
pthread_rwlock_wrlock // 申请写锁(不成功(已被上写锁或读锁)时阻塞)
pthread_rwlock_tryrdlock // 尝试申请读锁(不成功时不阻塞,继续往下执行)
pthread_rwlock_trywrlock // 尝试申请写锁(不成功时不阻塞,继续往下执行)
pthread_rwlock_unlock // 解锁
举例:
#include <stdio.h>
#include <pthread.h>
int counter;//累加量
pthread_rwlock_t rwlock;//定义读写锁,将在main中初始化
/* 3个线程不定时写同一全局资源,5个线程不定时读同一全局资源 */
void *th_write(void *arg)
{
int t;
while (1)
{
pthread_rwlock_wrlock(&rwlock);//上写锁
t = counter;
usleep(100);//若没有写锁在此会被打断,但是修改的值是之前的值
printf("write %x : counter=%d ++counter=%d\n",
(int)pthread_self(), //打印写线程
t, //打印原先count的值
++counter); //打印之后的count的值
pthread_rwlock_unlock(&rwlock);//释放写锁
usleep(100);
}
}
void *th_read(void *arg)
{
while (1) {
pthread_rwlock_rdlock(&rwlock);//上读锁
printf("read %x : %d\n", (int)pthread_self(), counter);
pthread_rwlock_unlock(&rwlock);//释放读锁
usleep(100);
}
}
int main(void)
{
int i;
pthread_t tid[8];
pthread_rwlock_init(&rwlock, NULL);//初始化读写锁
for (i = 0; i < 3; i++)//三个写线程
pthread_create(&tid[i], NULL, th_write, NULL);
for (i = 0; i < 5; i++)//五个度线程
pthread_create(&tid[i+3], NULL, th_read, NULL);
pthread_rwlock_destroy(&rwlock);//释放读写锁
for (i = 0; i < 8; i++)//回收子线程
pthread_join(tid[i], NULL);
return 0;
}
四、条件变量
条件变量给多个线程提供了一个汇合的场所(即可以使用pthread_cond_broadcast函数唤醒所有等待该条件变量的线程),条件变量控制原语:
#include <pthread.h>
pthread_cond_t cond= PTHREAD_COND_INITIALIZER;// 定义并初始化条件变量
pthread_cond_t // 定义条件变量
pthread_cond_init // 也可以用pthread_cond_t定义,用该函数初始化
pthread_cond_destroy // 销毁条件变量
pthread_cond_wait // 阻塞该条件变量,直到被唤醒(该线程会进入阻塞队列)
pthread_cond_timedwait // 带超时时间的阻塞
pthread_cond_signal // 唤醒某个条件变量(唤醒的是该条件变量阻塞队列的队首线程)
pthread_cond_broadcast // 唤醒所有等待该条件变量的线程
使用该方法可以模拟经典的生产者-消费者模型(注意pthread_cond_wait的使用方法):
#include <stdlib.h>
#include <pthread.h>
#include <stdio.h>
struct msg {
struct msg *next;
int num;
};//创建产品信息
struct msg *head;//创建头产品
pthread_cond_t has_product = PTHREAD_COND_INITIALIZER;//定义并初始化条件变量
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; //定义并初始化互斥锁
void *consumer(void *p)
{
struct msg *mp;
while(1)
{
pthread_mutex_lock(&lock);//阻塞获取互斥锁
/* 得到互斥锁后检查是否有"产品" */
while (head == NULL)//如果没有产品那么等待生产者生产出产品
pthread_cond_wait(&has_product, &lock);
/* pthread_cond_wait(&has_product, &lock);
* 1.释放互斥锁lock(后面那个参数的锁),
* 相当于pthread_mutex_unlock(&lock)
* 2.阻塞等待has_product被唤醒
* 3.当被唤醒时,解除阻塞,并重新去申请获取互斥锁lock
* 相当于pthread_mutex_lock(&lock)
*/
mp = head;
head = mp->next;
pthread_mutex_unlock(&lock);//释放上面wait时上的锁
printf("Consume %d\n", mp->num);
free(mp);//消耗产品
sleep(rand() % 5);
}
}
void *producer(void *p)
{
struct msg *mp;
while(1)
{
/* "进货"设置货品编号 */
mp = malloc(sizeof(struct msg));
mp->num = rand() % 1000 + 1;
printf("Produce %d\n", mp->num);
/* 插入时要上锁(阻塞获取互斥锁) */
pthread_mutex_lock(&lock);
/* 头插法 */
mp->next = head;
head = mp;
pthread_mutex_unlock(&lock);
/* 唤醒has_product条件变量阻塞队列中的第一个 */
pthread_cond_signal(&has_product);
sleep(rand() % 5);//睡眠一段时间
}
}
int main(int argc, char *argv[])
{
pthread_t pid, cid;
srand(time(NULL));//设置随机变量种子
pthread_create(&pid, NULL, producer, NULL);//创建生产者线程
pthread_create(&cid, NULL, consumer, NULL);//创建消费者线程
pthread_join(pid, NULL);//回收生产者线程
pthread_join(cid, NULL);//回收消费者线程
return 0;
}
五、信号量
信号量控制原语:
sem_t
sem_init
sem_wait
sem_trywait
sem_timedwait
sem_post
sem_destroy
使用信号量模拟生产者-消费者模型():
#include <stdlib.h>
#include <pthread.h>
#include <stdio.h>
#include <semaphore.h>
#define NUM 5 //定义最大产品数量
int queue[NUM];//环形队列,用于存放货物
sem_t blank_number, product_number;//定义两个信号量
/* blank_number为允许生产的量(再生产blank_number个,厂库就满了) */
/* product_number为允许消费的量(库存)(再消费product_number个,仓库就空了) */
void *producer(void *arg)
{
int p = 0;
while (1)
{
/* sem_wait(&blank_number);
* 1.如果blank_number信号量大于0则,blank_number减一,解除阻塞
* 2.如果如果blank_number信号量等于0,则阻塞,直到blank_number信号量大于0
*/
sem_wait(&blank_number);
queue[p] = rand() % 1000 + 1;
printf("Produce %d\n", queue[p]);
sem_post(&product_number);//product_number加一
//该信号量被blank_number信号量所限制
p = (p+1)%NUM;
sleep(rand()%5);
}
}
void *consumer(void *arg)
{
int c = 0;
while (1)
{
/* sem_wait(&product_number);
* 1.如果product_number信号量大于0则,product_number减一,解除阻塞
* 2.如果如果product_number信号量等于0,则阻塞,直到product_number信号量大于0
*/
sem_wait(&product_number);
printf("Consume %d\n", queue[c]);
queue[c] = 0;
sem_post(&blank_number);//blank_number加一
c = (c+1)%NUM;
sleep(rand()%5);
}
}
int main(int argc, char *argv[])
{
pthread_t pid, cid;
sem_init(&product_number, 0, 0);//初始化产品信号量(一开始没有产品)
sem_init(&blank_number, 0, NUM);//由于没有产品,所以允许生产NUM个产品
/* 创建线程 */
pthread_create(&pid, NULL, producer, NULL);
pthread_create(&cid, NULL, consumer, NULL);
/* 等待回收线程 */
pthread_join(pid, NULL);
pthread_join(cid, NULL);
/* 释放信号量 */
sem_destroy(&blank_number);
sem_destroy(&product_number);
return 0;
}
六、小结
以下转载自http://blog.csdn.net/sky234537667/article/details/42419623
最常见的进程/线程的同步方法有互斥锁(或称互斥量Mutex),读写锁(rdlock),条件变量(cond),信号量(Semophore)等。在Windows系统中,临界区(Critical Section)和事件对象(Event)也是常用的同步方法。
1.互斥锁–保护了一个临界区,在这个临界区中,一次最多只能进入一个线程。如果有多个进程在同一个临界区内活动,就有可能产生竞态条件(race condition)导致错误,其中包含递归锁和非递归锁,(递归锁:同一个线程可以多次获得该锁,别的线程必须等该线程释放所有次数的锁才可以获得)。
2.读写锁–从广义的逻辑上讲,也可以认为是一种共享版的互斥锁。可以多个线程同时进行读,但是写操作必须单独进行,不可多写和边读边写。如果对一个临界区大部分是读操作而只有少量的写操作,读写锁在一定程度上能够降低线程互斥产生的代价。
3.条件变量–允许线程以一种无竞争的方式等待某个条件的发生。当该条件没有发生时,线程会一直处于休眠状态。当被其它线程通知条件已经发生时,线程才会被唤醒从而继续向下执行。条件变量是比较底层的同步原语,直接使用的情况不多,往往用于实现高层之间的线程同步。使用条件变量的一个经典的例子就是线程池(Thread Pool)了。
4.信号量–通过精心设计信号量的PV操作,可以实现很复杂的进程同步情况(例如经典的哲学家就餐问题和理发店问题)。而现实的程序设计中,却极少有人使用信号量。能用信号量解决的问题似乎总能用其它更清晰更简洁的设计手段去代替信号量。