ConcurrentHashMap JDK 1.6 源码分析

时间:2024-07-22 23:33:38

前言

前段时间把 JDK 1.6 中的 HashMap 主要的一些操作源码分析了一次。既然把 HashMap 源码分析了, 就顺便把 JDK 1.6 中 ConcurrentHashMap 的主要一些操作源码分析一下。因为其中有很多思想是值得我们去借鉴的。 ConcurrentHashMap 中的分段锁。这个思想在 JDK 1.8 中 为了优化 JUC 下的原子锁 CAS 高并发情况下导致自旋次数太多效率低下。引用 Adder 。其中就是借鉴了分段锁的思想。AtomicLong 对比 LongAdder。 有兴趣可以查看。

准备

如果有人问你了解 ConcurrentHashMap 吗? 你可以这样回答,了解。 ConcurrentHashMap 是为了 取代 HashMap非线程安全的,一种线程安全实现类。它有一个 Segment 数组,Segment 本身就是相当于一个 HashMap 对象。里面是一个 HashEntry 数组,数组中的每一个 HashEntry 都是一个键值对,也是一个链表的表头。如果别人问你,那 ConcurrentHashMap get 或者 put 一个对象的时候是怎么操作的 ,你该怎么回答。emmm..... 继续往下看。会有你要的答案。

构造函数

分析源码,先从构造函数开始。直接研究带所有参数的构造方法,其他一些重载的构造方法,最里面还是调用了该构造方法。在看构造方法之前,需要 明白 sshift 是表示并发数的2的几次方 比如并发数是16 那么他的值就是 4 。ssize 是 segment 数组的大小。

 public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
// 用来与 key的hashCode >>> 运算 获取HashCode的高位
segmentShift = 32 - sshift;
// 高位与 它做与运算 eg 假如 默认的创建该对象 那么 segmentShift = 28 segmentMask=15(二进制为1111) 假如现在put一个值 他的key的HashCode值为2的32次方 那么 他在segment里面的定位时 2的32次方 无符号 高位补零 右移28个 那么就等于 10000(二进制) 等于 16 与 1111 做与运算 等于0 也就是定位在 segment[0]上 。
segmentMask = ssize - 1; // segment数组大小为 16
this.segments = Segment.newArray(ssize); if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
// segment数组中 每个HashEntry数组的大小,
int cap = 1;
while (cap < c)
cap <<= 1;
// 为segment数组中的每个HashEntry数组初始化大小,每个semengt中只有一个HashEntry数组。如果你设置的 ConcurrentHashMap 初始化大小为16的话,则 segment数组中每个的HashEntry的大小为1,如果你初始化他的大小为28 的话。它会根据上面的运算,cap的大小为2,也就是segment数组中的每个HashEntry数组的大小为2 ,总的大小为32。
for (int i = 0; i < this.segments.length; ++i)
this.segments[i] = new Segment<K,V>(cap, loadFactor);
}

上面的注释应该都挺清楚了,要注意的是 ConcurrentHashMap的大小 是所有 Segment 数组中每个HashEntry数组的大小相加的和。

put 方法

