一、概述
1.ConcurrentHashMap是HashMap的线程安全且高效的实现
2.ConcurrentHashMap采用分段锁技术,只有在同一个分段内才会存在竞争关系,不同的分段锁之间没有锁竞争。
3.ConcurrentHashMap的主干是Segment数组,Segment继承了ReentrantLock,是一种可重入锁,一个Segment就是一个子哈希表,在每个Segment中维护一个HashEntry数组,对于不同的Segment的数据进行操作不用考虑所竞争。
二、ConcurrentHashMap中的segment分析
Segment是ConcurrentHashMap中的主*分,一个Segment就是一个哈希表,一个Segment类似于一个HashMap。
1.HashEntry的定义
//HashEntry是ConcurrentHashMap中的一个内部类 static final class HashEntry<K, V>{ final int hash; final K key; volatile V value; volatile HashEntry<K, V> next; static final Unsafe UNSAFE; static final long nextOffset; HashEntry(int paramInt, K paramK, V paramV, HashEntry<K, V> paramHashEntry){ this.hash = paramInt; this.key = paramK; this.value = paramV; this.next = paramHashEntry; } //使用UNSAFE的putOrderedObject设置next指针 final void setNext(HashEntry<K, V> paramHashEntry){ UNSAFE.putOrderedObject(this, nextOffset, paramHashEntry); } static{ try{ UNSAFE = Unsafe.getUnsafe(); HashEntry localHashEntry = HashEntry.class; nextOffset = UNSAFE.objectFieldOffset(localHashEntry.getDeclaredField("next")); }catch (Exception localException) { throw new Error(localException); } } }
可以看到在ConcurrentHashMap中对HashEntry的定义与HashMap中的定义是相似的,区别在于对value和next的定义类型,在ConcurrentHashMap中的对于next的的写入是使用UNSAFE.putOrderedObject方法,而这个方法只对volatile才有效,他能够保证写操作之间的顺序性。普通的volatile保证写操作的结果能立马被其他线程看到,不论其他的线程是读操作还是写操作,而putOrderedObject是putObjectVolatile的内存非立即可见版本,这个方法对低延迟的代码很有用,他能够实现非堵塞的写入,写后的结果并不会被立即被其他的线程看到,甚至是自己的线程。
2.Segment的定义
Segment也是ConcurrentHashMap中的一个内部类,他继承了ReentrantLock,这意味着每个Segment都可以当做一个锁,每把锁只锁住整个容器中的部分数据,这样不影响线程访问其他的数据,当然如果是对全局改变时会锁定所有的Segment段。
下面看一下Segment的具体代码:
下面是在Segement中定义的主要的变量,对于一个Segment就相当于一个HashMap,其中主要的变量是一个HashEntry的数组,同时会根据Java中虚拟机的数量设置进入加锁等待的尝试次数。
private static final long serialVersionUID = 2249069246763182397L; //尝试次数达到限制进入加锁等待状态。 对最大尝试次数,目前的实现单核次数为1,多核为64 static final int MAX_SCAN_RETRIES = (Runtime.getRuntime().availableProcessors() > 1) ? 64 : 1; //segment内部的哈希表,访问HashEntry,通过具有volatie的entryAt、setEntryAt方法 volatile transient ConcurrentHashMap.HashEntry<K, V>[] table; //Segment的哈希表中的HashEntry的数量 transient int count; //在Segment上的所有的修改糙制作数,可能会溢出,但是为isEnpty和size方法,提供了有效准确稳定的检查或校验 transient int modCount; //哈希表中需要rehash的阈值 transient int threshold; //哈希表中的负载因子,所有的Segments中的这个值相等,这么做是为了避免需要连接到外部Object final float loadFactor; Segment(float paramFloat, int paramInt, ConcurrentHashMap.HashEntry<K, V>[] paramArrayOfHashEntry){ this.loadFactor = paramFloat; this.threshold = paramInt; this.table = paramArrayOfHashEntry; }
三、ConcurrentHashMap的初始化
1.ConcurrentHashMap中部分初始化变量
//默认初始化Map的容量 static final int DEFAULT_INITIAL_CAPACITY = 16; //默认负载因子 static final float DEFAULT_LOAD_FACTOR = 0.75F; //默认并发数 static final int DEFAULT_CONCURRENCY_LEVEL = 16; //最大容量 static final int MAXIMUM_CAPACITY = 1073741824; //每个segment中的最小容量,至少为2.避免当next指针使用时,需要立即resize static final int MIN_SEGMENT_TABLE_CAPACITY = 2; //最大分段数 static final int MAX_SEGMENTS = 65536; //在重排序锁定之前,非同步化尝试调用size等方法的次数,避免无限制的尝试 static final int RETRIES_BEFORE_LOCK = 2; //与实例相关联的一个随机值,用于哈希键的哈希码,使哈希冲突更难找到 private final transient int hashSeed = randomHashSeed(this); //段掩码:假如segment的长度为16,那么段掩码为15,保证二进制所有位置上都是1,可以更好的保证散列的均匀性 final int segmentMask; //用于与段掩码进行位运算来定位segment final int segmentShift; //ConcurrentHashMap的主干 final Segment<K, V>[] segments; //key集合 transient Set<K> keySet; //entry集合 transient Set<Map.Entry<K, V>> entrySet; //value集合 transient Collection<V> values;
2.ConcurrentHashMap的初始化的方法
在ConcurrentHashMap中主要有四种初始化的方法,但是最终都会归结到下面代码中的第一种方法中,在这个方法中主要注意的有几个步骤:
(1)分段数segment是由并发数的个数来决定的,并且必须是2的次幂,所有segment数可能可能大于并发数;
(2)每个segment中平均放置的元素个数是由初始化容量个数和segment数计算得到,并且是向上取整;
(3)每个segment中HashEntry数组的长度必须是2的次幂
//第一种初始化方法:指定初始化容量paramInt1、负载因子paramFloat和并发级别paramInt2 public ConcurrentHashMap(int paramInt1, float paramFloat, int paramInt2){ //校验传递的参数是否合法 if ((paramFloat <= 0.0F) || (paramInt1 < 0) || (paramInt2 <= 0)) { throw new IllegalArgumentException(); } //验证并发数,不能超过最大的segment数,如果超过了则设置为最大值 if (paramInt2 > 65536) { paramInt2 = 65536; } //调整segment数,必须为2的次幂,i为移动的次数,j为segment的个数 int i = 0; int j = 1; while (j < paramInt2) { i++; j <<= 1; } //segmentShift默认情况下是28,因为默认情况下并发数是16,则i为4 this.segmentShift = (32 - i); //段掩码:默认情况下是15,各个二进制为1,目的是之后可以通过key的hashcode与这个值做&运算确定segment的索引 this.segmentMask = (j - 1); //检查初始化的容量是否大于最大值,否则设置为最大值 if (paramInt1 > 1073741824) { paramInt1 = 1073741824; } //计算每个segment平均应该放置的元素个数,向上取整 int k = paramInt1 / j; if (k * j < paramInt1) { k++; } //m为每个segment中HashEntry数组的长度,必须为2的次幂 int m = 2; while (m < k) { m <<= 1; } //初始化一个Segment Segment localSegment = new Segment(paramFloat, (int)(m * paramFloat), (HashEntry[])new HashEntry[m]); //初始化Segment数组,其中segment的个数为j Segment[] arrayOfSegment = (Segment[])new Segment[j]; UNSAFE.putOrderedObject(arrayOfSegment, SBASE, localSegment); this.segments = arrayOfSegment; } //第二种初始化方法:指定初始化容量和负载因子,使用默认并发数 public ConcurrentHashMap(int paramInt, float paramFloat) { this(paramInt, paramFloat, 16); } //第三种初始化方法:指定初始化容量,使用默认并发数和负载因子 public ConcurrentHashMap(int paramInt) { this(paramInt, 0.75F, 16); } //第四种初始化方法:所有的都使用默认值 public ConcurrentHashMap() { this(16, 0.75F, 16); }
四、put操作
1.ConcurrentHashMap的put操作
在concurrentHashMap中插入一个键值对有两种方法,当直接调用put方法时,当key在map中存在时,会进行覆盖,新的value值会覆盖旧的value值,当调用putIfAbsent方法时,如果key值已经存在,不会替换旧的value值。
在这两种put方法中,前面所进行的操作都是一样的,首先都需要对key值进行null校验,在concurrentHashMap中key值不能为null,然后根据key值计算得到其在segment数组中的索引,得到对应的分段,最后都是调用Segement类中的put方法对元素进行插入。
//put操作:向concurrentHashMap中放入一个k-v,原有key存在时,会用新的value替换旧的value public V put(K paramK, V paramV){ //在ConcurrentHashMap中key值不能为null,这个与HashMap中不同 if (paramV == null) { throw new NullPointerException(); } //根据key值得到hashcode值 int i = hash(paramK); //根据hashcode和初始化时计算得到的两个值进行位运算得到在segment数组中的索引值 int j = i >>> this.segmentShift & this.segmentMask; //根据索引值获取对应的segment Segment localSegment; if ((localSegment = (Segment)UNSAFE.getObject(this.segments, (j << SSHIFT) + SBASE)) == null) { localSegment = ensureSegment(j); } //调用segment中的put方法插入k-v值 return localSegment.put(paramK, i, paramV, false); } //put操作:向concurrentHashMap中放入一个k-v,原有key存在时,不会替换旧的value public V putIfAbsent(K paramK, V paramV){ if (paramV == null) { throw new NullPointerException(); } int i = hash(paramK); int j = i >>> this.segmentShift & this.segmentMask; Segment localSegment; if ((localSegment = (Segment)UNSAFE.getObject(this.segments, (j << SSHIFT) + SBASE)) == null) { localSegment = ensureSegment(j); } return localSegment.put(paramK, i, paramV, true); }
(2)Segment中的put操作
在ConcurrentHashMap中进行put操作时,最终都是调用Segment中的put方法,接下来看一下Segment中的put操作,主要包含四个参数,key值,key对应的hash值,value值,booleanIfAbsent值。
//segment中的put方法:参数依次为key、hashcode、value、onlyIfAbsent:如果key存在的情况下,true时不会修改原有值,而false时则会修改成新的值 final V put(K paramK, int paramInt, V paramV, boolean paramBoolean){ //尝试获取锁,获取失败返回null,否则调用scanAndLockForPut方法返回一个HashEntry实体 ConcurrentHashMap.HashEntry localHashEntry1 = tryLock() ? null : scanAndLockForPut(paramK, paramInt, paramV); Object localObject1; try{ //得到当前segment下的hashEntry数组 ConcurrentHashMap.HashEntry[] arrayOfHashEntry = this.table; //根据hashcode和数组长度得到key在数组中的索引值 int i = arrayOfHashEntry.length - 1 & paramInt; //根据索引值得到在数组中的hashEntry链表localHashEntry2 ConcurrentHashMap.HashEntry localHashEntry2 = ConcurrentHashMap.entryAt(arrayOfHashEntry, i); //初始化一个HashEntry实体localHashEntry3,赋值为localHashEntry2 ConcurrentHashMap.HashEntry localHashEntry3 = localHashEntry2; //对HashEntry链表进行遍历 while (localHashEntry3 != null){ Object localObject2; //如果当前节点的key为需要进行put操作的key值,如果onlyIfAbsent为true,直接跳出循环,如果为false,将该结点的value值赋值为新值,跳出循环 if (((localObject2 = localHashEntry3.key) == paramK) || ((localHashEntry3.hash == paramInt) && (paramK.equals(localObject2)))){ localObject1 = localHashEntry3.value; if (paramBoolean) { break label218; } localHashEntry3.value = paramV; this.modCount += 1; break label218; } //如果结点的key值不是要进行put的值,取下一个节点 localHashEntry3 = localHashEntry3.next; } //如果创建的结点不为空,将创建的结点放在table的i索引对应的HashEntry链的头部,否则创建新的HashEntry,放在链头,next指向链的原始头部 if (localHashEntry1 != null) { localHashEntry1.setNext(localHashEntry2); } else { localHashEntry1 = new ConcurrentHashMap.HashEntry(paramInt, paramK, paramV, localHashEntry2); } //检查是否需要对table进行rehash操作,如果不需要,将结点添加到索引对应的链表 int j = this.count + 1; if ((j > this.threshold) && (arrayOfHashEntry.length < 1073741824)) { rehash(localHashEntry1); } else { ConcurrentHashMap.setEntryAt(arrayOfHashEntry, i, localHashEntry1); } this.modCount += 1; this.count = j; localObject1 = null; }finally{ label218: unlock(); } return localObject1; }
在put操作中,一开始就会去获取锁,如果获取失败,需要调用scanAndLockForPut方法自旋重新获取锁,后面讲这个步骤,假如put一开始就拿到锁,那么他就会执行以下逻辑:
(1)根据hashcode找到table数组中对应的索引位置:在segment中数组的长度是2的次幂,因此索引只需要使用&运算即可;
(2)根据索引值得到在当前segment的HashEntry数组所对应的链表头结点;
(3)因为table字段是一个volatile变量,因而在开始时将table赋值给arrayOfHashEntry变量,可以减少在直接引用table变量时因为该字段是volatile而不能做优化带来的损失;
(4)因为在开始时已经将volatile的table字段引用赋值给arrayOfHashEntry变量,但为了保证每次读取table中的数据都是最新的值,因此调用entryAt()方法来获取对应的数组项,这是因为在put更新结点时,是采用UNSAFE.poyOrderedObject()操作,此时他对链头的更新只局限于当前线程,为了保证put操作能够读取到上一次更新的结果,需要使用volatile语法去读取结点的链头;
(5)拿到对应的HashEntry链表之后,对链表进行遍历,如果在其中找到对应的key值,并且使用的是put操作而不是putIfAbsent操作时,记录旧值,用新值替换旧值,退出循环,释放锁,返回旧值。
(6)如果在链表中没有找到对应的key值,创建一个新的节点,并将该结点作为当前链表的表头,将count+1,如果数组中的数据超过阈值,需要对数组进行rehash操作。
3.Segment中的scanAndLockForPut方法
在进行put操作时,第一步就是去获取锁,如果没有获取到需要调用scanAndLockForPut方法,在这段代码中持续查找key所对应的链表中是否已经存在key对应的结点,如果没有找到,则创建一个新结点,并且尝试n次,直到尝试的次数遭到限制才真正进入等到状态,这就是所谓的自旋等待。这里需要注意的是在最后一个else中,当在自旋的过程中发现结点链表的链表头发生了变化,那么需要更新结点链的链头,然后重新重新赋值i为-1,重新开始进行遍历。
//paramK为key值,paramInt为hash值,paramV:value值 private ConcurrentHashMap.HashEntry<K, V> scanAndLockForPut(K paramK, int paramInt, V paramV){ //1.根据paramInt获取到ConcurrentHashMap中对应的segment段,找到HashEntry链表中的头结点 Object localObject1 = ConcurrentHashMap.entryForHash(this, paramInt); Object localObject2 = localObject1; ConcurrentHashMap.HashEntry localHashEntry1 = null; int i = -1; while (!tryLock()) { //第一次一定会进入该语句 if (i < 0){ //第一种情况:如果没有找到,创建一个新的结点 if (localObject2 == null){ if (localHashEntry1 == null) { localHashEntry1 = new ConcurrentHashMap.HashEntry(paramInt, paramK, paramV, null); } i = 0; }else if (paramK.equals(((ConcurrentHashMap.HashEntry)localObject2).key)){ //第二种情况:找到相同key的结点 i = 0; }else{ //第三种情况:没有找到key对应的结点,指向下一个节点数据 localObject2 = ((ConcurrentHashMap.HashEntry)localObject2).next; } }else{ i++; //如果尝试次数已经达到最大值进入加锁等待状态,最大值的设定在Segemnt的定义中已经有讲到 if (i > MAX_SCAN_RETRIES){ lock(); break; } ConcurrentHashMap.HashEntry localHashEntry2; //如果尝试的次数是偶数,并且不是链表的头结点,链表头可能会发生变化 if (((i & 0x1) == 0) && ((localHashEntry2 = ConcurrentHashMap.entryForHash(this, paramInt)) != localObject1)){ localObject2 = localObject1 = localHashEntry2; i = -1; } } } return localHashEntry1; }
4.Segment中的rehash操作
在进行put操作时,如果数组中的元素超过阈值时,需要进行rehash操作,对数组进行扩容,下面是在ConcurrentHashMap中对一个table进行扩容的过程。
在源码的注释中就可以看到,他是创建一个原来数组两倍容量的数组,然后遍历数组中的每个链表,对每个结点重新计算新的数组索引,然后创建一个新的结点插入到新的数组中。而在ConcurrentHashMap中每次创建新的结点而不是修改原有结点的next指针是为了在进行rehash操作时其他的线程在原有的立案表上仍然可以进行get操作。
而为了优化这段逻辑,减少重新创建结点的开销,主要有几个步骤:
(1)当链表中只有一个结点时,直接将该结点赋值给新的数组对应的位置即可,这么做是因为在Segment中数组的长度是2的次幂,扩大两倍后,新结点在新的数组中的位置只能是相同的索引号或者是原来的索引位置加上一个2的次幂,因此可以保证在每个链表上进行rehash操作时可以互不干扰。
(2)对有多个结点的链表,遍历找到第一个后面所有结点的索引值都不变的结点,将这个结点赋值到新的数组中,然后重新遍历链表中这个结点之前的所有结点,将他们映射到对应的数组位置上。
每次扩容都是按照2的次幂进行扩展的,那么扩容前在一个链表中的元素,现在要么还在原来的序号的链表中,或者是原来的序号再加上一个2的次幂方,所以原链表中只有一部分元素需要移动,基于这种特性,为了提高效率,就是针对链表中的元素进行以下处理:如果链表中没有元素,那么
//rehash操作 private void rehash(ConcurrentHashMap.HashEntry<K, V> paramHashEntry){ //将数组table赋值给arrayOfHashEntry1 ConcurrentHashMap.HashEntry[] arrayOfHashEntry1 = this.table; //获取原数组的长度i,并进行扩容翻倍j=i*2 int i = arrayOfHashEntry1.length; int j = i << 1; //计算得到新的阈值 this.threshold = ((int)(j * this.loadFactor)); //初始化一个新的数组arrayOfHashEntry2,其容量时j ConcurrentHashMap.HashEntry[] arrayOfHashEntry2 = (ConcurrentHashMap.HashEntry[])new ConcurrentHashMap.HashEntry[j]; int k = j - 1; //遍历数组中的每一个位置 for (int m = 0; m < i; m++){ //得到数组中每个位置的链表 ConcurrentHashMap.HashEntry localHashEntry1 = arrayOfHashEntry1[m]; if (localHashEntry1 != null){ ConcurrentHashMap.HashEntry localHashEntry2 = localHashEntry1.next; //得到链表头元素在新的数组中的索引位置 int n = localHashEntry1.hash & k; //如果链表中只有一个元素:直接将当前链表放到新的数组中 if (localHashEntry2 == null){ arrayOfHashEntry2[n] = localHashEntry1; }else{ //如果链表中有多个元素,遍历该链表,找到第一个在扩容后索引都保持不变的结点localObject1 Object localObject1 = localHashEntry1; int i1 = n; for (ConcurrentHashMap.HashEntry localHashEntry3 = localHashEntry2; localHashEntry3 != null; localHashEntry3 = localHashEntry3.next){ int i2 = localHashEntry3.hash & k; if (i2 != i1){ i1 = i2; localObject1 = localHashEntry3; } } //将这个HashEnry放到他在新的数组中所在的位置 arrayOfHashEntry2[i1] = localObject1; //遍历他前面的结点,将这些结点进行重拍 for (localHashEntry3 = localHashEntry1; localHashEntry3 != localObject1; localHashEntry3 = localHashEntry3.next){ Object localObject2 = localHashEntry3.value; int i3 = localHashEntry3.hash; int i4 = i3 & k; ConcurrentHashMap.HashEntry localHashEntry4 = arrayOfHashEntry2[i4]; arrayOfHashEntry2[i4] = new ConcurrentHashMap.HashEntry(i3, localHashEntry3.key, localObject2, localHashEntry4); } } } } //计算要加入的结点在新的table中的位置,并插入到表头 m = paramHashEntry.hash & k; paramHashEntry.setNext(arrayOfHashEntry2[m]); arrayOfHashEntry2[m] = paramHashEntry; this.table = arrayOfHashEntry2; }
五、get操作
相比较put操作,ConcurrentHashMap中的get操作就简单易懂的多,其操作步骤为:
(1)根据key经过两次hash得到对应的hashcode
(2)根据hashcode得到在对应的segment
(3)在根据hashcode得到在segment的table数组中对应的链表
(4)遍历链表,获取key值对应的value值,如果不存在返回null
//get:根据key值获取value值 public V get(Object paramObject){ int i = hash(paramObject); long l = ((i >>> this.segmentShift & this.segmentMask) << SSHIFT) + SBASE; Segment localSegment; HashEntry[] arrayOfHashEntry; if (((localSegment = (Segment)UNSAFE.getObjectVolatile(this.segments, l)) != null) && ((arrayOfHashEntry = localSegment.table) != null)) { for (HashEntry localHashEntry = (HashEntry)UNSAFE.getObjectVolatile(arrayOfHashEntry, ((arrayOfHashEntry.length - 1 & i) << TSHIFT) + TBASE); localHashEntry != null; localHashEntry = localHashEntry.next){ Object localObject; if (((localObject = localHashEntry.key) == paramObject) || ((localHashEntry.hash == i) && (paramObject.equals(localObject)))) { return localHashEntry.value; } } } return null; }
六、size操作
在计算ConcurrentHashMap中包含的元素的个数的时候,需要对整个Segment数组进行操作,而不像是put和get操作只对其中一个Segment进行处理。并且存在的问题是当我们在遍历其中一个Segment的时候,另一个Segment可能会被修改,那么这次计算出来的size的值可能并不是真正的结果。
那么怎么解决这个问题呢?所以最简单的方法是在计算Map的大小的时候讲所有的Segment都锁住,不能进行put和remove操作,等计算完成之后再释放锁。但是在ConcurrentHashMap中并不是一上来就使用锁,而是给了3次机会,在前三次中不对Segment使用锁,遍历每个Segment,累加每个Segment的大小得到整个Map的大小,如果相邻两次计算中所有Segment的更新次数(modCount)和是一样的,那么认为在这两次计算中并没有操作对Map的结构进行了修改,则可以直接返回这个值,但是如果3次计算过程中,每次的更新次数都不同,那么在计算开始之前对所有的segment进行加锁,在进行遍历计算得到size大小,最后释放Segment上的锁,其过程如下:
public int size(){ Segment[] arrayOfSegment = this.segments; long l2 = 0L; int k = -1; int i; int j; try{ int m; for (;;){ //第四次循环时,遍历segment数组,对每个segment加锁 if (k++ == 2) { for (m = 0; m < arrayOfSegment.length; m++) { ensureSegment(m).lock(); } } long l1 = 0L; i = 0; j = 0; //遍历segment数组,取得每个segment, for (m = 0; m < arrayOfSegment.length; m++){ Segment localSegment = segmentAt(arrayOfSegment, m); if (localSegment != null){ //l1为Segment数组结构修改的次数和 l1 += localSegment.modCount; //n为当前segment的中的个数 int n = localSegment.count; if ((n < 0) || (i += n < 0)) { j = 1; } } } //前后两次遍历时segment结构修改的次数相等,那么认为遍历过程中没有新的元素的插入,停止遍历 if (l1 == l2) { break; } l2 = l1; } //如果遍历的次数大于3,在遍历结束后,需要释放锁 if (k > 2) { for (m = 0; m < arrayOfSegment.length; m++) { segmentAt(arrayOfSegment, m).unlock(); } } }finally{ if (k > 2) { for (int i1 = 0; i1 < arrayOfSegment.length; i1++) { segmentAt(arrayOfSegment, i1).unlock(); } } } return j != 0 ? 2147483647 : i; } //返回给定索引位置的Segment,如果Segment不存在,则参考Segment表中的第一个Segment的参数创建一个Segment并通过CAS操作将它记录到Segment表中去 private Segment<K, V> ensureSegment(int paramInt){ Segment[] arrayOfSegment = this.segments; //计算在segment数组中的索引 long l = (paramInt << SSHIFT) + SBASE; Object localObject; //如果segment为空,取segment[0] if ((localObject = (Segment)UNSAFE.getObjectVolatile(arrayOfSegment, l)) == null){ Segment localSegment1 = arrayOfSegment[0]; int i = localSegment1.table.length; float f = localSegment1.loadFactor; int j = (int)(i * f); HashEntry[] arrayOfHashEntry = (HashEntry[])new HashEntry[i]; if ((localObject = (Segment)UNSAFE.getObjectVolatile(arrayOfSegment, l)) == null){ Segment localSegment2 = new Segment(f, j, arrayOfHashEntry); while ((localObject = (Segment)UNSAFE.getObjectVolatile(arrayOfSegment, l)) == null) { if (UNSAFE.compareAndSwapObject(arrayOfSegment, l, null, localObject = localSegment2)) { break; } } } } return localObject; }
七、contains方法
在ConcurrentHashMap中contains中主要有判断key值是否存在的containsKey方法和判断value值是否存在的containsValue方法。
在containsKey方法中判断key值是否存在的过程比较简单,根据key值计算所在的segment和在segment中对应的链表,遍历链表,判断key值是否存在,存在返回true,否则返回false。
在containsValue方法中判断value值是否存在的过程与size方法比较类似,他是需要对整个segment数组进行操作,也是给三次机会不进行加锁对segment数组进行遍历,同时记录每次遍历时的modcount次数和,如果相邻两次遍历中modcount和不变,认为在遍历过程中map的结构未进行变化,可以返回查询的结果,否则需要对segment进行加锁遍历。
//判断某个key值是否在ConcurrentHashMap中 public boolean containsKey(Object paramObject){ int i = hash(paramObject); long l = ((i >>> this.segmentShift & this.segmentMask) << SSHIFT) + SBASE; Segment localSegment; HashEntry[] arrayOfHashEntry; if (((localSegment = (Segment)UNSAFE.getObjectVolatile(this.segments, l)) != null) && ((arrayOfHashEntry = localSegment.table) != null)) { for (HashEntry localHashEntry = (HashEntry)UNSAFE.getObjectVolatile(arrayOfHashEntry, ((arrayOfHashEntry.length - 1 & i) << TSHIFT) + TBASE); localHashEntry != null; localHashEntry = localHashEntry.next) { Object localObject; if (((localObject = localHashEntry.key) == paramObject) || ((localHashEntry.hash == i) && (paramObject.equals(localObject)))) { return true; } } } return false; } //判断ConcurrentHashMap是否包含某个value值 public boolean containsValue(Object paramObject){ //1.校验value值是否符合要求 if (paramObject == null) { throw new NullPointerException(); } Segment[] arrayOfSegment = this.segments; boolean bool = false; long l1 = 0L; int i = -1; try{ for (;;){ //第四次及之后遍历时:遍历segments数组,对每个segment加锁 if (i++ == 2) { for (int j = 0; j < arrayOfSegment.length; j++) { ensureSegment(j).lock(); } } long l2 = 0L; int m = 0; //遍历每个segment中,在遍历每个segment中的链表,如果得到key值,结束遍历 for (int n = 0; n < arrayOfSegment.length; n++){ Segment localSegment = segmentAt(arrayOfSegment, n); HashEntry[] arrayOfHashEntry; if ((localSegment != null) && ((arrayOfHashEntry = localSegment.table) != null)){ for (int i1 = 0; i1 < arrayOfHashEntry.length; i1++) { for (HashEntry localHashEntry = entryAt(arrayOfHashEntry, i1); localHashEntry != null; localHashEntry = localHashEntry.next){ Object localObject1 = localHashEntry.value; if ((localObject1 != null) && (paramObject.equals(localObject1))){ bool = true; break label207; } } } m += localSegment.modCount; } } if ((i > 0) && (m == l1)) { break; } l1 = m; } label207: if (i > 2) { for (int k = 0; k < arrayOfSegment.length; k++) { segmentAt(arrayOfSegment, k).unlock(); } } }finally{ if (i > 2) { for (int i2 = 0; i2 < arrayOfSegment.length; i2++) { segmentAt(arrayOfSegment, i2).unlock(); } } } return bool; } //判断ConcurrentHashMap中是否包含某个value值 public boolean contains(Object paramObject){ return containsValue(paramObject); }
八、remove方法
在ConcurrentHashMap中remove操作有两种,一种是只匹配key值,只要key值满足即删除,另一种是key和value同时满足才删除,但是在内部其实现方法是一样的。
//删除某个key值所在的结点--只要key值相等就删除 public V remove(Object paramObject){ int i = hash(paramObject); Segment localSegment = segmentForHash(i); return localSegment == null ? null : localSegment.remove(paramObject, i, null); } //删除某个键为key,值为value的结点--需要key和value都匹配上才删除 public boolean remove(Object paramObject1, Object paramObject2){ int i = hash(paramObject1); Segment localSegment; return (paramObject2 != null) && ((localSegment = segmentForHash(i)) != null) && (localSegment.remove(paramObject1, i, paramObject2) != null); } private void scanAndLock(Object paramObject, int paramInt){ Object localObject1 = ConcurrentHashMap.entryForHash(this, paramInt); Object localObject2 = localObject1; int i = -1; while (!tryLock()) { if (i < 0) { if ((localObject2 == null) || (paramObject.equals(((ConcurrentHashMap.HashEntry)localObject2).key))) { i = 0; } else { localObject2 = ((ConcurrentHashMap.HashEntry)localObject2).next; } } else { i++; if (i > MAX_SCAN_RETRIES) { lock(); break; } ConcurrentHashMap.HashEntry localHashEntry; if (((i & 0x1) == 0) && ((localHashEntry = ConcurrentHashMap.entryForHash(this, paramInt)) != localObject1)) { localObject2 = localObject1 = localHashEntry; i = -1; } } } } //删除某个结点:key、hashcode、value final V remove(Object paramObject1, int paramInt, Object paramObject2){ if (!tryLock()) { scanAndLock(paramObject1, paramInt); } Object localObject1 = null; try { ConcurrentHashMap.HashEntry[] arrayOfHashEntry = this.table; int i = arrayOfHashEntry.length - 1 & paramInt; Object localObject2 = ConcurrentHashMap.entryAt(arrayOfHashEntry, i); Object localObject3 = null; //遍历key对应的链表 while (localObject2 != null) { ConcurrentHashMap.HashEntry localHashEntry = ((ConcurrentHashMap.HashEntry)localObject2).next; Object localObject4; //如果找到对应的key值 if (((localObject4 = ((ConcurrentHashMap.HashEntry)localObject2).key) == paramObject1) || ((((ConcurrentHashMap.HashEntry)localObject2).hash == paramInt) && (paramObject1.equals(localObject4)))) { Object localObject5 = ((ConcurrentHashMap.HashEntry)localObject2).value; //如果value值不相等,不做删除操作,直接返回 if ((paramObject2 != null) && (paramObject2 != localObject5) && (!paramObject2.equals(localObject5))) { break; } //如果移除的是表头结点,则更新数组项的值 if (localObject3 == null) { ConcurrentHashMap.setEntryAt(arrayOfHashEntry, i, localHashEntry); } else {//如果移除的的中间结点,那么直接将该节点的前面的结点的next结点指向该节点的next结点,这也是hashEntry中next结点未设置成final的原因 localObject3.setNext(localHashEntry); } this.modCount += 1; this.count -= 1; localObject1 = localObject5; break; } localObject3 = localObject2; localObject2 = localHashEntry; } } finally { unlock(); } return localObject1; }
在remove操作中首先根据key值找到对应的连边,然后对链表进行遍历,如果要删除的结点是链表头,那么更新数组项的值,删除表头结点,如果删除的是中间结点,那么将删除结点的前一个结点的next指向删除结点的next结点,这也是HashEntry中next指针没有设置成final的原因。
在remove中如果获取锁失败也会像put操作一样进入自旋操作,这里调用的scanAndLock方法与scanAndLockForPut方法类似,只是他不会预创建结点。
九、ConcurrentHashMap的总结
1.ConcurrentHashMap是线程安全的,可并发访问,key和value不允许为null;
2.ConcurrentHashMap使用了分段锁技术,Segment继承了可重入锁ReentrantLock,对HashTable的修改操作都要使用锁,只有在同一个Segment中才存在竞争关系,不同的Segment之间没有所竞争;相对于整个Map加锁的设计分段锁大大的提高了高并发环境下的处理能力;
3.ConcurrentHashMap的put、get、remove和replace操作都是对某一个segment进行操作,而clear是锁住整个table,size和contains方法是给一定的机会不使用锁;
4.ConcurrentHashMap中segment数和每个segment中的table的长度都是2的次幂;
5.ConcurrentHashMap中如果segment数设置的过小,会带来严重的所竞争问题,如果设置的过大,会使原本在同一个Segment中的访问扩散到不同的Segment中,命中率过低,从而引起程序性能下降;
6.ConcurrentHashMap中初始化时只会初始化第一个segment,剩下的segment采用的是延迟初始化的机制,因此在每次put操作时都需要检查key对应的segment是否为null,如果为空则需要创建;