并发容器学习—ConcurrentSkipListMap与ConcurrentSkipListSet 原

时间:2022-09-21 09:56:42
一、ConcurrentSkipListMap并发容器
1.ConcurrentSkipListMap的底层数据结构
    要学习ConcurrentSkipListMap,首先要知道什么是跳表或跳跃表(SkipList),跳表链表的升级版,且是有序的,另外跳表还是一种不论查找、添加或是删除效率可以和平衡二叉树媲美的层次结构(其时间复杂度是O(n)=log2n),更重要的是,跳表的实现要简单明了的多。
并发容器学习—ConcurrentSkipListMap与ConcurrentSkipListSet 原
    上图就是一个跳表的层次结构图,跳表的特征有:
    1.跳表分为若干层,层级越高跳跃性越大。
    2.跳表的最底层的链表包含所有数据。
    3.跳表是有序的。
    4.跳表的头结点永远指向最高层的第一个元素。
    下面在来看看跳表的索引结点的结构:一般分为三个区域:两个个指针域和一个数据存储区域;两个指针域分别是指向同层级下个索引的的next指针,和指向下个层级拥有相同数据的的down指针,如下图所示。
并发容器学习—ConcurrentSkipListMap与ConcurrentSkipListSet 原
    跳表的查找相对简单明了的多,从最高层的头指针开始查询,例如下图查找14,从最高层3开始找,14大于3,找下个22,14小于22,说明最高层没有14这个数。进入下一层,14比8大,比22小说明这一层也没有该元素,在进入下一层。比8大往后找,找到14。
并发容器学习—ConcurrentSkipListMap与ConcurrentSkipListSet 原
    下图展示了14的查找路线,红色线条表示对比后比14大,不进入索引的线路。
并发容器学习—ConcurrentSkipListMap与ConcurrentSkipListSet 原
 
2.底层跳表的实现
    最底层链表结点Node的定义:
 
static final class Node<K,V> {     final K key;    //存储键     volatile Object value;    //存储值     volatile Node<K,V> next;    //下个结点     Node(K key, Object value, Node<K,V> next) {         this.key = key;         this.value = value;         this.next = next;     }     Node(Node<K,V> next) {         this.key = null;         this.value = this;         this.next = next;     }         //不安全的操作变量,用于实现CAS操作     private static final sun.misc.Unsafe UNSAFE;     //value的内存地址偏移量     private static final long valueOffset;     //下个结点的内存地址偏移量     private static final long nextOffset;     static {         try {             UNSAFE = sun.misc.Unsafe.getUnsafe();             Class<?> k = Node.class;             valueOffset = UNSAFE.objectFieldOffset                 (k.getDeclaredField("value"));             nextOffset = UNSAFE.objectFieldOffset                 (k.getDeclaredField("next"));         } catch (Exception e) {             throw new Error(e);         }     } } 

 

