Java线程池详解

时间:2023-01-31 11:09:17
  1. 减少资源开销,不用频繁的创建和销毁线程
  2. 提高响应时长,有任务时可直接执行。
  3. 提高线程的可管理性,所有线程资源都由线程池统一管理。

二、线程池的主要参数

线程池ThreadPoolExecutor的继承关系:

Java线程池详解
想知道有哪些参数,先看参数最多的构造函数:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

可以看到创建一个线程池需要七个参数。

  1. corePoolSize:线程池核心线程数量大小,有新任务进来时,如果线程池中的线程数小于这个值,则会创建新的线程来执行任务(不管有没有空闲线程,都会创建),直到线程数量大于等于这个数量。
  2. maximumPoolSize:线程池最大线程数量大小(当达到核心线程数,且队列任务已满,会增加线程至最大线程数)。
  3. keepAliveTime:线程的最大空余时间,大于这个时间将被回收(线程数大于核心线程数时,多余的线程空闲时长达到这个值就会被回收)。
  4. unit:空余时长的单位。
  5. workQueue:阻塞队列,当核心线程数已满,任务会被放到这个队列中。
  6. threadFactory:线程工厂,线程池中的线程都是由这个线程工厂创建的,线程池提供了默认的线程工厂。
  7. handler:拒绝策略,当队列任务已满,且线程数量达到最大线程数,新进入的任务会执行这个拒绝策略来选择丢弃哪个任务,线程池提供了四种默认的拒绝策略。

线程工厂ThreadFactor,线程池中默认给定了一个线程工厂DefaultThreadFactory,线程工厂的作用是用于创建线程,自己创建线程工厂时需要实现ThreadFactor接口,该接口中只有一个方法Thread newThread(Runnable r);,即创建线程的方法,自定义线程工厂的好处是可以自定义线程名称。

阻塞队列BlockingQueue,阻塞队列是在核心线程满了以后存放任务使用,常用的有LinkedBlockingQueueArrayBlockingQueueSynchronousQueueDelayedWorkQueue等,当阻塞队列也满时,会创建线程至最大线程数,如果队列已满,也达到最大线程数,则执行拒绝策略。

拒绝策略RejectedExecutionHandler

  • DiscardPolicy:丢弃该任务,不抛异常。
  • DiscardOldestPolicy:丢弃最早加入队列的任务,不抛异常。
  • AbortPolicy:丢弃该任务并抛出异常RejectedExecutionException,默认使用该策略。
  • CallerRunsPolicy:由调用线程池的线程来执行当前任务。

查看源码:

public static class DiscardPolicy implements RejectedExecutionHandler {
    public DiscardPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    public DiscardOldestPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}
public static class AbortPolicy implements RejectedExecutionHandler {
    public AbortPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}
public static class CallerRunsPolicy implements RejectedExecutionHandler {
    public CallerRunsPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

如果想要实现自己的拒绝策略,那么实现RejectedExecutionHandler接口即可。

三、线程池的执行流程

如下图:

Java线程池详解

查看源码执行流程,execute()方法:

public void execute(Runnable command) {
    //传入的任务是否为空
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. 如果运行的线程数小于corePoolSize,则创建新的线程(线程池状态为正在运行的状态)。
     *
     * 2. 如果任务排队成功,仍然需要检查线程池的状态,如果不是可运行的状态则回滚刚刚的操作。
     *
     * 3. 如果排队失败,则尝试添加新的线程,如果已经达到线程池最大数量,则执行拒绝策略。
     */
     //判断线程数是否小于核心线程数
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        //如果小于,新增一个线程来执行
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //如果核心线程数已满,则向阻塞队列中添加任务
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //如果添加失败,则创建线程至最大值
    else if (!addWorker(command, false))
        //如果创建失败,则执行拒绝策略
        reject(command);
}

四、线程池的状态

查看源码,发现线程池有五种状态,如下:

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
  1. RUNNING,运行状态,线程池创建完成后就是运行状态。
  2. SHUTDOWN,关闭状态,执行shutdown()方法后进入此状态,继续处理队列中的任务,但是不再接收新的任务。
  3. STOP,停止状态,shutdownNow()方法后进入此状态,不处理队列中的任务,也不接收新的任务。
  4. TIDYING,整理状态,运行的线程数为0,队列中任务为空时,则进入此状态,进入此状态后会执行terminated()方法,进入销毁状态。
  5. TERMINATED,销毁状态,执行terminated()方法,进入此状态。

状态转换如下图:

Java线程池详解

五、Java提供的快速创建的线程池

Executors提供了几种快速创建的线程池:

  • newSingleThreadExecutor,只有一个线程的线程池
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
  • newFixedThreadPool,固定线程数量的线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
  • newCachedThreadPool,可缓存的线程池,即不限制线程数量的线程池。
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
  • newScheduledThreadPool,定时线程池,可周期性或延迟执行任务的线程池(使用延时队列)。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}
  • newSingleThreadScheduledExecutor,单个线程的定时线程池,功能和上面一样。
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1));
}

六、最后

欢迎关注个人微信公众号

Java线程池详解