线程同步(互斥锁、读写锁、条件变量、信号量)

时间:2021-03-25 15:14:21

线程同步:

线程同步指一个线程发出某一功能调用时,在没有得到结果之前,该调用不返回。同时其它线程为保证数据一致性,不能调用该功能。保证了数据的一致性。举个简单的例子就是,当一个线程将全局变量var=100加上10,另外一个线程时将var乘以2,这样最后得出的结果因为cpu不同的调度策略导致不同的结果,可能是220也可能是210还可能是110或者200,220和210可以理解,无非就是一个先操作一个后操作,但是110和200呢,这是因为两个线程读取到var的值为100时,都进行了操作,导致另外一个线程运行出来的结果被覆盖了,为了得到我们想要的结果,不让数据出现混乱,就需要进行同步。

造成数据混乱的原因:

1.资源共享
2.随机调度
3.缺乏同步机制

前两点我们无法改变,而第三点就是解决数据混乱的关键。这里就引入了锁机制。

互斥锁

相关概念:

Linux提供互斥锁mutex(又称互斥量)。每个线程在对资源进行操作前都尝试先加锁,成功加锁了之后才能操作该资源,操作结束后就解锁。在同一时间,锁只有一把,如果线程A加锁正在访问资源,这时B尝试加锁,就会阻塞。但是互斥锁有个特点,就是不加锁也可以访问数据,比如之前的线程A加锁了正在访问资源,这时B不加锁也可以直接访问数据。所以互斥锁实质上是操作系统提供的一把”建议锁”,没有进行强制限定必须有锁才能访问。

相关函数:

初始化互斥锁:
函数原型:int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
返回值:成功返回0,失败返回错误号
参数:mutex:传出参数,代表互斥锁;attr:传入参数,代表互斥锁属性,使用默认属性(线程间共享)传NULL。pthread_mutex_t变量类型是一个结构体,可以简单理解成整数,只有1或者0两个取值,初始值为0。
销毁互斥锁:
函数原型:int pthread_mutex_destroy(pthread_mutex_t *mutex);
返回值:成功返回0,失败返回错误号
参数:mutex:传出参数,要销毁的互斥锁。
加锁:
函数原型:int pthread_mutex_lock(pthread_mutex_t *mutex);
返回值:成功返回0,失败返回错误号。
参数:mutex:传出参数,要加锁的互斥锁。
可理解为将mutex–。如果已有线程进行了加锁,那么就会阻塞等待,直到持有该互斥锁的其它进程解锁为止。
尝试加锁:
函数原型:int pthread_mutex_trylock(pthread_mutex_t *mutex);
返回值:成功返回0,失败返回错误号。
参数:mutex:传出参数,要尝试加锁的互斥锁。如果已有线程进行了加锁,不会阻塞,直接返回错误号。
解锁:
函数原型:int pthread_mutex_unlock(pthread_mutex_t *mutex);
返回值:成功返回0,失败返回错误号
参数:mutex:传出参数,要解锁的互斥锁。
可理解为mutex++。当解锁成功时,该函数会把阻塞在该锁上的所有线程全部唤醒,之后哪个线程抢到锁继续执行,就是调度优先级的问题了,默认是先阻塞的先唤醒。

继续引用开头那个例子,引用互斥锁的机制就可以避免200和110这样的答案出现。

#include <stdio.h>
#include <unistd.h>
#include <signal.h>
#include <stdlib.h>
#include <pthread.h>

int var = 100;
pthread_mutex_t mutex; //定义互斥锁变量mutex
void *test1(void *arg)
{
pthread_mutex_lock(&mutex); //加锁
var += 10;
pthread_mutex_unlock(&mutex); //解锁
return NULL;
}
void *test2(void *arg)
{
pthread_mutex_lock(&mutex); //加锁
var *= 2;
pthread_mutex_unlock(&mutex); //解锁
return NULL;
}
int main()
{
pthread_t tid1, tid2;
pthread_attr_t attr;

pthread_mutex_init(&mutex, NULL); //初始化互斥锁并使用默认属性

pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); //设置线程为分离态


pthread_create(&tid1, NULL, test1, NULL); //执行var+10
pthread_create(&tid2, NULL, test2, NULL); //执行var*2

sleep(1); //这里为了防止创建的两个进程还未结束,主线程就结束了

pthread_attr_destroy(&attr); //销毁设置的属性
pthread_mutex_destroy(&mutex); //销毁互斥锁
printf("%d\n", var);
return 0;
}

这样得出的结果就只有220和210了,这就要看这两个线程谁先被调度了,不过我们也可以通过设置线程属性里面的调度优先级来得到我们想要的结果。

读写锁

相关概念:

与互斥锁类似,但是读写锁有个特点,那就是写独占,读共享。读数据本来就不涉及改变数据,所以共享,而写数据的时候就只用于同一时刻只有一个线程在操作了,十分合理。
读写锁有三种状态:
1.读模式下加锁(读锁)
2.写模式下加锁(写锁)
3.不加锁

