队列是我们非常常用的数据结构,用来提供数据的写入和读取功能,而且通常在不同线程之间作为数据通信的桥梁。不过在将无锁队列的算法之前,需要先了解一下CAS(compare and swap)的原理。由于多个线程同时操作同一个数据,其中肯定是存在竞争的,那么如何能够针对同一个数据进行操作,而且又不用加锁呢? 这个就需要从底层,CPU层面支持原子修改操作,比如在X86的计算机平台,提供了XCHG指令,能够原子的交互数值。
从开发语言的层面,比如C++11中,就提供了atomic_compare_exchange_weak函数,来实现CAS。
1. lockless queue,enqueue,dequeue操作的算法
(1) enqueue算法:
enqueue时,先将需要加入队尾的数据创建出来,然后在一个循环操作中,将数据加入队尾,如果加入失败,那么就更新当前的队尾指针,直到加入成功,然后循环结束。最后调整队尾指针。
(2) dequeue算法
dequeue时,在循环操作中,使用CAS将队列头指针,变成头指针的下一个指针,如果失败,持续操作直到成功。最后返回头指针指向的值。
2. ABA问题,及解决办法
从上面的算法中,可以看出采用CAS方式实现的无锁队列的算法过程,不过由于CAS操作本身的特殊性(通过判断当前被变换的值,是否发生过变化),可能会在某些情况下引起ABA问题。
那么首先什么是ABA问题呢? wiki上有这样一个说明的例子:
Natalie is waiting in her car at a red traffic light with her children. Her children start fighting with each other while waiting, and she leans back to scold them. Once their fighting stops, Natalie checks the light again and notices that it's still red. However, while she was focusing on her children, the light had changed to green, and then back again. Natalie doesn't think the light ever changed, but the people waiting behind her are very mad and honking their horns now.
意思就是说,Natalie在等红灯的时候,由于回头管孩子,错过了绿灯,等她再回过头看信号灯的时候,又是红灯了。
这其实就是一个ABA问题,虽然中间信号灯发生了变化,但是Natalie却不知道。
用C++中的一个stack来说明,stack代码如下:
/* Naive lock-free stack which suffers from ABA problem.*/
class Stack {
std::atomic<Obj*> top_ptr;
//
// Pops the top object and returns a pointer to it.
//
Obj* Pop() {
while() {
Obj* ret_ptr = top_ptr;
if (!ret_ptr) return std::nullptr;
// For simplicity, suppose that we can ensure that this dereference is safe
// (i.e., that no other thread has popped the stack in the meantime).
Obj* next_ptr = ret_ptr->next;
// If the top node is still ret, then assume no one has changed the stack.
// (That statement is not always true because of the ABA problem)
// Atomically replace top with next.
if (top_ptr.compare_exchange_weak(ret_ptr, next_ptr)) {
return ret_ptr;
}
// The stack has changed, start over.
}
}
//
// Pushes the object specified by obj_ptr to stack.
//
void Push(Obj* obj_ptr) {
while() {
Obj* next_ptr = top_ptr;
obj_ptr->next = next_ptr;
// If the top node is still next, then assume no one has changed the stack.
// (That statement is not always true because of the ABA problem)
// Atomically replace top with obj.
if (top_ptr.compare_exchange_weak(next_ptr, obj_ptr)) {
return;
}
// The stack has changed, start over.
}
}
};
假设,stack初始化为top → A → B → C
线程1先执行
ret = A;
next = B;
然后在线程1执行compare_exchange_weak之前被中断,换成线程2执行。
{ // 线程2先pop出A
ret = A;
next = B;
compare_exchange_weak(A, B) // Success, top = B
return A;
} // Now the stack is top → B → C
{ // 线程2再pop出B
ret = B;
next = C;
compare_exchange_weak(B, C) // Success, top = C
return B;
} // Now the stack is top → C
delete B;
{ // 最后线程2将A再push进stack
A->next = C;
compare_exchange_weak(C, A) // Success, top = A
}
当线程2执行完所有这些操作之后,换成线程1执行,线程1的compare_exchange_weak是会成功执行的,因为它不知道top_ptr已经被修改过了。
通常针对ABA问题的解决办法,就是针对操作的数据加上一个原子操作的使用计数,在CAS执行之前,先获取一下计数是否和之前一样,如果不一样,就说明数据已经被修改过了。