Synchronizer
Synchronizer是一个对象,可以根据本身的状态调整线程的控制流- 结构特性:
- 封装状态,而这些状态决定着线程执行到某一点时是通过还是*等待
- 提供操作状态的方法,以及高效的等待Synchronizer进入到期望状态的方法
- 理解:第一句话~ 就阻塞队列来讲,它封装了 队列已满,队列为空 等状态; 队列为空这个状态决定了消费者从队列中中获取数据时,是顺利获取,还是等待 同样的,队列已满这个状态就决定了生产者生产的数据能不能顺利放到队列中去 第二句话~ 还没弄明白
- 包括
- 阻塞队列 BlockingQueue
- 信号量 semaphore
- 关卡 barrier
- 闭锁 latch
闭锁(latch)
可以延迟线程的进度,直到闭锁达到终止状态; 闭锁工作起来就像一道门,在闭锁到达终止状态之前,门是关闭的,没有线程能通过;一旦到达终止状态,门开了,所有线程都能通过,闭锁的状态会一直保持在敞开状态 可以用来确保特定的活动知道其他活动完成之后才发生- 确保一个计算不会执行,直到他需要的资源被初始化
- 一个二元闭锁(只有两个状态)来表达资源是否已经初始化,所有需要用到该资源的线程都在闭锁中等待
- 确保一个服务s不会开始,直到他依赖的服务(t1-tn)都已经开始
- 还不明白怎么回事,大概就是在s中等待t1-tn的闭锁开启
CountDownLatch -- 一个闭锁的实现
CountDownLatch通过一个整数实例化对象,该对象每调用一次countDown方法,该整数减一,若该数字为0,则闭锁打开 示例:public class TestHarness{
public long timeTasks(int nThreads, final Runnable task) throws InterruptedException{
final CountDownLatch startGate = new CountDownLatch(1);//全部线程开始的闭锁
final CountDownLatch endGate = new CountDownLatch(nThread);//全部线程结束的闭锁
for(int i = 0 ; i < nThreads ; i++){
Thread t = new Thread(){
public void run(){
try{
startGate.await();//所有的线程都在这里等待闭锁打开
try{
task.run();//实际要调用的方法
}finally{
endGate.countDown(); //每一个线程执行完毕,调用countDown()方法,直到全部的线程(nThreads个)执行完毕,闭锁打开
}
}catch(InterruptedException e){}
}
};
t.start(); //t.start()不会立即开始执行task.run()方法,线程一开始执行startGate.await();会等待闭锁startGate打开
}
long start = System.nanoTime();//记录开始时间
startGate.countDown();//闭锁startGate初始化为1,执行一次countDown()方法,闭锁就打开了,所有的线程可以继续执行
endGate.await();//当前线程在这里等待所有的线程执行完毕
long end = System.nonaTIme();//记录结束时间
return end - start;
}
}
上面的示例是用来统计 nThread 个线程,从第一个线程启动到全部的线程终止所花费的时间FutureTask --另一种闭锁的实现
描述了一个可以携带结果的计算,通过callable实现的 有三个状态 等待 运行 完成,完成之后永远停留在这个状态 Future.get() 依赖于任务完成的状态,如果任务完成,则返回结果或者抛出异常。若没有完成,则阻塞直到任务完成,然后返回 为什么说FatureTask是一种闭锁?FutureTask有一个状态标志,即任务是否完成。 线程T1--Tn调用其get()方法,如果状态标志为未完成,会阻塞全部的线程,等于关闭了一道门,所有的线程都过不去了,直到状态标志改为任务完成,才会有返回值,所有的线程继续执行。所以FutureTask类似于一个二元闭锁。。。不知道这样理解对不对~~ 使用FutureTask预加载数据public class Preloader{
private final FutureTask<ProducInfo> future = new FutureTask<ProductInfo>( new Callable<ProductInfo>(){
public ProductInfo call() throws DataLoadException{//Callable接受的任务代码中抛出的Throwable会被封装成一个ExecutionException,并被他的调用者future.get()方法重新抛出
return loadProductInfo();//比较耗时的加载操作
}
}
);
private final Thread thread = new Thread(future);
public void start(){ thread.start();}//这里启动新线程进行预加载工作
public ProductInfo get() throws DataLoadException,InterruptedException{
try{
return future.get(); //调用的时候如果FutureTask 的任务完成状态为未完成,则阻塞直到状态改变
}catch(ExecutionException e){//ExecutionException包含三种情况1.受检查异常;2.RuntimeException;3.Error~必须分别处理
Throwable cause = e.getCause();
if(cause instanceof DataLoadException) throw (DataLoadException)cause; //这里处理一下已知的异常
else throw launderThrowable(cause); //通过launderThrowable()方法处理其他情况的异常1.如果cause是一个Error,抛出;2.如果是一个RuntimeException,返回该异常;3.其他情况下抛出IllegalStateException
}
}
}
计数信号量(Semaphore)
用来控制访问某特定资源的活动的数量,或者同时执行某一给定操作的数量。 例如数据库连接池中数据链接的访问,阻塞队列中的有界队列,等多个线程竞争的有限资源,可以用计数信号量来处理 计数信号量管理一个有效的许可集,许可的初始量通过构造函数传给Semaphore acquire方法获取一个许可,如果剩余许可为0,此方法会阻塞直到有可用的许可为止 release方法释放一个许可 二元信号量(初始值为1的Semaphore)可以用作 互斥 锁。 意思是 -- 谁拥有这唯一的许可,就拥有了互斥锁 示例:有界的阻塞容器,使用信号量来约束容器public class BoundedHashSet<T>{
private final Set<T> set;
private final Semaphore sem;
//构造方法,初始化边界
public BoundedHashSet(int bound){
this.set = Collections.synchronizedSet(new HashSet<T>());
sem = new Semaphore(bound);//初始化许可集的大小,即该有界阻塞队列的大小
}
public boolean add(T o) throws InterruptedException{
sem.acquire();//添加元素时获取一个许可,如果剩余许可为0,则说明此有界阻塞队列已满
boolean wasAdded = false; //返回值,默认为添加元素失败。
try{
wasAdded=set.add(o);//向队列中添加元素,如果成功,则返回成功
return wasAdded;
}finally{
if(!wasAdded) sem.release(); //如果添加失败,把方法一开始获取的许可释放掉
}
}
public boolean remove (Object o){
boolean wasRemoved = set.remove(o);
if(wasRemoved) sem.release();//如果删除成功,则释放一个许可
return wasRemoved;
}
}
关卡(barrier)
它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。 一定数量的线程只有都运行到关卡的屏障点时,才会同时继续往下执行。也可以向构造函数传递一个关卡行为,当所有线程成功通过关卡,会在其中一个线程执行事先定义好的关卡行为 示例(来自jdk文档) 有一个矩阵 float[][] ,现在对其进行分行并行计算,每个线程计算一行,当全部行计算完毕,即所有的线程都到达一个关卡时,开始汇总计算的结果。class Solver {
final int N;
final float[][] data;
final CyclicBarrier barrier;
class Worker implements Runnable {
int myRow;
Worker(int row) { myRow = row; }
public void run() {
while (!done()) {
processRow(myRow); //每一个线程负责处理一行数据
try {
barrier.await(); //处理完行数据之后在这个关卡等待其他线程,当所有的线程都处理完毕,才继续向下执行
} catch (InterruptedException ex) {
return;
} catch (BrokenBarrierException ex) {
return;
}
}
}
}
public Solver(float[][] matrix) {
data = matrix;
N = matrix.length;
barrier = new CyclicBarrier(N,
new Runnable() {
public void run() {
mergeRows(...); //汇总计算的结果语句
}//关卡CyclicBarrier 初始化时参数一为:关卡需要等待的线程的数量n,只有当n个线程全部到达此关卡,才能顺利通过;参数二为:一个关卡行为,当关卡顺利通过之后,在一个子任务线程中执行这个行为。Runnable类型~
});
for (int i = 0; i < N; ++i)
new Thread(new Worker(i)).start(); //每一行一个线程处理,分别启动
waitUntilDone();
}
}
关卡的实际应用远远没有这么简单,可以看一下jdk文档,线程在关卡处等待的时候会有很多种意外情况,这些都要分别去处理,等把其他地方的知识点也研究过之后在回过够来看这里jdk中的内容吧