jdk源码->集合->ConcurrentHashMap

时间:2022-02-05 17:19:30

类的属性

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable {
private static final long serialVersionUID = 7249069246763182397L;
// 表的最大容量
private static final int MAXIMUM_CAPACITY = 1 << 30;
// 默认表的大小
private static final int DEFAULT_CAPACITY = 16;
// 最大数组大小
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
// 默认并发数
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
// 装载因子
private static final float LOAD_FACTOR = 0.75f;
// 链表转化为红黑树的阈值
static fial int TREEIFY_THRESHOLD = 8;
// 红黑树转化为链表的阈值
static final int UNTREEIFY_THRESHOLD = 6;
//当数组长度还未超过64,优先数组的扩容,否则将链表转为红黑树
static final int MIN_TREEIFY_CAPACITY = 64;
//扩容时任务的最小转移节点数
private static final int MIN_TRANSFER_STRIDE = 16;
// sizeCtl中记录stamp的位数
private static int RESIZE_STAMP_BITS = 16;
// 进行扩容所允许的最大线程数
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
// 记录sizeCtl中的大小所需要进行的偏移位数
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
// 一系列的标识
static final int MOVED = -1; // 结点扩容时,设置的占位结点的hash值
static final int TREEBIN = -2; // 链表已经转化为了红黑树,那么桶中第一个元素的hash值(也就是数组中的结点)设置为TREEBIN
static final int RESERVED = -3; // hash for transient reservations
static final int HASH_BITS = 0x7fffffff; // 即为int的最大值:2的31次幂-1
//
/** Number of CPUS, to place bounds on some sizings */
// 获取可用的CPU个数
static final int NCPU = Runtime.getRuntime().availableProcessors();
//
/** For serialization compatibility. */
// 进行序列化的属性
private static final ObjectStreamField[] serialPersistentFields = {
new ObjectStreamField("segments", Segment[].class),
new ObjectStreamField("segmentMask", Integer.TYPE),
new ObjectStreamField("segmentShift", Integer.TYPE)
}; // volatile修饰table,保证对table[i]的修改对其他线程是可见的
transient volatile Node<K,V>[] table;
// 下一个表
private transient volatile Node<K,V>[] nextTable;
//
/**
* Base counter value, used mainly when there is no contention,
* but also as a fallback during table initialization
* races. Updated via CAS.
*/
// 基本计数
private transient volatile long baseCount;
/**
sizeCtl是一个控制标志符:
负数代表正在进行初始化或扩容操作
-1代表正在初始化
-N 表示有N-1个线程正在进行扩容操作
正数或0代表hash表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小,这一点类似于扩容阈值的概念。还后面可以看到,它的值始终是当前ConcurrentHashMap容量的0.75倍,这与loadfactor是对应的
**/
private transient volatile int sizeCtl; /**
* The next table index (plus one) to split while resizing.
*/
// 扩容时用到,初始时为table.length,表示从索引 0 到transferIndex的节点还未转移
private transient volatile int transferIndex; /**
* Spinlock (locked via CAS) used when resizing and/or creating CounterCells.
*/
// 旋转锁
private transient volatile int cellsBusy; /**
* Table of counter cells. When non-null, size is a power of 2.
*/
// counterCell表
private transient volatile CounterCell[] counterCells; // views
// 视图
private transient KeySetView<K,V> keySet;
private transient ValuesView<K,V> values;
private transient EntrySetView<K,V> entrySet; // Unsafe 静态块
private static final sun.misc.Unsafe U;
private static final long SIZECTL;
private static final long TRANSFERINDEX;
private static final long BASECOUNT;
private static final long CELLSBUSY;
private static final long CELLVALUE;
private static final long ABASE; //node数组第一个结点的地址
private static final int ASHIFT; //node数组结点相对第一个结点的偏移地址 static {
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> k = ConcurrentHashMap.class;
//获取ConcurrentHashMap这个对象字段sizeCtl在内存中的偏移量
SIZECTL = U.objectFieldOffset
(k.getDeclaredField("sizeCtl"));
TRANSFERINDEX = U.objectFieldOffset
(k.getDeclaredField("transferIndex"));
BASECOUNT = U.objectFieldOffset
(k.getDeclaredField("baseCount"));
CELLSBUSY = U.objectFieldOffset
(k.getDeclaredField("cellsBusy"));
Class<?> ck = CounterCell.class;
CELLVALUE = U.objectFieldOffset
(ck.getDeclaredField("value"));
Class<?> ak = Node[].class;
//可以获取数组第一个元素的偏移地址
ABASE = U.arrayBaseOffset(ak);
int scale = U.arrayIndexScale(ak);
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
} catch (Exception e) {
throw new Error(e);
}
}
}

