一、ConcurrentSkipListMap并发容器
1.ConcurrentSkipListMap的底层数据结构
要学习ConcurrentSkipListMap,首先要知道什么是跳表或跳跃表(SkipList),跳表链表的升级版,且是有序的,另外跳表还是一种不论查找、添加或是删除效率可以和平衡二叉树媲美的层次结构(其时间复杂度是O(n)=log2n),更重要的是,跳表的实现要简单明了的多。
上图就是一个跳表的层次结构图,跳表的特征有:
1.跳表分为若干层,层级越高跳跃性越大。
2.跳表的最底层的链表包含所有数据。
3.跳表是有序的。
4.跳表的头结点永远指向最高层的第一个元素。
下面在来看看跳表的索引结点的结构:一般分为三个区域:两个个指针域和一个数据存储区域;两个指针域分别是指向同层级下个索引的的next指针,和指向下个层级拥有相同数据的的down指针,如下图所示。
跳表的查找相对简单明了的多,从最高层的头指针开始查询,例如下图查找14,从最高层3开始找,14大于3,找下个22,14小于22,说明最高层没有14这个数。进入下一层,14比8大,比22小说明这一层也没有该元素,在进入下一层。比8大往后找,找到14。
下图展示了14的查找路线,红色线条表示对比后比14大,不进入索引的线路。
2.底层跳表的实现
最底层链表结点Node的定义:
static final class Node<K,V> { final K key; //存储键 volatile Object value; //存储值 volatile Node<K,V> next; //下个结点 Node(K key, Object value, Node<K,V> next) { this.key = key; this.value = value; this.next = next; } Node(Node<K,V> next) { this.key = null; this.value = this; this.next = next; } //不安全的操作变量,用于实现CAS操作 private static final sun.misc.Unsafe UNSAFE; //value的内存地址偏移量 private static final long valueOffset; //下个结点的内存地址偏移量 private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = Node.class; valueOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("value")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }
Index的定义:
static class Index<K,V> { final Node<K,V> node; //链表中结点的引用 final Index<K,V> down; //指向下一层的Index索引 volatile Index<K,V> right; //右边的索引 Index(Node<K,V> node, Index<K,V> down, Index<K,V> right) { this.node = node; this.down = down; this.right = right; } //不安全的操作变量 private static final sun.misc.Unsafe UNSAFE; private static final long rightOffset; //right的内存偏移量 static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = Index.class; rightOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("right")); } catch (Exception e) { throw new Error(e); } } }
HeadIndex是专门用在每个层级的头结点上的定义,它是继承了Index的基础上增加一个代表层级的level属性:
static final class HeadIndex<K,V> extends Index<K,V> { final int level; //层级索引 HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) { super(node, down, right); this.level = level; } }
HeadIndex、Index和Node之间的联系,如下图所示:
3.ConcurrentSkipListMap的继承关系
ConcurrentSkipListMap的继承关系如下图所示,它继承了AbstractMap类,并且实现了 ConcurrentNavigableMap和ConcurrentMap接口。由此可知,ConcurrentSkipListMap必然是个有序的、并发操作安全的、具有伸缩视图功能的Map集合。
之前数据结构及源码的学习过程中,上图的大部分接口及抽象类都已经学习过,仅有一个 ConcurrentNavigableMap接口没有了解过:
public interface ConcurrentNavigableMap<K,V> extends ConcurrentMap<K,V>, NavigableMap<K,V> { //返回当前map的子map,即将fromKey到toKey的子集合截取出来,fromInclusive和toInclusive //表示子集合中是否包含fromKey和toKey这两个临界键值对 ConcurrentNavigableMap<K,V> subMap(K fromKey, boolean fromInclusive, K toKey, boolean toInclusive); //返回不大于toKey的子集合,是否包含toKey由inclusive决定 ConcurrentNavigableMap<K,V> headMap(K toKey, boolean inclusive); //返回不小于fromKey的子集合,是否包含fromKey由inclusive决定 ConcurrentNavigableMap<K,V> tailMap(K fromKey, boolean inclusive); //返回大约fromKey,且小于等于toKey的子集合 ConcurrentNavigableMap<K,V> subMap(K fromKey, K toKey); //返回小于toKey的子集合 ConcurrentNavigableMap<K,V> headMap(K toKey); //返回大于等于fromKey的子集合 ConcurrentNavigableMap<K,V> tailMap(K fromKey); //返回一个排序是反序的map(即将原本正序的Map进行反序排列) ConcurrentNavigableMap<K,V> descendingMap(); //返回所有key组成的set集合 public NavigableSet<K> navigableKeySet(); //返回所有key组成的set集合 NavigableSet<K> keySet(); //返回一个key的set视图,并且这个set中key是反序的 public NavigableSet<K> descendingKeySet(); }
了解了接口,再来看ConcurrentSkipListMap的构造方法及一些重要的属性:
public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V> implements ConcurrentNavigableMap<K,V>, Cloneable, Serializable { //特殊值,用于 private static final Object BASE_HEADER = new Object(); //跳表的头索引 private transient volatile HeadIndex<K,V> head; //比较器,用于key的比较,若为null,则使用自然排序(key自带的Comparable实现) final Comparator<? super K> comparator; //键的视图 private transient KeySet<K> keySet; //键值对的视图 private transient EntrySet<K,V> entrySet; //值的视图 private transient Values<V> values; //反序排列的map private transient ConcurrentNavigableMap<K,V> descendingMap; private static final int EQ = 1; private static final int LT = 2; private static final int GT = 0; //空构造 public ConcurrentSkipListMap() { this.comparator = null; initialize(); } //带比较器的构造方法 public ConcurrentSkipListMap(Comparator<? super K> comparator) { this.comparator = comparator; initialize(); } //带初始元素集合的构造方法 public ConcurrentSkipListMap(Map<? extends K, ? extends V> m) { this.comparator = null; initialize(); putAll(m); } //使用m集合的比较器构造方法 public ConcurrentSkipListMap(SortedMap<K, ? extends V> m) { this.comparator = m.comparator(); initialize(); buildFromSorted(m); } //初始化变量 private void initialize() { keySet = null; entrySet = null; values = null; descendingMap = null; //新建HeadIndex结点,head指向该头结点 head = new HeadIndex<K,V>(new Node<K,V>(null, BASE_HEADER, null), null, null, 1); } }
4.put的过程
public V put(K key, V value) { if (value == null) //这里可以知道Value要求不能为null throw new NullPointerException(); return doPut(key, value, false); //实际执行put操作的方法 } private V doPut(K key, V value, boolean onlyIfAbsent) { Node<K,V> z; // added node if (key == null) //有序的map,key不能为null,否则无法比较 throw new NullPointerException(); Comparator<? super K> cmp = comparator; //获取比较器,若无则为null,使用自然排序 outer: for (;;) { //外层循环 //findPredecessor查找key的前驱索引对应的结点b for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) { //判断b结点是否是链表的尾结点,即b结点是否还有后继结点 //若有后继结点(n不为null表示有后继结点),那么要将key对应的结点插入b和n之间 //若b为尾结点,则key对应结点将为新的尾结点 if (n != null) { Object v; int c; Node<K,V> f = n.next; //获取n的后继结点 //判断n还是不是b的后继结点 //若n不在是b的后继结点,说明已经有线程抢先在b结点之后插入一个新结点 //退出内层循环,重新定位插入 if (n != b.next) break; //判断n结点是否要被删除(value为null表示逻辑删除),辅助删除后重新定位key if ((v = n.value) == null) { // n is deleted n.helpDelete(b, f); //让当前线程去辅助删除(物理删除) break; } //判断b是否要被删除,若要被删除,则退出当前循环,重新定位key if (b.value == null || v == n) // b is deleted break; //判断n结点的键与key的大小 //这里开始精准定位key在链表中的位置,找出key实际在链表中的前驱结点 //key比n结点的key大,则继续比较后续结点的key if ((c = cpr(cmp, key, n.key)) > 0) { b = n; n = f; continue; } //若key与n.key相等(c==0),说明key对应的结点已经存在 //那么根据onlyIfAbsent来决定是否覆盖旧value值 if (c == 0) { if (onlyIfAbsent || n.casValue(v, value)) { @SuppressWarnings("unchecked") V vv = (V)v; return vv; //更新成功,直接返回旧值 } break; //更新失败继续循环尝试 } // else c < 0; fall through } //到这说明b就是key的前驱结点 //新建key的结点z,并尝试更新b的next为z,成功就退出外层循环 //更新失败继续尝试 z = new Node<K,V>(key, value, n); if (!b.casNext(n, z)) break; // restart if lost race to append to b break outer; } } /** * 上面的步骤是将新结点插入到跳表最底层的链表中 * 接下来则是要向上生成各层的索引,是否生成某一层次的索引通过抛硬币的方式 * 决定,即从最底层开始随机决定是否在上一层建议索引,若为true则在上一层生成索引 * 并且继续抛硬币决定是否再往上层建立索引;若失败则布建立索引,直接结束 */ //生成一个随机数 int rnd = ThreadLocalRandom.nextSecondarySeed(); //判断是否要更新层级(随机数必须是正偶数才更新层级) //这里决定的是当前新增的结点是否要向上建立索引 if ((rnd & 0x80000001) == 0) { // test highest and lowest bits int level = 1, max; //决定level的等级,也就是结点要向上建立基层索引 //level的值有rnd从低二位开始有几个连续的1决定 //例如:rnd==0010 0100 0001 1110 那么level就为5 //由此可以看出跳表的最高层次不能超过31层(level<=31) while (((rnd >>>= 1) & 1) != 0) ++level; Index<K,V> idx = null; HeadIndex<K,V> h = head; //判断level是否超过当前跳表的最高等级(即是否大于头索引的level) //若未超过,那么直接建立一个从1到level的纵向索引列 if (level <= (max = h.level)) { //新建一个索引列,后建的索引的down指针指向前一个索引 for (int i = 1; i <= level; ++i) idx = new Index<K,V>(z, idx, null); } //若新建结点的level大于头结点的level,则要新建一个新的level层 else { // try to grow by one level level = max + 1; // 只能新增一层,为一个结点就新增2层以上没有意义 @SuppressWarnings("unchecked") Index<K,V>[] idxs = (Index<K,V>[])new Index<?,?>[level+1]; //新建一个结点z的索引列,并将这个索引列的所有索引都放到数组中 for (int i = 1; i <= level; ++i) idxs[i] = idx = new Index<K,V>(z, idx, null); for (;;) { h = head; //获取头索引 int oldLevel = h.level; //获取旧层次level //判断旧层次level是否发生改变(即是否有其他线程抢先更新了跳表的level,出现竞争) if (level <= oldLevel) // lost race to add level break; //竞争失败,直接结束当前循环 HeadIndex<K,V> newh = h; Node<K,V> oldbase = h.node; //生成新的HeadIndex节点 //正常情况下,循环只会执行一次,如果由于其他线程的并发操作导致 oldLevel 的值不稳定,那么会执行多次循环体 for (int j = oldLevel+1; j <= level; ++j) newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j); //尝试更新头索引的值 if (casHead(h, newh)) { h = newh; idx = idxs[level = oldLevel]; break; } } } //将新增的idx插入跳表之中 splice: for (int insertionLevel = level;;) { int j = h.level; //获取最高level //查找需要清理的索引 for (Index<K,V> q = h, r = q.right, t www.michenggw.com= idx;;) { //其他线程并发操作导致头结点被删除,直接退出外层循环 //这种情况发生的概率很小,除非并发量实在太大 if (q == null || t == null) break splice; //判断q是否为尾结点 if (r != null) { Node<K,V> n = r.node; //获取索引r对应的结点n // compare before deletion check avoids needing recheck int c = cpr(cmp, key, n.key); //比较新增结点的key与n.key的大小 //判断n是否要被删除 if (n.value == null) { if (!q.unlink(r)) //删除r索引 break; r = q.right; continue; } //c大于0,说明当前结点n.key小于新增结点的key // 继续向右查找一个结点的key大于新增的结点key的索引 //新结点对应的索引要插入q,r之间 if (c > 0) { q = r; r = r.right; continue; } } //判断是否 if (j == insertionLevel) { //尝试着将 t 插在 q 和 r 之间,如果失败了,退出内循环重试 if (!q.link(r, t)) break; // restart //如果插入完成后,t 结点被删除了,那么结束插入操作 if (t.node.value == null) { // 查找节点,查找过程中会删除需要删除的节点 findNode(key); break splice; } //到达最底层,没有要插入新索引的索引层了 if (--insertionLevel == 0) break splice; } //向下继续链接其它index 层 if (--j >= insertionLevel && j < level) t = t.down; q = q.down; r = q.right; } } } return null; } //查找key在map中的位置的前驱结点 private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) { if (key == null) throw new NullPointerException(); // don't postpone errors for (;;) { //获取head头索引,遍历跳表 for (Index<K,V> q = head, r = q.right, d;;) { //判断是否有后继索引,若有 if (r != null) { Node<K,V> n = r.node; //获取索引对应的结点 K k = n.key; //获取结点对应的key //判断结点的value是够为null,为null表示结点已经被删除了 //一个结点从链表中被删除时,会将value赋为null,因为此时跳表其他层级的索引还可能 //会引用着该结点,value赋null,标识该结点已经被废弃,对应的索引(如果存在)也应该从跳表中删除 //那么调用unlink方法删除索引 if (n.value == null) { //尝试将r索引移除出跳表,成功就继续查找 //失败就继续尝试 if (!q.unlink(r)) break; // restart r = q.right; // reread r continue; } //比较当前索引中结点的键k与key的大小 //若key大于k,则继续查找比较下个索引 if (cpr(cmp, key, k) > 0) { q = r; r = r.right; //下个右边索引 continue; } } //若是key小于k,则判断是够到达跳表的最底层的上一层, //若是到达最底层的上一层,则说明当前索引就是key的前驱索引,node就是对应的结点 //若不是最底层的上一层,则继续比较查找 if ((d = q.down) == null) return q.node; q = d; r = d.right; } } } //Index中的方法,断开两个索引之间的联系 final boolean unlink(Index<K,V> succ) { return node.value !=