源码分析-使用newFixedThreadPool线程池导致的内存飙升问题

时间:2022-10-06 14:55:17


前言

使用*队列的线程池会导致内存飙升吗?面试官经常会问这个问题,本文将基于源码,去分析newFixedThreadPool线程池导致的内存飙升问题,希望能加深大家的理解。

内存飙升问题复现

实例代码


1. ​​ExecutorService executor = Executors.newFixedThreadPool(10);​​
2. ​​ for (int i = 0; i < Integer.MAX_VALUE; i++) {​​
3. ​​ executor.execute(() -> {​​
4. ​​ try {​​
5. ​​ Thread.sleep(10000);​​
6. ​​ } catch (InterruptedException e) {​​
7. ​​ //do nothing​​
8. ​​ }​​
9. ​​ });​​
10. ​​ }​​

配置Jvm参数

IDE指定JVM参数:-Xmx8m -Xms8m :源码分析-使用newFixedThreadPool线程池导致的内存飙升问题

执行结果

run以上代码,会抛出OOM:源码分析-使用newFixedThreadPool线程池导致的内存飙升问题JVM OOM问题一般是创建太多对象,同时GC 垃圾来不及回收导致的,那么什么原因导致线程池的OOM呢?带着发现新大陆的心情,我们从源码角度分析这个问题,去找找实例代码中哪里创了太多对象。

线程池源码分析

以上的实例代码,就一个newFixedThreadPool和一个execute方法。首先,我们先来看一下newFixedThreadPool方法的源码

newFixedThreadPool源码


1. ​​public static ExecutorService newFixedThreadPool(int nThreads) {​​
2. ​​ return new ThreadPoolExecutor(nThreads, nThreads,​​
3. ​​ 0L, TimeUnit.MILLISECONDS,​​
4. ​​ new LinkedBlockingQueue<Runnable>());​​
5. ​​ }​​

该段源码以及结合线程池特点,我们可以知道newFixedThreadPool

  • 核心线程数coreSize和最大线程数maximumPoolSize大小一样,都是nThreads。
  • 空闲时间为0,即keepAliveTime为0
  • 阻塞队列为无参构造的LinkedBlockingQueue

线程池特点了解不是很清楚的朋友,可以看我这篇文章,面试必备:Java线程池解析

接下来,我们再来看看线程池执行方法execute的源码。

线程池执行方法execute的源码

execute的源码以及相关解释如下:


1. ​​ public void execute(Runnable command) {​​
2. ​​ if (command == null)​​
3. ​​ throw new NullPointerException();​​
4. ​​ int c = ctl.get();​​
5. ​​ if (workerCountOf(c) < corePoolSize) { //步骤一:判断当前正在工作的线程是否比核心线程数量小​​
6. ​​ if (addWorker(command, true)) // 以核心线程的身份,添加到工作集合​​
7. ​​ return;​​
8. ​​ c = ctl.get();​​
9. ​​ }​​
10. ​​ //步骤二:不满足步骤一,线程池还在RUNNING状态,阻塞队列也没满的情况下,把执行任务添加到阻塞队列workQueue。​​
11. ​​ if (isRunning(c) && workQueue.offer(command)) { ​​
12. ​​ int recheck = ctl.get();​​
13. ​​ //来个double check ,检查线程池是否突然被关闭​​
14. ​​ if (! isRunning(recheck) && remove(command)) ​​
15. ​​ reject(command);​​
16. ​​ else if (workerCountOf(recheck) == 0)​​
17. ​​ addWorker(null, false);​​
18. ​​ }​​
19. ​​ //步骤三:如果阻塞队列也满了,执行任务以非核心线程的身份,添加到工作集合​​
20. ​​ else if (!addWorker(command, false))​​
21. ​​ reject(command);​​
22. ​​ }​​

纵观以上代码,我们可以发现就addWorker 以及workQueue.offer(command)可能在创建对象。那我们先分析addWorker方法。

addWorker源码分析

addWorker源码以及相关解释如下


