Java并发 - (无锁)篇6

时间:2021-12-25 06:01:47

, 摘录自葛一鸣与郭超的 [Java高并发程序设计].

本文主要介绍了死锁的概念与一些相关的基础类, 摘录自葛一鸣与郭超的 [Java高并发程序设计].

无锁是一种乐观的策略, 它假设对资源的访问是没有冲突的, 所有的线程都可以不停顿地持续执行, 它采用一种叫做比较交换的技术 (CAS Compare And Swap) 来鉴别线程冲突, 如果出现冲突, 重试操作, 直到没有冲突为止.

CAS

CAS算法包含三个参数 CAS(V, E, N). V表示要更新的变量, E表示预期值, N表示新值.

如果 V==E 成立, 就将 V 设置为 N, 如果 V 于 E 值不同, 说明有别的线程对 V 进行了更新, 则当前线程什么都不做. CAS 的操作总是抱着乐观的态度进行的, 它总认为自己可以成功完成操作. 多个线程使用 CAS 操作一个变量时, 只有一个会胜出, 其他的被告知失败, 并允许再次尝试或者放弃.

典型的 CAS 算法包括

  • 一个死循环来控制的连续的尝试
  • 一次获取 V 保存到一个本地变量 E 的操作
  • 一个对本地变量进行处理得到 N 的方法
  • 一次对目前的 V 和之前得到的 E 的值的比较
  • 一次根据真假来确定是否要将 N 覆盖当前 V 的的操作
  • 对应的尝试失败的处理方法

例如, 这是一个下文中的一段涉及到CAS的代码的示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
   new Thread() {

   	for(;;){
// 一次获取 V 保存到一个本地变量 E 的操作
Integer m = money.get();
if(m<20){
// 这里进行了 三个步骤的合并
// 一个对本地变量进行处理得到 N 的方法
// 一次对目前的 V 和之前得到的 E 的值的比较
// 一次根据真假来确定是否要将 N 覆盖当前 V 的的操作
money.compareAndSet(m, m+20);
break;
} else{
// 对应的尝试失败的处理方法
break;
}
}
}.start();

正是由于这种机制 (额外给出一个期望值来判断操作是否成功), 所以即使CAS操作没有锁, 也可以发现别的线程的干扰, 并进行恰当的处理.

在硬件层面, 大部分的现代化处理器都已经支持原子化的 CAS 指令, 虚拟机可以使用这个指令来实现操作与数据结构

无锁的线程安全整数: AtomicInteger

这个类是一个可变的, 线程安全的整数, 也就是一个原子类, 这里展示它的一些主要方法

1
2
3
4
5
6
7
8
9
10
public final int ()
public final void set(int newValue)
public final int getAndSet(int newValue)
public final boolean compareAndSet(int expect, int update)
public final int getAndIncrement()
public final int getAndDecrement()
public final int getAndAdd(int delta)
public final int incrementAndGet()
public final int decrementAndGet()
public final int addAndGet(int delta)

其他的原子类的操作也非常类似.

AtomicInteger的核心字段是

1
private volatile int value;

incrementAndGet 中 CAS 的实现 JDK 1.7

1
2
3
4
5
6
7
8
public final int incrementAndGet() {
for(;;){ // 在完全未优化的情况下, for(;;)不需要用到寄存器, while(true)需要
int current = get(); // 获取当前值
int next = current + 1; // 得到加一后的值
if (compareAndSet(current, next)) // 如果当前值改变, 说明别的线程产生了干扰, 需要重新进行操作
return next;
}
}

incrementAndGet 中 CAS 的实现 JDK 1.8

1
2
3
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

代码中使用到了sun.misc.Unsafe类, JKD1.7 的对应实现中使用的compareAndSet()方法也来自于Unsafe类, 这个类封装了一些类似指针的操作, 例如 “比较并设置int值” 方法

1
2
// Object o, long offeset, int expected, int x
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

其中第一个为给定的对象, 第二个为一个字段到对象头部的偏移量, 通过它来快速定位字段, 第三个为期望值, 第四个为要设置的值,

无法由程序员自己实现

实际上, Unsafe类中类似指针的操作不能被我们自己的应用程序使用, 为了获得Unsafe类, 我们就需要调用其工厂方法, 而它做了相关的检测来静止用户调用.

1
2
3
4
5
6
7
8
9
10
public static Unsafe getUnsafe() {
Class var0 = Reflection.getCallerClass();
// 对调用类的类加载器进行检测, 如果不是系统加载器的, 就抛出错误, 拒绝工作
if (!VM.isSystemDomainLoader(var0.getClassLoader())) {
throw new SecurityException("Unsafe");
} else {
return theUnsafe;
}
}

无锁的对象: AtomicReference

一个泛型类, 利用这个类可以实现普通对象的原子性, 一般使用它三种基本的方法即可, 与我们自己去实现的不同, 它的方法都保证了原子性.