内部结点类

Node结点

    //最基础的结点,后面三种都继承与它,hash,key,val,next都可以在初始化的时候指定
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
//保证在更新结点的值后对其他线程可见
volatile V val;
//保证在更新结点的next域后对其他线程可见,例如:put方法在尾部添加结点
volatile Node<K,V> next; Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
//在其子类结点中,find方法都将被重写,对于不同的结构(链表,红黑树,空结点)都有不同的find实现
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}

ForwardingNode结点

    //在扩容时,设置的一个空结点(占位结点)
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
//ForwardingNode的属性不能指定,其hash设计为了MOVED
super(MOVED, null, null, null);
this.nextTable = tab;
}
//ForwardingNode的find方法,去newTable中查找
Node<K,V> find(int h, Object k) {
// loop to avoid arbitrarily deep recursion on forwarding nodes
outer: for (Node<K,V>[] tab = nextTable;;) {
Node<K,V> e; int n;
if (k == null || tab == null || (n = tab.length) == 0 ||
(e = tabAt(tab, (n - 1) & h)) == null)
return null;
for (;;) {
int eh; K ek;
if ((eh = e.hash) == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
if (eh < 0) {
if (e instanceof ForwardingNode) {
tab = ((ForwardingNode<K,V>)e).nextTable;
continue outer;
}
else
return e.find(h, k);
}
if ((e = e.next) == null)
return null;
}
}
}
}

TreeNode结点

    //链表转换为红黑树的时候,先将链表中的所有Node转换为Treeode,也就是说TreeNode是保存key-value键值对的红黑树结点,但此时红黑树尚未建立,需要TreeBin
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red; TreeNode(int hash, K key, V val, Node<K,V> next,
TreeNode<K,V> parent) {
//属性可以指定
super(hash, key, val, next);
this.parent = parent;
} Node<K,V> find(int h, Object k) {
return findTreeNode(h, k, null);
} //红黑树的实际find方法
final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {
if (k != null) {
TreeNode<K,V> p = this;
do {
int ph, dir; K pk; TreeNode<K,V> q;
TreeNode<K,V> pl = p.left, pr = p.right;
if ((ph = p.hash) > h)
p = pl;
else if (ph < h)
p = pr;
else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
return p;
else if (pl == null)
p = pr;
else if (pr == null)
p = pl;
else if ((kc != null ||
(kc = comparableClassFor(k)) != null) &&
(dir = compareComparables(kc, k, pk)) != 0)
p = (dir < 0) ? pl : pr;
else if ((q = pr.findTreeNode(h, k, kc)) != null)
return q;
else
p = pl;
} while (p != null);
}
return null;
}
}

TreeBin结点

    //TreeBin不负责包装key-value键值对,它代替了TreeNode的根节点,也就是说在实际的ConcurrentHashMap“数组”中,存放的是TreeBin对象
