Java线程池使用和源码分析

时间:2021-12-18 05:39:55

1.为什么使用线程池

在多线程编程中一项很重要的功能就是执行任务,而执行任务的方式有很多种,为什么一定需要使用线程池呢?下面我们使用Socket编程处理请求的功能,分别对每种执行任务的方式进行分析。

1.1串行执行任务

当Socket监听到客户端有连接,通过handleSocket方法顺序的处理每一个客户端连接,当处理完成后,继续监听。代码如下:

ServerSocket serverSocket = new ServerSocket();
SocketAddress endpoint = new InetSocketAddress(host, port);
serverSocket.bind(endpoint,1023);
while (!isStop) {
Socket socket = serverSocket.accept();
handleSocket(socket);
}

这种方式的缺点非常明显:当我有多个客户端请求时,在server处理一个请求的过程中,其他请求都需要等待前一个请求处理完毕。这种在高并发情况下几乎不可用。

1.2为每个任务创建一个线程

针对上面的问题进行优化:为每一个客户端请求创建一个线程来处理请求,主线程只需要创建线程,之后即可继续坚挺客户端请求.流程图如下:

Java线程池使用和源码分析

代码如下:

ServerSocket serverSocket = new ServerSocket();
SocketAddress endpoint = new InetSocketAddress(host, port);
serverSocket.bind(endpoint,1023);
while (!isStop) {
Socket socket = serverSocket.accept();
new SocketHandler(socket, THREAD_NAME_PREFIX + threadIndex++).start();
}

这种方式有以下优点:

1.将处理客户端连接的操作从主线程中分离出去,使得主循环可以更快的响应下一次请求。

2.处理客户端连接的操作是并行的,提高了程序的吞吐量。

但是这种方式有有以下几个缺点:

1.处理请求的线程必须是线程安全的

2.线程的创建和销毁都需要开销,当大量创建线程的时候,将会消耗大量计算机资源

3.当可用的CPU数量小于可运行的线程的时候,那么多出来的线程会占用内存资源,给垃圾回收带来压力,并且在大量线程竞争CPU资源的时候会有很大的性能开销

4.JVM中可创建的线程数存在一个上限,这个上限随着平台的不同而不同,并且受多个因素的限制,包括JVM的启动参数,每个线程所占用的内存大小等,如果超出这些限制,将会抛出OOM异常。

1.3 使用线程池处理客户端请求

对于1.2中出现的问题,最好的解决方案就是使用线程池来执行task,这样可以对创建的线程总数做限制,从而避免1.2中的问题。流程图如下:

Java线程池使用和源码分析

处理方式如下:

ServerSocket serverSocket = new ServerSocket();
SocketAddress endpoint = new InetSocketAddress(host, port);
serverSocket.bind(endpoint,1023);
while (!isStop) {
Socket socket = serverSocket.accept();
executorService.execute(new SocketHandler(socket, THREAD_NAME_PREFIX + threadIndex++));
}

此中方式有以下几个优点:

1.任务提交和任务执行分离开

2.执行任务的线程可以重用,减少了线程创建和销毁的开销,同时当任务到达时可以直接使用创建好的线程执行任务,也提高了程序的响应速度。

2.java中线程池介绍

在java中线程池的实现是基于生产者-消费者模式的,线程池的功能将任务的提交和任务的执行分离,任务提交的过程为生产者,执行任务的过程为消费过程。具体的分析见源码分析。java线程池的顶层接口为Executor,源码如下:

public interface Executor {
void execute(Runnable command);
}

此接口为所有线程池实现的顶层接口,其规定了可以接受的task类型为Runnable实现类,但是具体的执行task的逻辑由线程池实现类自己定义,比如:

可以使用主线程串行执行任务,

也可以为每个任务创建一个新的线程

或者提前创建好一组线程,每次执行任务的时候从一组线程中取,等等

对于线程池的执行策略主要有以下几个方面:

1.在什么线程中执行任务