    Index的定义:

static class Index<K,V> {     final Node<K,V> node;    //链表中结点的引用     final Index<K,V> down;    //指向下一层的Index索引     volatile Index<K,V> right;    //右边的索引     Index(Node<K,V> node, Index<K,V> down, Index<K,V> right) {         this.node = node;         this.down = down;         this.right = right;     }     //不安全的操作变量     private static final sun.misc.Unsafe UNSAFE;     private static final long rightOffset;    //right的内存偏移量     static {         try {             UNSAFE = sun.misc.Unsafe.getUnsafe();             Class<?> k = Index.class;             rightOffset = UNSAFE.objectFieldOffset                 (k.getDeclaredField("right"));         } catch (Exception e) {             throw new Error(e);         }     } }
 
 
    HeadIndex是专门用在每个层级的头结点上的定义,它是继承了Index的基础上增加一个代表层级的level属性:
 
static final class HeadIndex<K,V> extends Index<K,V> {     final int level;    //层级索引     HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {         super(node, down, right);         this.level = level;     } }

 

 
    HeadIndex、Index和Node之间的联系,如下图所示:
并发容器学习—ConcurrentSkipListMap与ConcurrentSkipListSet 原
 
3.ConcurrentSkipListMap的继承关系
    ConcurrentSkipListMap的继承关系如下图所示,它继承了AbstractMap类,并且实现了 ConcurrentNavigableMap和ConcurrentMap接口。由此可知,ConcurrentSkipListMap必然是个有序的、并发操作安全的、具有伸缩视图功能的Map集合。
并发容器学习—ConcurrentSkipListMap与ConcurrentSkipListSet 原
    之前数据结构及源码的学习过程中,上图的大部分接口及抽象类都已经学习过,仅有一个 ConcurrentNavigableMap接口没有了解过:
 
public interface ConcurrentNavigableMap<K,V>     extends ConcurrentMap<K,V>, NavigableMap<K,V> {     //返回当前map的子map,即将fromKey到toKey的子集合截取出来,fromInclusive和toInclusive     //表示子集合中是否包含fromKey和toKey这两个临界键值对     ConcurrentNavigableMap<K,V> subMap(K fromKey, boolean fromInclusive,                                        K toKey,   boolean toInclusive);     //返回不大于toKey的子集合,是否包含toKey由inclusive决定     ConcurrentNavigableMap<K,V> headMap(K toKey, boolean inclusive);     //返回不小于fromKey的子集合,是否包含fromKey由inclusive决定     ConcurrentNavigableMap<K,V> tailMap(K fromKey, boolean inclusive);     //返回大约fromKey,且小于等于toKey的子集合     ConcurrentNavigableMap<K,V> subMap(K fromKey, K toKey);     //返回小于toKey的子集合     ConcurrentNavigableMap<K,V> headMap(K toKey);     //返回大于等于fromKey的子集合     ConcurrentNavigableMap<K,V> tailMap(K fromKey);         //返回一个排序是反序的map(即将原本正序的Map进行反序排列)     ConcurrentNavigableMap<K,V> descendingMap();     //返回所有key组成的set集合     public NavigableSet<K> navigableKeySet();     //返回所有key组成的set集合     NavigableSet<K> keySet();     //返回一个key的set视图,并且这个set中key是反序的     public NavigableSet<K> descendingKeySet(); } 

 

