
线程复用:线程池
一、为什么需要线程池
为了避免系统频繁地创建和销毁线程,使用线程池让线程进行复用。(即创建线程变成了从线程池中获取空闲线程,销毁线程变成了把线程放回线程池中。)
二、JDK对线程池的支持:Executor框架
一)Excutor框架简介
public static ExecutorService newFixedThreadPool(int Threads) //返回一个固定线程数量的线程池,当新任务提交时无空闲线程,则任务进入等待队列,等待空闲线程。 public static ExecutorService newSingleThreadExecutor() //返回一个只有一个线程的线程池,多余的任务被提交到线程池,会进入等待队列,按先入先出顺序执行。 public static ExecutorService newCahedThreadPool() //返回一个可根据实际情况调整线程数量的线程池 public static ScheduledExecutorService newSingleThreadScheduledExecutor() //线程池中的线程可根据时间执行任务。 public static ScheduledExecutorService newScheduledThreadPool()
public class SESDemo { public static void main(String[] args){
ScheduledExecutorService p = Executors.newScheduledThreadPool(10);
p.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try{
Thread.sleep(1000);
System.out.println(System.currentTimeMillis()/1000+" and name is "+
Thread.currentThread().getName());
}catch (Exception e){
e.printStackTrace();
}
}
},0,2, TimeUnit.SECONDS);
}
}
/*1504236959 and name is pool-1-thread-1
1504236961 and name is pool-1-thread-1
1504236963 and name is pool-1-thread-2
1504236965 and name is pool-1-thread-1
1504236967 and name is pool-1-thread-3
1504236969 and name is pool-1-thread-2
.......
1504237017 and name is pool-1-thread-7
1504237019 and name is pool-1-thread-7
1504237021 and name is pool-1-thread-7
1504237023 and name is pool-1-thread-7
1504237025 and name is pool-1-thread-7
*/
注意:任务本身不能抛出异常,不然后续所有执行都会被中断。
二)框架内部实现:
以上方法具体实现都使用了ThreadPoolExecutor,它们都封装了ThreadPoolExecutor
其中workQueue指被提交但未被执行的任务队列。
拒绝策略:线程池中的线程已经用完,队列也已经排满。
JDK内置的四种拒绝策略:
AbortPolicy:直接抛出异常,阻止系统正常执行。
CallerRunsPolicy:直接在任务提交的线程内执行线程。
DiscardOledestPolicy:丢弃最老的一个任务,并再次提交当前任务。
DiscardPolicy:丢弃无法处理的当前任务,不做其他处理。
以上策略都实现了RejectedExecutionHandler接口,如果以上策略无法满足我们的需求,我们可以扩展该接口。
public class RejectThreadPoolDemo {
public static class MyTask implements Runnable{
@Override
public void run() {
System.out.println(System.currentTimeMillis()+" :Thread ID:"+
Thread.currentThread().getId()); try{
Thread.sleep(100);
}catch (Exception e){
e.printStackTrace();
}
}
} public static void main(String[] args) throws InterruptedException {
MyTask myTask=new MyTask();
ThreadPoolExecutor es = new ThreadPoolExecutor(5, 5, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(10)
, Executors.defaultThreadFactory(),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(r.toString() + "is discard");
}
}); for (int i=0;i<Integer.MAX_VALUE;i++){
es.submit(myTask);
Thread.sleep(10);
}
}
}
三)自定义线程创建
线程池中线程的来源:ThreadFactory接口。override该接口中的
public Thread newThread(Runnable r);
方法即可。
四)扩展线程池
ThreadPoolExecutor是个可以扩展的线程池接口,它提供了beforeExecutor(),afterExecutor()和terminated三个接口对线程池进行控制。
public class ExtThreadPool { public static class Mytask implements Runnable{
private String name; public Mytask(String name) {
this.name = name;
} @Override
public void run() {
System.out.println("正在执行:线程ID"+Thread.currentThread().getId()+
"任务名:"+name);
}
} public static void main(String[] args) throws InterruptedException {
ExecutorService es=new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()){ @Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("准备执行:"+((Mytask)r).name);
} @Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("执行完毕:"+((Mytask)r).name);
} @Override
protected void terminated() {
System.out.println("线程池退出");
}
}; for (int i=0;i<5;i++){
Mytask mytask=new Mytask("task"+i);
es.execute(mytask);
Thread.sleep(10);
} es.shutdown(); //所有任务执行完毕后,关闭线程池。
}
}
五)线程池中寻找异常堆栈
解决多线程中幽灵般的异常:
方法一:放弃submit()改用execute()
方法二:extends ThreadPoolExecutor
六)分而治之,Fork/Join框架
分阶段对大量数据进行处理,然后对结果进行整合。
互助精神:
重要接口:
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)
ForkJoinTask两个重要子类: