基本概念
互斥锁
POSIX线程锁机制的Linux实现都不是取消点,因此,延迟取消类型的线程不会因收到取消信号而离开加锁等待。值得注意的是,如果线程在加锁后解锁前被取消,锁将永远保持锁定状态,因此如果在关键区段内有取消点存在,或者设置了异步取消类型,则必须在退出回调函数中解锁。
这个锁机制同时也不是异步信号安全的,也就是说,不应该在信号处理过程中使用互斥锁,否则容易造成死锁。
条件变量和互斥锁的配合使用
假设线程t0,t1,t2的操作是sum++,而线程t3则是在sum到达100的时候,打印出一条信息,并对sum清零. 这种情况下,如果只用mutex, 则t3需要一个循环,每个循环里先取得lock_s,然后检查sum的状态,如果sum>=100,则打印并清零,然后unlock。
add()
{
pthread_mutex_lock(lock_s);
sum++;
pthread_mutex_unlock(lock_s);
if(sum>=100)
pthread_cond_signal(&cond_sum_ready);
}
T3线程的print<br><br>print
{
pthread_mutex_lock(lock_s);
while(sum<100)
pthread_cond_wait(&cond_sum_ready, &lock_s);
printf(“sum is over 100!”);
sum=0;
pthread_mutex_unlock(lock_s);
return;
}
注意两点:
- 在thread_cond_wait()之前,必须先lock相关联的mutex, 因为假如目标条件未满足,pthread_cond_wait()实际上会unlock该mutex, 然后block,在目标条件满足后再重新lock该mutex, 然后返回.这点非常重要。
- 为什么是while(sum<100),而不是if(sum<100) ?这是因为在pthread_cond_signal()和pthread_cond_wait()返回之间,有时间差,假设在这个时间差内,还有另外一个线程t4又把sum减少到100以下了,那么t3在pthread_cond_wait()返回之后,显然应该再检查一遍sum的大小.这就是用while的用意。
pthread_cond_wait总和一个互斥锁结合使用。在调用pthread_cond_wait前要先获取锁。pthread_cond_wait函数执行时先自动释放指定的锁,然后等待条件变量的变化。在函数调用返回之前,自动将指定的互斥量重新锁住。
pthread_cond_signal通过条件变量cond发送消息,若多个消息在等待,它只唤醒一个。pthread_cond_broadcast可以唤醒所有。调用pthread_cond_signal后要立刻释放互斥锁,因为pthread_cond_wait的最后一步是要将指定的互斥量重新锁住,如果pthread_cond_signal之后没有释放互斥锁,pthread_cond_wait仍然要阻塞。
用CAS原子操作实现无锁队列
当多线程需要共享数据和通信的时候,就会发生诸如死锁,共享数据存取等问题。此时可以用无锁队列来解决。
无锁机制的实现是基于CPU提供的一种原子操作CAS(Compare and Swap),它包含三个操作数,内存值V、旧的预期值 oldval、要修改的新值newval,当且仅当内存V中的值和旧值oldval相同时,将内存V修改为newval。
独占锁是一种悲观锁,synchronized就是一种独占锁,会导致其它所有需要锁的线程挂起,等待持有锁的线程释放锁。而另一个更加有效的锁就是乐观锁。所谓乐观锁就是,每次不加锁而是假设没有冲突而去完成某项操作,如果因为冲突失败就重试,直到成功为止。CAS是一种乐观锁。
数组队列是一个循环数组,队列至少有一个元素,当头等于尾标示队空,尾加1等于头标示队满。
数组的元素用EMPTY(无数据,标示可以入队)和FULL(有数据,标示可以出队)标记指示,数组一开始全部初始化成 EMPTY标示空队列。
EnQue 操作:如果当前队尾位置为EMPTY,标示线程可以在当前位置入队,通过CAS原子操作把该位置设置为FULL,避免其它线程操作这个位置,操作完后修改队尾位置。各个线程竞争新的队尾位置。
DeQue 操作:如果当前队头位置为FULL,标示线程可以在当前位置出队,通过CAS原子操作把该位置设置为EMPTY,避免其它线程操作这个位置,操作完后修改队头位置。各个线程竞争新的队头位置。
#include "stdlib.h"
#include "stdio.h"
#include <pthread.h>
#define MAXLEN 2
#define CAS __sync_bool_compare_and_swap
struct node
{
int elem;
int status;//用于状态监测
};
struct queue
{
node elePool[MAXLEN];
int front;
int rear;
};
enum
{
EMPTY =1,
FULL,
};
queue g_que;
void initQue()
{
int i = 0;
g_que.front = 0;
g_que.rear = 0;
//队列中每个元素都初始化为EMPTY
for(i=0;i<MAXLEN;i++)
{
g_que.elePool[i].status = EMPTY;
}
return;
}
//入队
int enque(int elem)
{
//竞争队尾位置
do
{
//如果循环队列已经满了
if((g_que.rear+1)%MAXLEN == g_que.front)
{
return -1;
}
}while(!CAS(&(g_que.elePool[g_que.rear].status),EMPTY,FULL));
//操作队尾元素
g_que.elePool[g_que.rear].elem = elem;
printf("in--%d(%lu)\n",elem,pthread_self());
//将队尾后移1个
CAS(&(g_que.rear),g_que.rear,(g_que.rear+1)%MAXLEN);
return 0;
}
//出队
int deque(int* pElem)
{
//竞争队头位置
do
{
//如果队列是空的
if(g_que.rear == g_que.front)
{
return -1;
}
}while(!CAS(&(g_que.elePool[g_que.front].status),FULL,EMPTY));
*pElem = g_que.elePool[g_que.front].elem;
printf("out--%d(%lu)\n",*pElem,pthread_self());
//修改队头的位置
CAS(&(g_que.front),g_que.front,(g_que.front+1)%MAXLEN);
return 0;
}
ABA问题
如果另一个线程修改V值假设原来是A,先修改成B,再修改回成A。当前线程的CAS操作无法分辨当前V值是否发生过变化。
各种乐观锁的实现中通常都会用版本戳version来对记录或标记对象,避免并发操作带来的问题,在Java中,AtomicStampedReference<E>
也实现了这个作用,它通过包装[E,Integer]
的元组来对对象标记版本戳stamp,从而避免ABA问题。
AtomicStampedReference的内部不仅维护了对象值,还维护了一个时间戳(我这里把它称为时间戳,实际上它可以是任何一个整数,它使用整数来表示状态值)。
当AtomicStampedReference对应的数值被修改时,除了更新数据本身外,还必须要更新时间戳。当AtomicStampedReference设置对象值时,对象值以及时间戳都必须满足期望值,写入才会成功。因此,即使对象值被反复读写,写回原值,只要时间戳发生变化,就能防止不恰当的写入,从而解决了ABA问题。