多线程无锁算法之无锁队列的实现

时间:2022-01-26 17:35:48

codeproject上的lock free queue

http://www.codeproject.com/Articles/43510/Lock-Free-Single-Producer-Single-Consumer-Circular


多线程无锁算法之无锁队列的实现

今天花了近两个小时的时间好好的理解了一下多线程无锁队列的实现,查看了很多资料和文献。在我看来,实现无锁队列的关键点有二:

1、各个平台的原子操作或者说CAS原语;

2、ABA问题的理解和解决。

首先来说说这个CAS原语,所谓CAS(Compare And Swap)即比较并交换,在 Intel 处理器中,比较并交换通过指令的 cmpxchg 系列实现。CAS有三个操作数: 内存位置(V)、预期原值(A)和新值(B)。如果内存位置V的值与预期A原值相匹配,那么处理器会自动将该位置值更新为新值B。否则,处理器不做任何操作。无论哪种情况,它都会在 CAS 指令之前返回该位置的值。(在 CAS 的一些特殊情况下将仅返回 CAS 是否成功,而不提取当前值。)CAS 有效地说明了“我认为位置 V 应该包含值 A;如果包含该值,则将 B 放到这个位置;否则,不要更改该位置,只告诉我这个位置现在的值即可。”CAS的C语言实现如下:

inline bool CAS2(pointer_t *addr, pointer_t &old_value, pointer_t &new_value)
{
bool ret;
__asm__ __volatile__(
"lock cmpxchg16b %1;\n"
"sete %0;\n"
:"=m"(ret),"+m" (*(volatile pointer_t *) (addr))
:"a" (old_value.ptr), "d" (old_value.tag), "b" (new_value.ptr), "c" (new_value.tag));
return ret;
}

这其中包括了内联汇编代码(这个可以去看看AT&T汇编语法),其中能够支持多线程的并行安全执行的秘诀就在 "lock cmpxchg16b %1;\n"这句中的“lock”,这个lock就和我们基本多线程编程中的锁相当,只不过,这个lock不再是普通的锁,它锁的是地址总线,在多核编程情景下,当已经有CPU的线程A已经访问某地址时,这时地址总线会被锁定,其他CPU核心的线程B无法再访问当前地址,直到A访问完毕之后,其他线程如B才可能访问该地址,这样就保证了线程A访问该地址的原子性操作。

再来讲讲这个ABA问题。在进行CAS操作时,因为在更改V值之前,CAS主要是通过访问V的值是否仍然和A相等,所以,在第一次读取V以及对V执行CAS操作之前,如果有其他线程将V的值先从A改为B,而另外的线程又将V的值从B改回了A,这样会使CAS算法混乱。显然,在这种情况下,CAS的操作会成功,这类的问题称为ABA问题。要解决这类问题,就是不再重用A,通常的做法是用标记或版本编号与进行CAS操作的每个值相关联,并原子的更新值和标记(如果有的朋友还不够清楚,可以上网搜搜ABA问题,呵呵……)。

在了解了以上两点后,就可以着手来设计和实现无锁队列了。为了保证正确性,我就不把自己的代码贴出来了,再网上找了一个,和我自己的差不多,但是,免得误导有些朋友,我贴了个网上的,这个队列是MS-queue的实现,有兴趣的朋友,可以去网上查查哦,一大把的例子,呵呵,我贴到这里很大一部分原因是为了备忘,呵呵……

MS-queue C++ / C的实现:

#ifndef __LOCK_FREE_QUEUE_HPP__
#define __LOCK_FREE_QUEUE_HPP__

#include <stddef.h>

struct node_t;
struct pointer_t
{
node_t *ptr;
unsigned int tag;
pointer_t() {
ptr = NULL;
tag = 0; // 各位注意,这个标记就是用来解决我在上面提到的ABA问题的
}
pointer_t(node_t *a_ptr, unsigned int a_tag) {
ptr = a_ptr; tag=a_tag;
}

friend
bool operator==(pointer_t const &l, pointer_t const &r)
{
return l.ptr == r.ptr && l.tag == r.tag;
}

friend
bool operator!=(pointer_t const &l, pointer_t const &r)
{
return !(l == r);
}
};

typedef void * data_type;

#define dummy_val NULL

struct node_t { pointer_t next; data_type value;
node_t() {
value = dummy_val;
next= pointer_t(NULL,0);
}
};



#ifdef __x86_64__
inline bool CAS2(pointer_t *addr,
pointer_t &old_value,
pointer_t &new_value)
{
bool ret;
__asm__ __volatile__(
"lock cmpxchg16b %1;\n"
"sete %0;\n"
:"=m"(ret),"+m" (*(volatile pointer_t *) (addr))
:"a" (old_value.ptr), "d" (old_value.tag), "b" (new_value.ptr), "c" (new_value.tag));
return ret;
}

#else
inline bool CAS2(pointer_t *addr,
pointer_t &old_value,
pointer_t &new_value)
{
bool ret;
__asm__ __volatile__(
"lock cmpxchg8b %1;\n"
"sete %0;\n"
:"=m"(ret),"+m" (*(volatile pointer_t *) (addr))
:"a" (old_value.ptr), "d" (old_value.tag), "b" (new_value.ptr), "c" (new_value.tag));
return ret;
}
#endif

class queue_t
{
pointer_t tail_;
pointer_t head_;
public:
queue_t() {

}

void init() {
node_t *nd = new node_t();
nd->next = pointer_t(NULL, 0);
head_ = pointer_t(nd, 0);
tail_ = pointer_t(nd, 0);
}

void enqueue(data_type val) {
pointer_t tail, next;
node_t* nd = new node_t();
nd->value = val;
while(true){
tail = this->tail_;
next = tail.ptr->next;
if (tail == this->tail_) {
if(next.ptr == NULL) {
pointer_t new_pt(nd, next.tag+1);
if(CAS2(&(this->tail_.ptr->next), next, new_pt)){
break; // Enqueue done!
}
}else {
pointer_t new_pt(next.ptr, tail.tag+1);
CAS2(&(this->tail_), tail, new_pt);
}
}
}
pointer_t new_pt(nd, tail.tag+1);
CAS2(&(this->tail_), tail, new_pt);
}


data_type dequeue() {

pointer_t tail, head, next;
data_type val=NULL;
while(true){
head = this->head_;
tail = this->tail_;
next = (head.ptr)->next;
if (head != this->head_) continue;

if(head.ptr == tail.ptr){
if (next.ptr == NULL){
return NULL;
}
pointer_t new_pt(next.ptr, tail.tag+1);
CAS2(&(this->tail_), tail, new_pt);
} else{
val = next.ptr->value;
pointer_t new_pt(next.ptr, head.tag+1);
if(CAS2(&(this->head_), head, new_pt)){
break;
}
}
}
delete head.ptr;
return val;
}
};
#endif //__LOCK_FREE_QUEUE_HPP__