    了解了接口,再来看ConcurrentSkipListMap的构造方法及一些重要的属性:
 
public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>     implements ConcurrentNavigableMap<K,V>, Cloneable, Serializable {     //特殊值,用于     private static final Object BASE_HEADER = new Object();     //跳表的头索引     private transient volatile HeadIndex<K,V> head;     //比较器,用于key的比较,若为null,则使用自然排序(key自带的Comparable实现)     final Comparator<? super K> comparator;     //键的视图     private transient KeySet<K> keySet;          //键值对的视图     private transient EntrySet<K,V> entrySet;          //值的视图     private transient Values<V> values;     //反序排列的map     private transient ConcurrentNavigableMap<K,V> descendingMap;     private static final int EQ = 1;     private static final int LT = 2;     private static final int GT = 0;     //空构造     public ConcurrentSkipListMap() {         this.comparator = null;         initialize();     }     //带比较器的构造方法     public ConcurrentSkipListMap(Comparator<? super K> comparator) {         this.comparator = comparator;         initialize();     }     //带初始元素集合的构造方法     public ConcurrentSkipListMap(Map<? extends K, ? extends V> m) {         this.comparator = null;         initialize();         putAll(m);     }     //使用m集合的比较器构造方法     public ConcurrentSkipListMap(SortedMap<K, ? extends V> m) {         this.comparator = m.comparator();         initialize();         buildFromSorted(m);     }     //初始化变量     private void initialize() {         keySet = null;             entrySet = null;         values = null;         descendingMap = null;         //新建HeadIndex结点,head指向该头结点         head = new HeadIndex<K,V>(new Node<K,V>(null, BASE_HEADER, null),                               null, null, 1);     } } 

 

4.put的过程
    
public V put(K key, V value) {     if (value == null)    //这里可以知道Value要求不能为null         throw new NullPointerException();     return doPut(key, value, false);    //实际执行put操作的方法 } private V doPut(K key, V value, boolean onlyIfAbsent) {     Node<K,V> z;             // added node     if (key == null)    //有序的map,key不能为null,否则无法比较         throw new NullPointerException();     Comparator<? super K> cmp = comparator;    //获取比较器,若无则为null,使用自然排序     outer: for (;;) {    //外层循环         //findPredecessor查找key的前驱索引对应的结点b         for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {             //判断b结点是否是链表的尾结点,即b结点是否还有后继结点             //若有后继结点(n不为null表示有后继结点),那么要将key对应的结点插入b和n之间             //若b为尾结点,则key对应结点将为新的尾结点             if (n != null) {                 Object v; int c;                 Node<K,V> f = n.next;    //获取n的后继结点                 //判断n还是不是b的后继结点                 //若n不在是b的后继结点,说明已经有线程抢先在b结点之后插入一个新结点                 //退出内层循环,重新定位插入                 if (n != b.next)                                   break;                 //判断n结点是否要被删除(value为null表示逻辑删除),辅助删除后重新定位key                 if ((v = n.value) == null) {   // n is deleted                     n.helpDelete(b, f);    //让当前线程去辅助删除(物理删除)                     break;                 }                 //判断b是否要被删除,若要被删除,则退出当前循环,重新定位key                 if (b.value == null || v == n) // b is deleted                     break;                 //判断n结点的键与key的大小                 //这里开始精准定位key在链表中的位置,找出key实际在链表中的前驱结点                 //key比n结点的key大,则继续比较后续结点的key                 if ((c = cpr(cmp, key, n.key)) > 0) {                     b = n;                     n = f;                     continue;                 }                 //若key与n.key相等(c==0),说明key对应的结点已经存在                 //那么根据onlyIfAbsent来决定是否覆盖旧value值                 if (c == 0) {                     if (onlyIfAbsent || n.casValue(v, value)) {                         @SuppressWarnings("unchecked") V vv = (V)v;                         return vv;    //更新成功,直接返回旧值                     }                     break;     //更新失败继续循环尝试                 }                 // else c < 0; fall through             }             //到这说明b就是key的前驱结点             //新建key的结点z,并尝试更新b的next为z,成功就退出外层循环             //更新失败继续尝试             z = new Node<K,V>(key, value, n);             if (!b.casNext(n, z))                 break;         // restart if lost race to append to b             break outer;         }     }     /**     * 上面的步骤是将新结点插入到跳表最底层的链表中     * 接下来则是要向上生成各层的索引,是否生成某一层次的索引通过抛硬币的方式     * 决定,即从最底层开始随机决定是否在上一层建议索引,若为true则在上一层生成索引     * 并且继续抛硬币决定是否再往上层建立索引;若失败则布建立索引,直接结束     */     //生成一个随机数     int rnd = ThreadLocalRandom.nextSecondarySeed();     //判断是否要更新层级(随机数必须是正偶数才更新层级)     //这里决定的是当前新增的结点是否要向上建立索引     if ((rnd & 0x80000001) == 0) { // test highest and lowest bits         int level = 1, max;                  //决定level的等级,也就是结点要向上建立基层索引         //level的值有rnd从低二位开始有几个连续的1决定         //例如:rnd==0010 0100 0001 1110 那么level就为5         //由此可以看出跳表的最高层次不能超过31层(level<=31)         while (((rnd >>>= 1) & 1) != 0)             ++level;         Index<K,V> idx = null;         HeadIndex<K,V> h = head;                  //判断level是否超过当前跳表的最高等级(即是否大于头索引的level)         //若未超过,那么直接建立一个从1到level的纵向索引列         if (level <= (max = h.level)) {             //新建一个索引列,后建的索引的down指针指向前一个索引             for (int i = 1; i <= level; ++i)                 idx = new Index<K,V>(z, idx, null);         }         //若新建结点的level大于头结点的level,则要新建一个新的level层         else { // try to grow by one level             level = max + 1; // 只能新增一层,为一个结点就新增2层以上没有意义             @SuppressWarnings("unchecked")             Index<K,V>[] idxs = (Index<K,V>[])new Index<?,?>[level+1];             //新建一个结点z的索引列,并将这个索引列的所有索引都放到数组中             for (int i = 1; i <= level; ++i)                     idxs[i] = idx = new Index<K,V>(z, idx, null);             for (;;) {                 h = head;    //获取头索引                 int oldLevel = h.level;      //获取旧层次level                   //判断旧层次level是否发生改变(即是否有其他线程抢先更新了跳表的level,出现竞争)                 if (level <= oldLevel) // lost race to add level                     break;    //竞争失败,直接结束当前循环                 HeadIndex<K,V> newh = h;                 Node<K,V> oldbase = h.node;                 //生成新的HeadIndex节点                 //正常情况下,循环只会执行一次,如果由于其他线程的并发操作导致 oldLevel 的值不稳定,那么会执行多次循环体                 for (int j = oldLevel+1; j <= level; ++j)                     newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);                 //尝试更新头索引的值                 if (casHead(h, newh)) {                     h = newh;                     idx = idxs[level = oldLevel];                     break;                 }             }         }         //将新增的idx插入跳表之中         splice: for (int insertionLevel = level;;) {             int j = h.level;    //获取最高level             //查找需要清理的索引             for (Index<K,V> q = h, r = q.right, t www.michenggw.com= idx;;) {                 //其他线程并发操作导致头结点被删除,直接退出外层循环                 //这种情况发生的概率很小,除非并发量实在太大                 if (q == null || t == null)                     break splice;                 //判断q是否为尾结点                 if (r != null) {                     Node<K,V> n = r.node;    //获取索引r对应的结点n                     // compare before deletion check avoids needing recheck                     int c = cpr(cmp, key, n.key);    //比较新增结点的key与n.key的大小                     //判断n是否要被删除                     if (n.value == null) {                         if (!q.unlink(r))    //删除r索引                             break;                         r = q.right;                         continue;                     }                     //c大于0,说明当前结点n.key小于新增结点的key                     // 继续向右查找一个结点的key大于新增的结点key的索引                     //新结点对应的索引要插入q,r之间                     if (c > 0) {                         q = r;                         r = r.right;                         continue;                     }                 }                 //判断是否                 if (j == insertionLevel) {                     //尝试着将 t 插在 q 和 r 之间,如果失败了,退出内循环重试                     if (!q.link(r, t))                         break; // restart                     //如果插入完成后,t 结点被删除了,那么结束插入操作                     if (t.node.value == null) {                         // 查找节点,查找过程中会删除需要删除的节点                         findNode(key);                         break splice;                     }                     //到达最底层,没有要插入新索引的索引层了                     if (--insertionLevel == 0)                         break splice;                 }                 //向下继续链接其它index 层                 if (--j >= insertionLevel && j < level)                     t = t.down;                 q = q.down;                 r = q.right;             }         }     }     return null; } //查找key在map中的位置的前驱结点 private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) {     if (key == null)         throw new NullPointerException(); // don't postpone errors     for (;;) {         //获取head头索引,遍历跳表         for (Index<K,V> q = head, r = q.right, d;;) {             //判断是否有后继索引,若有             if (r != null) {                     Node<K,V> n = r.node;    //获取索引对应的结点                 K k = n.key;    //获取结点对应的key                 //判断结点的value是够为null,为null表示结点已经被删除了                 //一个结点从链表中被删除时,会将value赋为null,因为此时跳表其他层级的索引还可能                 //会引用着该结点,value赋null,标识该结点已经被废弃,对应的索引(如果存在)也应该从跳表中删除                 //那么调用unlink方法删除索引                 if (n.value == null) {                     //尝试将r索引移除出跳表,成功就继续查找                     //失败就继续尝试                     if (!q.unlink(r))                         break;           // restart                     r = q.right;         // reread r                     continue;                 }                 //比较当前索引中结点的键k与key的大小                 //若key大于k,则继续查找比较下个索引                 if (cpr(cmp, key, k) > 0) {                     q = r;                     r = r.right;    //下个右边索引                     continue;                 }             }             //若是key小于k,则判断是够到达跳表的最底层的上一层,             //若是到达最底层的上一层,则说明当前索引就是key的前驱索引,node就是对应的结点             //若不是最底层的上一层,则继续比较查找             if ((d = q.down) == null)                 return q.node;             q = d;             r = d.right;         }     } } //Index中的方法,断开两个索引之间的联系 final boolean unlink(Index<K,V> succ) {     return node.value != null && casRight(succ, succ.right); } //CAS方式更新当前索引的右边索引