本章是提高教程可能对于刚入门同学来说会有些难度,读懂本章你需要了解以下知识点:
一、 【Java基础提高】深入分析final关键字(一)
二、 【Java并发编程】深入分析volatile(四)
三、 【Java基础提高】HashTable源码分析(六)
一、Concurrent源码分析
ConcurrentHashMap是由Segment(桶)、HashEntry(节点)2大数据结构组成。如下图所示:
1.1 Segment类和属性
//Segment内部维护了一个链表数组static final class Segment<K,V> extends ReentrantLock implements Serializable {
//链表数组,数组中的每一个元素代表了一个链表的头部
transient volatile HashEntry<K,V>[] table;
//Segment中元素的数量
transient int count;
//对table的大小造成影响的操作的数量(比如put或者remove操作)
transient int modCount;
//阈值,Segment里面元素的数量超过这个值会对Segment进行扩容,扩容后大小=old*2*负载因子
transient int threshold;
//负载因子,用于确定threshold
final float loadFactor;
}
Segment继承了ReentrantLock,这意味着每个segment都可以当做一个锁,每一把锁只锁住整个容器中的部分数据,这样不影响线程访问其它数据,当然如果是对全局改变时会锁定所有的segment段。比如:size()和containsValue(),注意的是要按顺序锁定所有段,操作完毕后,再按顺序释放所有段的锁。如果不按顺序的话,有可能会出现死锁。
1.2 HashEntry类和属性
//HashEntry是一个单向链表 static final class HashEntry<K,V> { //哈希值 final int hash; //存储的key和值value final K key; volatile V value; //指向的下一个HashEntry,即链表的下一个节点 volatile HashEntry<K,V> next;}
类似与HashMap节点Entry,HashEntry也是一个单向链表,它包含了key、hash、value和下一个节点信息。HashEntry和Entry的不同点:
不同点一:使用了多个final关键字(final class 、final hash) ,这意味着不能从链表的中间或尾部添加或删除节点,后面删除操作时会讲到。
不同点二:使用volatile,是为了更新值后能立即对其它线程可见。这里没有使用锁,效率更高。
1.3 类的初始化
/** * * @param initialCapacity 初始容量 * @param loadFactor负载因子 * @param concurrencyLevel 代表ConcurrentHashMap内部的Segment的数量, * ConcurrentLevel 并发级别 */public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)throw new IllegalArgumentException();if (concurrencyLevel > MAX_SEGMENTS)concurrencyLevel = MAX_SEGMENTS;// Find power-of-two sizes best matching argumentsint sshift = 0;int ssize = 1;while (ssize < concurrencyLevel) {++sshift;ssize <<= 1;//ssize左移一位也就是每次ssize=2*ssize。}//主要使用于put()和segmentForHash()方法,结合hash计算出元素在哪一个Segment中。 //假如concurrencyLevel是16,那么sshift=4、segmentShift=28、segmentMask=15;this.segmentShift = 32 - sshift;this.segmentMask = ssize - 1;if (initialCapacity > MAXIMUM_CAPACITY)initialCapacity = MAXIMUM_CAPACITY;int c = initialCapacity / ssize;if (c * ssize < initialCapacity)++c;int cap = MIN_SEGMENT_TABLE_CAPACITY;while (cap < c)cap <<= 1;// create segments and segments[0]Segment<K,V> s0 =new Segment<K,V>(loadFactor, (int)(cap * loadFactor),(HashEntry<K,V>[])new HashEntry[cap]);Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]this.segments = ss;}
ConcurrencyLevel默认情况下内部按并发级别为16来创建。对于每个segment的容量,默认情况也是16。其中concurrentLevel和segment的初始容量都是可以通过构造函数设定的。要注意的是ConcurrencyLevel一经指定,不可改变,后续如果ConcurrentHashMap的元素数量增加导致ConrruentHashMap需要扩容,ConcurrentHashMap不会增加Segment的数量,而只会增加Segment中链表数组的容量大小,这样的好处是扩容过程不需要对整个ConcurrentHashMap做rehash,而只需要对Segment里面的元素做一次rehash就可以了。
1.4 ensureSegment()方法
该方法返回给定索引位置的Segment,如果Segment不存在,则参考Segment表中的第一个Segment的参数创建一个Segment并通过CAS操作将它记录到Segment表中去。private Segment<K,V> ensureSegment(int k) { final Segment<K,V>[] ss = this.segments; long u = (k << SSHIFT) + SBASE; // raw offset Segment<K,V> seg; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { Segment<K,V> proto = ss[0]; // use segment 0 as prototype int cap = proto.table.length; float lf = proto.loadFactor; int threshold = (int)(cap * lf); HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap]; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) break; } } } return seg; }
1.5 entryAt()方法
entryAt()方法是从链表中查找节点。在方法参数里注意到有传入tab链表数组和index索引,那为什么还要调用entryAt()方法获取数组项的值而不是通过tab[index]方式直接获取?那我们从源头(put)开始分析,见1.6put()操作。static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) { return (tab == null) ? null : (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)i << TSHIFT) + TBASE); }
1.6 put()操作
1.6.1 锁分离技术
大家知道HashTable是使用了synchronized来保证线程安全,但是其效率非常差。它效率非常差的原因是多个线程访问HashTable时需要竞争同一把锁,如果我们有多把锁,每一把锁只锁住一部分数据,那么多线程在访问不同的数据时也就不会存在竞争,能提高访问效率。这种做法我们称为锁分离技术。在《Java并发编程实战》一书中作者提到过分拆锁和分离锁技术:分拆锁(lock spliting)就是若原先的程序中多处逻辑都采用同一个锁,但各个逻辑之间又相互独立,就可以拆(Spliting)为使用多个锁,每个锁守护不同的逻辑。
分拆锁有时候可以被扩展,分成可大可小加锁块的集合,并且它们归属于相互独立的对象,这样的情况就是分离锁(lock striping)。
而ConcurrentHashMap就是使用了分离锁技术,对每个Segment配置一把锁,如下图所示:
1.6.2 源码分析
Segment的put操作原理如下图所示,图中展示的不是很详细,其中关于加锁的步骤没有加上去,原因是加了几次觉得加锁后看着很复杂。用图片展示是为了更加简单和明了,如果看着复杂也就没有意义了,我尽量用文字说清楚它的步骤。
步骤一:进入Segment的put操作时先进行加锁保护。如果加锁没有成功,调用scanAndLockForPut方法(详细步骤见下面scanAndLockForPut()源码分析)进入自旋状态,该方法持续查找key对应的节点链中是已存在该机节点,如果没有找到,则预创建一个新节点,并且尝试n次,直到尝试次数操作限制,才真正进入加锁等待状态,自旋结束并返回节点(如果返回了一个非空节点,则表示在链表中没有找到相应的节点)。对最大尝试次数,目前的实现单核次数为1,多核为64。
步骤二:使用(tab.length - 1) & hash计算第一个节点位置,再通过entryAt()方法去查找第一个节点。如果节点存在,遍历链表找到key值所在的节点,如果找到了这个节点则直接更新旧value,结束循环。其中value使用了volatile,它更新后的值立马对其它线程可见。如果节点不存在,将步骤一预创建的新节点(如果没有则重新创建)添加到链表中,添加前先检查添加后节点数量是否超过容器大小,如果超过了,则rehash操作。没有的话调用setNext或setEntryAt方法添加新节点;
要注意的是在更新链表时使用了Unsafe.putOrderedObject()方法,这个方法能够实现非堵塞的写入,这些写入不会被Java的JIT重新排序指令(instruction reordering),使得它能更加快速的存储。
解决1.5问题:为什么还要调用entryAt()方法获取数组项的值而不是通过tab[index]方式直接获取?
虽然在开始时volatile table将引用赋值给了变量tab,但是多线程下table里的值可能发生改变,使用tab[index]并不能获得最新的值。。为了保证接下来的put操作能够读取到上一次的更新结果,需要使用volatile的语法去读取节点链的链头.
public V put(K key, V value) {Segment<K,V> s;if (value == null)throw new NullPointerException();int hash = hash(key);//计算Segment的位置,在初始化的时候对segmentShift和segmentMask做了解释int j = (hash >>> segmentShift) & segmentMask;//从Segment数组中获取segment元素的位置if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegments = ensureSegment(j);//return s.put(key, hash, value, false);}//往Segment的HashEntry中添加元素,使用了分锁机制final V put(K key, int hash, V value, boolean onlyIfAbsent) {//tryLock 仅在调用时锁为空闲状态才获取该锁。如果锁可用,则获取锁,并立即返回值 true。否则是false//scanAndLockForPut 下面单独说scanAndLockForPutHashEntry<K,V> node = tryLock() ? null :scanAndLockForPut(key, hash, value);V oldValue;try {HashEntry<K,V>[] tab = table;int index = (tab.length - 1) & hash;HashEntry<K,V> first = entryAt(tab, index);for (HashEntry<K,V> e = first;;) {if (e != null) {K k;if ((k = e.key) == key ||(e.hash == hash && key.equals(k))) {oldValue = e.value;if (!onlyIfAbsent) {e.value = value;++modCount;}break;}e = e.next;}else {if (node != null)node.setNext(first);elsenode = new HashEntry<K,V>(hash, key, value, first);int c = count + 1;if (c > threshold && tab.length < MAXIMUM_CAPACITY)rehash(node);elsesetEntryAt(tab, index, node);++modCount;count = c;oldValue = null;break;}}} finally {unlock();}return oldValue;}
1.5 segmentForHash()方法
/** * 查找Segment对象,这里Unsafe的主要作用是提供原子操作。 */@SuppressWarnings("unchecked")private Segment<K,V> segmentForHash(int h) { long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; return (Segment<K,V>) UNSAFE.getObjectVolatile(segments, u);}
1.6 scanAndLockForPut()方法
在下面代码中,它先获取key对应的头节点,进入链表循环。如果链表中不存在要插入的节点,则预创建一个新节点,否则retries值递增,直到操作最大尝试次数而进入等待状态。这个方法要注意的是:当在自旋过程中发现链表链头发生了变化,则更新节点链的链头,并重置retries值为-1,重新为尝试获取锁而自旋遍历。private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {//第一步:先找到HashEntry链表中的头节点 HashEntry<K,V> first = entryForHash(this, hash); HashEntry<K,V> e = first; HashEntry<K,V> node = null; int retries = -1; // negative while locating node while (!tryLock()) { HashEntry<K,V> f; // to recheck first below //第一次一定执行该条件内代码 if (retries < 0) { //第二步:分三种情景 //情景一:没有找到,创建一个新的节点。 if (e == null) { if (node == null) // speculatively create node node = new HashEntry<K,V>(hash, key, value, null); retries = 0; } //情景二:找到相同key的节点 else if (key.equals(e.key)) retries = 0; else //情景三:没找到key值对应的节点,指向下一个节点继续 e = e.next; } //尝试次数达到限制进入加锁等待状态。 对最大尝试次数,目前的实现单核次数为1,多核为64: else if (++retries > MAX_SCAN_RETRIES) { lock(); break; } //retries是偶数并且不是头节点。在自旋中链头可能会发生变化 else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) { e = first = f; // re-traverse if entry changed retries = -1; } } return node;}
1.7 get()操作
public V get(Object key) { Segment<K,V> s; // manually integrate access methods to reduce overhead HashEntry<K,V>[] tab; int h = hash(key); long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) { for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } return null; }从代码可以看出get方法并没有调用锁,它使用了volatile的可见性来实现线程安全的。
参考资料 http://blog.csdn.net/fw0124/article/details/43308193
http://ifeve.com/concurrenthashmap/
http://www.jdon.com/performance/java-performance-optimizations-queue.html
作者:小毛驴,一个游戏人
梦想:世界和平
原文地址:http://blog.csdn.net/liulongling若有错误之处,请多多谅解并欢迎批评指正。 本博客中未标明转载的文章归作者小毛驴所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。