1
2
3
public final V ()
public final void 大专栏  Java并发 - (无锁)篇6set(V newValue)
public final boolean compareAndSet(V expect, V update)

通过这三种方法, 我们可以完成一些更具体的 CAS 操作

例如, 下面使得一个值小于 20 时对其加 20

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static AtomicReference<Integer> money = new AtomicReference<>();
money.set(19);
for(int i = 0; i < 3 ; i++){
new Thread() {
for(;;){
Integer m = money.get();
if(m<20){
money.compareAndSet(m, m+20);
break;
} else{
// 更改失败, 为了避免死循环, 这里简单地跳出
break;
}
}
}.start();
}

这里启动了三个线程, 并且采用了 CAS 策略 . 明显的是, 只有一个线程能成功地更改值, 其他两个线程会失败, ,这里简单地取消处理, 我们也可以让这两个线程继续尝试, 直到别的线程修改了 money 的值, 使其低于20.

带时间戳的无锁对象: AtomicStampedReference

假设有这样的一种情况, 我们维护一组用户的账户余额数据, 现在进行一次刺激充值的活动, 所有余额低于20元的用户都能收到20元余额, 为了加速操作, 我们启动了若干个后台线程, 持续扫描用户数据, 用来完成足够快的增值服务响应.
存在这样一种异常情况, 一个账户的余额是15元, 两个线程同时get()到了目前的余额, 发现它需要被充值, 一个线程优先完成了充值操作, 余额变为35元. 而在另外一个线程检测到余额已被更改前, 用户消费了20元, 余额又变回了15元, 第二个线程又将余额更改为35元. 这样就给用户累积充值了40元, 虽然发生的概率非常小, 但是仍然可能出现

上述情况的原因是因为AtomicRefernce类将对象的值与本身状态划上了等号, 这在一些涉及到时间戳的应用中是不符合现实模型的. AtomicStampedReference在内部不仅维护了对象值还维护了一个时间戳 (final int stamp), 只有当对象值与时间戳都满足期望值时, 才会写入成功.

这里列出几个主要的 API

1
2
3
4
5
6
7
8
9
public V getReference()
public int getStamp() public void set(V newReference, int newStamp) public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp)

每次对对象的更改都需要对时间戳进行更改, 下面提供一个示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static AtomicStampedReference<Integer> money = new AtomicStampedReference<>();
money.set(19, 0);
final int timeStamp = money.getStamp();
for(int i = 0; i < 3 ; i++){
new Thread() {
for(;;){
Integer m = money.getReference();
if(m<20){
money.compareAndSet(m, m+20, timeStamp, timeStamp+1);
break;
} else{
// 更改失败, 为了避免死循环, 这里简单地跳出
break;
}
}
}.start();
}

无锁的数组: Atomic??Array

JDK 的Unsafe类通过 CAS 的方式也为我们实现了一些无锁的数组, 它们的名字都是 Atomic??Array的格式.

这里以AtomicIntegerArray为例, 展示原子数组的使用

AtomicIntegerArray实际上是利用Unsafe的 CAS 方式来完成对int[]的封装. 下面是它的核心 API

1
2
3
4
5
6
7
8
9
10
11
public final int (int i)
public final void set(int i, int newValue)
public final int getAndSet(int i, int newValue)
public final boolean compareAndSet(int i, int expect, int update) public final int length() // 自增和自减都不是原子的, Java为所有这种可以自增自减的原子类封装了原子操作
public final int getAndIncrement(int i)
public final int getAndDecrement(int i)
public final int getAndAdd(int i, int delta)

让普通变量也能享受到原子操作: Atomic??FieldUpdater 接口

有时候, 由于初期考虑不周或者需求变化, 一些普通变量也会存在需要线程安全的需求. 这时候, 我们可以简单地利用synchronized或者ReentrantLock来解决, 但是如果需要用到这个变量的地方很多, 这样的作法明显违反了开闭原则 (系统对功能的增加应该是开放的, 但对修改应该是保守的).

事实上 JDK 在原子包中为我们提供了相关的实用类, 来使得只要修改极少的代码就能获得线程安全的保证. 这更像是一种高效的补救的措施.

根据数据类型Updater类有三种选择

  • AtomicReferenceFieldUpdater
  • AtomicIntegerFieldUpdater
  • AtomicLongFieldUpdater

为了使用这个接口, 我们需要调用它的newUpdatar()方法, 这里以AtomicIntegerFieldUpdater为例

1
2
public static <U> AtomicIntegerFieldUpdater<U> newUpdater(Class<U> tclass, String fieldName)

AtomicInteger一样, 它也提供了incrementAndGet()这种更新方法, 详细资料请自己查阅JDK文档.

注意事项

  • 他们都是基于反射 而制作的工具类! 只能修改它可见的范围内的变量.
  • 修改的变量必须为volatile类型.
  • 由于使用了Unsafe类, 利用偏移量修改值, 所以不支持静态类型的修改.