并发编程之深入理解CAS
什么是 CAS
CAS,compare and swap的缩写,中文翻译成比较并交换。
CAS 操作包含三个操作数 —— 内存位置(V)、预期原值(A)和新值(B)。 如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值 。否则,处理器不做任何操作。
为什么要使用 CAS
在并发中,我们需要对一个数据进行更改,如果使用锁来保证原子性,首先在性能方面会设计到底层操作系统内核线程切换,这个开销是很大的,如果使用 CAS 实现的话,相比较之下不会设计到内核切换,开销比较轻。
使用伪代码表示
if (value == expectedValue) {
value = newValue;
}
一个由比较和赋值两阶段组成的复合操作,CAS 可以看作是它们合并后的整体 ——一个不可分割的原子操作,并且其原子性是直接在硬件层面得到保障的。
CAS可以看做是乐观锁(对比数据库的悲观、乐观锁)的一种实现方式,Java原子类中的递增操 作就通过CAS自旋实现的。 CAS是一种无锁算法,在不使用锁(没有线程被阻塞)的情况下实现多线程之间的变量同步。
- 1 首先读取内存中的实际值v 如果和自己的值E相等,计算赋值给更新的值 U 成功则返回 U
- 2 上面更新失败,则继续重试,成功则返回,失败则继续重试。
CAS应用
在Java中使用 CAS
CAS 操作是由 Unsafe 类提供支持的,该类定义了三种针对不同类型变量的 CAS 操 作
可以看到是本地方法,是由 JVM 实现的。 接收四个参数分别是
对象的实例,内存偏移量,字段期望值,字段新值。
CAS缺陷
CAS 虽然解决了原子性,但是有一些问题
- ABA 问题
- 只能保证一个共享变量的原子操作
- CAS 不成功,长时间会造成 CPU 空转,影响 CPU 开销
ABA问题
当有多个线程对一个原子类进行操作的时候,某个线程在短时间内将原子类的值A修改为B,又马 上将其修改为A,此时其他线程不感知,还是会修改成功。
main 线程不清楚另一个线程对这个变量进行了修改,可能误认为没有更改过。 [图片上传失败...(image-637329-1667441369607)]
ABA问题的解决
可以在每一次修改时给一个版本号,这也就是乐观锁的由来。
比如 Java中AtomicStampedReference
类就提供了这样的功能
三个线程 t1,t2,t3 分别对变量进行修改,最后t3在更改为初始值。
AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(1, 1);
new Thread(() -> {
boolean b = atomicStampedReference.compareAndSet(1, 2, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
while (b) {
if (b) {
System.out.println(" t1 修改成功 ");
return;
} else {
System.out.println(" t1 修改失败 ");
}
}
}, "t1 ").start();
new Thread(() -> {
boolean b = atomicStampedReference.compareAndSet(2, 3, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
while (b) {
if (b) {
System.out.println(" t2 修改成功 ");
return;
} else {
System.out.println(" t2 修改失败 ");
}
}
}, "t2 ").start();
new Thread(() -> {
boolean b = atomicStampedReference.compareAndSet(3, 1, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
while (b) {
if (b) {
System.out.println(" t3 修改成功 ");
return;
} else {
System.out.println(" t3 修改失败 ");
}
}
}, "t3 ").start();
Thread.sleep(100);
System.out.println("版本号: " + atomicStampedReference.getStamp());
System.out.println("值: " + atomicStampedReference.getReference());
可以看到结果最后值还是 1 但是版本号已经是 4 了,已经变了三个版本了。
reference即我们实际存储的变量,stamp是版本,每次修改可以通过+1保证版本唯一性。这样就可以保证每次修改后的版本也会往上递增。
另外还有AtomicMarkableReference
类 是 AtomicStampedReference 类的补充版,不关心修改了几次,只关心有没有修改过。
AtomicMarkableReference<Integer> atomicStampedReference = new AtomicMarkableReference<Integer>(1,false);
new Thread(() -> {
boolean b = atomicStampedReference.compareAndSet(1, 2,false,true);
while (b) {
if (b) {
System.out.println(" t1 修改成功 ");
return;
} else {
System.out.println(" t1 修改失败 ");
}
}
}, "t1 ").start();
new Thread(() -> {
boolean b = atomicStampedReference.compareAndSet(2, 1,true,false);
while (b) {
if (b) {
System.out.println(" t2 修改成功 ");
return;
} else {
System.out.println(" t2 修改失败 ");
}
}
}, "t2 ").start();
Thread.sleep(100);
System.out.println("版本号: " + atomicStampedReference.isMarked());
System.out.println("值: " + atomicStampedReference.getReference());
Atomic原子操作类
Java 中已经有了 CAS 为什么还需要提供Atomic原子操作类操作。
在 Java 并发编程中很容易出现一些并发安全问题,比如i++操作,多个贤臣根治性就有可能获取不到正确的值,解决这个问题最基本的操作室加锁保证线程安全,但是锁这个操作太重了,不是一种很高效的解决方案。
基于 CAS 操作,不需要底层一些线程内核的切换,可以使用 CAS 比较后更新,或者再进行重试。
如: 在有了 CAS 之后,synchronized 性能也变得提升了,不断的进行重试,重试一定次数失败后,实在不成功,进行内核的线程切换操作。
JDK 中为我们提供了Atomic原子操作类,保证线程对基本类型变量的操作,底层也是基于 CAS 来实现。
即使JDK 中不提供这些原子类,我们也可以自己去封装实现,区别点在于可能没 JDK 实现的那么优雅以及稳定。
在java.util.concurrent.atomic
包里提供了一组原子操作类,可以分为这么几种类型:
- 基本类型:AtomicInteger、AtomicLong、AtomicBoolean
- 引用类型:AtomicReference、AtomicStampedRerence、AtomicMarkableReference;
- 数组类型:AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray
- 对象属性原子修改器:AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、 AtomicReferenceFieldUpdater
- 原子类型累加器(jdk1.8增加的类):DoubleAccumulator、DoubleAdder、 LongAccumulator、LongAdder、Striped64
基本类型以AtomicInteger为例
以原子的方式将实例中的原值加1,返回的是自增前的旧值;
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
getAndSet(int newValue):将实例中的值更新为新值,并返回旧值
public final int getAndSet(int newValue) {
return unsafe.getAndSetInt(this, valueOffset, newValue);
}
先比较计算,成功则返回,失败则继续重试
以原子的方式将实例中的原值进行加1操作,并返回最终相加后的结果
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
以原子方式将输入的数值与实例中原本的值相加,并返回最后的结果
public final int addAndGet(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
}
可以看到底层是使用unsasfe cpu指令级原子操作,并且值用volatile
修饰对每个线程可见,保证可见性。
原子更新数组类型
//1 初始化
AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(2);
// 2 初始化数组
//int array [] = {1,2};
//AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(array);
//使用第一种初始化
//下表0 值设置 1
atomicIntegerArray.set(0,1);
System.out.println(atomicIntegerArray.get(0));;
//下表1 值设置 2
atomicIntegerArray.set(1,2);
System.out.println(atomicIntegerArray.get(1));;
//更新失败
System.out.println(atomicIntegerArray.compareAndSet(1, 1, 2));
//更新成功
System.out.println(atomicIntegerArray.compareAndSet(1, 2, 3));
结果
可以看到基本方法一直,只不过数组是对 Index 操作的
原子更新引用类型
AtomicReference作用是对普通对象的封装,它可以保证你在修改对象引用时的线程安全性
还有 AtomicMarkableReference AtomicStampedReference 上面有例子
对象属性原子修改器
对于AtomicIntegerFieldUpdater 的使用稍微有一些限制和约束,约束如下: (1)字段必须是volatile类型的,在线程之间共享变量时保证立即可见.eg:volatile int value = 3 (2)字段的描述类型(修饰符public/protected/default/private)与调用者与操作对象字段的 关系一致。也就是说调用者能够直接操作对象字段,那么就可以反射进行原子操作。但是对于父 类的字段,子类是不能直接操作的,尽管子类可以访问父类的字段。 (3)只能是实例变量,不能是类变量,也就是说不能加static关键字。 (4)只能是可修改变量,不能使final变量,因为final的语义就是不可修改。实际上final的语义和 volatile是有冲突的,这两个关键字不能同时存在。 (5)对于AtomicIntegerFieldUpdater和AtomicLongFieldUpdater只能修改int/long类型的字 段,不能修改其包装类型(Integer/Long)。
AtomicIntegerFieldUpdater<Test> atomicIntegerFieldUpdater =
AtomicIntegerFieldUpdater.newUpdater(Test.class,"i");
// AtomicLongFieldUpdater 类型是long 和 AtomicIntegerFieldUpdater 一样
// AtomicLongFieldUpdater<Test> atomicLongFieldUpdater = AtomicLongFieldUpdater.newUpdater(Test.class,"i");
Test test = new Test();
//设置字段值
atomicIntegerFieldUpdater.set(test,10);
IntStream.rangeClosed(1,2).forEach(x->{
new Thread(()->{
for (int i = 0; i < 20; i++) {
Integer andDecrement = atomicIntegerFieldUpdater.getAndIncrement(test);
System.out.println(Thread.currentThread().getName() + " : " + andDecrement);
}
}).start();
});
如果要修改包装类型就需要使用 AtomicReferenceFieldUpdater。
Test2 test2 = new Test2();
//反省擦除 会报错 必须制定类型
// AtomicReferenceFieldUpdater atomicReferenceFieldUpdater = AtomicReferenceFieldUpdater.newUpdater(Test2.class,Integer.class,"i");
AtomicReferenceFieldUpdater<Test2,Integer> atomicReferenceFieldUpdater = AtomicReferenceFieldUpdater.newUpdater(Test2.class,Integer.class,"i");
test2.i=1;
atomicReferenceFieldUpdater.compareAndSet(test2,1,2);
System.out.println(atomicReferenceFieldUpdater.get(test2));
LongAdder/DoubleAdder详解
AtomicLong 、 AtomicInteger 是利用了底层的CAS操作来提供并发性的。 我们都知道CAS 会在cpu上不断的重试,当线程数量少还可以接收,大量并发情况下,会出现大量失败并且不断的自旋重试,导致 CPU 空转,严重影响性能。
为了解决高并发下 AtomicLong 、 AtomicInteger 自旋瓶颈问题,引入LongAdder/DoubleAdder类。
内部原理
AtomicLong中有个内部变量value保存着实际的long值,所有的操作都是针对该变 量进行。
也就是说,高并发环境下,value变量其实是一个热点,也就是N个线程竞争 一个热点。LongAdder的基本思路就是分散热点,将value值分散到一个数组中,不 同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽 中的变量值累加返回。
LongAdder内部有一个base变量,一个Cell[]数组: base变量:非竞态条件下,直接累加到该变量上 Cell[]数组:竞态条件下,累加个各个线程自己的槽Cell[i]中。
定义了一个内部Cell类,这就是我们之前所说的槽,每个Cell对象存有一个value值,可以通过 Unsafe来CAS操作它的值 [图片上传失败...(image-9ea409-1667441369606)]
LongAdder中add方法
Cell[] 是否初始化,cells是null 则没有则初始化,casBase 方法 b=base值,这个是cas原子操作,表明没有竞争。 如果出现了冲突,则再判断 cells 是否初始化,出现冲突更新base 更新base冲突则查看 一个线程base原子操作,另一个失败则进行 cells初始化 初始化了则 定位hash 指定到 Cell 数组指定曹中,使用cas更新每个槽里的值
最后将每个槽和base相加起来
由于计算总和时没有对Cell数组进行加锁,所以在累加过程中可能有其他线程对Cell中的值进 行了修改,也有可能对数组进行了扩容,所以sum返回的值并不是非常精确的,其返回值并不是 一个调用sum方法时的原子快照值。
longAccumelate 这个方法很长
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
大致的逻辑是
只有从未出现过并发冲突的时候,base基数才会使用到,一旦出现了并发冲突,之后所有的操作 都只针对Cell[]数组中的单元Cell。
如果Cell[]数组未初始化,会调用父类的longAccumelate去初始化Cell[], 如果Cell[]已经初始化 但是冲突发生在Cell单元内,则也调用父类的longAccumelate,此时可能就需要对Cell[]扩容 了。
这也是LongAdder设计的精妙之处:尽量减少热点冲突,不到最后万不得已,尽量将CAS操作延迟。
LongAccumulator
LongAccumulator是LongAdder的增强版。LongAdder只能针对数值的进行加减运算。 LongAccumulator内部原理和LongAdder几乎完全 一样,都是使用了父类Striped64的longAccumulate方法。
//自定义算法
LongAccumulator longAccumulator = new LongAccumulator((x,y)-> x + y,10);
for (int i = 1; i <= 100; i++) {
int finalI = i;
new Thread(()->{
longAccumulator.accumulate(finalI);
}).start();
}
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(longAccumulator.get());
最后
CAS 和加锁相比,CAS 更加轻量,但也带来了两个问题一个是ABA问题、一个是自旋时间问题。