一个简单的单例示例
123456 | public class UnsafeLazyInitiallization { private static UnsafeLazyInitiallization instance; private UnsafeLazyInitiallization() { } public static UnsafeLazyInitiallization getInstance(){ if (instance== null ){ //1:A线程执行 instance= new UnsafeLazyInitiallization(); //2:B线程执行 } return instance; } } |
123456 | public class SafeLazyInitiallization { private static SafeLazyInitiallization instance; private SafeLazyInitiallization() { } public synchronized static SafeLazyInitiallization getInstance(){ if (instance== null ){ instance= new SafeLazyInitiallization(); } return instance; } } |
123456789101112 | public class DoubleCheckedLocking { //1 private static DoubleCheckedLocking instance; //2 private DoubleCheckedLocking() { } public static DoubleCheckedLocking getInstance() { //3 if (instance == null ) { //4:第一次检查 synchronized (DoubleCheckedLocking. class ) { //5:加锁 if (instance == null ) //6:第二次检查 instance = new DoubleCheckedLocking(); //7:问题的根源出在这里 } //8 } //9 return instance; //10 } //11} |
1 | memory=allocate(); //1:分配对象的内存空间ctorInstance(memory); //2:初始化对象instance=memory; //3:设置instance指向刚分配的内存地址 |
1 | memory=allocate(); //1:分配对象的内存空间instance=memory; //3:设置instance指向刚分配的内存地址,注意此时对象还没有被初始化ctorInstance(memory); //2:初始化对象 |
不允许2和3重排序
允许2和3重排序,但不允许其他线程“看到”这个重排序
123456789101112 | public class SafeDoubleCheckedLocking { private volatile static SafeDoubleCheckedLocking instance; private SafeDoubleCheckedLocking() { } public static SafeDoubleCheckedLocking getInstance() { if (instance == null ) { synchronized (SafeDoubleCheckedLocking. class ) { if (instance == null ) instance = new SafeDoubleCheckedLocking(); //instance为volatile,现在没问题了 } } return instance; } } |
12345678 | public class InstanceFactory { private InstanceFactory() { } private static class InstanceHolder { public static InstanceFactory instance = new InstanceFactory(); } public static InstanceFactory getInstance() { return InstanceHolder.instance; //这里将导致InstanceHolder类被初始化 } } |
JMM
lock(锁定):作用于主内存的变量,它把一个变量标识为一条线程独立的状态。
unlock(解锁):作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定。
read(读取):作用于主内存的变量,它把一个变量的值从主内存传输到线程的本地内存中,以便随后的load动作使用。
load(载入):作用于本地内存的变量,它把read操作从主内存中得到变量值放入本地内存的变量副本中。
use(使用):作用于本地内存的变量,它把本地内存中一个变量的值传递给执行引擎,每当虚拟机遇到一个需要使用到变量的值的字节码指令时将会执行这个操作。
assign(赋值):作用于本地内存的变量,它把一个从执行引擎接收到的值赋给本地内存的变量,每当虚拟机遇到一个给变量赋值的字节码指令时执行这个操作。
store(存储):作用于本地内存的变量,它把本地内存中的一个变量的值传送到主内存中,以便随后的write操作使用。
write(写入):作用于主内存的变量,它把store操作从本地内存中提到的变量的值放入到主内存的变量中。
一个线程中的所有操作必须按照程序的顺序来执行
(不管程序是否同步)所有线程都只能看到一个单一的操作执行顺序。在顺序一致性的内存模型中,每个操作必须原子执行并且立刻对所有线程可见。
程序次序法则:线程中的每个动作 A 都 happens-before 于该线程中的每一个动作 B,其中,在程序中,所有的动作 B 都出现在动作 A 之后。(注:此法则只是要求遵循 as-if-serial语义)
监视器锁法则:对一个监视器锁的解锁 happens-before 于每一个后续对同一监视器锁的加锁。(显式锁的加锁和解锁有着与内置锁,即监视器锁相同的存储语意。)
volatile变量法则:对 volatile 域的写入操作 happens-before 于每一个后续对同一域的读操作。(原子变量的读写操作有着与 volatile 变量相同的语意。)(volatile变量具有可见性和读写原子性。)
线程启动法则:在一个线程里,对 Thread.start 的调用会 happens-before 于每一个启动线程中的动作。 线程终止法则:线程中的任何动作都 happens-before 于其他线程检测到这个线程已终结,或者从 Thread.join 方法调用中成功返回,或者 Thread.isAlive 方法返回false。
中断法则法则:一个线程调用另一个线程的 interrupt 方法 happens-before 于被中断线程发现中断(通过抛出InterruptedException, 或者调用 isInterrupted 方法和 interrupted 方法)。
终结法则:一个对象的构造函数的结束 happens-before 于这个对象 finalizer 开始。
传递性:如果 A happens-before 于 B,且 B happens-before 于 C,则 A happens-before 于 C。
编译器优化的重排序。编译器在不改变单线程程序语义(as-if-serial )的前提下,可以重新安排语句的执行顺序。
指令级并行的重排序。现代处理器采用了指令级并行技术(Instruction Level Parallelism,ILP)来将多条指令重叠执行。如果不存在数据依赖性,处理器可以改变语句对机器指令的执行顺序。
内存系统的重排序。由于处理器使用缓存和读/写缓冲区,这使得加载和存储操作看上去可能是在乱序执行。
程序员对内存模型的使用。程序员希望内存模型易于理解,易于编程。程序员希望基于一个强内存模型(程序尽可能的顺序执行)来编写代码。
编译器和处理器对内存模型的实现。编译器和处理器希望内存模型对它们的束缚越少越好,这样它们就可以做尽可能多的优化(对程序重排序,做尽可能多的并发)来提高性能。编译器和处理器希望实现一个弱内存模型。
对于会改变程序执行结果的重排序,JMM要求编译器和处理器必须禁止这种重排序。
对于不会改变程序执行结果的重排序,JMM对编译器和处理器不作要求(JMM允许这种重排序)。
原子性:由Java内存模型来直接保证的原子性操作就是我们前面介绍的8个原子操作指令,其中lock(lock指令实际在处理器上原子操作体现对总线加锁或对缓存加锁)和unlock指令操作JVM并未直接开放给用户使用,但是却提供了更高层次的字节码指令monitorenter和monitorexit来隐式使用这两个操作,这两个字节码指令反映到Java代码中就是同步块——synchronize关键字,因此在synchronized块之间的操作也具备原子性。除了synchronize,在Java中另一个实现原子操作的重要方式是自旋CAS,它是利用处理器提供的cmpxchg指令实现的。至于自旋CAS后面J.U.C中会详细介绍,它和volatile是整个J.U.C底层实现的核心。
可见性:可见性是指一个线程修改了共享变量的值,其他线程能够立即得知这个修改。而我们上文谈的happens-before原则禁止某些处理器和编译器的重排序,来保证了JMM的可见性。而体现在程序上,实现可见性的关键字包含了volatile、synchronize和final。
有序性:谈到有序性就涉及到前面说的重排序和顺序一致性内存模型。我们也都知道了as-if-serial是针对单线程程序有序的,即使存在重排序,但是最终程序结果还是不变的,而多线程程序的有序性则体现在JMM通过插入内存屏障指令,禁止了特定类型处理器的重排序。通过前面8个操作指令和happens-before原则介绍,也不难推断出,volatile和synchronized两个关键字来保证线程之间的有序性,volatile本身就包含了禁止指令重排序的语义,而synchronized则是由监视器法则获得。
J.U.C
ABA问题。因为CAS需要在操作值的时候,检查值有没有变化,如果没有发生变化则更新,但是如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时会发现它的值没有变化,但是实际上发生变化了。ABA解决的思路是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加1。JDK的Atomic包里提供了一个类AtomicStampedReference来解决ABA问题。不过目前来说这个类比较“鸡肋”,大部分情况下ABA问题不会影响程序并发的正确性,如果需要解决ABA问题,改用原来的互斥同步可能会比原子类更高效。
循环时间长开销大。自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销。所以说如果是长时间占用锁执行的程序,这种方案并不适用于此。
只能保证一个共享变量的原子操作。当对一个共享变量执行操作时,我们可以使用自旋CAS来保证原子性,但是对多个共享变量的操作时,自旋CAS就无法保证操作的原子性,这个时候可以用锁。
123456789101112131415161718192021222324252627282930313233343536373839 | public class CASTest { public static void main(String[] args){ final Counter cas= new Counter(); List ts= new ArrayList( 600 ); long start=System.currentTimeMillis(); for ( int j= 0 ;j< 100 ;j++){ Thread t= new Thread( new Runnable() { @Override public void run() { for ( int i= 0 ;i< 10000 ;i++){ cas.count(); cas.safeCount(); } } }); ts.add(t); } for (Thread t:ts){ t.start(); } for (Thread t:ts){ try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(cas.i); System.out.println(cas.atomicI.get()); System.out.println(System.currentTimeMillis()-start); } } public class Counter { public AtomicInteger atomicI= new AtomicInteger( 0 ); public int i= 0 ; /** * 使用CAS实现线程安全计数器 */ public void safeCount(){ for (;;){ int i=atomicI.get(); boolean suc=atomicI.compareAndSet(i,++i); if (suc){ break ; } } } /** * 非线程安全计数器 */ public void count(){ i++; } } |
12 | public final boolean compareAndSet( int expect, int update) { return unsafe.compareAndSwapInt( this , valueOffset, expect, update); } |
123456789101112131415161718192021222324 | public class Mutex implements Lock { private static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -4387327721959839431L; protected boolean isHeldExclusively() { return getState() == 1 ; } public boolean tryAcquire( int acquires) { assert acquires == 1 ; // Otherwise unused if (compareAndSetState( 0 , 1 )) { setExclusiveOwnerThread(Thread.currentThread()); return true ; } return false ; } protected boolean tryRelease( int releases) { assert releases == 1 ; // Otherwise unused if (getState() == 0 ) throw new IllegalMonitorStateException(); setExclusiveOwnerThread( null ); setState( 0 ); return true ; } Condition newCondition() { return new ConditionObject(); } } private final Sync sync = new Sync(); public void lock() { sync.acquire( 1 ); } public boolean tryLock() { return sync.tryAcquire( 1 ); } public void unlock() { sync.release( 1 ); } public Condition newCondition() { return sync.newCondition(); } public boolean isLocked() { return sync.isHeldExclusively(); } public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly( 1 ); } public boolean tryLock( long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos( 1 , unit.toNanos(timeout)); } } |
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950 | public class ReentrantLock implements Lock, java.io.Serializable { private static final long serialVersionUID = 7373984872572414699L; /** Synchronizer providing all implementation mechanics */ private final Sync sync; /** * Base of synchronization control for this lock. Subclassed * into fair and nonfair versions below. Uses AQS state to * represent the number of holds on the lock. */ abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; /** * Performs {@link Lock#lock}. The main reason for subclassing * is to allow fast path for nonfair version. */ abstract void lock(); /** * Performs non-fair tryLock. tryAcquire is * implemented in subclasses, but both need nonfair * try for trylock method. */ final boolean nonfairTryAcquire( int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (compareAndSetState( 0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) // overflow throw new Error( "Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; } protected final boolean tryRelease( int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false ; if (c == 0 ) { free = true ; setExclusiveOwnerThread( null ); } setState(c); return free; } protected final boolean isHeldExclusively() { // While we must in general read state before owner, // we don't need to do so to check if current thread is owner return getExclusiveOwnerThread() == Thread.currentThread(); } final ConditionObject newCondition() { return new ConditionObject(); } // Methods relayed from outer class final Thread getOwner() { return getState() == 0 ? null : getExclusiveOwnerThread(); } final int getHoldCount() { return isHeldExclusively() ? getState() : 0 ; } final boolean isLocked() { return getState() != 0 ; } /** * Reconstitutes this lock instance from a stream. * @param s the stream */ private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); setState( 0 ); // reset to unlocked state } } /** * Sync object for non-fair locks */ static final class NonfairSync extends Sync { //todo sth... } //todo sth...} |
1234 | public final void acquire( int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } |
12345678910111213141516171819 | private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } private Node enq( final Node node) { for (;;) { Node t = tail; if (t == null ) { // Must initialize if (compareAndSetHead( new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } |
123456789101112 | final boolean acquireQueued( final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; // help GC failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } } |
ConcurrentHashMap : 一个高效的线程安全的HashMap。
CopyOnWriteArrayList : 在读多写少的场景中,性能非常好,远远高于vector。
ConcurrentLinkedQueue : 高效并发队列,使用链表实现,可以看成线程安全的LinkedList。
BlockingQueue : 一个接口,JDK内部通过链表,数组等方式实现了这个接口,表示阻塞队列,非常适合用作数据共享 。
ConcurrentSkipListMap : 跳表的实现,这是一个Map,使用跳表数据结构进行快速查找 。
ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
PriorityBlockingQueue :一个支持优先级排序的*阻塞队列。
DelayQueue:一个使用优先级队列实现的*阻塞队列。
SynchronousQueue:一个不存储元素的阻塞队列。
LinkedTransferQueue:一个由链表结构组成的*阻塞队列。
LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
123456789101112131415161718192021 | private final Condition notFull; private final Condition notEmpty; public ArrayBlockingQueue( int capacity, boolean fair) { //省略其他代码 notEmpty = lock.newCondition(); notFull = lock.newCondition(); } public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); insert(e); } finally { lock.unlock(); } } public E take() throws InterruptedException { final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == 0 ) notEmpty.await(); return extract(); } finally { lock.unlock(); } } private void insert(E x) { items[putIndex] = x; putIndex = inc(putIndex); ++count; notEmpty.signal(); } |
12345678 | public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0 ; while (!isOnSyncQueue(node)) { LockSupport.park( this ); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null ) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); } |
12345 | public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); unsafe.park( false , 0L); setBlocker(t, null ); } |
1 | public native void park( boolean isAbsolute, long time); |
与park对应的unpark执行或已经执行时。注意:已经执行是指unpark先执行,然后再执行的park。
线程被中断时。
如果参数中的time不是零,等待了指定的毫秒数时。
发生异常现象时。这些异常事先无法确定。
123456789101112131415161718192021222324 | void os::PlatformEvent::park() { int v ; for (;;) { v = _Event ; if (Atomic::cmpxchg (v- 1 , &_Event, v) == v) break ; } guarantee (v >= 0 , "invariant" ) ; if (v == 0 ) { // Do this the hard way by blocking ... int status = pthread_mutex_lock(_mutex); assert_status(status == 0 , status, "mutex_lock" ); guarantee (_nParked == 0 , "invariant" ) ; ++ _nParked ; while (_Event < 0 ) { status = pthread_cond_wait(_cond, _mutex); // for some reason, under 2.7 lwp_cond_wait() may return ETIME ... // Treat this the same as if the wait was interrupted if (status == ETIME) { status = EINTR; } assert_status(status == 0 || status == EINTR, status, "cond_wait" ); } -- _nParked ; // In theory we could move the ST of 0 into _Event past the unlock(), // but then we'd need a MEMBAR after the ST. _Event = 0 ; status = pthread_mutex_unlock(_mutex); assert_status(status == 0 , status, "mutex_unlock" ); } guarantee (_Event >= 0 , "invariant" ) ; } } |
12345 | public static ExecutorService newFixedThreadPool( int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService ( new ThreadPoolExecutor( 1 , 1 , 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor( 0 , Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); } |
newFixedThreadPool()方法的实现,它返回一个corePoolSize和maximumPoolSize一样的,并使用了LinkedBlockingQueue任务队列(*队列)的线程池。当任务提交非常频繁时,该队列可能迅速膨胀,从而系统资源耗尽。
newSingleThreadExecutor()返回单线程线程池,是newFixedThreadPool()方法的退化,只是简单的将线程池数量设置为1。
newCachedThreadPool()方法返回corePoolSize为0而maximumPoolSize无穷大的线程池,这意味着没有任务的时候线程池内没有现场,而当任务提交时,该线程池使用空闲线程执行任务,若无空闲则将任务加入SynchronousQueue队列,而SynchronousQueue队列是直接提交队列,它总是破事线程池增加新的线程来执行任务。当任务执行完后由于corePoolSize为0,因此空闲线程在指定时间内(60s)被回收。对于newCachedThreadPool(),如果有大量任务提交,而任务又不那么快执行时,那么系统变回开启等量的线程处理,这样做法可能会很快耗尽系统的资源,因为它会增加无穷大数量的线程。
12345678 | public ThreadPoolExecutor( //核心线程池,指定了线程池中的线程数量 int corePoolSize, //基本线程池,指定了线程池中的最大线程数量 int maximumPoolSize, //当前线程池数量超过corePoolSize时,多余的空闲线程的存活时间,即多次时间内会被销毁。 long keepAliveTime, //keepAliveTime的单位 TimeUnit unit, //任务队列,被提交但尚未被执行的任务。 BlockingQueue workQueue, //线程工厂,用于创建线程,一般用默认的即可 ThreadFactory threadFactory, //拒绝策略,当任务太多来不及处理,如何拒绝任务。 RejectedExecutionHandler handler) |
首先线程池判断基本线程池是否已满,如果没满,创建一个工作线程来执行任务。满了,则进入下个流程。
其次线程池判断工作队列是否已满,如果没满,则将新提交的任务存储在工作队列里。满了,则进入下个流程。
最后线程池判断整个线程池是否已满,如果没满,则创建一个新的工作线程来执行任务,满了,则交给饱和策略来处理这个任务。
123456789101112131415161718192021 | public void execute(Runnable command) { if (command == null ) throw new NullPointerException(); int c = ctl.get(); /** * workerCountOf(c)获取当前线程池线程总数 * 当前线程数小于corePoolSize核心线程数时,会将任务通过addWorker(command, true)方法直接调度执行。 * 否则进入下个if,将任务加入等待队列 **/ if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true )) return ; c = ctl.get(); } /** * workQueue.offer(command) 将任务加入等待队列。 * 如果加入失败(比如有界队列达到上限或者使用了synchronousQueue)则会执行else。 * **/ if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0 ) addWorker( null , false ); } /** * addWorker(command, false)直接交给线程池, * 如果当前线程已达到maximumPoolSize,则提交失败执行reject()拒绝策略。 **/ else if (!addWorker(command, false )) reject(command); } |
如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。
如果运行的线程等于或多于corePoolSize,则将任务加入到BlockingQueue。
如果无法将任务假如BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。
如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。