并且它还有这几个特性:
1.当读写锁是读模式加锁时,其它线程以读模式加锁都会成功,但是线程以写模式加锁会阻塞;
2.当读写锁是写模式加锁时,直到解锁前,其它线程加锁都会被阻塞。
3.当读写锁是读模式加锁时,其它线程既有试图以写模式加锁的线程,也有试图以读模式加锁的线程,这时读写锁会阻塞在写模式加锁请求之后的读模式加锁请求,优先满足写模式。

综上,读写锁的特点总结起来就是:“写独占,读共享。写锁优先级高“。读写锁特别适用于读的次数远大于写的情况。

相关函数:

初始化一把读写锁:
函数原型:int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);
返回值:成功返回0,失败返回错误号
参数:rwlock:代表读写锁变量;attr:表示读写锁属性,通常使用默认属性,传NULL就行了。
销毁一把读写锁:
函数原型:int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
返回值:成功返回0,失败返回错误号
参数:rwlock:代表读写锁变量
请求读锁:
函数原型:int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
返回值:成功返回0,失败返回错误号
参数:rwlock:代表读写锁变量
请求写锁:
函数原型:int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
返回值:成功返回0,失败返回错误号
参数:rwlock:代表读写锁变量
解锁:
函数原型:int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
返回值:成功返回0,失败返回错误号
参数:rwlock:代表读写锁变量
非阻塞请求读锁:
函数原型:int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
返回值:成功返回0,失败返回错误号
参数:rwlock:代表读写锁变量
非阻塞请求写锁:
函数原型:int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);
返回值:成功返回0,失败返回错误号
参数:rwlock:代表读写锁变量
例子:

#include <stdio.h>
#include <unistd.h>
#include <pthread.h>

pthread_rwlock_t rwlock; //定义读写锁变量
int counter;
void *read_lock(void *arg)
{
pthread_rwlock_rdlock(&rwlock); //请求读锁
printf("%dth pthread:counter = %d\n", (int)arg, counter);
pthread_rwlock_unlock(&rwlock);
}

void *write_lock(void *arg)
{
pthread_rwlock_wrlock(&rwlock); //请求写锁
printf("%dth pthread:counter += 1 = %d\n", (int)arg, ++counter);
pthread_rwlock_unlock(&rwlock);
}

int main()
{
pthread_t tid[6];

pthread_rwlock_init(&rwlock, NULL); //初始化读写锁

int i;
for(i = 0; i < 3; i++) //三个线程进行写操作
{
pthread_create(&tid[i], NULL, write_lock, (void *)i);
}

for(; i < 6; i++) //三个线程进行读操作
{
pthread_create(&tid[i], NULL, read_lock, (void *)i);
}

for(i = 0; i < 6; i++) //循环回收6个线程
pthread_join(tid[i], NULL);

pthread_rwlock_destroy(&rwlock); //销毁读写锁

return 0;
}

运行的结果是:

0th pthread:counter += 1 = 1
4th pthread:counter = 1
2th pthread:counter += 1 = 2
1th pthread:counter += 1 = 3
3th pthread:counter = 3
5th pthread:counter = 3

死锁

死锁不是一种锁,而是使用锁出现的一种现象。导致的结果就是程序一直阻塞。
造成死锁的主要原因有:
1.同一个线程对同一互斥量加锁两次
2.线程1拥有A锁,请求获得B锁;线程2拥有B锁,请求获得A锁
3.产生了震荡现象

条件变量:

相关概念:

条件变量不是锁,但是它和互斥锁配合使用,可以减少不必要的竞争现象。
具体的理解在函数中进行说明。

相关函数:

初始化一个条件变量:
函数原型:int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
返回值:成功返回0,失败返回错误码
参数:cond:条件变量;attr:条件变量属性,通常为默认值,传NULL即可

销毁一个条件变量:
函数原型:int pthread_cond_destroy(pthread_cond_t *restrict cond);
返回值:成功返回0,失败返回错误码
参数:cond:条件变量

阻塞等待一个条件变量:
函数原型:int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
返回值:成功返回0,失败返回错误码
参数:cond:条件变量;mutex:传出参数,互斥锁
作用:
1.阻塞等待条件变量cond满足并解锁传入的互斥锁
2.被唤醒之后,该函数返回时,解除阻塞并重新给传入的互斥锁加锁

限时等待一个条件变量:
函数原型:int pthread_cond_timewait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restirct abstime)
返回值:成功返回0,失败返回错误码
参数:cond:条件变量;mutex:传出参数,互斥锁;abstime:绝对时间。
第三个参数的用法:

time_t cur = time(null);    //获取当前时间
struct timespec t; //定义结构体变量
t.tv_sec = cur + 1; //定时1s
pthread_cond_timewait(&cond, &mutex, &t); //传参

唤醒至少一个阻塞在该条件变量上的线程:
函数原型:int pthread_cond_signal(pthread_cond_t *cond);
返回值:成功返回0,失败返回错误码
参数:cond:条件变量