static final class TreeBin<K,V> extends Node<K,V> {
TreeNode<K,V> root;
volatile TreeNode<K,V> first;
volatile Thread waiter;
volatile int lockState;
// values for lockState
static final int WRITER = 1; // set while holding write lock
static final int WAITER = 2; // set when waiting for write lock
static final int READER = 4; // increment value for setting read lock //实例化TreeBin时传入TreeNode的根节点,构造TreeBin的作用是:将还是链表结构的TreeNode转换为了红黑树
TreeBin(TreeNode<K,V> b) {
//无法指定属性,TreeBin专属hashTREEBIN
super(TREEBIN, null, null, null);
this.first = b;
TreeNode<K,V> r = null;
for (TreeNode<K,V> x = b, next; x != null; x = next) {
next = (TreeNode<K,V>)x.next;
x.left = x.right = null;
if (r == null) {
x.parent = null;
x.red = false;
r = x;
}
else {
K k = x.key;
int h = x.hash;
Class<?> kc = null;
for (TreeNode<K,V> p = r;;) {
int dir, ph;
K pk = p.key;
if ((ph = p.hash) > h)
dir = -1;
else if (ph < h)
dir = 1;
else if ((kc == null &&
(kc = comparableClassFor(k)) == null) ||
(dir = compareComparables(kc, k, pk)) == 0)
dir = tieBreakOrder(k, pk);
TreeNode<K,V> xp = p;
if ((p = (dir <= 0) ? p.left : p.right) == null) {
x.parent = xp;
if (dir <= 0)
xp.left = x;
else
xp.right = x;
r = balanceInsertion(r, x);
break;
}
}
}
}
this.root = r;
assert checkInvariants(root);
}
//TreeBin的find方法主要用来进行同步控制,实质还是调用TreeNode的find方法
final Node<K,V> find(int h, Object k) {
if (k != null) {
for (Node<K,V> e = first; e != null; ) {
int s; K ek;
if (((s = lockState) & (WAITER|WRITER)) != 0) {
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
e = e.next;
}
else if (U.compareAndSwapInt(this, LOCKSTATE, s,
s + READER)) {
TreeNode<K,V> r, p;
try {
p = ((r = root) == null ? null :
r.findTreeNode(h, k, null));
} finally {
Thread w;
if (U.getAndAddInt(this, LOCKSTATE, -READER) ==
(READER|WAITER) && (w = waiter) != null)
LockSupport.unpark(w);
}
return p;
}
}
}
return null;
}
}

构造函数

ConcurrentHashMap()

public ConcurrentHashMap() {
}

ConcurrentHashMap(int)

    public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0) // 初始容量小于0,抛出异常
throw new IllegalArgumentException();
// 最接近initialCapacity的2的次幂值作为容量
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
// 初始化 sizeCtl为正,尚未初始化,其数值为容量大小
this.sizeCtl = cap;
}

ConcurrentHashMap(int, float, int)型构造函数

    public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) // 合法性判断
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel)
//并发级别要小于等于桶的个数
initialCapacity = concurrencyLevel;
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}

核心函数分析

putVal(K key, V value, boolean onlyIfAbsent)

final V putVal(K key, V value, boolean onlyIfAbsent) {
//合法性检验
if (key == null || value == null) throw new NullPointerException();
//hash = (key.hashCode ^ (key.hashCode >>> 16)) & HASH_BITS
int hash = spread(key.hashCode());
int binCount = 0;
//不断自旋,因为在initTable和casTabAt用到了compareAndSwapInt、compareAndSwapObject等CAS操作,同步线程竞争的时候,一些尝试就会失败,所以这边要加一个for循环,在失败后继续put结点
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 表为空或者表的长度为0
if (tab == null || (n = tab.length) == 0)
// 初始化表
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 表中key对应的桶为空
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null))) // CAS操作,实例化一个结点放在该桶中
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED) //说明该结点正在扩容中
// 去帮助扩容
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) { // 有点分段锁的味道,只是锁住了tab[i]
//这个判断是非常重要的,有可能在获得该桶的锁之前,正在进行在扩容,获得该桶的锁之后,桶中只有ForwardingNode,如果是这种情况,if判断失败,上述for循环继续put
if (tabAt(tab, i) == f) {
if (fh >= 0) { // 该table表中该结点的hash值大于0
// binCount赋值为1
binCount = 1;
for (Node<K,V> e = f;; ++binCount) { // 无限循环
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) { // 判断key是否相等
// 记住该结点的val值
oldVal = e.val;
if (!onlyIfAbsent) // 进行判断
// 将指定的value保存至结点,即进行了结点值的更新
e.val = value;
break;
}
// 保存当前结点
Node<K,V> pred = e;
if ((e = e.next) == null) { // 移向链表中的下一个结点,如果为null,就实例化一个结点尾插 pred.next = new Node<K,V>(hash, key,
value, null);
// 退出循环
break;
}
}
}
else if (f instanceof TreeBin) { // 结点为红黑树结点类型
Node<K,V> p;
// binCount赋值为2
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) { // 将hash、key、value放入红黑树
// 保存结点的val
oldVal = p.val;
if (!onlyIfAbsent) // 判断
// 赋值结点value值
p.val = value;
}
}
}
}
if (binCount != 0) { // binCount不为0
if (binCount >= TREEIFY_THRESHOLD) // 如果binCount大于等于转化为红黑树的阈值
// 进行转化
treeifyBin(tab, i);
if (oldVal != null) // 旧值不为空
// 返回旧值
return oldVal;
break;
}
}
}
// 增加binCount的数量
addCount(1L, binCount);
return null;
}

