jdk源码->并发->Unsafe&Atomic

时间:2022-01-02 00:48:06

synchronized关键字类似于java中的悲观锁机制,接下来介绍一种java的乐观锁机制Unsafe类

CAS

CAS简介

CAS全称是Compare And Swap,即比较交换,它是在并发执行时的一种无锁思想,其主要包含三个参数:

/**
*V主内存中的值
*E表示线程中旧的预期值
*N表示新值
**/
CAS(V,E,N)

操作过程可以描述为:将主内存中的值与当前线程中变量副本值进行比较,如果相等的,说明在这期间没有线程修改主内存中的变量值,那么主内存值改为N,但是如果不相等,说明其他线程已经操作过了,这时需要重新读取主内存中的值到线程中来置为预期值,重新进行这个操作

流程图如下:

jdk源码->并发->Unsafe&Atomic

jvm对CAS的支持

jvm中的源码

UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
UnsafeWrapper("Unsafe_CompareAndSwapInt");
oop p = JNIHandles::resolve(obj);
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END

Linux的X86,Atomic::cmpxchg方法对的实现如下:

inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
int mp = os::is_MP();
__asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
: "=a" (exchange_value)
: "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
: "cc", "memory");
return exchange_value;
}

Unsafe

CAS在java中的实现是Unsafe类,类如其名,不安全,因为Unsafe类提供了直接操作内存的方式

源码分析

  • 内存管理,Unsafe类中存在直接操作内存的方法
//分配指定bytes的内存
public native long allocateMemory(long bytes);
//根据给定address重新分配指定bytes内存
public native long reallocateMemory(long address, long bytes);
//释放内存
public native void freeMemory(long address);
//将指定对象的给定offset偏移量内存块中的所有字节设置为固定值
public native void setMemory(Object o, long offset, long bytes, byte value);
//设置给定内存地址的值
public native void putAddress(long address, long x);
//获取指定内存地址的值
public native long getAddress(long address);
//设置给定内存地址的long值
public native void putLong(long address, long x);
//获取指定内存地址的long值
public native long getLong(long address);
//设置给定内存地址的byte值
public native void putByte(long address, byte x);
//获取指定内存地址的byte值
public native byte getByte(long address);
//其他基本数据类型(long,char,float,double,short等)的操作与putByte及getByte相同
//操作系统的内存页大小
public native int pageSize();
  • 获取类的对象
//传入一个对象的class并创建该实例对象,但不会调用构造方法
public native Object allocateInstance(Class cls) throws InstantiationException;
  • 类和对象以及变量的操作
//获取字段f在实例对象中的偏移量
public native long objectFieldOffset(Field f);
//获取静态变量f在class对象中的偏移量
public native long staticFieldOffset(Field f);
//根据静态变量获取class对象
public native Object staticFieldBase(Field f);
//获得给定对象偏移量上的int值,所谓的偏移量可以简单理解为指针指向该变量的内存地址,
//通过偏移量便可得到该对象的变量,进行各种操作
public native int getInt(Object o, long offset);
//设置给定对象上偏移量的int值
public native void putInt(Object o, long offset, int x);
//获得给定对象偏移量上的引用类型的值
public native Object getObject(Object o, long offset);
//设置给定对象偏移量上的引用类型的值
public native void putObject(Object o, long offset, Object x);
//其他基本数据类型(long,char,byte,float,double)的操作与getInthe及putInt相同
//设置给定对象的int值,使用volatile语义,即设置后立马更新到内存对其他线程可见
public native void putIntVolatile(Object o, long offset, int x);
//获得给定对象的指定偏移量offset的int值,使用volatile语义,总能获取到最新的int值。
public native int getIntVolatile(Object o, long offset); //其他基本数据类型(long,char,byte,float,double)的操作与putIntVolatile及getIntVolatile相同,引用类型putObjectVolatile也一样。
//与putIntVolatile一样,但要求被操作字段必须有volatile修饰
public native void putOrderedInt(Object o,long offset,int x);
  • Unsafe实例的获取方式:
    Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
Unsafe unsafe = (Unsafe) field.get(null);
/**
*会抛出SecurityException异常的方式:
*Unsafe un = Unsafe.getUnsafe();
**/
  • 一个实例:
