java高并发编程-java线程池浅析

时间:2021-10-27 00:00:10

java线程池在多线程应用场景中被广泛使用,作此记录以便后面翻看,如发现有错误,烦请大家指正。

带着如下问题,我们来了解下java提供的线程池技术

1,为什么要用线程池?

2java提供的线程池有哪几种?API怎么用?

3jdk提供的线程池是一个什么原理,其内部是怎么实现的?

我们带着这些问题来看一下。

首先,为什么要有线程池呢?或者说传统的线程模式存在什么问题需要我们使用线程池?

传统的创建和销毁线程是需要耗费时间的,大量线程的创建和销毁会降低程序的性能;另外线程也是要消耗内存的,大量线程占用着jvm宝贵的内存资源,甚至会导致oom,即便不会导致oom,大量的线程的回收,也会导致gc的时间增加,降低gc的性能。

而运用线程池之后,线程池会预先创建好线程或者在允许的线程数量范围内,线程执行完毕后不会关闭线程而是回收线程到线程池里面,这样创建线程就变成了从池中取空闲线程,销毁线程就变成了回收线程;而且我们还能通过控制线程池的大小来避免大量线程同时存在的情况。

从上看来,貌似所有的问题都得到了解决,但是不是真的是这样的神奇?下面我们来看下jdk对线程池这一块的支持。

JDK提供了一套Excutor框架来实现线程池。核心成员如下图(摘自实战高并发程序设计):

java高并发编程-java线程池浅析

简单说一下

大家看Excutors类,里面定义着n多方法,这里其实是一个生产线程池的工厂类,真正的线程池是ThreadPoolExecutor类,它首先继承自AbstractExcutorService这个抽象类,AbstractExcutorService实现了ExcutorService这个接口,而ExcutorService最终继承Excutor接口,所以七拐八绕的一句话ThreadPoolExecutor实现了Excutor接口。

我们主要介绍Excutors里面的四种线程池,如上图中的四种,其余的有些会在后续的博客文章中用到,到时再说。

1,固定大小的线程池

public class Demo1 {
    public static void main(String[] args) throws Exception{
        ExecutorService es = Executors.newFixedThreadPool(5);
        for(int i =0 ;i<10;i++){
            es.submit(new T1());
        }
    }
    public static class T1 implements Runnable{
        public void run() {
            System.out.println(Thread.currentThread().getName());
        }
    }
}
打印如下:

pool-1-thread-1
pool-1-thread-3
pool-1-thread-5
pool-1-thread-2
pool-1-thread-4
pool-1-thread-3
pool-1-thread-5
pool-1-thread-4
pool-1-thread-2
pool-1-thread-1
从结果可以看出,后5次重用了线程池中的线程。

2,new SingleThreadExcutor

ExecutorService es = Executors.newSingleThreadExecutor();

结果:
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
只有一个线程在跑,分10次

3,new cachedThreadPool

ExecutorService es = Executors.newCachedThreadPool();

结果:
pool-1-thread-1
pool-1-thread-4
pool-1-thread-3
pool-1-thread-2
pool-1-thread-7
pool-1-thread-6
pool-1-thread-5
pool-1-thread-8
pool-1-thread-10
pool-1-thread-9
4,new ThreadScheduledExcutor()

有两个方法:
scheduleAtFixedRate(Runnable command,long initialdelays,long period,TimeUnit)
scheduleAtFixedDelay(Runnable command,long initialdelays,long delay,TimeUnit)
前者不管上个任务是否执行完,每隔一秒执行一次。
后者是在上个任务执行完毕后延时delay在启动执行。
注意异常的处理,异常后线程池会停止执行。
 基本使用如下:
        ScheduledExecutorService es = Executors.newScheduledThreadPool(5);
        es.scheduleAtFixedRate(new T1(),10000l,1000l, TimeUnit.MILLISECONDS) 
 以上就是简单的使用,很简单,大家可以自己动手实验一下,下面我们介绍下实现的原理:

我们知道核心的实现是ThreadPoolExcutor,其核心的构造函数如下:

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
我们逐一介绍下各个参数的意义:

corePoolSize:线程池大小,这个参数的意义重大,决定着新提交的任务是新开线程去执行还是放到任务队列中,也是线程池的最最核心的参数。一般线程池开始时是没有线程的,只有当任务来了并且线程数量小于corePoolSize()才会创建线程。当然我们也可以通过ThreadPoolExcutor的两个方法来提前创建线程,分别是preStartAllCoreThread()、preStartCoreThread()。

maximumPoolSize:最大线程数,线程池能创建的最大线程数量。

keepAliveTime:在线程数量超过corePoolSize后,多余空闲线程(大于corePoolSize的部分线程)的最大存活时间,但是在调用了allowCoreThreadTimeOut(true)后,不管有没有超过corePoolSize,只要超过keepAliveTime,县城都会被干掉,直到线程数量为0。