1. ​​private boolean addWorker(Runnable firstTask, boolean core) {​​
2. ​​ retry:​​
3. ​​ for (;;) {​​
4. ​​ int c = ctl.get();​​
5. ​​ //获取当前线程池的状态​​
6. ​​ int rs = runStateOf(c);​​
7.
8. ​​ //如果线程池状态是STOP,TIDYING,TERMINATED状态的话,则会返回false。​​
9. ​​ // 如果现在状态是SHUTDOWN,但是firstTask不为空或者workQueue为空的话,那么直接返回false​​
10. ​​ if (rs >= SHUTDOWN &&​​
11. ​​ ! (rs == SHUTDOWN &&​​
12. ​​ firstTask == null &&​​
13. ​​ ! workQueue.isEmpty()))​​
14. ​​ return false;​​
15. ​​ //自旋​​
16. ​​ for (;;) {​​
17. ​​ //获取当前工作线程的数量​​
18. ​​ int wc = workerCountOf(c);​​
19. ​​ //判断线程数量是否符合要求,如果要创建的是核心工作线程,判断当前工作线程数量是否已经超过coreSize,​​
20. ​​ // 如果要创建的是非核心线程,判断当前工作线程数量是否超过maximumPoolSize,是的话就返回false​​
21. ​​ if (wc >= CAPACITY ||​​
22. ​​ wc >= (core ? corePoolSize : maximumPoolSize))​​
23. ​​ return false;​​
24. ​​ //如果线程数量符合要求,就通过CAS算法,将WorkerCount加1,成功就跳出retry自旋​​
25. ​​ if (compareAndIncrementWorkerCount(c))​​
26. ​​ break retry;​​
27. ​​ c = ctl.get(); // Re-read ctl​​
28. ​​ if (runStateOf(c) != rs)​​
29. ​​ continue retry;​​
30. ​​ retry inner loop​​
31. ​​ }​​
32. ​​ }​​
33. ​​ //线程启动标志​​
34. ​​ boolean workerStarted = false;​​
35. ​​ //线程添加进集合workers标志​​
36. ​​ boolean workerAdded = false;​​
37. ​​ Worker w = null;​​
38. ​​ try {​​
39. ​​ //由(Runnable 构造Worker对象​​
40. ​​ w = new Worker(firstTask);​​
41. ​​ final Thread t = w.thread;​​
42. ​​ if (t != null) {​​
43. ​​ //获取线程池的重入锁​​
44. ​​ final ReentrantLock mainLock = this.mainLock;​​
45. ​​ mainLock.lock();​​
46. ​​ try {​​
47. ​​ //获取线程池状态​​
48. ​​ int rs = runStateOf(ctl.get());​​
49. ​​ //如果状态满足,将Worker对象添加到workers集合​​
50. ​​ if (rs < SHUTDOWN ||​​
51. ​​ (rs == SHUTDOWN && firstTask == null)) {​​
52. ​​ if (t.isAlive()) ​​
53. ​​ throw new IllegalThreadStateException();​​
54. ​​ workers.add(w);​​
55. ​​ int s = workers.size();​​
56. ​​ if (s > largestPoolSize)​​
57. ​​ largestPoolSize = s;​​
58. ​​ workerAdded = true;​​
59. ​​ }​​
60. ​​ } finally {​​
61. ​​ mainLock.unlock();​​
62. ​​ }​​
63. ​​ //启动Worker中的线程开始执行任务​​
64. ​​ if (workerAdded) {​​
65. ​​ t.start();​​
66. ​​ workerStarted = true;​​
67. ​​ }​​
68. ​​ }​​
69. ​​ } finally {​​
70. ​​ //线程启动失败,执行addWorkerFailed方法​​
71. ​​ if (! workerStarted)​​
72. ​​ addWorkerFailed(w);​​
73. ​​ }​​
74. ​​ return workerStarted;​​
75. ​​ }​​

addWorker执行流程:

大概就是判断线程池状态是否OK,如果OK,在判断当前工作中的线程数量是否满足(小于coreSize/maximumPoolSize),如果不满足,不添加,如果满足,就将执行任务添加到工作集合workers,,并启动执行该线程。

再看一下workers的类型:


1. ​​    /**​​
2. ​​ * Set containing all worker threads in pool. Accessed only when​​
3. ​​ * holding mainLock.​​
4. ​​ */​​
5. ​​ private final HashSet<Worker> workers = new HashSet<Worker>();​​

workers是一个HashSet集合,它由coreSize/maximumPoolSize控制着,那么addWorker方法会导致OOM?结合实例代码demo,coreSize=maximumPoolSize=10,如果超过10,不会再添加到workers了,所以它不是导致newFixedThreadPool内存飙升的原因。那么,问题应该就在于workQueue.offer(command) 方法了。为了让整个流程清晰,我们画一下execute执行的流程图。

线程池执行方法execute的流程

根据以上execute以及addWork源码分析,我们把流程图画出来:源码分析-使用newFixedThreadPool线程池导致的内存飙升问题

  • 提交一个任务command,线程池里存活的核心线程数小于线程数corePoolSize时,调用addWorker方法,线程池会创建一个核心线程去处理提交的任务。
  • 如果线程池核心线程数已满,即线程数已经等于corePoolSize,一个新提交的任务,会被放进任务队列workQueue排队等待执行。
  • 当线程池里面存活的线程数已经等于corePoolSize了,并且任务队列workQueue也满,判断线程数是否达到maximumPoolSize,即最大线程数是否已满,如果没到达,创建一个非核心线程执行提交的任务。
  • 如果当前的线程数达到了maximumPoolSize,还有新的任务过来的话,直接采用拒绝策略处理 。