唤醒全部阻塞在该条件变量上的线程:
函数原型:int pthread_cond_broadcast(pthread_cond_t *cond);
返回值:成功返回0,失败返回错误码
参数:cond:条件变量

下面是个比较经典的例子,即生产者消费者条件变量模型:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; //全局变量的mutex可以通过这种方式初始化
pthread_cond_t cond = PTHREAD_COND_INITIALIZER; //全局变量的cond可以通过这种方式初始化

struct node //链表
{
struct node *next;
int counter;
};

struct node *head = NULL;

void *producter(void *arg) //生产者
{
struct node *p = NULL;
while(1)
{
p = (struct node *)malloc(sizeof(struct node)); //生产者
p->counter = rand() % 1000 + 1; //随即取一个值,作为生产的产品
p->next = NULL;

pthread_mutex_lock(&mutex); //上锁,如果不上锁,可能导致链表出现问题
p->next = head;
head = p;
pthread_mutex_unlock(&mutex); //解锁
printf("producter:I has produced a product named %d\n", p->counter);
pthread_cond_signal(&cond); //由于已经生产了一个产品,所以可以唤醒某个消费者了
sleep(rand() % 5); //为了便于观察,所以添加睡眠
}
}

void *consumer(void *arg) //消费者
{
struct node *p = NULL;
while(1)
{
pthread_mutex_lock(&mutex); //上锁
while(head == NULL)
{
pthread_cond_wait(&cond, &mutex); //阻塞等待条件满足,并解锁mutex。满足条件之后,会自动重新给mutex加锁
}
p = head;
head = p->next; //模拟消费一个产品
pthread_mutex_unlock(&mutex); //解锁

printf("consumer:I has taken a product named %d\n", p->counter);
free(p);
sleep(rand() % 5);

}
}
int main()
{
pthread_t tid[2];

pthread_create(&tid[0], NULL, producter, NULL);
pthread_create(&tid[1], NULL, consumer, NULL);

pthread_join(tid[0], NULL);
pthread_join(tid[1], NULL);

return 0;

}

信号量:
相关概念:信号量相当于加强版的互斥锁。互斥锁在同一时刻只能有一个线程访问加锁的数据,但是信号量可以允许多个线程来访问。sem_t类型是一个结构体,但是使用时,可以简单的看成非负整数。

相关函数(头文件为:<semaphore.h>):
初始化一个信号量:
函数原型:int sem_init(sem_t *sem, int pshared, unsigned int value)
返回值:成功返回0,失败返回-1并设置errno
参数:sem:代表信号量;pshared:取0代表用于线程间,取非0代表用于进程间;value:该参数指定信号量初值

销毁一个信号量:
函数原型:int sem_destroy(sem_t *sem)
返回值:成功返回0,失败返回-1并设置errno
参数:sem:代表信号量

给信号量加锁(相当于sem–):
函数原型:int sem_wait(sem_t *sem)
返回值:成功返回0,失败返回-1并设置errno
参数:sem:代表信号量
当sem>0时,就可以正常执行,当sem = 0时会阻塞等待

给信号量解锁(相当于sem++):
函数原型:int sem_post(sem_t *sem)
返回值:成功返回0,失败返回-1并设置errno
参数:sem:代表信号量

尝试给信号量加锁:
函数原型:int sem_trywait(sem_t *sem)
返回值:成功返回0,失败返回-1并设置errno
参数:sem:代表信号量

限时尝试给信号量加锁:
函数原型:int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout)
返回值:成功返回0,失败返回-1并设置errno
参数:sem:代表信号量;abs_timeout:代表定时的时间,用法和pthread_cond_timewait里面用法差不多

这里举一个生产者消费者信号量模型的例子:

#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <stdio.h>
#include <semaphore.h>

#define NUM 5

int queue[NUM]; //全局数组实现环形队列
sem_t blank_number, product_number; //空格子信号量, 产品信号量

void *producer(void *arg)
{
int i = 0;

while (1) {
sem_wait(&blank_number); //生产者将空格子数--,为0则阻塞等待
queue[i] = rand() % 1000 + 1; //生产一个产品
printf("----Produce---%d\n", queue[i]);
sem_post(&product_number); //将产品数++

i = (i+1) % NUM; //借助下标实现环形
sleep(rand()%3);
}
}

void *consumer(void *arg)
{
int i = 0;

while (1) {
sem_wait(&product_number); //消费者将产品数--,为0则阻塞等待
printf("-Consume---%d\n", queue[i]);
queue[i] = 0; //消费一个产品
sem_post(&blank_number); //消费掉以后,将空格子数++

i = (i+1) % NUM;
sleep(rand()%3);
}
}

int main(int argc, char *argv[])
{
pthread_t pid, cid;

sem_init(&blank_number, 0, NUM); //初始化空格子信号量为5
sem_init(&product_number, 0, 0); //产品数为0

pthread_create(&pid, NULL, producer, NULL);
pthread_create(&cid, NULL, consumer, NULL);

pthread_join(pid, NULL);
pthread_join(cid, NULL);

sem_destroy(&blank_number);
sem_destroy(&product_number);

return 0;
}