一、Java中的原子性操作概述
所谓原子操作,就是指执行一系列操作的时候,要么全部执行要么全部不执行,不存在只执行一部分的情况。在设置计数器的时候一般是读取当前的值,然后+1在更新(读-改-写的过程),如果不能保证这这几个操作的过程的原子性就可能出现线程安全问题,比如下面的代码示例,++value在没有任何额外保证的前提下不是原子操作。
1 public class ThreadUnSafe{ 2 private Long value; 3 4 public Long getValue() {return value;} 5 6 public void increment() {++value;} 7 }
使用Javap -c XX.class查看汇编代码如下
这是个复合操作,是不具备原子性的。而保证这个操作原子性的方法最简单的就是加上synchronized关键字(当然也可以是其他的加锁操作,参考Java中的锁——Lock和synchronized实现原理),使用synchronized可以实现线程安全性,但是这是个独占锁,没有获取内部锁的线程会被阻塞住(即便是这里的getValue操作,多线程访问也会阻塞住),这对于并发性能的提高是不好的(而这里也不能简单的去掉getValue上的synchronized,因为读操作需要保证value的读一致性,即需要获得主内存中的值而不是线程工作内存中的可能是旧的副本值)。那么除了加锁之外其他安全的方法?后面讲到的原子类(使用CAS实现)就可以作为一个选择。
二、从硬件原语上真正理解什么是同步(不特指Java)
同步机制是多处理机系统的重要组成部分,其实现方式除了关系到计算的正确性之外还有效率的问题。同步机制的实现通常是在硬件提供的同步指令的基础上,在通过用户级别软件例程实现的。
1、基本硬件原语
在多处理机中实现同步,所需的主要功能是一组能以原子操作读出并修改存储单元的硬件原语。如果没有这种操作,建立基本的同步原语的代价会非常大。基本硬件原语有几种形式提供选择,他们都能以原子操作的方式读改存储单元,并指出进行的操作是否能以原子形式进行,这些原语作为基本构建提供构造各种各样的用户及同步操作。
一个典型的例子就是原子交换(Atomic Exchange),他的功能是将一个存储单元中的值和一个寄存器的值进行交换。我们看看这个原语怎样构造一个我们通常意义上说的简单的锁。
假设现在我们构造这样一个简单的锁:其值为0表示锁是开的(锁可用),为1表示上锁(不可用)。当处理器要给该锁上锁的时候,将对应于该锁的存储单元的值与存放在某个寄存器中的1进行交换。如果别的处理器已经上了锁,那么交换指令返回的值为1否则为0。返回0的时候,因为是原子交换,锁的值就会从0变为1表示上锁成功;返回1,原子交换锁的值还是1,但是返回1表示已经被上了锁。 我们考虑使用这个锁:假设两个处理器同时进行交换操作(原子交换),竞争的结果就是,只有一个处理器会先执行成功而得到返回值0,而另一个得到的返回值为1表示已经被上锁。从这些我们可以看出,采用原子交换指令是实现同步的关键:这个原子交换操作的不可再分的,两个交换操作将由写顺序机制确定先后顺序,这也保证了两个线程不能同时获取同步变量锁。
除此之外,还有别的原语可以实现同步(关键都在于能以原子的方式读-改-写存储单元的值)。例如:测试并置定(test_and_set)(先测试一个存储单元的值,如果符合条件就修改其值),另一个同步原语是读取并加1(fetch_and_increment))(返回存储单元的值并自动增加该值)。(看到这里可以回忆一下Java中的CAS操作的实现以及在Java中的实现原理)
那么,上面的基本原语操作又是怎样实现的呢,这在一条指令中完成上述操作显然是困难的(在一条不可中断的指令中完成一次存储器的读改写,而且要求不允许其他的访存操作还要避免死锁)。现在的计算机上采用一对指令来实现上述的同步原语。该指令对由两条特殊的指令组成,一条是特殊的load指令(LL指令),另一条是特殊的store指令(SC)。指令的执行顺序是:如果LL指令指明的存储单元的值在SC对其进行写之前被其他的指令改写过,则第二条指令执行失败,如果在两条指令之间进行切换也会导致执行SC失败,而SC指令将通过返回一个值来指出该指令操作是否成功(如果返回的1表示执行成功,返回0表示失败)。为什么说这对指令相当于原子操作呢,这指的是是所有其他处理器进行的操作或者在这对指令之前执行或者在其后执行,不存在两条指令之间进行,所以在这一对指令之间不存在任何其他处理器改变相应存储单元的值。
下面是一段实现对R1指出的存储单元进行的原子交换操作
1 try:OR R3,R4,R0 //R4中为交换值,将该值送入R3 2 LL R2,0(R1) //将0(R1)中的值取到R2 3 SC R3,0(R1) //若0(R1)中的值与R3中的值相同,则置R3的值为1,否则为0 4 BEQZ R3,try //R3的值为0表示存失败,转移重新尝试 5 MOV R4,R2 //成功,将取出的值送往R4
最终R4和由R1指向的存储单元值进行了原子交换,在LL和SC之间如果有别的处理器插入并且修改了存储单元的值则SC都会返回0并存入R3中从而重新执行交换操作。下面是实现各个讲到的读取并加1(fetch_and_increment)原语的实现
1 try:LL R2,0(R1) //将0(R1)中的值送入R2 2 DADDIU R2,R2,#1 //加1操作(R2+1->R2) 3 SC R2,0(R1) //如果0(R1)中的值和R2中的值相同就置R2的值为1,否则为0 4 BEQZ R2,try //R2的值为0表示存失败,转移到开始出重新执行
上面的指令的执行需要跟踪地址,通常LL指令指定一个寄存器,该寄存器中存放着目的存储单元的地址,这个寄存器称为连接寄存器,如果发生中断切换或者与连接寄存器中的地址匹配的cache块被作废(被别的SC指令访问),则将连接寄存器清零,SC指令则检查它的存储地址和连接寄存器汇中的内容是够匹配,如果匹配则
三、Java中的Unsafe类
四、JUC中原子操作类AtomicLong的原理探究
1、原操作类概述
JUC包中提供了很多原子操作类,这些类都是通过上面说到的非阻塞CAS算法来实现的,相比较使用锁来实现原子性操作CAS在性能上有很大提高。由于原子操作类的原理都大致相同,所以下面分析AtomicLong类的实现原理来进一步了解原子操作类。
2、AtomicLong的源码
下面是AtomicLong原子类的部分源码,其中主要包含其成员变量以及一些静态代码块和构造方法
1 public class AtomicLong extends Number implements java.io.Serializable { 2 3 //(1)获取Unsafe实例 4 private static final Unsafe unsafe = Unsafe.getUnsafe(); 5 6 //(2)保存value值的偏移量 7 private static final long valueOffset; 8 9 //(3)判断当前JVM是否支持Long类型的无锁CAS 10 static final boolean VM_SUPPORTS_LONG_CAS = VMSupportsCS8(); 11 private static native boolean VMSupportsCS8(); 12 13 static { 14 try { 15 //(4)获取value值在AtomicLong中的偏移量 16 valueOffset = unsafe.objectFieldOffset 17 (AtomicLong.class.getDeclaredField("value")); 18 } catch (Exception ex) { throw new Error(ex); } 19 } 20 21 //(5)实际存的变量值value 22 private volatile long value; 23 24 //构造方法 25 public AtomicLong(long initialValue) { 26 value = initialValue; 27 } 28 }
在上面的部分代码中,代码(1)通过Unsafe.getUnsafe()方法获取到Unsafe类的实例(AtomicLong类也是rt.jar包下面的,所以AtomicLong也是通过启动类加载器进行类加载的)。(2)(4)两处是计算并保存AtomicLong类中存储的变量value的偏移量。(5)中的value被声明为volatile的(关于volatile的内存语义以及实现原理参考前面写到的Java并发编程基础之volatile),这是为了在多线程下保证内存的可见性,而value就是具体存放计数的变量。下面我们看看AtomicLong中的主要几个函数
(1)递增和递减的源码
1 //使用unsafe的方法,原子性的设置value值为原始值+1,返回值为递增之后的值 2 public final long getAndIncrement() { 3 return unsafe.getAndAddLong(this, valueOffset, 1L); 4 } 5 public final long incrementAndGet() { 6 return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L; 7 } 8 //使用unsafe的方法,原子性的设置value值为原始值-1,返回值为递减之后的值 9 public final long getAndDecrement() { 10 return unsafe.getAndAddLong(this, valueOffset, -1L); 11 } 12 public final long decrementAndGet() { 13 return unsafe.getAndAddLong(this, valueOffset, -1L) - 1L; 14 }
在上面的代码中都是通过调用Unsafe类的getAndAddLong方法来实现操作的,我们来看看这个方法,这个方法是个原子性操作:其中的第一个参数是AtomicLong实例的引用,第二个参数是value变量在AtomicLong中的偏移量,第三个参数是要设置为第二个变量的值。下面就是getAndAddLong方法的实现,以及一些分析
1 public final long getAndAddLong(Object var1, long var2, long var4) { 2 long var6; 3 do { 4 //public native long getLongVolatile(Object var1, long var2); 5 //该方法就是获取var1引用指向的内存地址中偏移量为var2位置的值,然后赋给var6 6 var6 = this.getLongVolatile(var1, var2); 7 /**public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6); 8 * var1:AtomicXXX类型的一个引用,指向堆内存中的一块地址 9 * var2:AtomicXXX源码中的valueOffset,表示AtomicXXX源码中实际存储的值value在原子类型内存中的地址偏移量 10 * var4:要比较的目标值expectValue,如果从内存指定地址处(var1和var2决定的那块地址)的值和该值相等,则CAS成功 11 * var6:CAS成功后向该内存中写进的新值 12 */ 13 //该方法就是使用CAS的方式,比较指定内存地址处(var1指向的内存地址块中偏移量为var2处)的值和上面同一块地址处取出的var6是否相等, 14 //相等就将var6+var4(这里可以看成var6+1)和指定内存地址处(var2引用指向的地址块中偏移量为var2处)的值交换,并返回true,然后就会结束循环 15 //CAS失败返回false,然后继续执行循环体内部的代码,直到成功(也就是自增运算成功就会跳出循环并返回自增后的值) 16 } while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4)); 17 18 return var6; 19 }
(2)CompareAndSet方法
下面是compaerAndSet方法的实现,主要还是调用unsafe类的compareAndSwapLong方法,其原理和上面分析的差不多,都是通过CAS的方式进行比较交换值。
1 public final boolean compareAndSet(long expect, long update) { 2 //public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6); 3 return unsafe.compareAndSwapLong(this, valueOffset, expect, update); 4 }
(3)扩展,下面是compareAndSwapInt的底层实现,实际上是通过硬件同步原语来实现的CAS,下面的cmpxchg就是基于硬件原语实现的
1 UNSAFE_ENTRY(jboolean,Usafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x)) 2 UsafeWrapper("Usafe_CompareAndSwapInt"); 3 oop p = JNIHasdles::resolve(obj); 4 jint* addr = (jint *) index_oop_from_field_offset_long(p, offset); 5 return (jint)(Atomic::cmpxchg(x,addr,e)) == e; 6 UNSAFE_END
(4)下面是一个例子,使用AtomicLong来进行技术运算
1 package test; 2 3 import java.util.concurrent.atomic.AtomicLong; 4 5 public class TestAtomic1 { 6 7 //创建AtomicLong类型的计数器 8 private static AtomicLong atomicLong = new AtomicLong(); 9 // private static Long atomicLong = 0L; 10 //创建两个数组,计算数组中的0的个数 11 private static Integer[] arr1 = {0,1,2,3,0,5,6,0,56,0}; 12 private static Integer[] arr2 = {10,1,2,3,0,5,6,0,56,0}; 13 14 public static void main(String[] args) throws InterruptedException { 15 16 //线程1统计arr1中0的个数 17 Thread t1 = new Thread(new Runnable() { 18 @Override 19 public void run() { 20 int size = arr1.length; 21 for (int i = 0; i < size; i++) { 22 if(arr1[i].intValue() == 0) { 23 // atomicLong.getAndIncrement(); 24 atomicLong++; 25 } 26 } 27 } 28 }); 29 30 Thread t2 = new Thread(new Runnable() { 31 @Override 32 public void run() { 33 int size = arr2.length; 34 for (int i = 0; i < size; i++) { 35 if(arr2[i].intValue() == 0) { 36 // atomicLong.getAndIncrement(); 37 atomicLong++; 38 } 39 } 40 } 41 }); 42 43 t1.start(); 44 t2.start(); 45 46 t1.join(); 47 t2.join(); 48 49 System.out.println("两个数组中0出现的次数为: " + atomicLong);//两个数组中0出现的次数为: 7 50 } 51 }
如果没有使用原子类型进行计数运算,那么可能就是下面的结果
五、JDK8中新增的LongAddr