11 java 线程池 使用实例

时间:2023-12-06 13:30:50

在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题:

如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。

那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务?

在Java中可以通过线程池来达到这样的效果。

1 线程池做什么

网络请求通常有两种形式:

第一种,请求不是很频繁,而且每次连接后会保持相当一段时间来读数据或者写数据,最后断开,如文件下载,网络流媒体等。

另一种形式是请求频繁,但是连接上以后读/写很少量的数据就断开连接。考虑到服务的并发问题,如果每个请求来到以后服务都为它启动一个线程,那么这对服务的资源可能会造成很大的浪费,特别是第二种情况。

因为通常情况下,创建线程是需要一定的耗时的,设这个时间为T1,而连接后读/写服务的时间为T2,当T1>>T2时,我们就应当考虑一种策略或者机制来控制,使得服务对于第二种请求方式也能在较低的功耗下完成。

通常,我们可以用线程池来解决这个问题,首先,在服务启动的时候,我们可以启动好几个线程,并用一个容器(如线程池)来管理这些线程。

当请求到来时,可以从池中取一个线程出来,执行任务(通常是对请求的响应),当任务结束后,再将这个线程放入池中备用;

如果请求到来而池中没有空闲的线程,该请求需要排队等候。最后,当服务关闭时销毁该池即可。

多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。
假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。
如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性

线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提高服务器程序性能的

它把T1,T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1,T3的开销了。

线程池不仅调整T1,T3产生的时间段,而且它还显著减少了创建线程的数目,看一个例子:

假设一个服务器一天要处理50000个请求,并且每个请求需要一个单独的线程完成。在线程池中,线程数一般是固定的,所以产生线程总数不会超过线程池中线程的数目,

而如果服务器不利用线程池来处理这些请求则线程总数为50000。一般线程池大小是远小于50000。

所以利用线程池的服务器程序不会为了创建50000而在处理请求时浪费时间,从而提高效率。

合理利用线程池能够带来三个好处:

第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。

第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。

第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

但是要做到合理的利用线程池,必须对其原理了如指掌。

2 线程池的继承架构

程序启动一个新线程成本是比较高的,因为它涉及到要与操作系统进行交互。而使用线程池可以很好的提高性能,尤其是当程序中要创建大量生存期很短的线程时,更应该考虑使用线程池。

线程池里的每一个线程代码结束后,并不会死亡,而是再次回到线程池中成为空闲状态,等待下一个对象来使用。

在JDK5之前,我们必须手动实现自己的线程池,从JDK5开始,Java内置支持线程池

Java里面线程池的*接口是Executor,但是严格意义上讲Executor并不是一个线程池,而只是一个执行线程的工具。

真正的线程池接口是ExecutorService。下面这张图完整描述了线程池的类体系结构。

11 java 线程池 使用实例

Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的;

然后ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;

抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法;

然后ThreadPoolExecutor继承了类AbstractExecutorService。

标记一下比较重要的类:

ExecutorService:

真正的线程池接口。

ScheduledExecutorService

能和Timer/TimerTask类似,解决那些需要任务重复执行的问题。

ThreadPoolExecutor

ExecutorService的默认实现。

ScheduledThreadPoolExecutor

继承ThreadPoolExecutor的ScheduledExecutorService接口实现,周期性任务调度的类实现。

要配置一个线程池是比较复杂的,尤其是对于线程池的原理不是很清楚的情况下,很有可能配置的线程池不是较优的,

因此在Executors类里面提供了一些静态工厂,生成一些常用的线程池。

newSingleThreadExecutor:创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。

如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。

newFixedThreadPool:创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。

线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。

newCachedThreadPool:创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,

当任务数增加时,此线程池又可以智能的添加新线程来处理任务。

此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。

newScheduledThreadPool:创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。

newSingleThreadExecutor:创建一个单线程的线程池。此线程池支持定时以及周期性执行任务的需求。

这些方法的返回值是ExecutorService对象,该对象表示一个线程池,可以执行Runnable对象或者Callable对象代表的线程。

它提供了如下方法来提交一个任务:

Future<?> submit(Runnable task)

<T> Future<T> submit(Callable<T> task)

Callable 与 Runable的相关内容参见:

03 创建线程的第3式 
02 如何创建线程 线程并发与synchornized
 
3 使用线程池步骤及案例

线程池的好处:线程池里的每一个线程代码结束后,并不会死亡,而是再次回到线程池中成为空闲状态,等待下一个对象来使用。

如何实现线程的代码呢?

A:创建一个线程池对象,控制要创建几个线程对象。

public static ExecutorService newFixedThreadPool(int nThreads)

B:这种线程池的线程可以执行:

可以执行Runnable对象或者Callable对象代表的线程

做一个类实现Runnable接口。

C:调用如下方法即可

Future<?> submit(Runnable task)

<T> Future<T> submit(Callable<T> task)