TimeUnit:时间单位

workQueue:用于存放来不及处理的任务的队列,是一个BlockingQueue

threadFactory:生产线程的工厂类

handler:拒绝策略,当任务太多来不及处理的时候,如何处理。

前面只是大概介绍了下参数的含义,下面介绍下任务提交执行的主要方法:execute(Runnable command)介绍完原理相信大家就会对上面的几个参数就比较熟悉了。

ThreadPoolExcutor代码里有几个需要注意的地方:线程池的状态、任务提交的流程、阻塞队列,决绝策略、线程池关闭。下面一一介绍:

一,线程池的状态:

 ThreadPoolExcutor定义了5种线程池的状态:

 RUNNING = -1 接受新任务,执行阻塞队列中的任务

 SHUTDOWN  = 0 不接受新任务,但执行未执行完的任务包括队列中的任务

 STOP  = 不接受新任务,不执行队列中的任务,打断运行中的任务

 TIDYING = 所有任务都关闭,workercount是0时,触发执行terminated()这个钩子方法,即将关闭状态

 TERMINATED = 3 terminated()hook方法执行完毕

 这几种状态的互相的转换如下:

 RUNNING ----->SHUTDOWN :调用shutdown()方法

 RUNNING,SHUTDOWN  -----> STOP  :调用shutdownNow()方法

 SHUTDOWN ----->TIDYING : 当阻塞队列和线程池都空的时候,会转换为TIDYING状态

 STOP ----->TIDYING : 当线程池为空的时候,会转换为TIDYING状态

 TIDYING ----->TERMINATED :当钩子方法terminated执行完毕的时候

二,任务提交流程

下面我们看下ThreadPoolExcutor的excute方法,代码逻辑如上,分三种情况:

1,如果线程数量小于corePoolSize,则直接尝试创建线程执行任务,但是期间还是会去校验线程池的状态和workerCount,如果检测到状态大于SHUTDOWN或者线程数量大于corePoolSize,创建线程就会失败

2,如果workerCount大于等于corePoolSIze,RUNNING状态:则尝试将任务放入阻塞队列中,如果成功放入后,还会再次检测线程池状态,如果此时检测到状态为非RUNNING,则会将新加入的任务从阻塞队列中移除,并执行拒绝策略;还要检测当前的线程数量,如果线程数量为0,则需要启动线程去执行阻塞队列中的任务。

3,如果放入阻塞队列失败(队列满了),则尝试直接创建线程执行此任务,RUNNING状态下:线程数量>=maxmumPoolSize的大小,新建线程失败,执行拒绝策略;非RUNNING状态,新建线程失败,执行拒绝策略

代码如下:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        
	int c = ctl.get();
	//获取当前线程数量,并与corePoolSize对比
        if (workerCountOf(c) < corePoolSize) {
	    //如果小于corePoolSize,尝试启动线程去执行任务
            if (addWorker(command, true))
                return;
	    c = ctl.get();
        }
	//判断是不是RUNNING状态,如果是的话尝试向blockingqueue存储任务
        if (isRunning(c) && workQueue.offer(command)) {
            //再次判断是否是运行状态
            int recheck = ctl.get();
	    //如果不是运行状态,尝试从队列移除任务并尝试关闭线程池(满足状态>RUNNING,如果状态为SHUTDOWN时workQueue为空才可尝试关闭线程池),执行拒绝策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)//如果池中线程数量为0,则创建线程去执行队列中的任务
                addWorker(null, false);
        }else if (!addWorker(command, false))//如果不是RUNNING或者任务放入队列失败,尝试启动新线程去执行任务尝试失败则决绝策略。
	    reject(command);
    }

addWorker方法,启动线程去执行任务

