ConcurrentSkipListMap是线程安全的有序的哈希表。与同是有序的哈希表TreeMap相比,ConcurrentSkipListMap是线程安全的,TreeMap则不是,且ConcurrentSkipListMap是通过跳表(skip list)实现的,而TreeMap是通过红黑树实现的。至于为什么ConcurrentSkipListMap不像TreeMap一样使用红黑树结构,在ConcurrentSkipListMap源码中Doug Lea已经给出解释:
The reason is that there are no known efficient lock-free insertion and deletion algorithms for search trees.
有必要详细了解下skip list。
skip list
引用作者William Pugh的一句话:
Skip lists are a probabilistic data structure that seem likely to supplant balanced trees as the implementation method of choice for many applications. Skip list algorithms have the same asymptotic expected time bounds as balanced trees and are simpler, faster and use less space.
大意为跳过列表是一种概率数据结构,可能取代平衡树作为许多应用程序的实现方法。跳过列表算法具有与平衡树相同的渐近期望时间界限,并且更简单,更快速并且使用更少的空间。下图是skip list的数据结构示意图。
从图中可以看出跳表主要有以下几个成员构成:
- Node:节点,保存map元素值。有三个属性,key、value、指向下个node的指针next。
- Index:索引节点。有三个属性,指向最底层node的指针、指向下一层的index的指针down、指向本层中的下个index的指针right。
- HeadIndex:索引头节点。除了有Index的所有属性外,还有一个表示此索引所在层的属性。
- NULL:表尾,全部为NULL。
内部类
下面是这跳表成员在ConcurrentSkipListMap中的实现。
Node
static final class Node<K,V> {
final K key;
volatile Object value;
volatile Node<K,V> next;//指向下个node的指针
...
}
Index
static class Index<K,V> {
final Node<K,V> node;//指向最底层的node
final Index<K,V> down;//指向下一层的index
volatile Index<K,V> right;//指向本层中的下个index
...
}
HeadIndex
static final class HeadIndex<K,V> extends Index<K,V> {
final int level;//代表所在层级
HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
super(node, down, right);
this.level = level;
}
}
核心方法
get(Object key)
public V get(Object key) {
return doGet(key);
}
doGet(Object key)源码如下:
private V doGet(Object key) {
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
outer: for (;;) {
//找到key的前驱节点b,n为b的next节点(如果n符合某些条件,那么就是要找的节点)
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
Object v; int c;
//如果n为null,说明下面就没有Node节点了,没有必要继续下去了,直接跳出outer循环
if (n == null)
break outer;
//获取n的next节点f
Node<K,V> f = n.next;
//如果两次读取到的[b的后继节点]不同,说明其它线程操作了该跳表,则返回到外层for循环重新遍历
if (n != b.next) // inconsistent read
break;
//如果n的value为null,,说明其它线程操作了该跳表
//n已经被删除了(在下面的doDelete方法中会讲到为什么会这样),返回到外层for循环重新遍历
if ((v = n.value) == null) { // n is deleted
n.helpDelete(b, f);
break;
}
//如果b的value为null,,说明其它线程操作了该跳表
//b已经被删除了(在下面的doDelete方法中会讲到为什么会这样),返回到外层for循环重新遍历
if (b.value == null || v == n) // b is deleted
break;
//找到了key值与参数key相等的结点,返回节点的value
if ((c = cpr(cmp, key, n.key)) == 0) {
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
//???
if (c < 0)
break outer;
//向后移动,判断下个Node节点是否符合条件
b = n;
n = f;
}
}
//没有找到就返回null
return null;
}
可以将get(Object)的步骤总结如下:
- 获取key的前驱节点。
- 从前驱节点开始循环查找,找到与key相等的节点,返回value。在这个过程中会删除一些value为null的节点。
- 如果步骤2中没有找到,返回null。
put(K key, V value)
public V put(K key, V value) {
if (value == null)
throw new NullPointerException();
return doPut(key, value, false);
}
可以看出,value是不允许为null的。而在doPut(K, V, boolean)中,对key也做了验证,不允许其为null。
doPut(K, V, boolean)源码如下:
/** * 插入方法。 * 如果添加的元素不存在,则插入,如果存在且onlyIfAbsent为false,则替换value。 * 返回被替换的旧value,否则返回null */
private V doPut(K key, V value, boolean onlyIfAbsent) {
Node<K,V> z; // added node
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
outer: for (;;) {
//找到key的前驱节点b,n为b的next节点(如果n符合某些条件,那么就是要找的节点)
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
//如果n不为null,说明当前跳表中可能有要找的Node节点,尝试找到它
if (n != null) {
Object v; int c;
Node<K,V> f = n.next;
//如果两次读取到的[b的后继节点]不同,说明其它线程操作了该跳表,则返回到外层for循环重新遍历
if (n != b.next) // inconsistent read
break;
//如果n的value为null,,说明其它线程操作了该跳表
//n已经被删除了(在下面的doDelete方法中会讲到为什么会这样),返回到外层for循环重新遍历
if ((v = n.value) == null) { // n is deleted
n.helpDelete(b, f);
break;
}
//如果b的value为null,说明其它线程操作了该跳表
//b已经被删除了(在下面的doDelete方法中会讲到为什么会这样),返回到外层for循环重新遍历
if (b.value == null || v == n) // b is deleted
break;
//如果参数key大于n的key,说明要找的节点还在后面???
if ((c = cpr(cmp, key, n.key)) > 0) {
//b往后移动一位
b = n;
//n往后移动一位
n = f;
//跳出本次循环
continue;
}
//如果参数key等于当前index节点的key
if (c == 0) {
//如果onlyIfAbsent为true或者当前index节点的value等于参数value,返回value
if (onlyIfAbsent || n.casValue(v, value)) {
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
break; // restart if lost race to replace value
}
// else c < 0; fall through
}
//如果n为null,说明当前跳表中没有要找的Node节点,直接新建并插入节点
z = new Node<K,V>(key, value, n);
//如果没有其它线程操作了该跳表,就将b的next节点设为z,否则返回到外层for循环重新遍历
if (!b.casNext(n, z))
break; // restart if lost race to append to b
//跳出outer循环
break outer;
}
}
int rnd = ThreadLocalRandom.nextSecondarySeed();
if ((rnd & 0x80000001) == 0) { // test highest and lowest bits
int level = 1, max;
while (((rnd >>>= 1) & 1) != 0)
++level;
Index<K,V> idx = null;
HeadIndex<K,V> h = head;
if (level <= (max = h.level)) {
for (int i = 1; i <= level; ++i)
idx = new Index<K,V>(z, idx, null);
}
else { // try to grow by one level
level = max + 1; // hold in array and later pick the one to use
@SuppressWarnings("unchecked")Index<K,V>[] idxs =
(Index<K,V>[])new Index<?,?>[level+1];
for (int i = 1; i <= level; ++i)
idxs[i] = idx = new Index<K,V>(z, idx, null);
for (;;) {
h = head;
int oldLevel = h.level;
if (level <= oldLevel) // lost race to add level
break;
HeadIndex<K,V> newh = h;
Node<K,V> oldbase = h.node;
for (int j = oldLevel+1; j <= level; ++j)
newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
if (casHead(h, newh)) {
h = newh;
idx = idxs[level = oldLevel];
break;
}
}
}
// 找到插入位置并拼接
splice: for (int insertionLevel = level;;) {
int j = h.level;
for (Index<K,V> q = h, r = q.right, t = idx;;) {
if (q == null || t == null)
break splice;
if (r != null) {
Node<K,V> n = r.node;
// compare before deletion check avoids needing recheck
int c = cpr(cmp, key, n.key);
if (n.value == null) {
if (!q.unlink(r))
break;
r = q.right;
continue;
}
if (c > 0) {
q = r;
r = r.right;
continue;
}
}
if (j == insertionLevel) {
if (!q.link(r, t))
break; // restart
if (t.node.value == null) {
findNode(key);
break splice;
}
if (--insertionLevel == 0)
break splice;
}
if (--j >= insertionLevel && j < level)
t = t.down;
q = q.down;
r = q.right;
}
}
}
return null;
}
put(K key, V value) 方法的大体流程如下:
- 获取key的前驱节点
- 从前驱节点开始循环查找,如果找到key值与参数key相等的节点,返回节点的value。在这个过程中会删除一些value为null的节点。如果没有找到,就新建并插入节点。
- 更新跳表
remove(Object key)
public V remove(Object key) {
return doRemove(key, null);
}
doRemove(Object key, Object value)源码如下:
/** * 删除方法 * 定位节点,将值置为null,添加删除标记,解除与前驱节点的关联,删除相关索引节点,也可能需要减少头索引层。 */
final V doRemove(Object key, Object value) {
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
outer: for (;;) {
//找到key的前驱节点b,n为b的next节点(如果n符合某些条件,那么就是要找的节点)
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
Object v; int c;
//参考doGet
if (n == null)
break outer;
Node<K,V> f = n.next;
//参考doGet
if (n != b.next) // inconsistent read
break;
//参考doGet
if ((v = n.value) == null) { // n is deleted
n.helpDelete(b, f);
break;
}
//参考doGet
if (b.value == null || v == n) // b is deleted
break;
//这种情况说明当前表中不可能存在要找的Node节点了,跳出outer循环
if ((c = cpr(cmp, key, n.key)) < 0)
break outer;
//如果参数key大于当前index节点的key,说明要找的节点还在后面,继续向后找
if (c > 0) {
b = n;
n = f;
continue;
}
//如果c<0、c>0都不成立,那么只能说明c=0。c=0说明找到key对应的节点了。
//如果key相等,但value不相等,说明表中不可能存在要删除的节点了,跳出outer循环
if (value != null && !value.equals(v))
break outer;
//如果没有其它线程操作了该跳表,将节点n的value置为null,否则返回到外层for循环重新遍历
if (!n.casValue(v, null))
break;
//添加删除标记
//如果没有其它线程操作了该跳表,并且将b的next域更新为f
if (!n.appendMarker(f) || !b.casNext(n, f))
findNode(key); // retry via findNode
else {//添加标记节点并且更新b的next域均成功
//利用findPredecessor方法的副作用删除节点及索引
findPredecessor(key, cmp); // clean index
//如果头结点的right为null,说明当前层空了,level-1
if (head.right == null)
tryReduceLevel();
}
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
}
return null;
}
将doRemove(Object key, Object value)的操作总结如下:
- 找到key的前驱节点,前驱节点的next为当前index节点
- 从前驱节点开始循环查找,如果找到key值与参数key相等的index节点
- 将index节点的value置为null
- 添加删除标记
- 将前驱节点的next域更新为当前index节点的next域
- 删除节点及索引
- 返回被删除节点的value
- 如果步骤2中没有找到,返回null。
未完待续。。