public class test {

    public  static  void main(String[] args) throws NoSuchFieldException, IllegalAccessException, InstantiationException {
// 通过反射得到theUnsafe对应的Field对象
Field field = Unsafe.class.getDeclaredField("theUnsafe");
// 设置该Field为可访问
field.setAccessible(true);
// 通过Field得到该Field对应的具体对象,传入null是因为该Field为static的
Unsafe unsafe = (Unsafe) field.get(null);
System.out.println(unsafe); //通过allocateInstance直接创建对象
User user = (User) unsafe.allocateInstance(User.class); Class userClass = user.getClass();
Field name = userClass.getDeclaredField("name");
Field age = userClass.getDeclaredField("age");
Field id = userClass.getDeclaredField("id"); //获取实例变量name和age在对象内存中的偏移量并设置值
unsafe.putInt(user,unsafe.objectFieldOffset(age),18);
unsafe.putObject(user,unsafe.objectFieldOffset(name),"android TV"); // 这里返回 User.class,
Object staticBase = unsafe.staticFieldBase(id);
System.out.println("staticBase:"+staticBase); //获取静态变量id的偏移量staticOffset
long staticOffset = unsafe.staticFieldOffset(userClass.getDeclaredField("id"));
//获取静态变量的值
System.out.println("设置前的ID:"+unsafe.getObject(staticBase,staticOffset));
//设置值
unsafe.putObject(staticBase,staticOffset,"SSSSSSSS");
//获取静态变量的值
System.out.println("设置后的ID:"+unsafe.getObject(staticBase,staticOffset));
//输出USER
System.out.println("输出USER:"+user.toString()); long data = 1000;
byte size = 1;//单位字节 //调用allocateMemory分配内存,并获取内存地址memoryAddress
long memoryAddress = unsafe.allocateMemory(size);
//直接往内存写入数据
unsafe.putAddress(memoryAddress, data);
//获取指定内存地址的数据
long addrData=unsafe.getAddress(memoryAddress);
System.out.println("addrData:"+addrData); /**
* 输出结果:
sun.misc.Unsafe@6f94fa3e
staticBase:class geym.conc.ch4.atomic.User
设置前的ID:USER_ID
设置前的ID:SSSSSSSS
输出USER:User{name='android TV', age=18', id=SSSSSSSS'}
addrData:1000
*/ }
} class User{
public User(){
System.out.println("user 构造方法被调用");
}
private String name;
private int age;
private static String id="USER_ID"; @Override
public String toString() {
return "User{" +
"name='" + name + '\'' +
", age=" + age +'\'' +
", id=" + id +'\'' +
'}';
}
}
  • 数组操作:
//获取数组第一个元素的偏移地址
public native int arrayBaseOffset(Class arrayClass);
//数组中一个元素占据的内存空间,arrayBaseOffset与arrayIndexScale配合使用,可定位数组中每个元素在内存中的位置
public native int arrayIndexScale(Class arrayClass);
  • CAS操作相关
//第一个参数o为给定对象,offset为对象内存的偏移量,通过这个偏移量迅速定位字段并设置或获取该字段的值
//expected表示当前线程期望值(旧值),x表示要设置的值,下面3个方法都通过CAS原子指令执行操作。
public final native boolean compareAndSwapObject(Object o, long offset,Object expected, Object x);
public final native boolean compareAndSwapInt(Object o, long offset,int expected,int x);
public final native boolean compareAndSwapLong(Object o, long offset,long expected,long x);
/**
*下述是jdk8中新增加的
**/
//这是一个CAS操作过程,直到设置成功才退出循环,返回旧值
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
//将预期值更新为内存中最新值
v = getIntVolatile(o, offset);
//CAS操作直到为true
} while (!compareAndSwapInt(o, offset, v, v + delta));
return v;
} public final long getAndAddLong(Object o, long offset, long delta) {
long v;
do {
v = getLongVolatile(o, offset);
} while (!compareAndSwapLong(o, offset, v, v + delta));
return v;
} public final int getAndSetInt(Object o, long offset, int newValue) {
int v;
do {
v = getIntVolatile(o, offset);
} while (!compareAndSwapInt(o, offset, v, newValue));
return v;
} public final long getAndSetLong(Object o, long offset, long newValue) {
long v;
do {
v = getLongVolatile(o, offset);
} while (!compareAndSwapLong(o, offset, v, newValue));
return v;
}
public final Object getAndSetObject(Object o, long offset, Object newValue) {
Object v;
do {
v = getObjectVolatile(o, offset);
} while (!compareAndSwapObject(o, offset, v, newValue));
return v;
}
  • 挂起与恢复