ConcurrentHashMap 每次 put 的时候都是需要加锁的,只不过会锁住他所在的那个Segment数组位置。其他的不锁,这也就是分段锁,默认支持16个并发。说起put,以数组的形式存储的数据,就会涉及到扩容。这样是接下来需要好好讨论的一个事情。

 	public V put(K key, V value) {
// key value 不能为null
if (value == null)
throw new NullPointerException();
// 获取hash值
int hash = hash(key.hashCode());
// 先获取hash二进制数的高位与15的二进制做与运算,得到segment数组的位置。
return segmentFor(hash).put(key, hash, value, false);
} V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 锁住
lock();
try {
int c = count;
if (c++ > threshold) // ensure capacity
// 扩容操作
rehash();
// 获取 Segment数组中的其中的HashEntry数组
HashEntry<K,V>[] tab = table;
// 获取在在HashEntry数组中的位置。
int index = hash & (tab.length - 1);
HashEntry<K,V> first = tab[index];
HashEntry<K,V> e = first;
// 判断是否是该key。
while (e != null && (e.hash != hash || !key.equals(e.key)))
e = e.next; V oldValue;
// 如果存在该key的数据 ,那么更新该值 返回旧值
if (e != null) {
oldValue = e.value;
if (!onlyIfAbsent)
e.value = value;
}
else {
oldValue = null;
++modCount;
//头插法插入 tab[index]
tab[index] = new HashEntry<K,V>(key, hash, first, value);
count = c; // write-volatile
}
return oldValue;
} finally {
unlock();
}
} // 看下扩容操作的细节
void rehash() {
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
if (oldCapacity >= MAXIMUM_CAPACITY)
return; // HashEntry数组,新的数组为它的两倍
HashEntry<K,V>[] newTable = HashEntry.newArray(oldCapacity<<1);
// 阈值
threshold = (int)(newTable.length * loadFactor);
//他的二进制添加以为 原来他的大小为3 那么二进制就是11 现在就为 7 二进制为 111
int sizeMask = newTable.length - 1;
for (int i = 0; i < oldCapacity ; i++) { // 旧的HashEntry。
HashEntry<K,V> e = oldTable[i]; if (e != null) {
// 下一个 该HashEntry数组上的 HashEntry是否为链表,有下一个值。
HashEntry<K,V> next = e.next;
// 该HashEntry的新位置 如果高位为1 那么他在HashEntry数组中的位置就是老的HashEntry数组中的加上这个倍数。举个例子
// 假如e.hash 原来的的二进制位...111 老的HashEntry数组的大小为 4 那么e.hash和 4-1 也就是3 做与运算 得到的值也就是二进制的11
// 值位3 现在新的HashEntry数组的大小为 8 也就是 e.hash 和 8-1 做与运算 得到的值 也就是二进制位 111 位 7 。
int idx = e.hash & sizeMask; // 没有的话就直接放入该位置了,如果有的话往下看:
if (next == null)
newTable[idx] = e; else {
HashEntry<K,V> lastRun = e;
// 假如idx 等于 7
int lastIdx = idx;
// 找到最后一个 HashEntry中的位置,并且后面的HashEntry的位置都是一样的。举个例子
// 假如这个链表中的所有HashEntry的Hash值为 1-5-1-5-5-5 。那么最后lastIdx = 5 也就是1-5-1后面的这个5 。lastRun 为 1-5-1后面的这个5的HashEnrty。
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
//
lastRun = last;
}
}
// 将 lastRun 复制给 这个新的Table 那么后面还有 5-5-5 这些的就不用移动了 直接带过来了。 这就是上面那个for循环做的事情
newTable[lastIdx] = lastRun; // 对前面的 1-5-1做操作了 1就是在新HashEntry书中的1的位置 5的后就是头插法 ,查到新HashEntry的头部了
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
int k = p.hash & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(p.key, p.hash,
n, p.value);
}
}
}
}
table = newTable;
}

其实put 方法中有点难理解的就是 把查找到后面如果有所有相同的 HashEntry key 的位置是一样的话,就不用额外的进行Hash重新定位了。不知道我描述的清不清楚。如果还有不清楚的话,可以私信一下我。

get 方法

ConcurrentHashMapget 方法是不会加锁的,如果get的值为null的时候,这个时候会对这个HashEntry进行加锁。预防此时并发出现的问题。

    public V get(Object key) {
//定位Segment数组中的HashEntry数组为位置
int hash = hash(key.hashCode());
return segmentFor(hash).get(key, hash);
} V get(Object key, int hash) {
// 曾经put进去过,也就是里面有值
if (count != 0) { // read-volatile
// 定位HashEntry数组中的HashEntry。
HashEntry<K,V> e = getFirst(hash);
while (e != null) {
if (e.hash == hash && key.equals(e.key)) {
V v = e.value;
if (v != null)
return v;
return readValueUnderLock(e); // recheck
}
e = e.next;
}
}
return null;
}

ConcurrentHashMapget方法是比较简单的。看一看就知道了。

总结

这一遍ConcurrentHashMap源码分析,可以说是自己写了大半个月吧。好早之前就准备写了。总是写一点,然后就停笔了。加上自己换了公司的原因,又忙上了一段时间,导致一拖再落。哇,严重拖延症患者。上面自己也是全部透彻之后写下来的,如果有些表达不够清晰的还得多加包涵,如果有不同的可以下方浏览讨论一下。上面很多关键的代码我都写上了注释,可以配合着注释,然后自己对源码进行研究,查看,如果还有不是很透彻的话,自己多翻一翻其他人写的。最近一直在写LeetCode上的动态规划这些算法题。其实也就是抄一遍。等以后有了感悟再来写这一些吧。