看完execute的执行流程,我猜测,内存飙升问题就是workQueue塞满了。接下来,进行阻塞队列源码分析,揭开内存飙升问题的神秘面纱。

阻塞队列源码分析

源码分析-使用newFixedThreadPool线程池导致的内存飙升问题回到newFixedThreadPool构造函数,发现阻塞队列就是LinkedBlockingQueue,而且是个无参的LinkedBlockingQueue队列。OK,那我们直接分析LinkedBlockingQueue源码。

LinkedBlockingQueue类图

源码分析-使用newFixedThreadPool线程池导致的内存飙升问题由类图可以看到:

  • LinkedBlockingQueue 是使用单向链表实现的,其有两个 Node,分别用来存放首、尾节点, 并且还有一个初始值为 0 的原子变量 count,用来记录 队列元素个数。
  • 另外还有两个 ReentrantLock 的实例,分别用来控制元素入队和出队的原 子性,其中 takeLock 用来控制同时只有一个线程可以从队列头获取元素,其他线程必须 等待, putLock 控制同时只能有一个线程可以获取锁,在队列尾部添加元素,其他线程必 须等待。
  • 另外, notEmpty 和 notFull 是条件变量,它们内部都有一个条件队列用来存放进 队和出队时被阻塞的线程,其实这是生产者一消费者模型。

LinkedBlockingQueue无参构造函数


1. ​​    public LinkedBlockingQueue() {​​
2. ​​ this(Integer.MAX_VALUE);​​
3. ​​ }​​
4. ​​ public LinkedBlockingQueue(int capacity) {​​
5. ​​ if (capacity <= 0) throw new IllegalArgumentException();​​
6. ​​ this.capacity = capacity;​​
7. ​​ last = head = new Node<E>(null);​​
8. ​​ }​​

LinkedBlockingQueue无参构造函数,默认构造Integer.MAX_VALUE(那么大)的链表,看到这里,你回想一下execute流程,是不是阻塞队列一直不会满了,这队列来者不拒,把所有阻塞任务收于麾下。。。是不是内存飙升问题水落石出啦。

LinkedBlockingQueue的offer函数

源码分析-使用newFixedThreadPool线程池导致的内存飙升问题

线程池中,插入队列用了offer方法,我们来看一下阻塞队列LinkedBlockingQueue的offer骚操作吧

    1. ​​public boolean offer(E e) {​​
    2. ​​ //为空元素则抛出空指针异常​​
    3. ​​ if (e == null) throw new NullPointerException();​​
    4. ​​ final AtomicInteger count = this.count;​​
    5. ​​ //如采当前队列满则丢弃将要放入的元素, 然后返回false​​
    6. ​​ if (count.get() == capacity)​​
    7. ​​ return false;​​
    8. ​​ int c = -1;​​
    9. ​​ //构造新节点,获取putLock独占锁​​
    10. ​​ Node<E> node = new Node<E>(e);​​
    11. ​​ final ReentrantLock putLock = this.putLock;​​
    12. ​​ putLock.lock();​​
    13. ​​ try {​​
    14. ​​ //如采队列不满则进队列,并递增元素计数​​
    15. ​​ if (count.get() < capacity) {​​
    16. ​​ enqueue(node);​​
    17. ​​ c = count.getAndIncrement();​​
    18. ​​ //新元素入队后队列还有空闲空间,则​​
    19. ​​ 唤醒 notFull 的条件队列中一条阻塞线程​​
    20. ​​ if (c + 1 < capacity)​​
    21. ​​ notFull.signal();​​
    22. ​​ }​​
    23. ​​ } finally {​​
    24. ​​ //释放锁​​
    25. ​​ putLock.unlock();​​
    26. ​​ }​​
    27. ​​ if (c == 0)​​
    28. ​​ signalNotEmpty();​​
    29. ​​ return c >= 0;​​
    30. ​​ }​​

    offer操作向队列尾部插入一个元素,如果队列中有空闲则插入成功后返回 true,如果队列己满 则丢弃当前元素然后返回 false。如果 e 元素为 null 则抛出 Nul!PointerException 异常。另外, 该方法是非阻塞的。

    内存飙升问题结果揭晓

    newFixedThreadPool线程池的核心线程数是固定的,它使用了近乎于*的LinkedBlockingQueue阻塞队列。当核心线程用完后,任务会入队到阻塞队列,如果任务执行的时间比较长,没有释放,会导致越来越多的任务堆积到阻塞队列,最后导致机器的内存使用不停的飙升,造成JVM OOM。

    参考与感谢

    • 《Java并发编程之美》
    • 面试必备:Java线程池解析

    个人公众号

    源码分析-使用newFixedThreadPool线程池导致的内存飙升问题

    • 如果你是个爱学习的好孩子,可以关注我公众号,一起学习讨论。
    • 如果你觉得本文有哪些不正确的地方,可以评论,也可以关注我公众号,大家一起学习进步哈。