Java并发编程之线程池任务监控
当我们提交runnable或者callable<?>到ThreadPoolExecutor时,我们是无法知道这些任务是在什么时候才真正的执行的,为了实现这个需求,我们需要扩展ThreadPoolExecutor,重写beforeExecute和afterExecute,在这两个方法里分别做一些任务执行前和任务执行后的相关监控逻辑,还有个terminated方法,是在线程池关闭后回调,,另外,我们可以通过getLargestPoolSize()和getCompletedTaskCount()来分别获取线程池数的峰值和线程池已完成的任务数。
下面就一个完整的例子来说明如何进行:
自定义MonitorHandler接口,把before和after抽象出来:
- package cc.lixiaohui.demo.concurrent;
- /**
- * 监控处理器, 目的是把before和after抽象出来, 以便在{@link MonitorableThreadPoolExecutor}中形成一条监控处理器链
- *
- * @author lixiaohui
- * @date 2016年10月11日 下午7:18:38
- *
- */
- public interface MonitorHandler {
- /**
- * 改监控任务是否可用
- *
- * @return
- */
- boolean usable();
- /**
- * 任务执行前回调
- *
- * @param thread 即将执行该任务的线程
- * @param runnable 即将执行的任务
- */
- void before(Thread thread, Runnable runnable);
- /**
- * <pre>
- * 任务执行后回调
- * 注意:
- * 1.当你往线程池提交的是{@link Runnable} 对象时, 参数runnable就是一个{@link Runnable}对象
- * 2.当你往线程池提交的是{@link java.util.concurrent.Callable<?>} 对象时, 参数runnable实际上就是一个{@link java.util.concurrent.FutureTask<?>}对象
- * 这时你可以通过把参数runnable downcast为FutureTask<?>或者Future来获取任务执行结果
- *
- * @param runnable 执行完后的任务
- * @param throwable 异常信息
- */
- void after(Runnable runnable, Throwable throwable);
- /**
- * 线程池关闭后回调
- *
- * @param largestPoolSize
- * @param completedTaskCount
- */
- void terminated(int largestPoolSize, long completedTaskCount);
- }
扩展ThreadPoolExecutor,增加监控的逻辑,如果监控比较耗时的话,为了不影响业务线程池的执行效率,我们应该将before,after和terminated方法的调用封装为统一的Runnable交给非业务线程池内的Thread来跑(新建个Thread或者线程池):
- package cc.lixiaohui.demo.concurrent;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.concurrent.BlockingQueue;
- import java.util.concurrent.RejectedExecutionHandler;
- import java.util.concurrent.ThreadFactory;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
- /**
- * 可监控的线程池, 可有多个监控处理器,如果监控的逻辑是比较耗时的话, 最好另起个线程或者线程池专门用来跑MonitorHandler的方法.
- *
- * @author lixiaohui
- * @date 2016年10月11日 下午7:15:16
- *
- */
- public class MonitorableThreadPoolExecutor extends ThreadPoolExecutor {
- /**
- * 可有多个监控处理器
- */
- private Map<String, MonitorHandler> handlerMap = new HashMap<String, MonitorHandler>();
- private final Object lock = new Object();
- public MonitorableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
- }
- public MonitorableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
- }
- public MonitorableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
- }
- public MonitorableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
- }
- @Override
- protected void beforeExecute(Thread t, Runnable r) {
- super.beforeExecute(t, r);
- // 依次调用处理器
- for (MonitorHandler handler : handlerMap.values()) {
- if (handler.usable()) {
- handler.before(t, r);
- }
- }
- }
- @Override
- protected void afterExecute(Runnable r, Throwable t) {
- super.afterExecute(r, t);
- // 依次调用处理器
- for (MonitorHandler handler : handlerMap.values()) {
- if (handler.usable()) {
- handler.after(r, t);
- }
- }
- }
- /*
- * @see java.util.concurrent.ThreadPoolExecutor#terminated()
- */
- @Override
- protected void terminated() {
- super.terminated();
- for (MonitorHandler handler : handlerMap.values()) {
- if (handler.usable()) {
- handler.terminated(getLargestPoolSize(), getCompletedTaskCount());
- }
- }
- }
- public MonitorHandler addMonitorTask(String key, MonitorHandler task, boolean overrideIfExist) {
- if (overrideIfExist) {
- synchronized (lock) {
- return handlerMap.put(key, task);
- }
- } else {
- synchronized (lock) {
- return handlerMap.putIfAbsent(key, task);
- }
- }
- }
- public MonitorHandler addMonitorTask(String key, MonitorHandler task) {
- return addMonitorTask(key, task, true);
- }
- public MonitorHandler removeMonitorTask(String key) {
- synchronized (lock) {
- return handlerMap.remove(key);
- }
- }
- }
测试程序:
- package cc.lixiaohui.demo.concurrent;
- import java.io.IOException;
- import java.util.Map;
- import java.util.concurrent.Callable;
- import java.util.concurrent.CancellationException;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.Future;
- import java.util.concurrent.FutureTask;
- import java.util.concurrent.LinkedBlockingQueue;
- import java.util.concurrent.TimeUnit;
- import cc.lixiaohui.util.RandomUtils;
- /**
- * @author lixiaohui
- * @date 2016年10月11日 下午8:11:39
- *
- */
- public class Tester {
- static volatile boolean stop = false;
- public static void main(String[] args) throws InterruptedException, IOException {
- // fixed size 5
- final MonitorableThreadPoolExecutor pool = new MonitorableThreadPoolExecutor(5, 10, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
- pool.addMonitorTask("TimeMonitorTask", newTimeMonitorHandler());
- // 起一个线程不断地往线程池丢任务
- Thread t = new Thread(new Runnable() {
- public void run() {
- startAddTask(pool);
- }
- });
- t.start();
- // 丢任务丢20 ms
- Thread.sleep(50);
- stop = true;
- t.join();
- pool.shutdown();
- // 等线程池任务跑完
- pool.awaitTermination(100, TimeUnit.SECONDS);
- }
- private static MonitorHandler newTimeMonitorHandler() {
- return new MonitorHandler() {
- // 任务开始时间记录map, 多线程增删, 需用ConcurrentHashMap
- Map<Runnable, Long> timeRecords = new ConcurrentHashMap<Runnable, Long>();
- public boolean usable() {
- return true;
- }
- public void terminated(int largestPoolSize, long completedTaskCount) {
- System.out.println(String.format("%s:largestPoolSize=%d, completedTaskCount=%s", time(), largestPoolSize, completedTaskCount));
- }
- public void before(Thread thread, Runnable runnable) {
- System.out.println(String.format("%s: before[%s -> %s]", time(), thread, runnable));
- timeRecords.put(runnable, System.currentTimeMillis());
- }
- public void after(Runnable runnable, Throwable throwable) {
- long end = System.currentTimeMillis();
- Long start = timeRecords.remove(runnable);
- Object result = null;
- if (throwable == null && runnable instanceof FutureTask<?>) { // 有返回值的异步任务,不一定是Callable<?>,也有可能是Runnable
- try {
- result = ((Future<?>) runnable).get();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt(); // reset
- } catch (ExecutionException e) {
- throwable = e;
- } catch (CancellationException e) {
- throwable = e;
- }
- }
- if (throwable == null) { // 任务正常结束
- if (result != null) { // 有返回值的异步任务
- System.out.println(String.format("%s: after[%s -> %s], costs %d millisecond, result: %s", time(), Thread.currentThread(), runnable, end - start, result));
- } else {
- System.out.println(String.format("%s: after[%s -> %s], costs %d millisecond", time(), Thread.currentThread(), runnable, end - start));
- }
- } else {
- System.err.println(String.format("%s: after[%s -> %s], costs %d millisecond, exception: %s", time(), Thread.currentThread(), runnable, end - start, throwable));
- }
- }
- };
- }
- // 随机runnable或者callable<?>, 任务随机抛异常
- private static void startAddTask(MonitorableThreadPoolExecutor pool) {
- int count = 0;
- while (!stop) {
- if (RandomUtils.randomBoolean()) {// 丢Callable<?>任务
- pool.submit(new Callable<Boolean>() {
- public Boolean call() throws Exception {
- // 随机抛异常
- boolean bool = RandomUtils.randomBoolean();
- // 随机耗时 0~100 ms
- Thread.sleep(RandomUtils.randomInt(100));
- if (bool) {
- throw new RuntimeException("thrown randomly");
- }
- return bool;
- }
- });
- } else { // 丢Runnable
- pool.submit(new Runnable() {
- public void run() {
- // 随机耗时 0~100 ms
- try {
- Thread.sleep(RandomUtils.randomInt(100));
- } catch (InterruptedException e) {}
- // 随机抛异常
- if (RandomUtils.randomBoolean()) {
- throw new RuntimeException("thrown randomly");
- }
- };
- });
- }
- System.out.println(String.format("%s:submitted %d task", time(), ++count));
- }
- }
- private static String time() {
- return String.valueOf(System.currentTimeMillis());
- }
- }
一个较短的结果:
- 1476253228222: before[Thread[pool-1-thread-1,5,main] -> java.util.concurrent.FutureTask@548bb979]
- 1476253228222:Thread[Thread-0,5,main], submitted 1 task
- 1476253228253:Thread[Thread-0,5,main], submitted 2 task
- 1476253228264: before[Thread[pool-1-thread-2,5,main] -> java.util.concurrent.FutureTask@97e041d]
- 1476253228264:Thread[Thread-0,5,main], submitted 3 task
- 1476253228265: before[Thread[pool-1-thread-3,5,main] -> java.util.concurrent.FutureTask@7d6d5cc]
- 1476253228271: after[Thread[pool-1-thread-2,5,main] -> java.util.concurrent.FutureTask@97e041d], costs 7 millisecond, exception: java.util.concurrent.ExecutionException: java.lang.RuntimeException: thrown randomly
- 1476253228295: after[Thread[pool-1-thread-1,5,main] -> java.util.concurrent.FutureTask@548bb979], costs 42 millisecond
- 1476253228347: after[Thread[pool-1-thread-3,5,main] -> java.util.concurrent.FutureTask@7d6d5cc], costs 82 millisecond, exception: java.util.concurrent.ExecutionException: java.lang.RuntimeException: thrown randomly
- 1476253228347:Thread[pool-1-thread-3,5,main], largestPoolSize=3, completedTaskCount=3