2.按照什么顺序执行任务(FIFO、LIFO、优先级?)

3.有多少个任务可以并发执行

4.最多可以有多少个任务在队列中等待执行

5.当等待队列中达到最大值的时候,怎么样拒绝新提交的task

6.在执行一个任务之前或者之后需要做哪些操作?

应该根据具体的业务选择不同的执行策略。在java类库中提供了Executors工具类来常见默认策略的线程池。主要有以下几个接口:

public static ExecutorService newFixedThreadPool(int nThreads)
将会创建一个固定大小的线程池,每当有新任务提交的时候,当线程总数没有达到核心线程数的时候,为每个任务创建一个新线程,当线程的个数到达最大值后,重用之前创建的线程,当线程因为未知异常而停止时候,将会重现创建一个线程作为补充。 public static ExecutorService newCachedThreadPool()
根据需求创建线程的个数,当线程数大于任务数的时候,将会注销多余的线程 public static ExecutorService newSingleThreadExecutor()
创建一个单线程的线程池 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
创建一个可执行定时任务的线程池

在以上的例子中,所有提交的task在提交到线程池后其执行状态是不可见的,即主线程无法知道提交的task是否执行结束或者执行结果。针对这个问题,java提供了可以返回数据的task接口Future和Callable接口。

其中Callable接口提供了任务返回数据以及抛出异常的功能,定义如下:

public interface Callable<V> {

    V call() throws Exception;
}

在ExecutorService中所有的submit方法都会返回一个Future对象,其接口定义如下:

public interface Future<V> {

    取消任务执行,当mayInterruptIfRunning为true,interruptedthisthread
boolean cancel(boolean mayInterruptIfRunning);
返回此任务是否在执行完毕之前被取消执行
boolean isCancelled();
返回此任务是否已经完成,包括正常结束,异常结束以及被cancel
boolean isDone();
返回执行结果,当任务没有执行结束的时候,等待
V get() throws InterruptedException, ExecutionException;
}

3.使用线程池可能出现的问题

1.线程饥饿死锁

在单线程的Executor中,如果Executor中执行的一个任务中,再次提交任务到同一个Executor中,并且等待这个任务执行完毕,那么就会发生死锁问题。如下demo中所示:

public class ThreadDeadLock {

    private static final ExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadExecutor();

    public static void main(String[] args) throws Exception {
System.out.println("Main Thread start.");
EXECUTOR_SERVICE.submit(new DeadLockThread());
System.out.println("Main Thread finished."); } private static class DeadLockThread extends Thread{ @Override
public void run() {
try {
System.out.println("DeadLockThread start.");
Future future = EXECUTOR_SERVICE.submit(new DeadLockThread2());
future.get();
System.out.println("DeadLockThread finished.");
} catch (Exception e) { }
}
} private static class DeadLockThread2 extends Thread { @Override
public void run() {
try {
System.out.println("DeadLockThread2 start.");
Thread.sleep(1000 * 10);
System.out.println("DeadLockThread2 finished.");
} catch (Exception e) { }
}
}
}

输出结果为:

Main Thread start.
Main Thread finished.
DeadLockThread start.

对于多个线程的线程池,如果所有正在执行的线程都因为等待处于工作队列中的任务执行而阻塞,那么就会发生线程饥饿死锁。

当往线程池中提交有依赖的任务时,应清楚的知道可能会出现的线程饥饿死锁风险。应考虑是否将依赖的task提交到不同的线程池中

或者使用*的线程池。

只有当任务相对独立时,设置线程池大小和工作队列的大小才是合理的,否则有可能会出现线程饥饿死锁

2.任务运行时间过长

任务执行时间过长会影响线程池的响应时间,当运行时间长的任务远大于线程池线程的个数时,会出现所有线程都在执行运行时间长的任务,从而影响对其他任务的响应。

解决办法:

1.通过限定任务等待的时长,而不要无限期等待下去,当等待超时的时候,可以将任务标记为失败,或者重新放到线程池中。

