并发编程之深入理解CAS

时间:2022-11-03 12:01:08

并发编程之深入理解CAS

什么是 CAS

CAS,compare and swap的缩写,中文翻译成比较并交换。

CAS 操作包含三个操作数 —— 内存位置(V)、预期原值(A)和新值(B)。 如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值 。否则,处理器不做任何操作。

为什么要使用 CAS

在并发中,我们需要对一个数据进行更改,如果使用锁来保证原子性,首先在性能方面会设计到底层操作系统内核线程切换,这个开销是很大的,如果使用 CAS 实现的话,相比较之下不会设计到内核切换,开销比较轻。

使用伪代码表示

if (value == expectedValue) {
value = newValue;
}

一个由比较和赋值两阶段组成的复合操作,CAS 可以看作是它们合并后的整体 ——一个不可分割的原子操作,并且其原子性是直接在硬件层面得到保障的。

CAS可以看做是乐观锁(对比数据库的悲观、乐观锁)的一种实现方式,Java原子类中的递增操 作就通过CAS自旋实现的。 CAS是一种无锁算法,在不使用锁(没有线程被阻塞)的情况下实现多线程之间的变量同步。

并发编程之深入理解CAS

  • 1 首先读取内存中的实际值v 如果和自己的值E相等,计算赋值给更新的值 U 成功则返回 U
  • 2 上面更新失败,则继续重试,成功则返回,失败则继续重试。

CAS应用

在Java中使用 CAS

CAS 操作是由 Unsafe 类提供支持的,该类定义了三种针对不同类型变量的 CAS 操 作

并发编程之深入理解CAS

可以看到是本地方法,是由 JVM 实现的。 接收四个参数分别是

对象的实例,内存偏移量,字段期望值,字段新值。

CAS缺陷

CAS 虽然解决了原子性,但是有一些问题

  • ABA 问题
  • 只能保证一个共享变量的原子操作
  • CAS 不成功,长时间会造成 CPU 空转,影响 CPU 开销

ABA问题

并发编程之深入理解CAS

当有多个线程对一个原子类进行操作的时候,某个线程在短时间内将原子类的值A修改为B,又马 上将其修改为A,此时其他线程不感知,还是会修改成功。

并发编程之深入理解CAS

main 线程不清楚另一个线程对这个变量进行了修改,可能误认为没有更改过。 [图片上传失败...(image-637329-1667441369607)]

ABA问题的解决

可以在每一次修改时给一个版本号,这也就是乐观锁的由来。

比如 Java中​​AtomicStampedReference​​类就提供了这样的功能

三个线程 t1,t2,t3 分别对变量进行修改,最后t3在更改为初始值。

AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(1, 1);