//线程调用该方法,线程将一直阻塞直到超时,或者是中断条件出现。
public native void park(boolean isAbsolute, long time); //终止挂起的线程,恢复正常.java.util.concurrent包中挂起操作都是在LockSupport类实现的,其底层正是使用这两个方法,
public native void unpark(Object thread);
  • 内存屏障
//在该方法之前的所有读操作,一定在load屏障之前执行完成
public native void loadFence();
//在该方法之前的所有写操作,一定在store屏障之前执行完成
public native void storeFence();
//在该方法之前的所有读写操作,一定在full屏障之前执行完成,这个内存屏障相当于上面两个的合体功能
public native void fullFence();

Atomic

CAS在java中的应用,主要体现在并发包中的原子操作类(Atomic系列),从jdk1.5开始提供了java.util.concurrent.atomic包。

AtomicInteger

    public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = 6214790243416807050L; // 获取Unsafe
private static final Unsafe unsafe = Unsafe.getUnsafe(); //volatile修饰的value变量
private volatile int value; //用来保存value变量在AtomicInteger对象内的内存偏移量
//私有静态不可变
private static final long valueOffset; static {
try {
//通过unsafe类的objectFieldOffset()方法,获取value变量在对象内存中的偏移
//通过该偏移量valueOffset,unsafe类的内部方法可以获取到变量value对其进行取值或赋值操作
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
//初始化value的构造函数
public AtomicInteger(int initialValue) {
value = initialValue;
}
public AtomicInteger() {
}
//获取当前最新值,
public final int get() {
return value;
}
//设置当前值,具备volatile效果,方法用final修饰是为了更进一步的保证线程安全。
public final void set(int newValue) {
value = newValue;
}
//最终会设置成newValue,使用该方法后可能导致其他线程在之后的一小段时间内可以获取到旧值,有点类似于延迟加载
public final void lazySet(int newValue) {
unsafe.putOrderedInt(this, valueOffset, newValue);
}
//设置新值并获取旧值,底层调用的是CAS操作即unsafe.compareAndSwapInt()方法
public final int getAndSet(int newValue) {
return unsafe.getAndSetInt(this, valueOffset, newValue);
}
//如果当前值为expect(当前值指的是value变量),则设置为update
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
//当前值加1返回旧值,底层CAS操作
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
//当前值减1,返回旧值,底层CAS操作
public final int getAndDecrement() {
return unsafe.getAndAddInt(this, valueOffset, -1);
}
//当前值增加delta,返回旧值,底层CAS操作
public final int getAndAdd(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta);
}
//当前值加1,返回新值,底层CAS操作
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
//当前值减1,返回新值,底层CAS操作
public final int decrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, -1) - 1;
}
//当前值增加delta,返回新值,底层CAS操作
public final int addAndGet(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
}
//省略一些不常用的方法....
}

AtomicReference

public class AtomicReference<V> implements java.io.Serializable {
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicReference.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
/**
*和AtomicInteger类相比,value从int类型变成了范型指定的类型
**/
private volatile V value; /**
*和AtomicInteger类相似,只不过调用的是Unsafe类中操作Object的方法
**/
public final boolean compareAndSet(V expect, V update) {
return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
}
public final V getAndSet(V newValue) {
return (V)unsafe.getAndSetObject(this, valueOffset, newValue);
}
}

AtomicIntegerArray

