概述
线程安全的HashMap版本。
1)基本思想:将整个大的hash table进一步细分成小的hash table,即Segment;
2)读不用加锁;写操作在所在的Segmenet上加锁,而不是整个HashMap,Hashtable就是所有方法竞争Hashtable上的锁,导致并发效率低;
3)采用懒构造segment(除了segments[0]),以减少初始化内存。Unsafe类实现了AtomicReferenceArrays的功能,但减少了间接引用的程度。对Segment.table元素、HashEntry.next属性采用更轻量级的“lazySet”写(用Unsafe.putOrderedObject实现,确保用在lock中,其后有unlock释放),以保证table更新的顺序一致性。之前的ConcurrentHashMap类过度依赖final关键字,虽然能避免一些volatile读,但是初始化时需要较大的内存。
4)并发级别由concurrencyLevel参数确定,太大会浪费内存空间和遍历时间,太小则加强竞争;当只有一个线程写,其他线程都是读,则设置为1;
5)迭代器为其创建时或之后的某个时刻ConcurrentHashMap状态,不会抛出ConcurrentModificationException,用来在一个线程中一次使用;
6)尽量避免rehash。
数据结构
Segment<K,V>数组,每一个Segment又是特殊的hash table,包括HashEntry<K,V>数组,HashEntry为并发版的键值对:
final Segment<K,V>[] segments;// final方式
static final class Segment<K,V> extends ReentrantLock implements Serializable {
// 在预扫描中,segment阻塞前tryLock尝试的最大数
// 在多处理器中,采用有限的尝试次数使得在查找节点时缓存
static final int MAX_SCAN_RETRIES =
Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
// segment的table数组
// 采用entryAt/setEntryAt提供对其元素的volatile读写
transient volatile HashEntry<K,V>[] table;
// segment中元素数目,获取方式采用lock或其他volatile方式
transient int count;
// 所在segment的写总数
// 即使其overflows,也可以保证在isEmpty()、size()使用中的正确性
// 获取方式采用lock或其他volatile方式
transient int modCount;
// segment中table的负载阈值,超过则其table rehash
transient int threshold;
// 负载因子,对所有segment都一样。
// 这个单独作为segment的一个属性是为了避免链接到外部对象
final float loadFactor;
Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
this.loadFactor = lf;
this.threshold = threshold;
this.table = tab;
}
}
static final class HashEntry<K,V> {
// hash、key为final;value、next为volatile
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}
// 用UNSAFE.putOrderedObject进行next的volatile写
final void setNext(HashEntry<K,V> n) {
UNSAFE.putOrderedObject(this, nextOffset, n);
}
// Unsafe mechanics
static final sun.misc.Unsafe UNSAFE;
static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class k = HashEntry.class;
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
构造器
相关概念了解:
初始容量:ConcurrentHashMap容量,其初始化完成后,segments长度*Segment容量Segment容量:Segment.table.length
负载因子:即每一个Segment的负载因子,该值在所有的Segment中是相等的
并发级别:segments.length,在ConcurrentHashMap中锁是针对Segment的
segmentMask:限定在segments的索引范围,即segments.length-1,key hashCode的高位决定segment
segmentShift:在segments索引中,hash码的移位值
采用无参构造的相关参数值:
segments.length = 16
segmentShift = 28
segmentMask = 15
Segment.table.length = 2
Segment.loadFactor = 0.75
Segment.threshold = 1
// 带初始容量、负载因子、并发级别参数构造
// initialCapacity参数:用来确定Segment容量
// loadFactor参数:Segment的负载因子
// concurrencyLevel参数:用来确定segments长度,即并发级别
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift; // 用来确定segment的移位
this.segmentMask = ssize - 1; // 用来确定segment的掩码,即Segment数组长度-1,key hashCode的高位确定segment index
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize; // 初始容量/Segment数组长度,即用来确定Segment容量
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1; // Segment容量
// create segments and segments[0]
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]); // 保证序列化与之前版本的兼容
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
// 带初始容量、负载因子参数构造,默认并发级别为16
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);
}
// 带初始容量参数构造,默认负载因子0.75
public ConcurrentHashMap(int initialCapacity) {
this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
// 无参构造,默认初始容量16
public ConcurrentHashMap() {
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
/**
* Creates a new map with the same mappings as the given map.
* The map is created with a capacity of 1.5 times the number
* of mappings in the given map or 16 (whichever is greater),
* and a default load factor (0.75) and concurrencyLevel (16).
*
* @param m the map
*/
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1,
DEFAULT_INITIAL_CAPACITY),
DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
putAll(m);
}
增删改查
Segment容量调整策略
1)当Segment中键值对数超过负载阈值且table长度小于MAXIMUM_CAPACITY = 1 << 30,则对其table进行容量调整,table容量翻倍;
2)table最大容量为MAXIMUM_CAPACITY = 1 << 30;
3)table容量达到MAXIMUM_CAPACITY 后,如果有put请求,则直接在相应的bucket中链接进来,不会控制键值对的添加;
4)若进行了table的容量调整,需要将旧table关联的键值对重新在新table中确定bucket,再添加进来,也就是所说的hash table rehash,这里可以重用一些旧table的节点,因为next属性是不变的。
// rehash
@SuppressWarnings("unchecked")
private void rehash(HashEntry<K,V> node) {
// table长度翻倍
// 由于table长度为2的幂次方,在从就table到新table的rehash过程中,
// 元素的索引要么不变,要么移位2的幂次方;
// 一些老的节点因其next属性不变可以重用,在默认的负载阈值下,
// 只有1/6的元素需要复制。Entry的读取值采用普通的数组索引,这是
// 因为其后有table的volatile写
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
int newCapacity = oldCapacity << 1; // table长度翻倍,长度依然为2的幂次方
threshold = (int)(newCapacity * loadFactor); // 负载阈值
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
int sizeMask = newCapacity - 1;
for (int i = 0; i < oldCapacity ; i++) {
HashEntry<K,V> e = oldTable[i]; // 采用普通的数组索引
if (e != null) {
HashEntry<K,V> next = e.next;
int idx = e.hash & sizeMask;
if (next == null) // Single node on list
newTable[idx] = e;
else {
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
newTable[lastIdx] = lastRun; // 在新的table中,重用索引相同的老的节点,其next属性不变
// Clone remaining nodes
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}
增、改
确定segments数组中Segment索引、Segment中table数组bucket索引
基于key的hashCode再哈希产生hash码,用其高位确定Segment索引,用其低位确定bucket索引:
// 对key的hashCode进行再次哈希,对其高位、低位进一步散列
// 由于segments、Segment.table length为均2的幂次方,所以对那些高位、低位相同的hashCode,容易产生hash碰撞;
private int hash(Object k) {
int h = hashSeed;
if ((0 != h) && (k instanceof String)) {
return sun.misc.Hashing.stringHash32((String) k);
}
h ^= k.hashCode();
// Spread bits to regularize both segment and index locations,
// using variant of single-word Wang/Jenkins hash.
h += (h << 15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h << 3);
h ^= (h >>> 6);
h += (h << 2) + (h << 14);
return h ^ (h >>> 16);
}
// 用hash码的高位在segments中确定index
int j = (hash >>> segmentShift) & segmentMask;
// 用hash码的低位确定bucket index
int index = (tab.length - 1) & hash;
步骤:
1)根据key的hashCode获取hash码;
2)用hash码的高位确定Segment;
3)将put操作委托给确定的Segment进行put;
4)先采用自旋获取该Segment的锁;
5)用hash码的低位确定该Segment中table的bucket;
6)先遍历该bucket中键值对,确定是否已有相同或hash码相等且key相等的键值对,有则替换新value后返回;否则用key、value、hash创建Entry,将其链接到bucket首位;
7)释放该Segment的锁。
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false); // 委托给确定的Segment进行put
}
// Segment put
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value); // 采用自旋,获取锁与遍历bucket热身
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash; // bucketIndex
HashEntry<K,V> first = entryAt(tab, index); // table元素 volatile读
for (HashEntry<K,V> e = first;;) {
if (e != null) { // 先遍历看是否已有键值对
K k;
// key相同或hash码相等且key相等
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node); // table元素 volatile写
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
// segments元素volatile读,若不存在,则创建再用CAS写入segments
@SuppressWarnings("unchecked")
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
// 对segments中元素进行volatile读,若为null则
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // 再次检查是否为null
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))// 用CAS方式写
== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
// 在尝试获取Segment锁的过程中,遍历hash码所确定的bucket中的节点,
// 如果没有,则创建返回,返回时确保获取到当前Segment锁。这里只采用equals
// 来比较key,这不重要,主要是为了遍历热身。
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {
if (e == null) {
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key)) // 不是真的比较,仅仅简单遍历
retries = 0;
else
e = e.next;
}
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
// table元素volatile读
@SuppressWarnings("unchecked")
static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) {
return (tab == null) ? null :
(HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)i << TSHIFT) + TBASE);
}
// table元素volatile写
static final <K,V> void setEntryAt(HashEntry<K,V>[] tab, int i,
HashEntry<K,V> e) {
UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e);
}
删
步骤:
1)根据key的hashCode获取hash码;
2)用hash码的高位确定Segment;
3)将put操作委托给确定的Segment进行remove;
4)先采用自旋获取该Segment的锁;
5)用hash码的低位确定该Segment中table的bucket;
6)遍历该bucket中键值对,确定是否已有相同或hash码相等且key相等的键值对,有则删除;
7)释放该Segment的锁。
public V remove(Object key) {
int hash = hash(key);
Segment<K,V> s = segmentForHash(hash);
return s == null ? null : s.remove(key, hash, null);
}
final V remove(Object key, int hash, Object value) {
if (!tryLock())
scanAndLock(key, hash);
V oldValue = null;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> e = entryAt(tab, index);
HashEntry<K,V> pred = null;
while (e != null) {
K k;
HashEntry<K,V> next = e.next;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
V v = e.value;
if (value == null || value == v || value.equals(v)) {
if (pred == null)
setEntryAt(tab, index, next); // 删除的是bucket的第一个节点,volatile写
else
pred.setNext(next); // 通过改变next删除节点,next的volatile写
++modCount;
--count;
oldValue = v;
}
break;
}
pred = e;
e = next;
}
} finally {
unlock();
}
return oldValue;
}
查
步骤:
1)根据key的hashCode获取hash码;
2)用hash码的高位确定Segment;
3)用hash码的低位确定该Segment中table的bucket;
4)遍历该bucket中键值对,确定是否已有相同或hash码相等且key相等的键值对,有则返回关联的value;否则返回null。
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
其他方法
// 判断ConcurrentHashMap是否为空
// 用segment的modCount来确定调用时间段
public boolean isEmpty() {
long sum = 0L;
final Segment<K,V>[] segments = this.segments;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
if (seg.count != 0)
return false;
sum += seg.modCount;
}
}
if (sum != 0L) { // recheck unless no modifications
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
if (seg.count != 0)
return false;
sum -= seg.modCount;
}
}
if (sum != 0L) // modCount溢不溢出都没有关系,只要sum为0L就表明没有修改
return false;
}
return true;
}
// 获取ConcurrentHashMap的键值对总数
// 用segment的modCount来确定调用时间段
// 先尝试RETRIES_BEFORE_LOCK次数获取size,获取失败则加全锁
public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
迭代器
HashIterator为迭代器基类,从后往前对ConcurrentHashMap进行全遍历,性能一般,KeyIterator、ValueIterator、EntryIterator都继承于该类:
abstract class HashIterator {
int nextSegmentIndex;
int nextTableIndex;
HashEntry<K,V>[] currentTable;
HashEntry<K, V> nextEntry;
HashEntry<K, V> lastReturned;
HashIterator() {
nextSegmentIndex = segments.length - 1;
nextTableIndex = -1;
advance();
}
/**
* Set nextEntry to first node of next non-empty table
* (in backwards order, to simplify checks).
*/
final void advance() {
for (;;) {
if (nextTableIndex >= 0) {
if ((nextEntry = entryAt(currentTable,
nextTableIndex--)) != null) // 往前找到不为null的bucket
break;
}
else if (nextSegmentIndex >= 0) {
Segment<K,V> seg = segmentAt(segments, nextSegmentIndex--);// 往前找到不为null的Segment
if (seg != null && (currentTable = seg.table) != null)
nextTableIndex = currentTable.length - 1;
}
else
break;
}
}
final HashEntry<K,V> nextEntry() {
HashEntry<K,V> e = nextEntry;
if (e == null)
throw new NoSuchElementException();
lastReturned = e; // cannot assign until after null check
if ((nextEntry = e.next) == null)
advance();
return e;
}
public final boolean hasNext() { return nextEntry != null; }
public final boolean hasMoreElements() { return nextEntry != null; }
public final void remove() {
if (lastReturned == null)
throw new IllegalStateException();
ConcurrentHashMap.this.remove(lastReturned.key);
lastReturned = null;
}
}
特性
hash特性
1)segments、Segment.table的长度均为2的幂次方;
2)用hash函数,基于key的hashCode再次哈希,对其高位、低位进一步散列 ;
3)用hash码的高位在segments中确定index,低位确定bucket index;
// 用hash码的高位在segments中确定index
int j = (hash >>> segmentShift) & segmentMask;
// 用hash码的低位确定bucket index
int index = (tab.length - 1) & hash;
ConcurrentHashMap并发设计
其并发体现在两点:
1)Segment之间的并发;
2)Segment内部的读写并发;
Segment之间的并发
1)并发读取segments中的元素;2)将所有操作委托给Segment。
int h = hash(key); // hash码
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;// segment index
s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)// 对segment index元素volatile读
Segment内部的读写并发
1)写线程之间竞争Segment同一把锁,读线程不加锁;
2)利用volatile、final语义保证并发写线程之间、并发读写线程之间的可见性。
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) { // table volatile读
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);// table元素volatile读
e != null; e = e.next) { // next volatile读
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k))) // key、hash为final
return e.value;// value volatile读
}
}
return null;
}
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table; // // table volatile读
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index); // table元素volatile读
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {// key、hash为final
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;// value volatile写
++modCount;
}
break;
}
e = e.next;// next volatile读
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);// table元素volatile写,弱一致性
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
segments数组的懒构造如何体现?有何作用?
在put时,如果获取的segment为null,则进行懒构造
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
这样可以减少初始化内存空间。
final关键字的作用?
final字段所在的类实例初始化完成后,在未在构造完成前暴露实例的this前提下,final字段对所有并发线程可见,可以部分减少volatile读,但是会增加初始化内存空间。