new Thread(() -> {
boolean b = atomicStampedReference.compareAndSet(1, 2, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
while (b) {
if (b) {
System.out.println(" t1 修改成功 ");
return;
} else {
System.out.println(" t1 修改失败 ");
}
}
}, "t1 ").start();

new Thread(() -> {
boolean b = atomicStampedReference.compareAndSet(2, 3, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
while (b) {
if (b) {
System.out.println(" t2 修改成功 ");
return;
} else {
System.out.println(" t2 修改失败 ");
}
}

}, "t2 ").start();

new Thread(() -> {
boolean b = atomicStampedReference.compareAndSet(3, 1, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
while (b) {
if (b) {
System.out.println(" t3 修改成功 ");
return;

} else {
System.out.println(" t3 修改失败 ");
}
}

}, "t3 ").start();

Thread.sleep(100);

System.out.println("版本号: " + atomicStampedReference.getStamp());
System.out.println("值: " + atomicStampedReference.getReference());

可以看到结果最后值还是 1 但是版本号已经是 4 了,已经变了三个版本了。

并发编程之深入理解CAS

reference即我们实际存储的变量,stamp是版本,每次修改可以通过+1保证版本唯一性。这样就可以保证每次修改后的版本也会往上递增。

另外还有​​AtomicMarkableReference​​类 是 AtomicStampedReference 类的补充版,不关心修改了几次,只关心有没有修改过。

AtomicMarkableReference<Integer> atomicStampedReference = new AtomicMarkableReference<Integer>(1,false);

new Thread(() -> {
boolean b = atomicStampedReference.compareAndSet(1, 2,false,true);
while (b) {
if (b) {
System.out.println(" t1 修改成功 ");
return;
} else {
System.out.println(" t1 修改失败 ");
}
}
}, "t1 ").start();

new Thread(() -> {
boolean b = atomicStampedReference.compareAndSet(2, 1,true,false);
while (b) {
if (b) {
System.out.println(" t2 修改成功 ");
return;
} else {
System.out.println(" t2 修改失败 ");
}
}

}, "t2 ").start();

Thread.sleep(100);

System.out.println("版本号: " + atomicStampedReference.isMarked());
System.out.println("值: " + atomicStampedReference.getReference());

并发编程之深入理解CAS

Atomic原子操作类

Java 中已经有了 CAS 为什么还需要提供Atomic原子操作类操作。

并发编程之深入理解CAS

在 Java 并发编程中很容易出现一些并发安全问题,比如i++操作,多个贤臣根治性就有可能获取不到正确的值,解决这个问题最基本的操作室加锁保证线程安全,但是锁这个操作太重了,不是一种很高效的解决方案。

基于 CAS 操作,不需要底层一些线程内核的切换,可以使用 CAS 比较后更新,或者再进行重试。

如: 在有了 CAS 之后,synchronized 性能也变得提升了,不断的进行重试,重试一定次数失败后,实在不成功,进行内核的线程切换操作。

JDK 中为我们提供了Atomic原子操作类,保证线程对基本类型变量的操作,底层也是基于 CAS 来实现。

即使JDK 中不提供这些原子类,我们也可以自己去封装实现,区别点在于可能没 JDK 实现的那么优雅以及稳定。

在​​java.util.concurrent.atomic​​包里提供了一组原子操作类,可以分为这么几种类型:

  • 基本类型:AtomicInteger、AtomicLong、AtomicBoolean
  • 引用类型:AtomicReference、AtomicStampedRerence、AtomicMarkableReference;
  • 数组类型:AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray
  • 对象属性原子修改器:AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、 AtomicReferenceFieldUpdater
  • 原子类型累加器(jdk1.8增加的类):DoubleAccumulator、DoubleAdder、 LongAccumulator、LongAdder、Striped64

基本类型以AtomicInteger为例

以原子的方式将实例中的原值加1,返回的是自增前的旧值;

public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}

getAndSet(int newValue):将实例中的值更新为新值,并返回旧值

public final int getAndSet(int newValue) {
return unsafe.getAndSetInt(this, valueOffset, newValue);
}

先比较计算,成功则返回,失败则继续重试

并发编程之深入理解CAS

以原子的方式将实例中的原值进行加1操作,并返回最终相加后的结果

public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

以原子方式将输入的数值与实例中原本的值相加,并返回最后的结果

public final int addAndGet(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
}

可以看到底层是使用unsasfe cpu指令级原子操作,并且值用​​volatile​​修饰对每个线程可见,保证可见性。

并发编程之深入理解CAS

原子更新数组类型

//1 初始化 
AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(2);

// 2 初始化数组
//int array [] = {1,2};
//AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(array);

//使用第一种初始化
//下表0 值设置 1
atomicIntegerArray.set(0,1);

System.out.println(atomicIntegerArray.get(0));;

//下表1 值设置 2
atomicIntegerArray.set(1,2);

System.out.println(atomicIntegerArray.get(1));;

//更新失败
System.out.println(atomicIntegerArray.compareAndSet(1, 1, 2));
//更新成功
System.out.println(atomicIntegerArray.compareAndSet(1, 2, 3));

结果

并发编程之深入理解CAS

可以看到基本方法一直,只不过数组是对 Index 操作的

并发编程之深入理解CAS

原子更新引用类型

AtomicReference作用是对普通对象的封装,它可以保证你在修改对象引用时的线程安全性

还有 AtomicMarkableReference AtomicStampedReference 上面有例子

对象属性原子修改器

对于AtomicIntegerFieldUpdater 的使用稍微有一些限制和约束,约束如下: (1)字段必须是volatile类型的,​​在线程之间共享变量时保证立即可见.eg​​:volatile int value = 3 (2)字段的描述类型(修饰符public/protected/default/private)与调用者与操作对象字段的 关系一致。也就是说调用者能够直接操作对象字段,那么就可以反射进行原子操作。但是对于父 类的字段,子类是不能直接操作的,尽管子类可以访问父类的字段。 (3)只能是实例变量,不能是类变量,也就是说不能加static关键字。 (4)只能是可修改变量,不能使final变量,因为final的语义就是不可修改。实际上final的语义和 volatile是有冲突的,这两个关键字不能同时存在。 (5)对于AtomicIntegerFieldUpdater和AtomicLongFieldUpdater只能修改int/long类型的字 段,不能修改其包装类型(Integer/Long)。

AtomicIntegerFieldUpdater<Test> atomicIntegerFieldUpdater =
AtomicIntegerFieldUpdater.newUpdater(Test.class,"i");

// AtomicLongFieldUpdater 类型是long 和 AtomicIntegerFieldUpdater 一样
// AtomicLongFieldUpdater<Test> atomicLongFieldUpdater = AtomicLongFieldUpdater.newUpdater(Test.class,"i");

Test test = new Test();
//设置字段值
atomicIntegerFieldUpdater.set(test,10);

IntStream.rangeClosed(1,2).forEach(x->{
new Thread(()->{
for (int i = 0; i < 20; i++) {
Integer andDecrement = atomicIntegerFieldUpdater.getAndIncrement(test);
System.out.println(Thread.currentThread().getName() + " : " + andDecrement);

}
}).start();
});

如果要修改包装类型就需要使用 AtomicReferenceFieldUpdater。

Test2 test2 = new Test2();
//反省擦除 会报错 必须制定类型
// AtomicReferenceFieldUpdater atomicReferenceFieldUpdater = AtomicReferenceFieldUpdater.newUpdater(Test2.class,Integer.class,"i");
AtomicReferenceFieldUpdater<Test2,Integer> atomicReferenceFieldUpdater = AtomicReferenceFieldUpdater.newUpdater(Test2.class,Integer.class,"i");
test2.i=1;
atomicReferenceFieldUpdater.compareAndSet(test2,1,2);
System.out.println(atomicReferenceFieldUpdater.get(test2));

LongAdder/DoubleAdder详解

AtomicLong 、 AtomicInteger 是利用了底层的CAS操作来提供并发性的。 我们都知道CAS 会在cpu上不断的重试,当线程数量少还可以接收,大量并发情况下,会出现大量失败并且不断的自旋重试,导致 CPU 空转,严重影响性能。

为了解决高并发下 AtomicLong 、 AtomicInteger 自旋瓶颈问题,引入LongAdder/DoubleAdder类。

内部原理

AtomicLong中有个内部变量value保存着实际的long值,所有的操作都是针对该变 量进行。

也就是说,高并发环境下,value变量其实是一个热点,也就是N个线程竞争 一个热点。LongAdder的基本思路就是分散热点,将value值分散到一个数组中,不 同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽 中的变量值累加返回。

并发编程之深入理解CAS

LongAdder内部有一个base变量,一个Cell[]数组: base变量:非竞态条件下,直接累加到该变量上 Cell[]数组:竞态条件下,累加个各个线程自己的槽Cell[i]中。

定义了一个内部Cell类,这就是我们之前所说的槽,每个Cell对象存有一个value值,可以通过 Unsafe来CAS操作它的值 [图片上传失败...(image-9ea409-1667441369606)]

LongAdder中add方法

并发编程之深入理解CAS

Cell[] 是否初始化,cells是null 则没有则初始化,casBase 方法 b=base值,这个是cas原子操作,表明没有竞争。 如果出现了冲突,则再判断 cells 是否初始化,出现冲突更新base 更新base冲突则查看 一个线程base原子操作,另一个失败则进行 cells初始化 初始化了则 定位hash 指定到 Cell 数组指定曹中,使用cas更新每个槽里的值

最后将每个槽和base相加起来

并发编程之深入理解CAS

由于计算总和时没有对Cell数组进行加锁,所以在累加过程中可能有其他线程对Cell中的值进 行了修改,也有可能对数组进行了扩容,所以sum返回的值并不是非常精确的,其返回值并不是 一个调用sum方法时的原子快照值。

longAccumelate 这个方法很长

final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}

大致的逻辑是

只有从未出现过并发冲突的时候,base基数才会使用到,一旦出现了并发冲突,之后所有的操作 都只针对Cell[]数组中的单元Cell。

如果Cell[]数组未初始化,会调用父类的longAccumelate去初始化Cell[], 如果Cell[]已经初始化 但是冲突发生在Cell单元内,则也调用父类的longAccumelate,此时可能就需要对Cell[]扩容 了。

这也是LongAdder设计的精妙之处:尽量减少热点冲突,不到最后万不得已,尽量将CAS操作延迟。

LongAccumulator

LongAccumulator是LongAdder的增强版。LongAdder只能针对数值的进行加减运算。 LongAccumulator内部原理和LongAdder几乎完全 一样,都是使用了父类Striped64的longAccumulate方法。

//自定义算法
LongAccumulator longAccumulator = new LongAccumulator((x,y)-> x + y,10);
for (int i = 1; i <= 100; i++) {
int finalI = i;
new Thread(()->{
longAccumulator.accumulate(finalI);

}).start();
}

try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(longAccumulator.get());

最后

CAS 和加锁相比,CAS 更加轻量,但也带来了两个问题一个是ABA问题、一个是自旋时间问题。