123456 | public class UnsafeLazyInitiallization { private static UnsafeLazyInitiallization instance; private UnsafeLazyInitiallization() { } public static UnsafeLazyInitiallization getInstance(){ if (instance== null ){ //1:A线程执行 instance= new UnsafeLazyInitiallization(); //2:B线程执行 } return instance; } } |
123456 | public class SafeLazyInitiallization { private static SafeLazyInitiallization instance; private SafeLazyInitiallization() { } public synchronized static SafeLazyInitiallization getInstance(){ if (instance== null ){ instance= new SafeLazyInitiallization(); } return instance; } } |
123456789101112 | public class DoubleCheckedLocking { //1 private static DoubleCheckedLocking instance; //2 private DoubleCheckedLocking() { } public static DoubleCheckedLocking getInstance() { //3 if (instance == null ) { //4:第一次检查 synchronized (DoubleCheckedLocking. class ) { //5:加锁 if (instance == null ) //6:第二次检查 instance = new DoubleCheckedLocking(); //7:问题的根源出在这里 } //8 } //9 return instance; //10 } //11} |
1 | memory=allocate(); //1:分配对象的内存空间ctorInstance(memory); //2:初始化对象instance=memory; //3:设置instance指向刚分配的内存地址 |
1 | memory=allocate(); //1:分配对象的内存空间instance=memory; //3:设置instance指向刚分配的内存地址,注意此时对象还没有被初始化ctorInstance(memory); //2:初始化对象 |
123456789101112 | public class SafeDoubleCheckedLocking { private volatile static SafeDoubleCheckedLocking instance; private SafeDoubleCheckedLocking() { } public static SafeDoubleCheckedLocking getInstance() { if (instance == null ) { synchronized (SafeDoubleCheckedLocking. class ) { if (instance == null ) instance = new SafeDoubleCheckedLocking(); //instance为volatile,现在没问题了 } } return instance; } } |
12345678 | public class InstanceFactory { private InstanceFactory() { } private static class InstanceHolder { public static InstanceFactory instance = new InstanceFactory(); } public static InstanceFactory getInstance() { return InstanceHolder.instance; //这里将导致InstanceHolder类被初始化 } } |
程序次序法则:线程中的每个动作 A 都 happens-before 于该线程中的每一个动作 B,其中,在程序中,所有的动作 B 都出现在动作 A 之后。(注:此法则只是要求遵循 as-if-serial语义)
监视器锁法则:对一个监视器锁的解锁 happens-before 于每一个后续对同一监视器锁的加锁。(显式锁的加锁和解锁有着与内置锁,即监视器锁相同的存储语意。)
volatile变量法则:对 volatile 域的写入操作 happens-before 于每一个后续对同一域的读操作。(原子变量的读写操作有着与 volatile 变量相同的语意。)(volatile变量具有可见性和读写原子性。)
线程启动法则:在一个线程里,对 Thread.start 的调用会 happens-before 于每一个启动线程中的动作。 线程终止法则:线程中的任何动作都 happens-before 于其他线程检测到这个线程已终结,或者从 Thread.join 方法调用中成功返回,或者 Thread.isAlive 方法返回false。
中断法则法则:一个线程调用另一个线程的 interrupt 方法 happens-before 于被中断线程发现中断(通过抛出InterruptedException, 或者调用 isInterrupted 方法和 interrupted 方法)。
终结法则:一个对象的构造函数的结束 happens-before 于这个对象 finalizer 开始。
传递性:如果 A happens-before 于 B,且 B happens-before 于 C,则 A happens-before 于 C。
编译器优化的重排序。编译器在不改变单线程程序语义(as-if-serial )的前提下,可以重新安排语句的执行顺序。
指令级并行的重排序。现代处理器采用了指令级并行技术(Instruction Level Parallelism,ILP)来将多条指令重叠执行。如果不存在数据依赖性,处理器可以改变语句对机器指令的执行顺序。
123456789101112131415161718192021222324252627282930313233343536373839 | public class CASTest { public static void main(String[] args){ final Counter cas= new Counter(); List ts= new ArrayList( 600 ); long start=System.currentTimeMillis(); for ( int j= 0 ;j< 100 ;j++){ Thread t= new Thread( new Runnable() { @Override public void run() { for ( int i= 0 ;i< 10000 ;i++){ cas.count(); cas.safeCount(); } } }); ts.add(t); } for (Thread t:ts){ t.start(); } for (Thread t:ts){ try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(cas.i); System.out.println(cas.atomicI.get()); System.out.println(System.currentTimeMillis()-start); } } public class Counter { public AtomicInteger atomicI= new AtomicInteger( 0 ); public int i= 0 ; /** * 使用CAS实现线程安全计数器 */ public void safeCount(){ for (;;){ int i=atomicI.get(); boolean suc=atomicI.compareAndSet(i,++i); if (suc){ break ; } } } /** * 非线程安全计数器 */ public void count(){ i++; } } |
12 | public final boolean compareAndSet( int expect, int update) { return unsafe.compareAndSwapInt( this , valueOffset, expect, update); } |
123456789101112131415161718192021222324 | public class Mutex implements Lock { private static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -4387327721959839431L; protected boolean isHeldExclusively() { return getState() == 1 ; } public boolean tryAcquire( int acquires) { assert acquires == 1 ; // Otherwise unused if (compareAndSetState( 0 , 1 )) { setExclusiveOwnerThread(Thread.currentThread()); return true ; } return false ; } protected boolean tryRelease( int releases) { assert releases == 1 ; // Otherwise unused if (getState() == 0 ) throw new IllegalMonitorStateException(); setExclusiveOwnerThread( null ); setState( 0 ); return true ; } Condition newCondition() { return new ConditionObject(); } } private final Sync sync = new Sync(); public void lock() { sync.acquire( 1 ); } public boolean tryLock() { return sync.tryAcquire( 1 ); } public void unlock() { sync.release( 1 ); } public Condition newCondition() { return sync.newCondition(); } public boolean isLocked() { return sync.isHeldExclusively(); } public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly( 1 ); } public boolean tryLock( long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos( 1 , unit.toNanos(timeout)); } } |
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950 | public class ReentrantLock implements Lock, java.io.Serializable { private static final long serialVersionUID = 7373984872572414699L; /** Synchronizer providing all implementation mechanics */ private final Sync sync; /** * Base of synchronization control for this lock. Subclassed * into fair and nonfair versions below. Uses AQS state to * represent the number of holds on the lock. */ abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; /** * Performs {@link Lock#lock}. The main reason for subclassing * is to allow fast path for nonfair version. */ abstract void lock(); /** * Performs non-fair tryLock. tryAcquire is * implemented in subclasses, but both need nonfair * try for trylock method. */ final boolean nonfairTryAcquire( int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (compareAndSetState( 0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) // overflow throw new Error( "Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; } protected final boolean tryRelease( int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false ; if (c == 0 ) { free = true ; setExclusiveOwnerThread( null ); } setState(c); return free; } protected final boolean isHeldExclusively() { // While we must in general read state before owner, // we don't need to do so to check if current thread is owner return getExclusiveOwnerThread() == Thread.currentThread(); } final ConditionObject newCondition() { return new ConditionObject(); } // Methods relayed from outer class final Thread getOwner() { return getState() == 0 ? null : getExclusiveOwnerThread(); } final int getHoldCount() { return isHeldExclusively() ? getState() : 0 ; } final boolean isLocked() { return getState() != 0 ; } /** * Reconstitutes this lock instance from a stream. * @param s the stream */ private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); setState( 0 ); // reset to unlocked state } } /** * Sync object for non-fair locks */ static final class NonfairSync extends Sync { //todo sth... } //todo sth...} |
1234 | public final void acquire( int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } |
12345678910111213141516171819 | private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } private Node enq( final Node node) { for (;;) { Node t = tail; if (t == null ) { // Must initialize if (compareAndSetHead( new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } |
123456789101112 | final boolean acquireQueued( final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; // help GC failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } } |
ConcurrentHashMap : 一个高效的线程安全的HashMap。
CopyOnWriteArrayList : 在读多写少的场景中,性能非常好,远远高于vector。
ConcurrentLinkedQueue : 高效并发队列,使用链表实现,可以看成线程安全的LinkedList。
BlockingQueue : 一个接口,JDK内部通过链表,数组等方式实现了这个接口,表示阻塞队列,非常适合用作数据共享 。
ConcurrentSkipListMap : 跳表的实现,这是一个Map,使用跳表数据结构进行快速查找 。
ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
PriorityBlockingQueue :一个支持优先级排序的*阻塞队列。
123456789101112131415161718192021 | private final Condition notFull; private final Condition notEmpty; public ArrayBlockingQueue( int capacity, boolean fair) { //省略其他代码 notEmpty = lock.newCondition(); notFull = lock.newCondition(); } public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); insert(e); } finally { lock.unlock(); } } public E take() throws InterruptedException { final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == 0 ) notEmpty.await(); return extract(); } finally { lock.unlock(); } } private void insert(E x) { items[putIndex] = x; putIndex = inc(putIndex); ++count; notEmpty.signal(); } |
12345678 | public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0 ; while (!isOnSyncQueue(node)) { LockSupport.park( this ); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null ) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); } |
12345 | public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); unsafe.park( false , 0L); setBlocker(t, null ); } |
1 | public native void park( boolean isAbsolute, long time); |
123456789101112131415161718192021222324 | void os::PlatformEvent::park() { int v ; for (;;) { v = _Event ; if (Atomic::cmpxchg (v- 1 , &_Event, v) == v) break ; } guarantee (v >= 0 , "invariant" ) ; if (v == 0 ) { // Do this the hard way by blocking ... int status = pthread_mutex_lock(_mutex); assert_status(status == 0 , status, "mutex_lock" ); guarantee (_nParked == 0 , "invariant" ) ; ++ _nParked ; while (_Event < 0 ) { status = pthread_cond_wait(_cond, _mutex); // for some reason, under 2.7 lwp_cond_wait() may return ETIME ... // Treat this the same as if the wait was interrupted if (status == ETIME) { status = EINTR; } assert_status(status == 0 || status == EINTR, status, "cond_wait" ); } -- _nParked ; // In theory we could move the ST of 0 into _Event past the unlock(), // but then we'd need a MEMBAR after the ST. _Event = 0 ; status = pthread_mutex_unlock(_mutex); assert_status(status == 0 , status, "mutex_unlock" ); } guarantee (_Event >= 0 , "invariant" ) ; } } |
12345 | public static ExecutorService newFixedThreadPool( int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService ( new ThreadPoolExecutor( 1 , 1 , 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor( 0 , Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); } |
12345678 | public ThreadPoolExecutor( //核心线程池,指定了线程池中的线程数量 int corePoolSize, //基本线程池,指定了线程池中的最大线程数量 int maximumPoolSize, //当前线程池数量超过corePoolSize时,多余的空闲线程的存活时间,即多次时间内会被销毁。 long keepAliveTime, //keepAliveTime的单位 TimeUnit unit, //任务队列,被提交但尚未被执行的任务。 BlockingQueue workQueue, //线程工厂,用于创建线程,一般用默认的即可 ThreadFactory threadFactory, //拒绝策略,当任务太多来不及处理,如何拒绝任务。 RejectedExecutionHandler handler) |
123456789101112131415161718192021 | public void execute(Runnable command) { if (command == null ) throw new NullPointerException(); int c = ctl.get(); /** * workerCountOf(c)获取当前线程池线程总数 * 当前线程数小于corePoolSize核心线程数时,会将任务通过addWorker(command, true)方法直接调度执行。 * 否则进入下个if,将任务加入等待队列 **/ if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true )) return ; c = ctl.get(); } /** * workQueue.offer(command) 将任务加入等待队列。 * 如果加入失败(比如有界队列达到上限或者使用了synchronousQueue)则会执行else。 * **/ if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0 ) addWorker( null , false ); } /** * addWorker(command, false)直接交给线程池, * 如果当前线程已达到maximumPoolSize,则提交失败执行reject()拒绝策略。 **/ else if (!addWorker(command, false )) reject(command); } |