线程池是个神器,用得好会非常地方便。本来觉得线程池的构造器有些复杂,即使讲清楚了对今后的用处可能也不太大,因为有一些Java定义好的线程池可以直接使用。但是(凡事总有个但是),还是觉得讲一讲可能跟有助于理解后面的常用线程池,所以该打脸还是打吧
因为直接结合代码看会更清楚一些,所以我把带注释的代码贴出来:
public class ThreadPoolExecutor { public ThreadPoolExecutor( /** * corePoolSize:初始化时指定的核心线程数,包括空闲线程,必须大于等于0,当有新任务提交时,会执行以下判断(workCount为当前活跃的线程数量): * 当workCount< corePoolSize:即使线程池中有空闲线程,也会创建新线程 * 当corePoolSize ≤ workCount < maximumPoolSize:只有workQueue满时才创建新线程 * 当corePoolSize < workCount < maximumPoolSize:且超过corePoolSize部分的线程空闲时间达到keepAliveTime时,就回收这些线程,当设置allowCoreThreadTimeOut(true)时, * 线程池中corePoolSize范围内的线程空闲时间达到keepAliveTime也将被回收 * 当设置corePoolSize == maximumPoolSize:线程池的大小固定,此时如有新任务提交,且workQueue未满时,会将请求放入workQueue,等待有空闲的线程从workQueue中取任务并处理 * 当workCount ≥ maximumPoolSize:若workQueue满,则采取handler对应的策略 */ int corePoolSize, // maximumPoolSize:初始化时指定的最大线程数量 int maximumPoolSize, // keepAliveTime:线程池维护线程所允许的空闲时间。当线程池中的线程数量大于corePoolSize时,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是等待,直到等待的时间超过了keepAliveTime long keepAliveTime, // 空闲时间单位 TimeUnit unit, /** * workQueue:阻塞队列的类型是保存等待执行的任务的阻塞队列,主要有四种提交方式: * SynchronousQueue:同步队列,这个“队列”内部只包含了一个元素,队列的size始终为0,每执行一个put,就需要一个take来解除阻塞,反之也一样。饱和状态下,线程池能处理的最大线程数量为maximumPoolSize * 使用SynchronousQueue队列,提交的任务不会保存,而是会马上提交执行 * 需要对程序的并发量有个准确的评估,才能设置合适的maximumPoolSize数量,否则很容易就会执行拒绝策略 * ArrayBlockingQueue:有界任务队列,饱和状态下,线程池能处理的最大线程数量为maximumPoolSize + ArrayBlockingQueue.SIZE * LinkedBlockingQueue:*任务队列,线程池的任务队列可以无限制的添加新的任务,此时线程池能够创建的最大线程数是corePoolSize, * 而maximumPoolSize就无效了,线程池饱和状态下能处理的最大线程数量只取决于系统的性能 * PriorityBlockingQueue:优先任务队列,同LinkedBlockingQueue一样,它也是一个*的任务队列,只不过需要自己实现元素的Comparable排序接口 */ BlockingQueue<Runnable> workQueue, // threadFactory:创建新线程,使新创建的线程有相同的优先级且为非守护线程,同时设置线程的名称,默认使用Executors.DefaultThreadFactory类创建 ThreadFactory threadFactory, /** * handler:表示线程池的饱和策略,意思就是如果阻塞队列满了并且没有空闲的线程,此时如果继续提交任务,就需要采取一种策略处理该任务,线程池提供了4种策略 * AbortPolicy:直接抛出异常,这是默认策略 * CallerRunsPolicy:如果线程池的线程数量达到上限,则把任务队列中的任务放在调用者的线程当运行 * DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务 * DiscardPolicy:直接丢弃任务 */ RejectedExecutionHandler handler) { // balabala… … } }
这样就清晰多了。
其中,最主要是要清楚几种workQueue,也就是BlockingQueue<Runnable>的作用。
SynchronousQueue同步队列,这个队列没有所谓的缓冲,这样做是为了排除阻塞时队列丢消息的可能。如果没有其他微服务并行执行的话,可以放心地用这个队列,不然还是小心一点为妙。它的示例代码:
/** * 同步队列 */ public class SynchronousQueueTest { public static void main(String[] args) { ExecutorService service = new ThreadPoolExecutor( 1, // 当要处理的线程数超过maximumPoolSize时,抛出异常 2, 1000, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() ); for (int i = 0; i < 10; i++) { service.execute(() -> System.out.println("当前线程 " + Thread.currentThread().getName()) ); } service.shutdown(); } }
ArrayBlockingQueue,它的使用范围非常广,一般可以用于轻量级的同步锁,也就是在同一个服务中(也就是非微服务架构),如果要具有分布式锁的功能又不想部署zookeeper这么麻烦的话,ArrayBlockingQueue就是一个非常不错的选择。
/** * 有界阻塞队列 */ public class ArrayBlockingQueueTest { public static void main(String[] args) { ExecutorService service = new ThreadPoolExecutor( // 要处理的线程数超过maximumPoolSize + workQueue.SIZE时,抛出异常 1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() ); // 因为 maximumPoolSize(2) + ArrayBlockingQueue.SIZE(10) < 13,所以会抛出异常 for (int i = 0; i < 13; i++) { service.execute(() -> System.out.println("当前线程 " + Thread.currentThread().getName()) ); } service.shutdown(); } }
再来看看ArrayBlockingQueue的另一个例子,可以加深印象:
public class ArrayBlockingQueueTester { public static BlockingQueue<String> queue = new ArrayBlockingQueue<String>(5); // 一个往里放 class Producer implements Runnable { @Override public void run() { try { queue.put("川菜"); System.out.println(Thread.currentThread().getName() + " 厨师做好 川菜"); } catch (InterruptedException e) { e.printStackTrace(); } } } // 一个往外拿 class Consumer implements Runnable { @Override public void run() { try { String food = queue.take(); System.out.println( Thread.currentThread().getName() + " 客人消费 " + food ); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { // 客人等着菜 for (int i = 0; i < 5; i++) { new Thread(new ArrayBlockingQueueTester().new Consumer()).start(); } // 厨师做好菜 for (int i = 0; i < 5; i++) { new Thread(new ArrayBlockingQueueTester().new Producer()).start(); } } }
ArrayBlockingQueue说白了就是一个往里放,一个往外拿:
1、往里放的,只能最多放指定个数就不能再放了(阻塞,等待,这里是5个);
2、往外拿的,如果没有可以拿的了,就等着(阻塞,等待)。
咱们点菜的时候不就是这样吗?
LinkedBlockingQueue这个就牛逼了,相当于无底洞,有多少处理多少,此时线程池能够创建的最大线程数是corePoolSize,而maximumPoolSize就成了摆设,这等于说是完全取决于系统的性能。
/** * *阻塞队列 */ public class LinkedBlockingQueueTest { public static void main(String[] args) { // 要处理的线程数过大时,是否抛出异常,取决于机器的性能 ExecutorService service = new ThreadPoolExecutor( 1, 2, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() ); for (int i = 0; i < 10000; i++) { service.execute(() -> System.out.println("当前线程 " + Thread.currentThread().getName()) ); } service.shutdown(); } }
最后一个队列是PriorityBlockingQueue,它是一种有优先级的*阻塞队列,默认的元素执行顺序是升序,可以通过自定义接口Comparable<T>实现compareTo()方法来指定队列中的元素执行顺序。
/** * 测试类 */ public class Test1 implements Runnable, Comparable<Test1> { private int priority; public Test1(int priority) { this.priority = priority; } public int getPriority() { return priority; } public void setPriority(int priority) { this.priority = priority; } @Override public int compareTo(Test1 o) { // 返回1时为升序 // 返回-1为降序 return this.priority > o.priority ? -1 : 1; } @Override public void run() { System.out.println( "当前线程 " + Thread.currentThread().getName() + ", priority = " + this.priority ); } } /** * 有优先级的*阻塞队列 */ public class PriorityBlockingQueueTest { public static void main(String[] args) { ExecutorService service = new ThreadPoolExecutor( 1, 2, 1000, TimeUnit.MILLISECONDS, new PriorityBlockingQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() ); for (int i = 0; i < 10; i++) { service.execute(new Test1(i)); } service.shutdown(); } }
如果想在线程池的执行线程中加入一点自己希望的动作,可以通过自定义ThreadFactory实现。
/** * 测试类 */ public class Test2 implements Runnable { private String name; public Test2(String name) { this.name = name; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public void run() { System.out.println(this.getName() + ",当前线程 " + Thread.currentThread().getName()); } } /** * 自定义ThreadFactory */ public class SelfDefineThreadPoolExecutor { public static void main(String[] args) { ExecutorService service = new ThreadPoolExecutor( 1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(8), // 自定义ThreadFactory new ThreadFactory() { @Override public Thread newThread(Runnable r) { System.out.println("线程 " + r.hashCode() + " 创建"); return new Thread(r, "thread-pool-" + r.hashCode()); } }, // 加入自定义动作 new ThreadPoolExecutor.CallerRunsPolicy() ) { public void beforeExecute(Thread thread, Runnable runnable) { System.out.println(((Test2) runnable).getName() + " 准备执行"); } public void afterExecute(Thread thread, Runnable runnable) { System.out.println(((Test2) runnable).getName() + " 执行完毕"); } public void terminated() { System.out.println("线程池关闭"); } }; for (int i = 0; i < 10; i++) { service.execute(new Test2("Test2" + i)); } service.shutdown(); } }
其实主要是把常用那几个workQueue搞搞清楚,因为这几个在今后的工作中可能会用到,尤其是ArrayBlockingQueue,它和后面会说的另两个神器,可以说是是「线程三宝」。