public class AtomicIntegerArray implements java.io.Serializable {
private static final long serialVersionUID = 2862133569453604235L;
//获取unsafe类的实例对象
private static final Unsafe unsafe = Unsafe.getUnsafe();
//获取数组第一个元素内存起始地址
private static final int base = unsafe.arrayBaseOffset(int[].class);
/**计算数组中第i个元素内存地址增量时,可以用左移shift(i<<shift)
*来代替i*scale,换句话说scale是2的shift次幂,至于为什么要这么做
*说是移位运算的效率比乘法效率高
**/
private static final int shift;
//主要操作的数组对象
private final int[] array; static {
//获取数组中一个元素占据的内存空间
int scale = unsafe.arrayIndexScale(int[].class);
//判断是否是2的次幂,否则抛出异常
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
/**
*Integer.numberOfLeadingZeros(scale)用来计算int型变量转换为二进制后从最高位开始连续零的个数
*例如:在该类中scale=4(因为一个int类型占据4个字节)那么结果可以直观的表示为:00000000 00000000 00000000 00000100 为29,shift=2
**/
shift = 31 - Integer.numberOfLeadingZeros(scale);
}
//先越界检查然后调用byteOffset返回计算后的第i个元素地址
private long checkedByteOffset(int i) {
if (i < 0 || i >= array.length) //越界检查
throw new IndexOutOfBoundsException("index " + i);
return byteOffset(i);
}
//返回数组第i个元素的位置
private static long byteOffset(int i) {
return ((long) i << shift) + base;
}
//初始化array的构造方法
public AtomicIntegerArray(int[] array) {
this.array = array.clone();
}
//获得数组长度
public final int length() {
return array.length;
}
//先越界检查确定偏移量然后getRow
public final int get(int i) {
return getRaw(checkedByteOffset(i));
}
//Unsafe方法获得volatile变量的值
private int getRaw(long offset) {
return unsafe.getIntVolatile(array, offset);
}
//Unsafe方法设置volatile变量的值
public final void set(int i, int newValue) {
unsafe.putIntVolatile(array, checkedByteOffset(i), newValue);
}
/**
*下述方法均为调用Unsafe类的CAS操作,不再累述
**/
public final int getAndSet(int i, int newValue) {
return unsafe.getAndSetInt(array, checkedByteOffset(i), newValue);
}
public final boolean compareAndSet(int i, int expect, int update) {
return compareAndSetRaw(checkedByteOffset(i), expect, update);
}
private boolean compareAndSetRaw(long offset, int expect, int update) {
return unsafe.compareAndSwapInt(array, offset, expect, update);
}
public final int getAndIncrement(int i) {
return getAndAdd(i, 1);
}
public final int getAndDecrement(int i) {
return getAndAdd(i, -1);
} public final int getAndAdd(int i, int delta) {
return unsafe.getAndAddInt(array, checkedByteOffset(i), delta);
} public final int incrementAndGet(int i) {
return getAndAdd(i, 1) + 1;
}
public final int decrementAndGet(int i) {
return getAndAdd(i, -1) - 1;
}
public final int addAndGet(int i, int delta) {
return getAndAdd(i, delta) + delta;
}

AtomicIntegerFieldUpdater

public abstract class AtomicIntegerFieldUpdater<T> {
//静态工厂方法用来获得AtomicIntegerFieldUpdaterImpl对象
public static <U> AtomicIntegerFieldUpdater<U> newUpdater(Class<U> tclass,
String fieldName) {
return new AtomicIntegerFieldUpdaterImpl<U>
(tclass, fieldName, Reflection.getCallerClass());
}
//只可以在本类和子类中实例化AtomicIntegerFieldUpdater对象
protected AtomicIntegerFieldUpdater() {
}
/**
*下述仅列出了部分抽象方法,在私有内部子类中会被重写
**/
public abstract boolean compareAndSet(T obj, int expect, int update);
public abstract void set(T obj, int newValue);
public abstract int get(T obj);
/**
*下述是私有内部子类的代码
**/
private static final class AtomicIntegerFieldUpdaterImpl<T>
extends AtomicIntegerFieldUpdater<T> {
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
private final long offset;//记录Field的偏移量
private final Class<?> cclass;//classLoader的class对象
private final Class<T> tclass;//field所属类的class对象
/**
*tclass:Field所属类的class对象
*fieldName:field的名称
*caller:获得类加载器
**/
AtomicIntegerFieldUpdaterImpl(final Class<T> tclass,
final String fieldName,
final Class<?> caller) {
final Field field; //要修改字段的Field对象
final int modifiers;//修饰符
try {
field = AccessController.doPrivileged(
new PrivilegedExceptionAction<Field>() {
public Field run() throws NoSuchFieldException {
//反射获得Field对象
return tclass.getDeclaredField(fieldName);
}
});
modifiers = field.getModifiers();
sun.reflect.misc.ReflectUtil.ensureMemberAccess(
caller, tclass, null, modifiers);
ClassLoader cl = tclass.getClassLoader();
ClassLoader ccl = caller.getClassLoader();
if ((ccl != null) && (ccl != cl) &&
((cl == null) || !isAncestor(cl, ccl))) {
sun.reflect.misc.ReflectUtil.checkPackageAccess(tclass);
}
} catch (PrivilegedActionException pae) {
throw new RuntimeException(pae.getException());
} catch (Exception ex) {
throw new RuntimeException(ex);
}
//判断是否是int类型
if (field.getType() != int.class)
throw new IllegalArgumentException("Must be integer type");
//判断是否被volatile修饰
if (!Modifier.isVolatile(modifiers))
throw new IllegalArgumentException("Must be volatile type");
this.cclass = (Modifier.isProtected(modifiers) &&
tclass.isAssignableFrom(caller) &&
!isSamePackage(tclass, caller))
? caller : tclass;
this.tclass = tclass;//初始化tclass
this.offset = U.objectFieldOffset(field);//调用Unsafe方法初始化offset
}
/**
*下面重写了AtomicIntegerFieldUpdater的抽象方法,利用java多态性调用
**/
public final boolean compareAndSet(T obj, int expect, int update) {
accessCheck(obj);
return U.compareAndSwapInt(obj, offset, expect, update);
}
public final void set(T obj, int newValue) {
accessCheck(obj);
U.putIntVolatile(obj, offset, newValue);
}
public final int get(T obj) {
accessCheck(obj);
return U.getIntVolatile(obj, offset);
}

CAS的ABA问题

ABA用语言描述就是:某个线程当前的期望值(旧值)为A,它准备执行CAS操作,但在它执行之前,有其它线程成功执行了两次CAS操作,第一次将主内存值从A->B,第二次将B->A,这时原来那个线程执行CAS操作,并成功,但是中间的修改过程无法得知。

该过程可用下图表示;

jdk源码->并发->Unsafe&Atomic

Atomic包中也提供了解决ABA问题的类,接下来介绍该类

AtomicStampedReference

CAS操作的成功执行依靠当前线程值和主内存中值的比较,如果相等就能更新,并且这个更新是没有方向的,也就是说多次更新的累积效果可能为0,导致这部分更新信息丢失,这就是ABA问题出现根本原因。那么能不能解决这个问题呢,很简单,既然我们要修改的字段更新是无方向的,那么我们就增加一个字段每更新一次字段朝同一方向变化一次,在cas更新操作的时候就比较这两个字段都相等才能更新成功,这样就不会出现ABA问题了,下述AtomicStampedReference类就是按照这个思路实现的:

public class AtomicStampedReference<V> {
/**
*pair:内部静态类,封装了实际操作的变量和时间戳
**/
private static class Pair<T> {
final T reference;
final int stamp;
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}
//将pair作为一个对象来进行cas操作
private volatile Pair<V> pair; //构造函数,初始化变量和时间戳
public AtomicStampedReference(V initialRef, int initialStamp) {
pair = Pair.of(initialRef, initialStamp);
} public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
} private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
//获得pair的偏移量
private static final long pairOffset =
objectFieldOffset(UNSAFE, "pair", AtomicStampedReference.class); private boolean casPair(Pair<V> cmp, Pair<V> val) {
//CAS操作更新pair
return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
} static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
String field, Class<?> klazz) {
try {
//Unsafe操作获得pair的偏移量
return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
} catch (NoSuchFieldException e) {
NoSuchFieldError error = new NoSuchFieldError(field);
error.initCause(e);
throw error;
}
}
}