在系统开发时,我们经常会遇到“池”的概念。使用池一种以空间换时间的做法,通常在内存中事先保存一系列整装待命的对象,以供后期供其他对象随时调用。常见的池有:数据库连接池,socket连接池,线程池等。今天我们就来看一下线程池的概念。
◆
Executor
◆
JDK为我们提供了一套Executor框架来方便我们来管理和使用线程池。
打开java.util.concurrent.Executors类,我们可以发现JDK为我们提供了那么多的方法来帮助我们高效快捷的创建线程池:
1 |
public static ExecutorService newFixedThreadPool(int nThreads);//创建一个固定数目的、可重用的线程池 |
上方简单列举了几个Executor框架为我们提供的创建线程池的方法,这些线程池拥有各种各样的功能,我想当你刚刚开始使用线程的时候google如何使用线程池的时候大部分文章都是教你如何使用上方的一些方法创建一个线程池。但是如果你去查看他们的源码就会发现他们最后构造的时候都调用了同一个构造方法。(除了newWorkStealingPool之外,这个我们在下篇文章再讨论)
1 |
ThreadPoolExecutor(int corePoolSize,//线程池线程数量 |
上方的4个参数我想你看到了就会明白了,现在我们着重来讲一下下面的三个参数。
◆
WorkQueue
◆
参数workQueue是用来存放已提交但还未执行的任务,JDK为我们提供了一下实现:
直接提交队列SynchronousQueue
1 |
当新任务过来的时候它是这样处理的: |
因此使用这个队列时一定要设置很大的maximumPoolSize
有界的任务队列ArrayBlockingQueue
1 |
if(当前线程数<corePoolSize){ |
*的任务队列LinkedBlockingDeque
1 |
if(当前线程数<corePoolSize){ |
◆
线程工厂
◆
第六个参数threadFactory是为线程池中创建线程的,我们使用Executor框架创建的线程就是有threadFactory提供的。我们看一下JDK提供的默认的threadFactory:
1 |
static class DefaultThreadFactory implements ThreadFactory { |
重点关注一下其中的newThread方法,看到这个我想你就明白了为什么你使用线程池创建出来的线程打印的时候名字的来源,还有是否是守护线程和优先级等属性的来源了。
◆
拒绝策略
◆
看到刚刚的几种任务队列我们发现当任务过多时是需要指定拒绝策略来进行拒绝呢,那么JDK又为我们提供了哪些拒绝策略呢。
1 |
AbortPolicy直接抛出异常。 |
◆
线程池的扩展
◆
ThreadPoolExecutor不仅仅能够创建各种各样的线程来帮助我们实行功能,它还预留了三个接口来供我们进行扩展。
在runWorker方法中调用线程进行执行之前调用了beforeExecute方法,执行之后调用了afterExecute()方法
1 |
final void runWorker(Worker w) { |
这两个方法在ThreadPoolExecutor类中是没有实现的,我们想要监控线程运行前后的数据就可以通过继承ThreadPoolExecutor类来实现这个扩展。
另外还有一个terminated()方法是在整个线程池退出的时候调用的,我们这里一并扩展。
public class ThreadPoolExecutorDemo extends ThreadPoolExecutor {
//注意这里因为ThreadPoolExecutor没有无参的构造,所以还需要重写一下构造方法。
//这里限于篇幅就不贴了
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println(Thread.currentThread().getId()+"执行完成");
}
@Override
protected void terminated() {
System.out.println("线程池退出");
}
}
//使用这个demo就可以验证我们扩展的结果了。
public class ThreadPoolDemo {
static class ThreadDemo extends Thread {
@Override
public void run() {
System.out.println(System.currentTimeMillis() + ":Thread ID is:" + Thread.currentThread().getId());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutorDemo threadPoolExecutorDemo= new ThreadPoolExecutorDemo(5,5,0,TimeUnit.SECONDS,new LinkedBlockingDeque<Runnable>());
ThreadDemo threadDemo = new ThreadDemo();
for (int i = 0; i < 20; i++) {
threadPoolExecutorDemo.submit(threadDemo);
}
threadPoolExecutorDemo.shutdown();
}
}