Java多线程 -- JUC包源码分析6 -- ConcurrentHashMap

时间:2022-09-21 10:09:49

ConcurrentHashMap的源码,从JDK1.6到JDK1.7, 经历了不少变化。在JDK1.7中,好几个地方使用了sun.misc.Unsafe里面的函数,比如UNSAFE.putOrderedObject, UNSAFE.getObject..。对于这些函数的原理,笔者也不甚理解。为此,本文的讨论只针对JDK1.6。
-锁分离
-读不加锁,锁加锁
-弱一致性, happen before偏序关系


锁分离

ConcurrentHashMap首先使用了锁分离技术,即把一个HashMap拆成了多个子HashMap,每个HashMap分别加锁,从而提供并发度。

在代码中,每个子HashMap叫做一个Segment,缺省Segment的个数为 static final int DEFAULT_CONCURRENCY_LEVEL = 16。用户可以自己重新设置此值,但此值一点是2的整数次方,便于hash。

并且Segment是final的,在构造函数里面,初始化好之后,就不再改变,因此可以不加锁访问。

public class ConcurrentHashMap<K,V> ...
{
...
final Segment<K,V>[] segments; //关键点:final,构造完毕,segment的个数就不能再改变。线程安全
...
}

final Segment<K,V> segmentFor(int hash) {
return segments[(hash >>> segmentShift) & segmentMask];
}

public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
segmentShift = 32 - sshift;
segmentMask = ssize - 1;
this.segments = Segment.newArray(ssize);
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = 1;
while (cap < c)
cap <<= 1;
for (int i = 0; i < this.segments.length; ++i)
this.segments[i] = new Segment<K,V>(cap, loadFactor); //构造所有segments
}

读不加锁,写加锁

Segment里关键的volatile变量

每个Segment读不加锁,写加锁。Segment结构如下:

static final class Segment<K,V> extends ReentrantLock implements Serializable {  

//关键变量:为了尽可能的保证put/get的同步,加了此volatile变量,实现个数的原子性。但put/get还是没办法完全同步,这个将在下面,详细阐释。
transient volatile int count;

transient int modCount;

transient int threshold;

transient volatile HashEntry<K,V>[] table; //hash表。table是volatile的,在rehash的时候,table会被重新赋值,就会利用到volatile这个特性。但是,对table[x]的赋值,并不是volatile的!因此会出现put/get不同步的时候,后面会讲到这个问题。

final float loadFactor;
...
}

static final class HashEntry<K,V> {
final K key;
final int hash;
volatile V value; //关键点:value是volatile的,保证当put的时候,如果此key存在,则对value的修改,对get理解可见。但当key不存在,新加Entry时,对get就不一定立即可见了。后续将详细解释此问题
final HashEntry<K,V> next;
...
}

get函数

public V get(Object key) {  
int hash = hash(key.hashCode());
return segmentFor(hash).get(key, hash);
}

final Segment<K,V> segmentFor(int hash) {
return segments[(hash >>> segmentShift) & segmentMask];
}

V get(Object key, int hash) {
if (count != 0) { //先用volatile变量判断个数
HashEntry<K,V> e = getFirst(hash);
while (e != null) {
if (e.hash == hash && key.equals(e.key)) {
V v = e.value;
if (v != null)
return v;
return readValueUnderLock(e); //key有,value = nll,重新加锁读
}
e = e.next;
}
}
return null;
}
HashEntry<K,V> getFirst(int hash) {
HashEntry<K,V>[] tab = table;
return tab[hash & (tab.length - 1)];
}

//此举并不能完全避免put/get的不同步,只是为了避免put新加结点的时候,HashEntry初始化与赋值给table的指令重排序。详见下面put源码分析
V readValueUnderLock(HashEntry<K,V> e) {
lock();
try {
return e.value;
} finally {
unlock();
}
}

put 函数

V put(K key, int hash, V value, boolean onlyIfAbsent) {  
lock(); // 加锁
try {
int c = count;
if (c++ > threshold)
rehash(); //扩容,rehash。注意,只是对某个Segment rehash,而不会rehash整个ConcurrentHashMap
HashEntry<K,V>[] tab = table;
int index = hash & (tab.length - 1);
HashEntry<K,V> first = tab[index];
HashEntry<K,V> e = first;
while (e != null && (e.hash != hash || !key.equals(e.key)))
e = e.next;
V oldValue;
if (e != null) { //找到了key相同的,直接改value
oldValue = e.value;
if (!onlyIfAbsent) //并且有覆盖标识
e.value = value; //value是volatile的,在这种case下,put的更改,对get立即可见
}
else { //没找到key相同的,新建结点,插在链表头部
oldValue = null;
++modCount;

//关键的2行代码
tab[index] = new HashEntry<K,V>(key, hash, first, value);
//(1)这里tab是volatile的,但tab[index]并不是!!!也就是说,这里put的新结点,对get不一定立即可见。

//(2)HashEntry的构造,可能被重排序到赋给tab[index]之后,因此,才有上面的get,读到value = null,加锁重读


count = c;
//count是volatile的,原子的。如果在上面的get中if(count != 0)之前执行,则有happen before 关系。

}
return oldValue; // 返回旧值。
} finally {
unlock();
}
}

弱一致性-happen before偏序关系

###put/get的不同步

所谓弱一致性,是指上面的put进一个元素之后,get未必立即能取到。put的时候,有以下3种case:
case 1: key存在,覆盖value。因为value是volatile的,put/get一定同步。

case 2: key不存在,新建Entry。上面那2行关键代码,执行完之后,get去取。此时因为happen before的偏序关系:
“tab[index] = ..” happen before “count = c”,
“count = c” happen before “if(count ! = 0)”,
因此 “tab[index] = .. ” happen before “if(count!=0)”,
也就happen before于”if(count !=0)”之后的代码块。

此时, put完之后,一定可以get到.

case 3: key不存在,新建Entry。在上面那2行代码,第1行执行完毕,第2行还没执行之前,此时执行了get,则可能取不到!!因为不存在上述的happen before关系。

总结一下:虽然第3种case概率比较低,但仍可能出现put进去,get取不到的情况。

###clear的不同步
因为没有全局的锁,在清除完一个segments之后,正在清理下一个segments的时候,已经清理segments可能又被加入了数据,因此clear返回的时候,ConcurrentHashMap中是可能存在数据的。因此,clear方法是弱一致的。

public void clear() {
for (int i = 0; i < segments.length; ++i)
segments[i].clear();
}

当然,除了上述函数,还有其它函数有不同步的,在此不再详述。