//第一个参数不用说就是task本身,可以是null。第二个参数意思是按照corePoolSize还是maxmumPoolSize来进行范围的界定
private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 在SHUTDOWN状态下没有新任务提交但是检测到队列还有任务就会启动线程去执行
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
	   //下面是用CAS无锁方式进行workerCount的增加
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))//如果
                    return false;
		//CAS无锁方式增加workercount
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // 如果CAS失败则重复执行CAS
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
		    //检测状态并判断所处状态为RUNNING或者为SHUTDOWN且不是新任务且队列有需要被执行的任务
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
			//新增worker
                        workers.add(w);
			//得到wokers的大小并更新largestPoolSize
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
		//如果添加成功,启动线程去执行,此处设计比较精巧,我们一起看下Worker的实现
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
Worker(ThreadPoolExcutor的内部类。仅展示了一部分逻辑代码)

  //实现了Runnable接口,还继承了AbstractQueuedSynchronizer这个抽象类

  private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /** 要执行的任务,可以为null */
        Runnable firstTask;

        /**
         * 构造函数,
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            this.firstTask = firstTask;
	    //由于该类实现了runnable接口,创建线程
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
	    //巧妙在此方法内,启动worker
            runWorker(this);
        }

    }

worker实现Runnable接口,run方法内的runWorker(this)才是真正的去执行我们提交的任务

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
//妙就妙在如果task为null的时候,会去阻塞队列中取任务(如果设置了allowCoreThreadTimeOut或者workercount>corepoolsize,          getTask()方法是有超时时间的,大小为keepAliveTime)
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

三、阻塞队列

blockingqueue,阻塞队列。用于存放任务。通过阅读上面的代码,我们知道当线程池状态为RUNNING且线程数量大于等于corePoolSize的时候,会尝试将任务放入阻塞队列中。并且阻塞队列的容量还是决定是否执行决绝策略的因素之一。

线程池中用到的阻塞队列都有哪些呢?

1,LinkedBlockingQueue:基于链表,如果创建时没有指定大小,则大小为Integer.MAX_VALUE,ThreadPoolExcutor提供的fixedThreadPool和singleThreadExcutor均将此阻塞队列当作workQueue。特点是当线程数量大于corePoolSize时,可以一直接收任务并放到队列中等待执行,但是任务数量过多会占用大量内存,导致内存溢出。以下是两者的实现代码:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}
2,ArrayBlockingQueue,打眼一瞧,基于数组,肯定要指定容量大小,有界的,目前基本没有使用的。

3,SynchronousQueue,这个就比较有一意思了,被cachedThreadPool当作workerQueue,这个看名字是队列,其实空有其名,它的容量为0 ,它不存储任何的任务,而是一旦有任务提交就启动新线程执行,这样的话就要求corePoolSize非常大,而cachedThreadPool正是如此。下面是cachedThreadPool的实现代码:

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
可以看到cahcedThreadPool的corePoolSize是0 ,也就是说所有启动的线程在执行完任务后在60s后都会被杀死。

四、拒绝策略

看代码中几种线程池均为提供拒绝策略handler,那么我们看下jdk提供了哪几种策略

1,DiscardOldestPolicy,丢弃队列中即将执行的任务,并重新尝试执行该任务

2,DiscardPolicy,单纯拒绝,啥事不干。

3,AbortPolicy:丢弃任务并抛出RejectExcutionException。

4,CallerRunsPolicy:这个策略就是执行被抛弃的任务

五、线程池关闭

shutDown()和shutDownNow(),前面介绍线程池状态转换的时候提到过,这里不再赘述。

六、玩一玩ThreadPoolExcutor

 定义一个corePoolSize为5,maxmumPoolSize是10,keepAliveTime为1s,阻塞队列大写为5,自定义拒绝策略打印被拒绝的任务。

public class Demo1 {
    public static void main(String[] args) {
        ThreadPoolExecutor tp = new ThreadPoolExecutor(5, 10, 1000l, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5), new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.out.println("任务"+((T1)r).getNum()+"被拒绝了");
            }
        });
        for(int i =0;i<20;i++){
            tp.execute(new T1(i));
            System.out.println("一共有"+tp.getQueue().size()+"个线程被放入缓存队列");
        }

    }
    public static class T1 implements Runnable{
        private int num;
        public T1(int num) {
            this.num =num;
        }

        public int getNum() {
            return num;
        }

        @Override
        public void run() {
            System.out.println("任务num"+num+"正在执行......");
            try {
                Thread.sleep(10000l);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
结果:

一共有0个线程被放入缓存队列
任务num0正在执行......
一共有0个线程被放入缓存队列
一共有0个线程被放入缓存队列
任务num1正在执行......
一共有0个线程被放入缓存队列
任务num2正在执行......
一共有0个线程被放入缓存队列
一共有1个线程被放入缓存队列
任务num3正在执行......
一共有2个线程被放入缓存队列
任务num4正在执行......
一共有3个线程被放入缓存队列
一共有4个线程被放入缓存队列
一共有5个线程被放入缓存队列
一共有5个线程被放入缓存队列
任务num10正在执行......
一共有5个线程被放入缓存队列
任务num11正在执行......
一共有5个线程被放入缓存队列
一共有5个线程被放入缓存队列
任务num12正在执行......
任务num13正在执行......
一共有5个线程被放入缓存队列
任务15被拒绝了
任务num14正在执行......
一共有5个线程被放入缓存队列
任务16被拒绝了
一共有5个线程被放入缓存队列
任务17被拒绝了
一共有5个线程被放入缓存队列
任务18被拒绝了
一共有5个线程被放入缓存队列
任务19被拒绝了
一共有5个线程被放入缓存队列
任务num5正在执行......
任务num7正在执行......
任务num8正在执行......
任务num6正在执行......
任务num9正在执行......

暂时就介绍到这吧,如果有错误,请大家批评和指正,在此谢过。有问题和疑问也希望大家提出来一起探讨。


参考书籍和文章

1,实战Java高并发程序设计.pdf

2,深入理解Java虚拟机:JVM高级特性与最佳实践(最新第二版)