2.当线程池中阻塞任务过多的时,应该考虑扩大线程池的大小

4.线程池大小的设置

线程池的大小依赖于提交任务的类型以及服务器的可用资源,线程池的大小应该避免设置过大或者过小,当线程设置过打的时候可能会有资源耗尽的风险,线程池设置过小会有可用cpu空闲从而影响系统吞吐量。

影响线程池大小的资源有很多,比如CPU、内存、数据库链接池等,只需要计算资源可用总资源 / 每个任务需要的资源,取最小值,即可得出线程池的上限。

线程池的最小值应该大于可用的CPU数量。

4.java中常用线程池源码分析-ThreadPoolExecutor

ThreadPoolExecutor线程池是比较常用的一个线程池实现类,通过Executors工具类创建的线程池中,其具体实现类是ThreadPoolExecutor。首先我们可以看下ThreadPoolExecutor的构造函数如下:

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

下面分别对构造函数中的各个参数对应的策略进行分析:

1.线程的创建与销毁

首先构造函数中corePoolSize、maximumPoolSize、keepAliveTime和unit参数影响线程的创建和销毁。其中corePoolSize为核心线程数,当第一次提交任务的时候如果正在执行的线程数小于corePoolSize,则新建一个线程执行task,如果已经超过corePoolSize,则将任务放到任务队列中等待执行。当任务队列的个数到达上限的时候,并且工作线程数量小于maximumPoolSize,则继续创建线程执行工作队列中的任务。当任务的个数小于maximumPoolSize的时候,将会把空闲的线程标记为可回收的垃圾线程。对于以下代码段测试此功能:

public class ThreadPoolTest {

    private static ThreadPoolExecutor executorService = new ThreadPoolExecutor(3, 6,100, TimeUnit.SECONDS, new LinkedBlockingQueue<>(3));

    public static void main(String[] args) throws Exception {
for (int i = 0; i< 9; i++) {
executorService.submit(new Task());
System.out.println("Active thread:" + executorService.getActiveCount() + ".Task count:" + executorService.getTaskCount() + ".TaskQueue size:" + executorService.getQueue().size());
}
} private static class Task extends Thread { @Override
public void run() {
try {
Thread.sleep(1000 * 100);
} catch (Exception e) { }
}
} }

输出结果为:

Active thread:1.Task count:1.TaskQueue size:0
Active thread:2.Task count:2.TaskQueue size:0
Active thread:3.Task count:3.TaskQueue size:0
Active thread:3.Task count:4.TaskQueue size:1
Active thread:3.Task count:5.TaskQueue size:2
Active thread:3.Task count:6.TaskQueue size:3
Active thread:4.Task count:7.TaskQueue size:3
Active thread:5.Task count:8.TaskQueue size:3
Active thread:6.Task count:9.TaskQueue size:3

2.任务队列

在ThreadPoolExecutor的构造函数中可以传入保存任务的队列,当新提交的任务没有空闲线程执行时候,会将task保存到此队列中。保存的顺序是根据插入的顺序或者Comparator来排序的。

3.饱和策略

ThreadPoolExecutor.AbortPolicy
抛出RejectedExecutionException ThreadPoolExecutor.CallerRunsPolicy
将任务的执行交给调用者,即将本该异步执行的任务变成同步执行。

4.线程工厂

当线程池需要创建线程的时候,默认是使用线程工厂方法来创建线程的,通常情况下我们通过指定线程工厂的方式来为线程命名,便于出现线程安全问题时候来定位问题。

6.线程池最佳实现

1.项目中所有的线程应该都有线程池来提供,不允许自行创建线程

2.尽量不要用Executors来创建线程,而是使用ThreadPoolExecutor来创建

Executors有以下问题:

1)FixedThreadPool 和 SingleThreadPool:
允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
2)CachedThreadPool 和 ScheduledThreadPool:
允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。