put流程:

  1. 计算当前key的hash值,根据hash值计算索引 i (i=(table.length - 1) & hash);
  2. 如果当前table为null,说明是第一次进行put操作,调用initTable()初始化table;
  3. 如果索引 i 位置的节点 f 为空,则直接把当前值作为新的节点直接插入到索引 i 位置;
  4. 如果节点 f 的hash为-1(f.hash == MOVED(-1)),说明当前节点处于移动状态(或者说是其他线程正在对 f 节点进行转移/扩容操作),此时调用helpTransfer(tab, f)帮助转移/扩容;
  5. 如果不属于上述条件,说明已经有元素存储到索引 i 处,此时需要对索引 i 处的节点 f 进行 put or update 操作,首先使用内置锁 synchronized 对节点 f 进行加锁:

    如果f.hash>=0,说明 i 位置是一个链表,并且节点 f 是这个链表的头节点,则对 f 节点进行遍历,此时分两种情况:

    --如果链表中某个节点e的hash与当前key的hash相同,则对这个节点e的value进行修改操作。

    --如果遍历到链表尾都没有找到与当前key的hash相同的节点,则把当前K-V作为一个新的节点插入到这个链表尾部。
  6. 如果节点 f 是TreeBin节点(f instanceof TreeBin),说明索引 i 位置的节点是一个红黑树,则调用putTreeVal方法找到一个已存在的节点进行修改,或者是把当前K-V放入一个新的节点(put or update)。
  7. 完成插入后,如果索引 i 处是一个链表,并且在插入新的节点后节点数>8,则调用treeifyBin把链表转换为红黑树。

    最后,调用addCount更新元素数量

Node<K,V>[] initTable()

    private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) { // 不断循环
i ((sc = sizeCtl) < 0f) // sizeCtl小于0,正在初始化,则进行线程让步等待
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // CAS操作,比较sizeCtl的值与sc是否相等,相等则用-1替换
try {
if ((tab = table) == null || tab.length == 0) { // table表为空或者大小为0
// sc的值即容量设定值是否大于0,若是,则n为sc,否则,n为默认初始容量
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
// 实例化结点数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
// 内部table引用指向该数组
table = tab = nt;
// sc为n * 0.75
sc = n - (n >>> 2);
}
} finally {
// 设置sizeCtl的值
sizeCtl = sc;
}
break;
}
}
// 返回table表
return tab;
}

tabAt(Node<K,V>[] tab, int i)

/*
*i为什么要等于((long)i << ASHIFT) + ABASE呢,计算偏移量
*ASHIFT是指tab[i]中第i个元素在相对数组第一个元素于的偏移量,而ABASE就算第一数组的内存素的偏移地址
*所以呢,((long)i << ASHIFT) + ABASE就算i最后的地址,因此tabAt函数拿到的是table[i]的最新值
*/
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v)

//compareAndSwapObject的作用就算tab[i]和c比较,如果相等就tab[i]=v否则tab[i]=c;
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

helpTransfer(Node<K,V>[] tab, Node<K,V> f)

//帮助其他线程进行转移操作
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
//计算操作栈校验码
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)//不需要帮助转移,跳出
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {//每当一个线程来帮助扩容,SIZECTL就+1
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}

