ConcurrentHashMap源码阅读

时间:2025-02-14 15:56:09
  • //调用该扩容方法的地方有:
  • //juc.C13Map#addCount 向集合中插入新数据后更新容量计数时发现到达扩容阈值而触发的扩容
  • //juc.C13Map#helpTransfer 扩容状态下其他线程对集合进行插入、修改、删除、合并、compute 等操作时遇到 ForwardingNode 节点时触发的扩容
  • //juc.C13Map#tryPresize putAll批量插入或者插入后发现链表长度达到8个或以上,但数组长度为64以下时触发的扩容
  • private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
  • int n = , stride;
  • //计算每条线程处理的桶个数,每条线程处理的桶数量一样,如果CPU为单核,则使用一条线程处理所有桶
  • //每条线程至少处理16个桶,如果计算出来的结果少于16,则一条线程处理16个桶
  • if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
  • stride = MIN_TRANSFER_STRIDE; // subdivide range
  • if (nextTab == null) { // 初始化新数组(原数组长度的2倍)
  • try {
  • @SuppressWarnings("unchecked")
  • Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
  • nextTab = nt;
  • } catch (Throwable ex) { // try to cope with OOME
  • sizeCtl = Integer.MAX_VALUE;
  • return;
  • }
  • nextTable = nextTab;
  • //将 transferIndex 指向最右边的桶,也就是数组索引下标最大的位置
  • transferIndex = n;
  • }
  • int nextn = ;
  • ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
  • boolean advance = true;//该标识用于控制是否继续处理下一个桶,为 true 则表示已经处理完当前桶,可以继续迁移下一个桶的数据
  • boolean finishing = false;//该标识用于控制扩容何时结束,最后一个扩容线程会负责重新检查一遍数组查看是否有遗漏的桶
  • //这个循环用于处理一个 stride 长度的任务,i : stride 内最大下标, bound : stride 内最小下标
  • //通过循环不断减小 i 的值,从右往左依次迁移桶上面的数据,直到 i 小于 bound 时结束本次长度为 stride 的迁移任务
  • //结束这次任务后会通过外层 addCount、helpTransfer、tryPresize 方法的 while 循环达到继续领取其他任务的效果
  • for (int i = 0, bound = 0;;) {
  • Node<K,V> f; int fh;
  • while (advance) {
  • int nextIndex, nextBound;
  • //每处理完一个hash桶就将 i 进行减 1 操作
  • if (--i >= bound || finishing)
  • advance = false;
  • else if ((nextIndex = transferIndex) <= 0) {
  • //transferIndex <= 0 说明数组的hash桶已被线程分配完毕,没有待分配的hash桶,将 i 设置为 -1 ,后面的代码根据这个数值退出当前线的扩容操作
  • i = -1;
  • advance = false;
  • }
  • //首次for循环进入,设置 bound 和 i 的值,领取迁移任务的数组区间
  • else if ((this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
  • bound = nextBound;
  • i = nextIndex - 1;
  • advance = false;
  • }
  • }
  • if (i < 0 || i >= n || i + n >= nextn) {
  • int sc;
  • //扩容结束后做后续工作,将 nextTable 设置为 null,表示扩容已结束,将 table 指向新数组,sizeCtl 设置为扩容阈值
  • if (finishing) {
  • nextTable = null;
  • table = nextTab;
  • sizeCtl = (n << 1) - (n >>> 1);
  • return;
  • }
  • //每当一条线程扩容结束就会更新一次 sizeCtl 的值,进行减 1 操作
  • if ((this, SIZECTL, sc = sizeCtl, sc - 1)) {
  • //(sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT 成立,
  • // 说明该线程不是扩容大军里面的最后一条线程,直接return回到上层
  • // addCount/helpTransfer/tryPresize的while循环
  • if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
  • return;
  • //(sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT 说明这条线程是最后一条扩容线程
  • //之所以能用这个来判断是否是最后一条线程,因为第一条扩容线程进行了如下操作:
  • // (this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)
  • //除了修改结束标识之外,还得设置 i = n; 以便重新检查一遍数组,防止有遗漏未成功迁移的桶
  • finishing = advance = true;
  • i = n; // recheck before commit
  • }
  • }
  • else if ((f = tabAt(tab, i)) == null)
  • //遇到数组上空的位置直接放置一个ForwardingNode占位对象,以便查询操作的转发和标识当前处于扩容状态
  • advance = casTabAt(tab, i, null, fwd);
  • else if ((fh = ) == MOVED)
  • //数组上遇到hash值为MOVED=-1 的位置,说明该位置已经被其他线程迁移过了,将 advance 设置为 true ,以便继续往下一个桶检查并进行迁移操作
  • advance = true; // already processed
  • else {
  • synchronized (f) {
  • if (tabAt(tab, i) == f) {
  • Node<K,V> ln, hn;
  • //该节点为链表结构
  • if (fh >= 0) {
  • int runBit = fh & n;
  • Node<K,V> lastRun = f;
  • //遍历整条链表,找出 lastRun 节点
  • for (Node<K,V> p = ; p != null; p = ) {
  • int b = & n;
  • if (b != runBit) {
  • runBit = b;
  • lastRun = p;
  • }
  • }
  • //根据 lastRun 节点的高位标识(0 或 1),首先将 lastRun设置为 ln 或者 hn 链的末尾部分节点,后续的节点使用头插法拼接
  • if (runBit == 0) {
  • ln = lastRun;
  • hn = null;
  • }
  • else {
  • hn = lastRun;
  • ln = null;
  • }
  • //使用高位和低位两条链表进行迁移,使用头插法拼接链表
  • for (Node<K,V> p = f; p != lastRun; p = ) {
  • int ph = ; K pk = ; V pv = ;
  • if ((ph & n) == 0)
  • ln = new Node<K,V>(ph, pk, pv, ln);
  • else
  • hn = new Node<K,V>(ph, pk, pv, hn);
  • }
  • //setTabAt方法调用的是 Unsafe 类的 putObjectVolatile 方法
  • //使用 volatile 方式的 putObjectVolatile 方法,能够将数据直接更新回主内存,并使得其他线程工作内存的对应变量失效,达到各线程数据及时同步的效果
  • //使用 volatile 的方式将 ln 链设置到新数组下标为 i 的位置上
  • setTabAt(nextTab, i, ln);
  • //使用 volatile 的方式将 hn 链设置到新数组下标为 i + n(n为原数组长度) 的位置上
  • setTabAt(nextTab, i + n, hn);
  • //迁移完成后使用 volatile 的方式将占位对象设置到该 hash 桶上,该占位对象的用途是标识该hash桶已被处理过,以及查询请求的转发作用
  • setTabAt(tab, i, fwd);
  • //advance 设置为 true 表示当前 hash 桶已处理完,可以继续处理下一个 hash 桶
  • advance = true;
  • }
  • //该节点为红黑树结构
  • else if (f instanceof TreeBin) {
  • TreeBin<K,V> t = (TreeBin<K,V>)f;
  • //lo 为低位链表头结点,loTail 为低位链表尾结点,hi 和 hiTail 为高位链表头尾结点
  • TreeNode<K,V> lo = null, loTail = null;
  • TreeNode<K,V> hi = null, hiTail = null;
  • int lc = 0, hc = 0;
  • //同样也是使用高位和低位两条链表进行迁移
  • //使用for循环以链表方式遍历整棵红黑树,使用尾插法拼接 ln 和 hn 链表
  • for (Node<K,V> e = ; e != null; e = ) {
  • int h = ;
  • //这里面形成的是以 TreeNode 为节点的链表
  • TreeNode<K,V> p = new TreeNode<K,V>
  • (h, , , null, null);
  • if ((h & n) == 0) {
  • if (( = loTail) == null)
  • lo = p;
  • else
  • = p;
  • loTail = p;
  • ++lc;
  • }
  • else {
  • if (( = hiTail) == null)
  • hi = p;
  • else
  • = p;
  • hiTail = p;
  • ++hc;
  • }
  • }
  • //形成中间链表后会先判断是否需要转换为红黑树:
  • //1、如果符合条件则直接将 TreeNode 链表转为红黑树,再设置到新数组中去
  • //2、如果不符合条件则将 TreeNode 转换为普通的 Node 节点,再将该普通链表设置到新数组中去
  • //(hc != 0) ? new TreeBin<K,V>(lo) : t 这行代码的用意在于,如果原来的红黑树没有被拆分成两份,那么迁移后它依旧是红黑树,可以直接使用原来的 TreeBin 对象
  • ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
  • (hc != 0) ? new TreeBin<K,V>(lo) : t;
  • hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
  • (lc != 0) ? new TreeBin<K,V>(hi) : t;
  • //setTabAt方法调用的是 Unsafe 类的 putObjectVolatile 方法
  • //使用 volatile 方式的 putObjectVolatile 方法,能够将数据直接更新回主内存,并使得其他线程工作内存的对应变量失效,达到各线程数据及时同步的效果
  • //使用 volatile 的方式将 ln 链设置到新数组下标为 i 的位置上
  • setTabAt(nextTab, i, ln);
  • //使用 volatile 的方式将 hn 链设置到新数组下标为 i + n(n为原数组长度) 的位置上
  • setTabAt(nextTab, i + n, hn);
  • //迁移完成后使用 volatile 的方式将占位对象设置到该 hash 桶上,该占位对象的用途是标识该hash桶已被处理过,以及查询请求的转发作用
  • setTabAt(tab, i, fwd);
  • //advance 设置为 true 表示当前 hash 桶已处理完,可以继续处理下一个 hash 桶
  • advance = true;
  • }
  • }
  • }
  • }
  • }
  • }