-为什么是SkipList,不是TreeMap的红黑树?
-无锁链表的精髓
-ConcurrentSkipListMap
-ConcurrentSkipListSet
为什么是SkipList?
大家都知道,在Java集合类中,有一个TreeMap,相对于HashMap,它的一个最大特点是:key是有序的。TreeMap的内部是用红黑树实现的。同HashMap一样,它也是非线程安全的。
然后在JUC包中,提供了一个线程安全的、有序的HashMap,也就是今天所要讲的ConcurrentSkipListMap。(关于什么是SkipList,或者“跳跃表“,此处不再详述,大家自行google之)
那为什么实现这个线程安全的、有序的HashMap,不用红黑树,而用SkipList呢?
借用Doug Lea的原话:
The reason is that there are no known efficient lock-free insertion and deletion algorithms for search trees.
也就是目前计算机领域没找到一种高效的、作用在树上的、无锁的、增加和删除结点的办法。
那为什么SkipList就可以无锁的实现结点的增加、删除呢?那就要从以下的故事说起:
无锁链表
问题的提出
在Doug Lea的注释中,他引用了下面这篇无锁链表的Paper:
“A pragmatic implementation of non-blocking linked lists” (大家自行google之)
咋一看,无锁链表不是很简单嘛,还值得写一篇paper专门论述?我们在前面讲AQS的时候,已反复用到无锁队列,其实现也是链表。那究竟区别在哪呢?
我们前面所讲的无锁队列、栈,都是只在头、尾做cas操作,这个问题不大。而玄机就在,如果你在链表的中间做insert/delete操作,照通常的cas做法,就会出问题!
关于这个问题,那篇论文中有清晰的论述,此处引用如下:
操作1: 在结点10后面插入结点20,没什么问题
操作2:删除结点10,也没什么问题
但如果2个线程同时操作,一个删除结点10,一个要在10后面插入结点20。这2个操作都各自是CAS的,这个时候就会出问题:新插入的结点20会丢失!这个问题不是简单的用CAS就可以解决的。
那为什么会出现这个问题呢?
究其原因:我们在删除结点10的时候,实际操作的是10的前驱H,10本身并没有任何变化。这样,在往10后插入20的这个线程,不知道10已经删除了!!
要解决这个问题,论文中提出了如下的解决办法:
把10的删除分2步:第1步,把10的next指针,mark成删除,也就是软删;第2步,找机会,物理删除。
这个解决办法有一个关键点:就是“把10的next指针指向20(insert操作)“ 和 “判断10本身是否删除“这2个,必须是原子的,必须在1个cas里面完成!!!
解决办法
办法1: AtomicMarkableReference
要实现上面所说的,一个办法是用前面所提到的AtomicMarkableReference,也就是说,每个next要是AtomicMarkableReference类型。但这个办法,不够高效。Doug Lea在ConcurrentSkipListMap的实现中,用了另外一种办法
办法2:Mark结点
* +——+ +——+ +——+ +——+
* … | b |——>| n |—–>|marker|——>| f | …
* +——+ +——+ +——+ +——+
*
* 3. CAS b’s next pointer over both n and its marker.
* From this point on, no new traversals will encounter n,
* and it can eventually be GCed.
* +——+ +——+
* … | b |———————————–>| f | …
* +——+ +——+
我们的目的,不就是想标记结点n,也就是标记它的next字段嘛?那就新造一个marker结点,让n.next = marker。
这样也达到了,删除n的时候,标记n的next指针的目的。
ConcurrentSkipListMap
SkipList结构
解决了无锁链表的insert/delete问题,也就解决了SkipList的一个关键问题。因为SkipList,通俗点讲,就是多层链表叠起来的:一个大的单向链表,存储所有的
//最底层Node结点。所有的<k,v>对,都是这个单向链表串起来的。
static final class Node<K,V> {
final K key;
volatile Object value;
volatile Node<K,V> next;
。。。
}
//上面的Index层的结点
static class Index<K,V> {
final Node<K,V> node; //不存储实际数据,指向Node
final Index<K,V> down; //关键点:每个Index结点,必须有个指针,指向其下一个Level对应的结点
volatile Index<K,V> right; //自己也组成单向链表
。。。
}
//整个ConcurrentSkipListMap就只用记录最顶层的head结点
public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
implements ConcurrentNavigableMap<K,V>,
Cloneable,
java.io.Serializable {
。。。
private transient volatile HeadIndex<K,V> head;
}
final void initialize() {
keySet = null;
entrySet = null;
values = null;
descendingMap = null;
randomSeed = seedGenerator.nextInt() | 0x0100; // ensure nonzero
head = new HeadIndex<K,V>(new Node<K,V>(null, BASE_HEADER, null),
null, null, 1);
}
put操作
public V put(K key, V value) {
if (value == null)
throw new NullPointerException();
return doPut(key, value, false);
}
private V doPut(K kkey, V value, boolean onlyIfAbsent) {
Comparable<? super K> key = comparable(kkey);
for (;;) {
Node<K,V> b = findPredecessor(key); //先找到结点的前驱(一定在base level上)
Node<K,V> n = b.next;
for (;;) {
if (n != null) {
Node<K,V> f = n.next;
if (n != b.next) // inconsistent read
break;
Object v = n.value;
if (v == null) { // n is deleted
n.helpDelete(b, f);
break;
}
if (v == n || b.value == null) // b is deleted
break;
int c = key.compareTo(n.key);
if (c > 0) {
b = n;
n = f;
continue;
}
if (c == 0) { //结点存在,直接改值
if (onlyIfAbsent || n.casValue(v, value))
return (V)v;
else
break; // restart if lost race to replace value
}
// else c < 0; fall through
}
Node<K,V> z = new Node<K,V>(kkey, value, n);
if (!b.casNext(n, z)) //结点不存在,新建一个,插入进入
break;
int level = randomLevel(); //关键点:判断是否要给此结点加上Index层。randomLevel有50%概率会返回0,也就是说,50%的结点,不会有Index层,只会在最底层的链表上面
if (level > 0)
insertIndex(z, level); //level > 0,为其建索引
return null;
}
}
}
//关键函数:查找一个结点的前驱,header开始,从左往右、从上往下遍历
private Node<K,V> findPredecessor(Comparable<? super K> key) {
if (key == null)
throw new NullPointerException(); // don't postpone errors
for (;;) {
Index<K,V> q = head;
Index<K,V> r = q.right;
for (;;) {
if (r != null) {
Node<K,V> n = r.node;
K k = n.key;
if (n.value == null) {
if (!q.unlink(r))
break; // restart
r = q.right; // reread r
continue;
}
if (key.compareTo(k) > 0) {
q = r;
r = r.right;
continue;
}
}
Index<K,V> d = q.down; //跳到下一层
if (d != null) {
q = d;
r = d.right;
} else
return q.node;
}
}
}
get操作
public V get(Object key) {
return doGet(key);
}
private V doGet(Object okey) {
Comparable<? super K> key = comparable(okey);
for (;;) {
Node<K,V> n = findNode(key);
if (n == null)
return null;
Object v = n.value;
if (v != null)
return (V)v;
}
}
private Node<K,V> findNode(Comparable<? super K> key) {
for (;;) {
Node<K,V> b = findPredecessor(key); //再次用的此函数
Node<K,V> n = b.next;
for (;;) {
if (n == null)
return null;
Node<K,V> f = n.next;
if (n != b.next) // inconsistent read
break;
Object v = n.value;
if (v == null) { // n is deleted
n.helpDelete(b, f);
break;
}
if (v == n || b.value == null) // b is deleted
break;
int c = key.compareTo(n.key);
if (c == 0)
return n;
if (c < 0)
return null;
b = n;
n = f;
}
}
}
ConcurrentSkipListMap中,关于链表的操作,细节非常多,上面只是分析了其整体实现思路。关于insert/delete, insertIndex的具体实现细节,本文不在详述,留待大家对照代码仔细分析。
ConcurrentSkipListSet
我们知道,TreeSet是基于TreeMap实现的,就是对TreeMap的一个简单封装。
同样, ConcurrentSkipListSet也就是对ConcurrentSkipListMap的一个简单封装,具体不再详述。
public ConcurrentSkipListSet() {
m = new ConcurrentSkipListMap<E,Object>();
}