transfer(Node<K,V>[] tab, Node<K,V>[] nextTab)

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
//转移幅度( tab.length/(NCPU*8) ),最小为16
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
//根据当前数组长度,新建一个两倍长度的数组nextTab
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;//初始为table的最后一个索引
}
int nextn = nextTab.length;
//初始化ForwardingNode节点,持有nextTab的引用,在处理完每个节点之后当做占位节点,表示该槽位已经处理过了
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;//一个标志位,可以控制任务领取和任务完成
boolean finishing = false; //本线程的任务是否完成
//自旋移动每个节点,从transferIndex开始移动stride个节点到新的table。
//i:当前处理的Node索引;bound:需要处理节点的索引边界
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
//nextIndex:下一个要处理的结点索引;nextBound:下一个要处理的结点索引边界
int nextIndex, nextBound;
//本地线程任务尚未达到边界,则继续执行,如果完成了任务,则进行其他if判断
if (--i >= bound || finishing)
advance = false;
//查看是否还有扩容任务,如果没有就置i等于-1,break while(advance)循环
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
//控制任务领取:每次线程领取扩容任务后,需要更新transferIndex的值(transferIndex-stride)。
//CAS修改transferIndex,并更新索引边界,防止领取重复的任务
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
//本地线程已经完成任务
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
//最后只有一个线程完成收尾工作
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);//1.5*n:扩容阈值设置为原来容量的1.5倍 依然相当于现在容量的0.75倍
//终于return
return;
}
//CAS进行将SIZECTL自减
if (U.compareAndSapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStawmp(n) << RESIZE_STAMP_SHIFT)
return;
//在前面helpTransfer中每来一个线程帮助扩容SIZECTL就+1,线程完毕就-1,所以当所有线程都执行完毕后,SIZRCTL就恢复了初始值resizeStawmp(n) << RESIZE_STAMP_SHIFT+2,也就宣布扩容完毕了,finishing=true
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);//i位置节点为空,替换为ForwardingNode节点,用于通知其他线程该位置已经处理
else if ((fh = f.hash) == MOVED)//节点已经被其他线程处理过,继续处理下一个节点
advance = true; // already processed
else {
synchronized (f) {
//判断很重要,防止该结点已经remove
if (tabAt(tab, i) == f) {
//两个Node引用,分别用来指向tab[i],tab[i+n]
Node<K,V> ln, hn;
if (fh >= 0) {//当前结点为链表结点
//链表结构的扩容过程可以见代码后面的图示分析
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {//*********1
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {// ********2
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
//TreeNode不仅有红黑树结构,其继承自Node,在初始化的时候也指定了next域,也是一个链表结构
for (Node<K,V> e = t.first; e != null; e = e.next) {
//下述过程与hashmap中链表扩容相同,不再累述
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
//ln结点的生成逻辑如下:
//(1)如果lo链表的元素个数小于等于UNTREEIFY_THRESHOLD,默认为6,则通过untreeify方法把树节点链表转化成普通节点链表;
//(2)否则判断hi链表中的元素个数是否等于0:如果等于0,表示lo链表中包含了所有原始节点,则设置原始红黑树给ln,否则根据lo链表重新构造红黑树。
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
//原理与上述相同
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}

链表的扩容过程如下:

使用fn&n可以快速把链表中的元素区分成两类,A类是hash值的第X位(X为n的最高位)为0,B类是hash值的第X位为1,并通过lastRun记录最后需要处理的节点,A类和B类节点可以分散到新数组的槽位i和i+n中,假设链表拉平显示如下:

jdk源码->集合->ConcurrentHashMap

1.通过代码标记1处的遍历,记录runBit和lastRun,分别为1和节点6,所以设置hn为节点6,ln为null;

2.通过代码标记2处的遍历,以lastRun节点为终止条件,根据第X位的值分别构造ln链表和hn链表

jdk源码->集合->ConcurrentHashMap

jdk源码->集合->ConcurrentHashMap

Transfer过程总结:

  1. 第一个执行的线程会首先设置sizeCtl属性为一个负值,然后执行transfer(tab, null),其他晚进来的线程会检查当前扩容是否已经完成(if(nextIndex = transferIndex) <= 0)),没完成则帮助进行扩容(U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ? nextIndex - stride : 0))),完成了则直接退出。
  2. 该ConcurrentHashMap的扩容操作可以允许多个线程并发执行,那么就要处理好任务的分配工作。每个线程获取一部分桶的迁移任务(nextIndex - stride),如果当前线程的任务完成,查看是否还有未迁移的桶(if(nextIndex = transferIndex) <= 0)),若有则继续领取任务执行,若没有则退出(i=-1,advance=false)。在退出时需要检查是否还有其他线程在参与迁移工作(if sc == resizeStawmp(n) << RESIZE_STAMP_SHIFT+2),如果有(即前面等式不成立)则自己什么也不做直接退出,如果没有了(advance = finishing = true,i=n)则执行最终的收尾工作(nextTable = null; table = nextTab;sizeCtl = (n << 1) - (n >>> 1)