Executor框架是指java 5中引入的一系列并发库中与executor相关的一些功能类,其中包括线程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等。并发编程的一种编程方式是把任务拆分为一些列的小任务,即Runnable,然后在提交给一个Executor执行,Executor.execute(Runnalbe) 。Executor在执行时使用内部的线程池完成操作。
一、创建线程池
Executors类,提供了一系列工厂方法用于创先线程池,返回的线程池都实现了ExecutorService接口。
public static ExecutorService newFixedThreadPool(int nThreads)
创建固定数目线程的线程池。
public static ExecutorService newCachedThreadPool()
创建一个可缓存的线程池,调用execute 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。
public static ExecutorService newSingleThreadExecutor()
创建一个单线程化的Executor。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。
例子
1.固定线程创建线程池执行任务
Executor executor = Executors.newFixedThreadPool(10);
Runnable task = new Runnable() {
@Override
public void run() {
System.out.println("task over");
}
};
Runnable task2 = new Runnable() {
@Override
public void run() {
System.out.println("task over2");
}
};
executor.execute(task);
executor.execute(task2);
2.定时周期执行任务代替timer类例子
Executor executor = Executors.newScheduledThreadPool(10);
ScheduledExecutorService scheduler = (ScheduledExecutorService) executor;
/**
* command:执行线程
initialDelay:初始化延时
period:两次开始执行最小间隔时间
unit:计时单位
*/
scheduler.scheduleAtFixedRate(task2, 10, 10, TimeUnit.SECONDS);
二.使用Callable,Future返回结果
Future<V>代表一个异步执行的操作,通过get()方法可以获得操作的结果,如果异步操作还没有完成,则,get()会使当前线程阻塞。FutureTask<V>实现了Future<V>和Runable<V>。Callable代表一个有返回值得操作。(一般Future结合callable使用)
futureTask的使用
单独使用Runnable时:
无法获得返回值
单独使用Callable时:
无法在新线程中(new Thread(Runnable r))使用,只能使用ExecutorService
Thread类只支持Runnable
FutureTask:
实现了Runnable和Future,所以兼顾两者优点
既可以使用ExecutorService,也可以使用Thread
- Callable pAccount = new PrivateAccount();
- FutureTask futureTask = new FutureTask(pAccount);
- // 使用futureTask创建一个线程
- Thread thread = new Thread(futureTask);
- thread.start();
=================================================================
public interface Future<V> Future 表示异步计算的结果。
Future有个get方法而获取结果只有在计算完成时获取,否则会一直阻塞直到任务转入完成状态,然后会返回结果或者抛出异常。
FutureTask是为了弥补Thread的不足而设计的,它可以让程序员准确地知道线程什么时候执行完成并获得到线程执行完成后返回的结果(如果有需要)。
FutureTask是一种可以取消的异步的计算任务。它的计算是通过Callable实现的,它等价于可以携带结果的Runnable,并且有三个状态:等待、运行和完成。完成包括所有计算以任意的方式结束,包括正常结束、取消和异常。
Future 主要定义了5个方法:
1) boolean cancel(boolean mayInterruptIfRunning):试图取消对此任务的执行。如果任务已完成、或已取消,或者由于某些其他原因而无法取消,则此尝试将失败。当调用 cancel 时,如果调用成功,而此任务尚未启动,则此任务将永不运行。如果任务已经启动,则 mayInterruptIfRunning 参数确定是否应该以试图停止任务的方式来中断执行此任务的线程。此方法返回后,对 isDone() 的后续调用将始终返回 true。如果此方法返回 true,则对 isCancelled() 的后续调用将始终返回 true。 (取消当前任务或终止线程)
2)boolean isCancelled():如果在任务正常完成前将其取消,则返回 true。
3)boolean isDone():如果任务已完成,则返回 true。 可能由于正常终止、异常或取消而完成,在所有这些情况中,此方法都将返回 true。
4)V get()throws InterruptedException,ExecutionException:如有必要,等待计算完成,然后获取其结果(futuretask获取Callable返回的结果)。
5)V get(long timeout,TimeUnit unit) throws InterruptedException,ExecutionException,TimeoutException:如有必要,最多等待为使计算完成所给定的时间之后,获取其结果(如果结果可用)。
- public class FutureTask<V> extends Object
- implements Future<V>, Runnable
FutureTask类是Future 的一个实现,并实现了Runnable,所以可通过Excutor(线程池) 来执行,也可传递给Thread对象执行。如果在主线程中需要执行比较耗时的操作时,但又不想阻塞主线程时,可以把这些作业交给Future对象在后台完成,当主线程将来需要时,就可以通过Future对象获得后台作业的计算结果或者执行状态。
Executor框架利用FutureTask来完成异步任务,并可以用来进行任何潜在的耗时的计算。一般FutureTask多用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。
futureTask和Callable的综合使用
例子1
Callable<Integer> func = new Callable<Integer>(){
public Integer call() throws Exception {
System.out.println("inside callable");
Thread.sleep(1000);
return new Integer(8);
}
};
FutureTask<Integer> futureTask = new FutureTask<Integer>(func);
Thread newThread = new Thread(futureTask);
newThread.start();
try {
System.out.println("blocking here");
Integer result = futureTask.get();
System.out.println(result);
} catch (InterruptedException ignored) {
} catch (ExecutionException ignored) {
}
例子2
1.多处理单元计算总和
ConcurrentCalculato.java为计算类
public class ConcurrentCalculato {
private ExecutorService exec;
private int cpuCoreNumber;
private List<Future<Long>> tasks = new ArrayList<Future<Long>>();
// 内部类
class SumCalculator implements Callable<Long> {
private int[] numbers;
private int start;
private int end;
public SumCalculator(final int[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}
/* (non-Javadoc)任务结束
* @see java.util.concurrent.Callable#call()
*/
public Long call() throws Exception {
Long sum = 0l;
for (int i = start; i < end; i++) {
sum += numbers[i];
}
return sum;
}
}
public ConcurrentCalculato() {
cpuCoreNumber = Runtime.getRuntime().availableProcessors();
exec = Executors.newFixedThreadPool(cpuCoreNumber);
}
public Long sum(final int[] numbers) {
// 根据CPU核心个数拆分任务,创建FutureTask并提交到Executor
for (int i = 0; i < cpuCoreNumber; i++) {
int increment = numbers.length / cpuCoreNumber + 1;
int start = increment * i;
int end = increment * i + increment;
if (end > numbers.length)
end = numbers.length;
SumCalculator subCalc = new SumCalculator(numbers, start, end);
FutureTask<Long> task = new FutureTask<Long>(subCalc);
tasks.add(task);
if (!exec.isShutdown()) {
exec.submit(task);
}
}
return getResult();
}
//
/**
* 迭代每个只任务,获得部分和,相加返回
*
* @return
*/
public Long getResult() {
Long result = 0l;
for (Future<Long> task : tasks) {
try {
// 如果计算未完成则阻塞
Long subSum = task.get();
result += subSum;
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
return result;
}
public void close() {
exec.shutdown();
}
2.main方法里面的代码
public static void main(String[] args) {
int[] numbers = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 10, 11 };
ConcurrentCalculato calc = new ConcurrentCalculato();
Long sum = calc.sum(numbers);
System.out.println(sum);
calc.close();
}
//待续这里。。。。
三、CompletionService
在刚在的例子中,getResult()方法的实现过程中,迭代了FutureTask的数组,如果任务还没有完成则当前线程会阻塞,如果我们希望任意字任务完成后就把其结果加到result中,而不用依次等待每个任务完成,可以使CompletionService。生产者submit()执行的任务。使用者take()已完成的任务,并按照完成这些任务的顺序处理它们的结果 。也就是调用CompletionService的take方法是,会返回按完成顺序放回任务的结果,CompletionService内部维护了一个阻塞队列BlockingQueue,如果没有任务完成,take()方法也会阻塞(阻塞当前统计的线程)。修改刚才的例子使用CompletionService:
Java代码
public class ConcurrentCalculator2 {
private ExecutorService exec;
private CompletionService<Long> completionService;
private int cpuCoreNumber;
// 内部类
class SumCalculator implements Callable<Long> {
private int[] numbers;
private int start;
private int end;
public SumCalculator(final int[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}
/* (non-Javadoc)任务结束
* @see java.util.concurrent.Callable#call()
*/
public Long call() throws Exception {
Long sum = 0l;
for (int i = start; i < end; i++) {
sum += numbers[i];
}
return sum;
}
}
public ConcurrentCalculator2() {
cpuCoreNumber = Runtime.getRuntime().availableProcessors();
exec = Executors.newFixedThreadPool(cpuCoreNumber);
completionService = new ExecutorCompletionService<Long>(exec);
}
public Long sum(final int[] numbers) {
// 根据CPU核心个数拆分任务,创建FutureTask并提交到Executor
for (int i = 0; i < cpuCoreNumber; i++) {
int increment = numbers.length / cpuCoreNumber + 1;
int start = increment * i;
int end = increment * i + increment;
if (end > numbers.length)
end = numbers.length;
SumCalculator subCalc = new SumCalculator(numbers, start, end);
if (!exec.isShutdown()) {
completionService.submit(subCalc);
}
}
return getResult();
}
/**
* 迭代每个只任务,获得部分和,相加返回
*
* @return
*/
public Long getResult() {
Long result = 0l;
for (int i = 0; i < cpuCoreNumber; i++) {
try {
Long subSum = completionService.take().get();
result += subSum;
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
return result;
}
public void close() {
exec.shutdown();
}
}
四.ExecutorService
ExecutorService接口继承了Executor接口,定义了一些生命周期的方法
原型
- public interface ExecutorService extends Executor {
- void shutdown();
- List<Runnable> shutdownNow();
- boolean isShutdown();
- boolean isTerminated();
- boolean awaitTermination(long timeout, TimeUnit unit)
- throws InterruptedException;
- }
本文,我们逐一分析里面的每个方法。
首先,我们需要创建一个任务代码,这段任务代码主要是随机生成含有10个字符的字符串
- /**
- * 随机生成10个字符的字符串
- * @author dream-victor
- *
- */
- public class Task1 implements Callable<String> {
- @Override
- public String call() throws Exception {
- String base = "abcdefghijklmnopqrstuvwxyz0123456789";
- Random random = new Random();
- StringBuffer sb = new StringBuffer();
- for (int i = 0; i < 10; i++) {
- int number = random.nextInt(base.length());
- sb.append(base.charAt(number));
- }
- return sb.toString();
- }
- }
然后,我们还需要一个长任务,这里我们默认是沉睡10秒,
- /**
- * 长时间任务
- *
- * @author dream-victor
- *
- */
- public class LongTask implements Callable<String> {
- @Override
- public String call() throws Exception {
- TimeUnit.SECONDS.sleep(10);
- return "success";
- }
- }
OK,所有前期准备完毕,下面我们就来分析一下ExecutorService接口中和生命周期有关的这些方法:
1、shutdown方法:这个方法会平滑地关闭ExecutorService,当我们调用这个方法时,ExecutorService停止接受任何新的任务且等待已经提交的任务执行完成(已经提交的任务会分两类:一类是已经在执行的,另一类是还没有开始执行的),当所有已经提交的任务执行完毕后将会关闭ExecutorService。这里我们先不举例在下面举例。
2、awaitTermination方法:这个方法有两个参数,一个是timeout即超时时间,另一个是unit即时间单位。这个方法会使线程等待timeout时长,当超过timeout时间后,会监测ExecutorService是否已经关闭,若关闭则返回true,否则返回false。一般情况下会和shutdown方法组合使用。例如:
- ExecutorService service = Executors.newFixedThreadPool(4);
- service.submit(new Task1());
- service.submit(new Task1());
- service.submit(new LongTask());
- service.submit(new Task1());
- service.shutdown();
- while (!service.awaitTermination(1, TimeUnit.SECONDS)) {
- System.out.println("线程池没有关闭");
- }
- System.out.println("线程池已经关闭");
这段代码中,我们在第三次提交了一个长任务,这个任务将执行10秒沉睡,紧跟着执行了一次shutdown()方法,假设:这时ExecutorService被立即关闭,下面调用service.awaitTermination(1, TimeUnit.SECONDS)方法时应该返回true,程序执行结果应该只会打印出:“线程池已经关闭”。但是,真实的运行结果如下:
- 线程池没有关闭
- 线程池没有关闭
- 线程池没有关闭
- 线程池没有关闭
- 线程池没有关闭
- 线程池没有关闭
- 线程池没有关闭
- 线程池没有关闭
- 线程池没有关闭
- 线程池已经关闭
这说明我们假设错误,service.awaitTermination(1, TimeUnit.SECONDS)每隔一秒监测一次ExecutorService的关闭情况,而长任务正好需要执行10秒,因此会在前9秒监测时ExecutorService为未关闭状态,而在第10秒时已经关闭,因此第10秒时输出:线程池已经关闭。这也验证了shutdown方法关闭ExecutorService的条件。
3、shutdownNow方法:这个方法会强制关闭ExecutorService,它将取消所有运行中的任务和在工作队列中等待的任务,这个方法返回一个List列表,列表中返回的是等待在工作队列中的任务。例如:
- ExecutorService service = Executors.newFixedThreadPool(3);
- service.submit(new LongTask());
- service.submit(new LongTask());
- service.submit(new LongTask());
- service.submit(new LongTask());
- service.submit(new LongTask());
- List<Runnable> runnables = service.shutdownNow();
- System.out.println(runnables.size());
- while (!service.awaitTermination(1, TimeUnit.MILLISECONDS)) {
- System.out.println("线程池没有关闭");
- }
- System.out.println("线程池已经关闭");
这段代码中,我们限制了线程池的长度是3,提交了5个任务,这样将有两个任务在工作队列中等待,当我们执行shutdownNow方法时,ExecutorService被立刻关闭,所以在service.awaitTermination(1, TimeUnit.MILLISECONDS)方法校验时返回的是false,因此没有输出:线程池没有关闭。而在调用shutdownNow方法时,我们接受到了一个List,这里包含的是在工作队列中等待执行的任务,由于线程池长度为3,且执行的都是长任务,所以当提交了三个任务后线程池已经满了,剩下的两次提交只能在工作队列中等待,因此我们看到runnables的大小为2,结果如下:
4、isTerminated方法:这个方法会校验ExecutorService当前的状态是否为“TERMINATED”即关闭状态,当为“TERMINATED”时返回true否则返回false。例如:
- ExecutorService service = Executors.newFixedThreadPool(3);
- service.submit(new Task1());
- service.submit(new Task1());
- service.submit(new LongTask());
- service.shutdown();
- System.out.println(System.currentTimeMillis());
- while (!service.isTerminated()) {
- }
- System.out.println(System.currentTimeMillis());
这段代码我们执行了两个正常的任务和一个长任务,然后调用了shutdown方法,我们知道调用shutdown方法并不会立即关闭ExecutorService,这时我们记录一下监测循环执行前的时间,在没有关闭前我们一直进入一个空循环中,直到 ExecutorService关闭后退出循环,这里我们知道长任务执行时间大约为10秒,我们看一下上述程序运行结果:
- 1303298818621
- 1303298828634
- 相差:10013毫秒,转换一下除以1000,得到相差大约10秒
这10秒正好是长任务执行的时间,因此在 ExecutorService正常关闭后isTerminated方法返回true。
5、isShutdown方法:这个方法在ExecutorService关闭后返回true,否则返回false。方法比较简单不再举例。
以上讨论是基于ThreadPoolExecutor的实现,不同的实现会有所不同需注意。