D:我就要结束,可以吗? 可以。

 package com.jt.thread.demo05;

 import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; class MyRunnable implements Runnable {
@Override
public void run() {
for (int x = 0; x < 100; x++) {
System.out.println(Thread.currentThread().getName() + ":" + x);
11 }
12 }
} public class ExecutorServiceDemo {
public static void main(String[] args) {
// 创建一个线程池对象,控制要创建几个线程对象。
// public static ExecutorService newFixedThreadPool(int nThreads)
ExecutorService pool = Executors.newFixedThreadPool(2); // 可以执行Runnable对象或者Callable对象代表的线程
pool.submit(new MyRunnable());
pool.submit(new MyRunnable()); //结束线程池
pool.shutdown();
}
}

运行结果:

pool-1-thread-1:0

pool-1-thread-1:1

pool-1-thread-1:2

pool-1-thread-2:0

pool-1-thread-2:1

pool-1-thread-2:2

pool-1-thread-2:3

。。。

说明:

(1 newFixedThreadPool

是固定大小的线程池 有结果可见 我们指定2 在运行时就只有2个线程工作

若其中有一个线程异常  会有新的线程替代他

(2 shutdown方法有2个重载:

void shutdown() 启动一次顺序关闭,等待执行以前提交的任务完成,但不接受新任务。

List<Runnable> shutdownNow() 试图立即停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。

(3 submit 与 execute

3.1 submit是ExecutorService中的方法 用以提交一个任务

他的返回值是future对象  可以获取执行结果

<T> Future<T>
submit(Callable<T> task) 提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future。

Future<?> submit(Runnable task) 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。

<T> Future<T> submit(Runnable
task, T result) 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。

3.2 execute是Executor接口的方法

他虽然也可以像submit那样让一个任务执行  但并不能有返回值

void execute(Runnable command)

在未来某个时间执行给定的命令。该命令可能在新的线程、已入池的线程或者正调用的线程中执行,这由 Executor 实现决定。

(4
Future

Future 表示异步计算的结果。

它提供了检查计算是否完成的方法,以等待计算的完成,并获取计算的结果。

计算完成后只能使用 get 方法来获取结果,如有必要,计算完成前可以阻塞此方法。

取消则由 cancel 方法来执行。还提供了其他方法,以确定任务是正常完成还是被取消了。一旦计算完成,就不能再取消计算。

如果为了可取消性而使用 Future 但又不提供可用的结果,则可以声明 Future<?> 形式类型、并返回 null 作为底层任务的结果。

Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。

必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。 

也就是说Future提供了三种功能:

--判断任务是否完成;

--能够中断任务;

--能够获取任务执行结果。

boolean
cancel(boolean mayInterruptIfRunning) 试图取消对此任务的执行。

V get() 如有必要,等待计算完成,然后获取其结果。

V
get(long timeout, TimeUnit unit) 如有必要,最多等待为使计算完成所给定的时间之后,获取其结果(如果结果可用)。

boolean
isCancelled() 如果在任务正常完成前将其取消,则返回 true。

boolean
isDone() 如果任务已完成,则返回 true。

4 线程池简单使用案例2

java.util.concurrent.Executors类的API提供大量创建连接池的静态方法:

 import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
public class JavaThreadPool {
public static void main(String[] args) {
// 创建一个可重用固定线程数的线程池
ExecutorService pool = Executors.newFixedThreadPool(2);
// 创建实现了Runnable接口对象,Thread对象当然也实现了Runnable接口
Thread t1 = new MyThread();
9 Thread t2 = new MyThread();
10 Thread t3 = new MyThread();
11 Thread t4 = new MyThread();
12 Thread t5 = new MyThread();
// 将线程放入池中进行执行
pool.execute(t1);
pool.execute(t2);
pool.execute(t3);
pool.execute(t4);
pool.execute(t5);
// 关闭线程池
pool.shutdown();
}
}
class MyThread extends Thread {
@Override
25 public void run() {
System.out.println(Thread.currentThread().getName() + "正在执行… …");
}
}

运行效果示例:

pool-1-thread-2正在执行… …

pool-1-thread-2正在执行… …

pool-1-thread-2正在执行… …

pool-1-thread-2正在执行… …

pool-1-thread-1正在执行… …

pool-1-thread-1正在执行… …

pool-1-thread-1正在执行… …

pool-1-thread-1正在执行… …

pool-1-thread-1正在执行… …

pool-1-thread-2正在执行… …

可见线程池中有2个线程在工作,可见 newFixedThreadPool 是固定大小的线程池

5 单任务线程池:

//创建一个使用单个 worker 线程的 Executor,以*队列方式来运行该线程。

ExecutorService pool = Executors.newSingleThreadExecutor();

案例:

 import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; public class SingleThreadPollDemo { public static void main(String[] args) {
// 创建一个使用单个 worker 线程的 Executor,以*队列方式来运行该线程。
ExecutorService pool = Executors.newSingleThreadExecutor(); Runnable task1 = new SingelTask();
11 Runnable task2 = new SingelTask();
Runnable task3 = new SingelTask(); pool.execute(task3);
pool.execute(task2);
pool.execute(task1); // 等待已提交的任务全部结束 不再接受新的任务
pool.shutdown();
}
} class SingelTask implements Runnable{ @Override
public void run() {
System.out.println(Thread.currentThread().getName() + "正在执行… …");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "执行完毕");
} }

运行结果:

pool-1-thread-1正在执行… …

pool-1-thread-1执行完毕

pool-1-thread-1正在执行… …

pool-1-thread-1执行完毕

pool-1-thread-1正在执行… …

pool-1-thread-1执行完毕

可见线程池中只有一个线程在执行任务

6 小结:

对于以上两种连接池,大小都是固定的,当要加入的池的线程(或者任务)超过池最大尺寸时候,则入此线程池需要排队等待。

一旦池中有线程完毕,则排队等待的某个线程会入池执行。

其他线程池示例:

固定大小线程池

import java.util.concurrent.Executors;

import java.util.concurrent.ExecutorService;

ExecutorService pool = Executors.newFixedThreadPool(2);

pool.execute(t1);

pool.shutdown();

单任务线程池

ExecutorService pool = Executors.newSingleThreadExecutor();

可变尺寸线程池

ExecutorService pool = Executors.newCachedThreadPool();

延迟连接池

import java.util.concurrent.Executors;

import java.util.concurrent.ScheduledExecutorService;

import java.util.concurrent.TimeUnit;

ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);

pool.schedule(t4, 10, TimeUnit.MILLISECONDS);

单任务延迟连接池

ScheduledExecutorService pool = Executors.newSingleThreadScheduledExecutor();

7 使用示例

 public class Test {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(, , , TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>()); for(int i=;i<;i++){
MyTask myTask = new MyTask(i);
executor.execute(myTask);
System.out.println("线程池中线程数目:"+executor.getPoolSize()+",队列中等待执行的任务数目:"+
executor.getQueue().size()+",已执行完别的任务数目:"+executor.getCompletedTaskCount());
}
executor.shutdown();
}
} class MyTask implements Runnable {
private int taskNum; public MyTask(int num) {
this.taskNum = num;
} @Override
public void run() {
System.out.println("正在执行task "+taskNum);
try {
Thread.currentThread().sleep();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("task "+taskNum+"执行完毕");
}
}

执行结果:

正在执行task 0
线程池中线程数目:1,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
线程池中线程数目:2,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
正在执行task 1
线程池中线程数目:3,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
正在执行task 2
线程池中线程数目:4,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
正在执行task 3
线程池中线程数目:5,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
正在执行task 4
线程池中线程数目:5,队列中等待执行的任务数目:1,已执行玩别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:2,已执行玩别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:3,已执行玩别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:4,已执行玩别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
线程池中线程数目:6,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
正在执行task 10
线程池中线程数目:7,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
正在执行task 11
线程池中线程数目:8,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
正在执行task 12
线程池中线程数目:9,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
正在执行task 13
线程池中线程数目:10,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
正在执行task 14
task 3执行完毕
task 0执行完毕
task 2执行完毕
task 1执行完毕
正在执行task 8
正在执行task 7
正在执行task 6
正在执行task 5
task 4执行完毕
task 10执行完毕
task 11执行完毕
task 13执行完毕
task 12执行完毕
正在执行task 9
task 14执行完毕
task 8执行完毕
task 5执行完毕
task 7执行完毕
task 6执行完毕
task 9执行完毕

从执行结果可以看出,当线程池中线程的数目大于5时,便将任务放入任务缓存队列里面,当任务缓存队列满了之后,便创建新的线程。

如果上面程序中,将for循环中改成执行20个任务,就会抛出任务拒绝异常了。

不过在java doc中,并不提倡我们直接使用ThreadPoolExecutor,而是使用Executors类中提供的几个静态方法来创建线程池:

 Executors.newCachedThreadPool();        //创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE
Executors.newSingleThreadExecutor(); //创建容量为1的缓冲池
Executors.newFixedThreadPool(int); //创建固定容量大小的缓冲池

下面是这三个静态方法的具体实现:

 public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

从它们的具体实现来看,它们实际上也是调用了ThreadPoolExecutor,只不过参数都已配置好了。

  newFixedThreadPool创建的线程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;

  newSingleThreadExecutor将corePoolSize和maximumPoolSize都设置为1,也使用的LinkedBlockingQueue;

  newCachedThreadPool将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,使用的SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。

  实际中,如果Executors提供的三个静态方法能满足要求,就尽量使用它提供的三个方法,因为自己去手动配置ThreadPoolExecutor的参数有点麻烦,要根据实际任务的类型和数量来进行配置。

  另外,如果ThreadPoolExecutor达不到要求,可以自己继承ThreadPoolExecutor类进行重写。