Java并发编程实践 目录
并发编程 04—— 闭锁CountDownLatch 与 栅栏CyclicBarrier
并发编程 06—— CompletionService : Executor 和 BlockingQueue
并发编程 10—— 任务取消 之 关闭 ExecutorService
并发编程 12—— 任务取消与关闭 之 shutdownNow 的局限性
并发编程 13—— 线程池的使用 之 配置ThreadPoolExecutor 和 饱和策略
并发编程 20—— AbstractQueuedSynchronizer 深入分析
概述
第2 部分 java.util.concurrent 同步器类中的AQS
第1 部分 锁和同步器
先看下java.util.concurrent.locks大致结构
上图中,LOCK的实现类其实都是构建在AbstractQueuedSynchronizer上,为何图中没有用UML线表示呢,这是每个Lock实现类都持有自己内部类Sync的实例,而这个Sync就是继承AbstractQueuedSynchronizer(AQS)。为何要实现不同的Sync呢?这和每种Lock用途相关。另外还有AQS的State机制。下文会举例说明不同同步器内的Sync与state实现。
同步器是实现锁的关键,利用同步器将锁的语义实现,然后在锁的实现中聚合同步器。可以这样理解:锁的API是面向使用者的,它定义了与锁交互的公共行为,而每个锁需要完成特定的操作也是透过这些行为来完成的(比如:可以允许两个线程进行加锁,排除两个以上的线程),但是实现是依托给同步器来完成;同步器面向的是线程访问和资源控制,它定义了线程对资源是否能够获取以及线程的排队等操作。锁和同步器很好的隔离了二者所需要关注的领域,严格意义上讲,同步器可以适用于除了锁以外的其他同步设施上(包括锁)。
第2 部分 java.util.concurrent 同步器类中的AQS
什么是state机制
提供 volatile 变量 state; 用于同步线程之间的共享状态。通过 CAS 和 volatile 保证其原子性和可见性。对应源码里的定义:
/** * 同步状态 */ private volatile int state; /** *cas */ protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
不同实现类的Sync与State
基于AQS构建的Synchronizer包括ReentrantLock,Semaphore,CountDownLatch, ReetrantRead WriteLock,FutureTask等,这些Synchronizer实际上最基本的东西就是原子状态的获取和释放,只是条件不一样而已。
2.1 ReentrantLock
需要记录当前线程获取原子状态的次数,如果次数为零,那么就说明这个线程放弃了锁(也有可能其他线程占据着锁从而需要等待),如果次数大于1,也就是获得了重进入的效果,而其他线程只能被park住,直到这个线程重进入锁次数变成0而释放原子状态。以下为ReetranLock的FairSync的tryAcquire实现代码解析。
1 //公平获取锁 2 protected final boolean tryAcquire(int acquires) { 3 final Thread current = Thread.currentThread(); 4 int c = getState(); 5 //如果当前重进入数为0,说明有机会取得锁 6 if (c == 0) { 7 //如果是第一个等待者,并且设置重进入数成功,那么当前线程获得锁 8 if (isFirst(current) && 9 compareAndSetState(0, acquires)) { 10 setExclusiveOwnerThread(current); 11 return true; 12 } 13 } 14 //如果当前线程本身就持有锁,那么叠加重进入数,并且继续获得锁 15 else if (current == getExclusiveOwnerThread()) { 16 int nextc = c + acquires; 17 if (nextc < 0) 18 throw new Error("Maximum lock count exceeded"); 19 setState(nextc); 20 return true; 21 } 22 //以上条件都不满足,那么线程进入等待队列。 23 return false; 24 }
2.2 Semaphore
则是要记录当前还有多少次许可可以使用,到0,就需要等待,也就实现并发量的控制,Semaphore一开始设置许可数为1,实际上就是一把互斥锁。以下为Semaphore的FairSync实现
1 protected int tryAcquireShared(int acquires) { 2 Thread current = Thread.currentThread(); 3 for (;;) { 4 Thread first = getFirstQueuedThread(); 5 //如果当前等待队列的第一个线程不是当前线程,那么就返回-1表示当前线程需要等待 6 if (first != null && first != current) 7 return -1; 8 //如果当前队列没有等待者,或者当前线程就是等待队列第一个等待者,那么先取得semaphore还有几个许可证,并且减去当前线程需要的许可证得到剩下的值 9 int available = getState(); 10 int remaining = available - acquires; 11 //如果remining<0,那么反馈给AQS当前线程需要等待,如果remaining>0,并且设置availble成功设置成剩余数,那么返回剩余值(>0),也就告知AQS当前线程拿到许可,可以继续执行。 12 if (remaining < 0 ||compareAndSetState(available, remaining)) 13 return remaining; 14 } 15 }
2.3 CountDownLatch
闭锁则要保持其状态,在这个状态到达终止态之前,所有线程都会被park住,闭锁可以设定初始值,这个值的含义就是这个闭锁需要被countDown()几次,因为每次CountDown是sync.releaseShared(1),而一开始初始值为10的话,那么这个闭锁需要被countDown()十次,才能够将这个初始值减到0,从而释放原子状态,让等待的所有线程通过。
1 //await时候执行,只查看当前需要countDown数量减为0了,如果为0,说明可以继续执行,否则需要park住,等待countDown次数足够,并且unpark所有等待线程 2 public int tryAcquireShared(int acquires) { 3 return getState() == 0? 1 : -1; 4 } 5 6 //countDown 时候执行,如果当前countDown数量为0,说明没有线程await,直接返回false而不需要唤醒park住线程,如果不为0,得到剩下需要 countDown的数量并且compareAndSet,最终返回剩下的countDown数量是否为0,供AQS判定是否释放所有await线程。 7 public boolean tryReleaseShared(int releases) { 8 for (;;) { 9 int c = getState(); 10 if (c == 0) 11 return false; 12 int nextc = c-1; 13 if (compareAndSetState(c, nextc)) 14 return nextc == 0; 15 } 16 }
2.4 FutureTask
需要记录任务的执行状态,当调用其实例的get方法时,内部类Sync会去调用AQS的acquireSharedInterruptibly()方法,而这个方法会反向调用Sync实现的tryAcquireShared()方法,即让具体实现类决定是否让当前线程继续还是park,而FutureTask的tryAcquireShared方法所做的唯一事情就是检查状态,如果是RUNNING状态那么让当前线程park。而跑任务的线程会在任务结束时调用FutureTask 实例的set方法(与等待线程持相同的实例),设定执行结果,并且通过unpark唤醒正在等待的线程,返回结果。
1 //get时待用,只检查当前任务是否完成或者被Cancel,如果未完成并且没有被cancel,那么告诉AQS当前线程需要进入等待队列并且park住 2 protected int tryAcquireShared(int ignore) { 3 return innerIsDone()? 1 : -1; 4 } 5 6 //判定任务是否完成或者被Cancel 7 boolean innerIsDone() { 8 return ranOrCancelled(getState()) && runner == null; 9 } 10 11 //get时调用,对于CANCEL与其他异常进行抛错 12 V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException { 13 if (!tryAcquireSharedNanos(0,nanosTimeout)) 14 throw new TimeoutException(); 15 if (getState() == CANCELLED) 16 throw new CancellationException(); 17 if (exception != null) 18 throw new ExecutionException(exception); 19 return result; 20 } 21 22 //任务的执行线程执行完毕调用(set(V v)) 23 void innerSet(V v) { 24 for (;;) { 25 int s = getState(); 26 //如果线程任务已经执行完毕,那么直接返回(多线程执行任务?) 27 if (s == RAN) 28 return; 29 //如果被CANCEL了,那么释放等待线程,并且会抛错 30 if (s == CANCELLED) { 31 releaseShared(0); 32 return; 33 } 34 //如果成功设定任务状态为已完成,那么设定结果,unpark等待线程(调用get()方法而阻塞的线程),以及后续清理工作(一般由FutrueTask的子类实现) 35 if (compareAndSetState(s, RAN)) { 36 result = v; 37 releaseShared(0); 38 done(); 39 return; 40 } 41 } 42 }
1.《java并发编程实战》 构建自定义的同步工具
3. AbstractQueuedSynchronizer的介绍和原理分析
4. 深度解析Java 8:JDK1.8 AbstractQueuedSynchronizer的实现分析(上)