线程池的使用
一丶什么是线程池
为了避免系统频繁的创建和销毁线程, 需要将创建好的线程"存储"管理起来复用, 可以形象的理解为"池子", 当需要线程时, 则向线程池申请一个线程, 用完之后,并不会销毁, 而是将线程归还给线程池, 减少线程的创建和销毁.
二丶为什么需要线程池
1) 多线程虽然可以提高系统性能,但如果数量不加以控制, 频繁的创建和销毁线程, 会消耗系统性能
2) 线程会占用内存, 如果过多创建线程, 会出现内存溢出的问题
因此, 在实际生产环境中, 线程的数量必须得到控制, 盲目的大量创建线程对系统性能是有害的.
二丶JDK中的线程池
2.1) 三种线程池接口
a. Executor 执行器执行Runnable任务, 不可获取任务执行后的结果
Executor
b. ExecutorService 执行器服务, 继承于Executor接口, 执行Callable任务, 可以任务执行后的任务
c. ScheduledExecutorService 调度执行器服务, 继承于ExecutorService接口, 可用于定时调度执行提交的任务
2.2) Executos 线程池工厂类, 用于生产不同特性的线程池
a. Executors中的newFixedThreadPool()方法
创建固定线程数量的线程池, 提交任务到该线程池时, 若没有空闲的线程, 则会保存到队列中, 直到有空闲的线程才会执行. 由于使用了*阻塞队列, 所以该线程池不会拒绝任务, 适合执行可预估任务数量的任务
/** * Creates a thread pool that reuses a fixed number of threads * operating off a shared unbounded queue. At any point, at most * {@code nThreads} threads will be active processing tasks. * If additional tasks are submitted when all threads are active, * they will wait in the queue until a thread is available. * If any thread terminates due to a failure during execution * prior to shutdown, a new one will take its place if needed to * execute subsequent tasks. The threads in the pool will exist * until it is explicitly {@link ExecutorService#shutdown shutdown}. * * @param nThreads the number of threads in the pool * @return the newly created thread pool * @throws IllegalArgumentException if {@code nThreads <= 0} */ public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
b. Executors中的newSingleThreadExecutor()方法
创建只有一个线程的线程池, 提交的任务若没有空闲线程, 会被保存到队列中, 直到线程空闲才会按陷入先出的顺序执行队列中的任务
/** * Creates an Executor that uses a single worker thread operating * off an unbounded queue. (Note however that if this single * thread terminates due to a failure during execution prior to * shutdown, a new one will take its place if needed to execute * subsequent tasks.) Tasks are guaranteed to execute * sequentially, and no more than one task will be active at any * given time. Unlike the otherwise equivalent * {@code newFixedThreadPool(1)} the returned executor is * guaranteed not to be reconfigurable to use additional threads. * * @return the newly created single-threaded Executor */ public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
c. Executos中的newCachedThreadPool()方法
创建可缓存复用的线程池, 该池没有固定的线程数量, 适合执行大量执行时间短的任务. 提交的任务会优先使用空闲的线程, 否则会创建新的线程执行. 这时因为SynchronousQueue不会保存任务. 空闲线程空闲时间超过60s将会被销毁
/** * Creates a thread pool that creates new threads as needed, but * will reuse previously constructed threads when they are * available, and uses the provided * ThreadFactory to create new threads when needed. * @param threadFactory the factory to use when creating new threads * @return the newly created thread pool * @throws NullPointerException if threadFactory is null */ public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); }
d. Executors中的newSingleThreadScheduledExecutor()方法:
创建只有一个线程的ScheduledExecutorService,SchduledExecutorService在ExecutorService接口之上拓展了可用于在给定时间执行某任务的功能, 如在某个固定的延时之后执行某个任务, 或者周期性执行某个任务
/** * Creates a single-threaded executor that can schedule commands * to run after a given delay, or to execute periodically. * (Note however that if this single * thread terminates due to a failure during execution prior to * shutdown, a new one will take its place if needed to execute * subsequent tasks.) Tasks are guaranteed to execute * sequentially, and no more than one task will be active at any * given time. Unlike the otherwise equivalent * {@code newScheduledThreadPool(1)} the returned executor is * guaranteed not to be reconfigurable to use additional threads. * @return the newly created scheduled executor */ public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1)); }
e. Executors中的newSchduledExecutorPool()方法
返回指定线程数量的SchduledExecutorService
/** * Creates a thread pool that can schedule commands to run after a * given delay, or to execute periodically. * @param corePoolSize the number of threads to keep in the pool, * even if they are idle * @return a newly created scheduled thread pool * @throws IllegalArgumentException if {@code corePoolSize < 0} */ public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
f. Executors中的newWorkStealingPool()方法
创建可用于分治执行任务的线程池, 大任务分解成小任务并行执行
/** * Creates a thread pool that maintains enough threads to support * the given parallelism level, and may use multiple queues to * reduce contention. The parallelism level corresponds to the * maximum number of threads actively engaged in, or available to * engage in, task processing. The actual number of threads may * grow and shrink dynamically. A work-stealing pool makes no * guarantees about the order in which submitted tasks are * executed. * * @param parallelism the targeted parallelism level * @return the newly created thread pool * @throws IllegalArgumentException if {@code parallelism <= 0} * @since 1.8 */ public static ExecutorService newWorkStealingPool(int parallelism) { return new ForkJoinPool (parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); }
四丶ThreadPoolExecutor
从上面的Executors中的多个方法来看, 许多线程池都是使用了ThreadPoolExecutor类, 这是线程池的核心类,
4.1) ThreadPoolExecutor的构造函数
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
// 省略赋值代码... }
构造参数含义如下:
1. corePoolSize: 指定线程池的核心线程数, 这是线程池最小的线程数
2. maximumPoolSize: 指定线程池的最大线程数, 当线程池没有空闲线程, 且任务塞满队列时, 就会创建额外的线程来执行任务. 总线程数小于maximumPoolSize
3. keepAliveTime: 超过核心线程数的线程空闲的最大时间, 空闲时间超过该时间后, 竟会被销毁
4. unit: keepAliveTime对应的时间单位
5. workQueue: 任务队列, 当没有空闲的核心线程时, 任务就会保存到该队列中, 等待执行
6. threadFactory: 线程工厂, 用于创建线程, 可以指定线程名字
7. handler: 拒绝执行处理器, 当没有空闲的核心线程时, 任务就会保存到任务队列中, 如果塞满队列, 且线程数未达到最大线程数, 就会创建额外线程执行任务, 如果已达到最大线程数, 就会使用拒绝执行处理器拒绝超载任务
4.2 ThreadPoolExecutor的工作逻辑
/** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */ public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
调度逻辑分3步走:
a. 当工作线程数< corePoolSize时, 即存在空闲线程或者线程数未满corePoolSize时, 直接分配线程执行
b. 当线程数已满corePoolSize, 且没有空闲线程时, 将任务提交到工作队列中
c. 若任务队列已满, 且线程数<maximumPoolSize, 提交的任务将会由额外创建的线程执行, 若线程数已满maximumPoolSize, 将会使用拒绝处理器拒绝任务.
4.3 几种可使用的阻塞队列
a. 直接提交队列SynchronousQueue:
SynchronousQueue没有容量,插入一个元素,需要等待移除该元素, 线程池使用该队列时, 每提交一个任务需要创建一个线程执行, 所以需要指定maximumPoolSize, 否则很容易执行拒绝策略
b. 有界任务队列ArrayBlockingQueue:
ArrayBlockingQueue是一个有界队列, 从指定容量创建之后, 容量就不会变化了. 线程池使用该队列时, 当队列满之后, 如果线程数小于最大线程数, 就会创建新线程执行, 否则执行拒绝策略
c. *任务队列LinkedBlockingQueue:
LinkedBlockingQueue是一个使用链表实现的*阻塞队列, 线程池使用该队列时, 队列永远不会满, 即永远不会执行拒绝策略, 直至内存满为止. Executors默认使用的是该队列, 如果任务数量不可预估,为防止应用程序崩溃, 需重新指定阻塞队列
d. 优先任务队列PriorityBlockingQueue:
PriorityBlockingQueue是一个优先阻塞队列, 进入该队列的任务, 可按照指定的算法排序, 按优先级被执行. 而ArrayBlockingQueue, LinkedBlockingQueue则是先入先出队列
4.4) 拒绝策略
JDK内置的拒绝策略有4种:
a. AbortPolicy策略: 该策略会直接抛出异常, 组织系统正常工作
b. CallerRunsPolicy策略: 只要线程池未关闭, 该策略会直接在调用者线程中运行当前被丢弃任务. 这样不会真正的丢弃任务,但任务提交线程性能可能会急剧下降.
c. DiscardOledestPolicy策略: 该策略将丢弃最老的一个请求, 也就是即将被执行的一个任务, 并尝试再次提交当前任务.
d. DiscardPolicy策略: 该策略默默的丢弃无法处理的任务, 不予任何处理.
4.5) 注意被"吞掉"异常
a. 放弃submit() , 改用 execute()
b. 改造submit()
捕获异常, 手动抛出异常 (可以参考<Java高并发程序设计>P114页)
学习资料:
<java高并发程序设计>