jdk自带线程池实例详解

时间:2022-09-13 11:58:42

一、简介

多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力,但频繁的创建线程的开销是很大的,那么如何来减少这部分的开销了,那么就要考虑使用线程池了。线程池就是一个线程的容器,每次只执行额定数量的线程,线程池就是用来管理这些额定数量的线程。

二、涉及线程池的类结构图

jdk自带线程池实例详解

其*我们使用的,主要是threadpoolexecutor类。

三、如何创建线程池

我们创建线程池一般有以下几种方法:

1、使用executors工厂类

executors主要提供了下面几种创建线程池的方法:

jdk自带线程池实例详解

下面来看下使用示例:

1)newfixedthreadpool(固定大小的线程池)

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class fixedthreadpool {
  public static void main(string[] args) {
    executorservice pool = executors.newfixedthreadpool(5);// 创建一个固定大小为5的线程池
    for (int i = 0; i < 10; i++) {
      pool.submit(new mythread());
    }
    pool.shutdown();
  }
}
public class mythread extends thread {
  @override
  public void run() {
    system.out.println(thread.currentthread().getname() + "正在执行。。。");
  }
}

测试结果如下:

pool-1-thread-1正在执行。。。 
pool-1-thread-2正在执行。。。 
pool-1-thread-3正在执行。。。 
pool-1-thread-2正在执行。。。 
pool-1-thread-3正在执行。。。 
pool-1-thread-2正在执行。。。 
pool-1-thread-2正在执行。。。 
pool-1-thread-3正在执行。。。 
pool-1-thread-5正在执行。。。 
pool-1-thread-4正在执行。。。 

固定大小的线程池:每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程线。

2)newsinglethreadexecutor(单线程的线程池)

?
1
2
3
4
5
6
7
8
9
public class singlethreadpool {
  public static void main(string[] args) { 
    executorservice pool=executors.newsinglethreadexecutor();//创建一个单线程池 
    for(int i=0;i<100;i++){ 
      pool.submit(new mythread()); 
    
    pool.shutdown(); 
  }
}

测试结果如下:

pool-1-thread-1正在执行。。。 
pool-1-thread-1正在执行。。。 
pool-1-thread-1正在执行。。。 
pool-1-thread-1正在执行。。。 
pool-1-thread-1正在执行。。。 
pool-1-thread-1正在执行。。。 
pool-1-thread-1正在执行。。。 
pool-1-thread-1正在执行。。。 
pool-1-thread-1正在执行。。。 
pool-1-thread-1正在执行。。。 

单线程的线程池:这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。

3)newscheduledthreadpool

?
1
2
3
4
5
6
7
8
9
10
11
12
public class scheduledthreadpool {
  public static void main(string[] args) { 
    scheduledexecutorservice pool=executors.newscheduledthreadpool(6); 
    for(int i=0;i<10000;i++){ 
      pool.submit(new mythread()); 
    
      
    pool.schedule(new mythread(), 1000, timeunit.milliseconds); 
    pool.schedule(new mythread(), 1000, timeunit.milliseconds); 
    pool.shutdown(); 
  
}

测试结果如下:

pool-1-thread-1正在执行。。。 
pool-1-thread-6正在执行。。。 
pool-1-thread-5正在执行。。。 
pool-1-thread-4正在执行。。。 
pool-1-thread-2正在执行。。。 
pool-1-thread-3正在执行。。。 
pool-1-thread-4正在执行。。。 
pool-1-thread-5正在执行。。。 
pool-1-thread-6正在执行。。。 
pool-1-thread-1正在执行。。。 
…………此处会延时1s………… 
pool-1-thread-4正在执行。。。 
pool-1-thread-1正在执行。。。 

测试结果的最后两个线程都是在延时1s之后,才开始执行的。此线程池支持定时以及周期性执行任务的需求

4)newcachedthreadpool(可缓存的线程池)

?
1
2
3
4
5
6
7
8
9
public class cachedthreadpool {
  public static void main(string[] args) { 
    executorservice pool=executors.newcachedthreadpool(); 
    for(int i=0;i<100;i++){ 
      pool.submit(new mythread()); 
    
    pool.shutdown(); 
  
}

测试结果如下:

pool-1-thread-5正在执行。。。 
pool-1-thread-7正在执行。。。 
pool-1-thread-5正在执行。。。 
pool-1-thread-16正在执行。。。 
pool-1-thread-17正在执行。。。 
pool-1-thread-16正在执行。。。 
pool-1-thread-5正在执行。。。 
pool-1-thread-7正在执行。。。 
pool-1-thread-16正在执行。。。 
pool-1-thread-18正在执行。。。 
pool-1-thread-10正在执行。。。 

可缓存的线程池:如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说jvm)能够创建的最大线程大小。

官方建议程序员使用较为方便的executors工厂方法executors.newcachedthreadpool()(*线程池,可以进行自动线程回收)、executors.newfixedthreadpool(int)(固定大小线程池)executors.newsinglethreadexecutor()(单个后台线程),这几种线程池均为大多数使用场景预定义了默认配置。

2、继承threadpoolexecutor类,并复写父类的构造方法。

在介绍这种方式之前,我们来分析下前面几个创建线程池的底层代码是怎样的?

?
1
2
3
4
5
6
7
8
9
10
11
12
13
public class executors {
  public static executorservice newfixedthreadpool(int nthreads) {
    return new threadpoolexecutor(nthreads, nthreads,
                   0l, timeunit.milliseconds,
                   new linkedblockingqueue<runnable>());
}
  public static executorservice newsinglethreadexecutor() {
    return new finalizabledelegatedexecutorservice
      (new threadpoolexecutor(1, 1,
                  0l, timeunit.milliseconds,
                  new linkedblockingqueue<runnable>()));
  }
}

从executors工厂类的底层代码可以看出,工厂类提供的创建线程池的方法,其实都是通过构造threadpoolexecutor来实现的。threadpoolexecutor构造方法代码如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public threadpoolexecutor(int corepoolsize,
               int maximumpoolsize,
               long keepalivetime,
               timeunit unit,
               blockingqueue<runnable> workqueue,
               threadfactory threadfactory,
               rejectedexecutionhandler handler) {
    if (corepoolsize < 0 ||
      maximumpoolsize <= 0 ||
      maximumpoolsize < corepoolsize ||
      keepalivetime < 0)
      throw new illegalargumentexception();
    if (workqueue == null || threadfactory == null || handler == null)
      throw new nullpointerexception();
    this.corepoolsize = corepoolsize;
    this.maximumpoolsize = maximumpoolsize;
    this.workqueue = workqueue;
    this.keepalivetime = unit.tonanos(keepalivetime);
    this.threadfactory = threadfactory;
    this.handler = handler;
  }

那么接下来,我们就来谈谈这个threadpoolexecutor构造方法。在这个构造方法中,主要有以下几个参数:

corepoolsize--池中所保存的线程数,包括空闲线程。

maximumpoolsize--池中允许的最大线程数。

keepalivetime--当线程数大于corepoolsize时,此为终止空闲线程等待新任务的最长时间。

unit--keepalivetime 参数的时间单位。

workqueue--执行前用于保持任务的队列。此队列仅保持由 execute方法提交的 runnable任务。

threadfactory--执行程序创建新线程时使用的工厂。

handler--由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。

接下来,咋们来说下这几个参数之间的关系。当线程池刚创建的时候,线程池里面是没有任何线程的(注意,并不是线程池一创建,里面就创建了一定数量的线程),当调用execute()方法添加一个任务时,线程池会做如下的判断:

1)如果当前正在运行的线程数量小于corepoolsize,那么立刻创建一个新的线程,执行这个任务。

2)如果当前正在运行的线程数量大于或等于corepoolsize,那么这个任务将会放入队列中。

3)如果线程池的队列已经满了,但是正在运行的线程数量小于maximumpoolsize,那么还是会创建新的线程,执行这个任务。

4)如果队列已经满了,且当前正在运行的线程数量大于或等于maximumpoolsize,那么线程池会根据拒绝执行策略来处理当前的任务。

5)当一个任务执行完后,线程会从队列中取下一个任务来执行,如果队列中没有需要执行的任务,那么这个线程就会处于空闲状态,如果超过了keepalivetime存活时间,则这个线程会被线程池回收(注:回收线程是有条件的,如果当前运行的线程数量大于corepoolsize的话,这个线程就会被销毁,如果不大于corepoolsize,是不会销毁这个线程的,线程的数量必须保持在corepoolsize数量内).为什么不是线程一空闲就回收,而是需要等到超过keepalivetime才进行线程的回收了,原因很简单:因为线程的创建和销毁消耗很大,更不能频繁的进行创建和销毁,当超过keepalivetime后,发现确实用不到这个线程了,才会进行销毁。这其中unit表示keepalivetime的时间单位,unit的定义如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public enum timeunit {
  nanoseconds {
    // keepalivetime以纳秒为单位
  },
  microseconds {
    // keepalivetime以微秒为单位
  },
  milliseconds {
    // keepalivetime以毫秒为单位
  },
  seconds {
    // keepalivetime以秒为单位
  },
  minutes {
    // keepalivetime以分钟为单位
  },
  hours {
    // keepalivetime以小时为单位
  },
  days {
    // keepalivetime以天为单位
  };

下面从源码来分析一下,对于上面的几种情况,主要涉及到的源码有以下几块:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private boolean addifundercorepoolsize(runnable firsttask) {
    thread t = null;
    final reentrantlock mainlock = this.mainlock;
    mainlock.lock();
    try {
      if (poolsize < corepoolsize && runstate == running)
        t = addthread(firsttask);
    } finally {
      mainlock.unlock();
    }
    if (t == null)
      return false;
    t.start();
    return true;
}

其实,这段代码很简单,主要描述的就是,如果当前的线程池小于corepoolsize的时候,是直接新建一个线程来处理任务。 

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private boolean addifundermaximumpoolsize(runnable firsttask) {
    thread t = null;
    final reentrantlock mainlock = this.mainlock;
    mainlock.lock();
    try {
      if (poolsize < maximumpoolsize && runstate == running)
        t = addthread(firsttask);
    } finally {
      mainlock.unlock();
    }
    if (t == null)
      return false;
    t.start();
    return true;
}

上面这段代码描述的是,如果当前线程池的数量小于maximumpoolsize的时候,也会创建一个线程,来执行任务 

四、线程池的队列

线程池的队列,总的来说有3种:

直接提交:工作队列的默认选项是 synchronousqueue,它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求* maximumpoolsizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许*线程具有增长的可能性。

*队列:使用*队列(例如,不具有预定义容量的 linkedblockingqueue)将导致在所有 corepoolsize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corepoolsize。(因此,maximumpoolsize的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用*队列;例如,在 web页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许*线程具有增长的可能性。

有界队列:当使用有限的 maximumpoolsizes时,有界队列(如 arrayblockingqueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 cpu 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 i/o边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,cpu使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。

下面就来说下线程池的队列,类结构图如下:

jdk自带线程池实例详解

1)synchronousqueue

该队列对应的就是上面所说的直接提交,首先synchronousqueue是*的,也就是说他存数任务的能力是没有限制的,但是由于该queue本身的特性,在某次添加元素后必须等待其他线程取走后才能继续添加。

2)linkedblockingqueue

该队列对应的就是上面的*队列。

3)arrayblockingqueue

该队列对应的就是上面的有界队列。arrayblockingqueue有以下3中构造方法:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public arrayblockingqueue(int capacity) {
    this(capacity, false);
  }
  public arrayblockingqueue(int capacity, boolean fair) {
    if (capacity <= 0)
      throw new illegalargumentexception();
    this.items = (e[]) new object[capacity];
    lock = new reentrantlock(fair);
    notempty = lock.newcondition();
    notfull = lock.newcondition();
}
  public arrayblockingqueue(int capacity, boolean fair,
               collection<? extends e> c) {
    this(capacity, fair);
    if (capacity < c.size())
      throw new illegalargumentexception();
    for (iterator<? extends e> it = c.iterator(); it.hasnext();)
      add(it.next());
  }

下面我们重点来说下这个fair,fair表示队列访问线程的竞争策略,当为true的时候,任务插入队列遵从fifo的规则,如果为false,则可以“插队”。举个例子,假如现在有很多任务在排队,这个时候正好一个线程执行完了任务,同时又新来了一个任务,如果为false的话,这个任务不用在队列中排队,可以直接插队,然后执行。如下图所示:

jdk自带线程池实例详解

五、线程池的拒绝执行策略

当线程的数量达到最大值时,这个时候,任务还在不断的来,这个时候,就只好拒绝接受任务了。

threadpoolexecutor 允许自定义当添加任务失败后的执行策略。你可以调用线程池的 setrejectedexecutionhandler() 方法,用自定义的rejectedexecutionhandler 对象替换现有的策略,threadpoolexecutor提供的默认的处理策略是直接丢弃,同时抛异常信息,threadpoolexecutor 提供 4 个现有的策略,分别是:
threadpoolexecutor.abortpolicy:表示拒绝任务并抛出异常,源码如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static class abortpolicy implements rejectedexecutionhandler {
    /**
     * creates an <tt>abortpolicy</tt>.
     */
    public abortpolicy() { }
    /**
     * always throws rejectedexecutionexception.
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     * @throws rejectedexecutionexception always.
     */
    public void rejectedexecution(runnable r, threadpoolexecutor e) {
      throw new rejectedexecutionexception(); //抛异常
    }
  }

 threadpoolexecutor.discardpolicy:表示拒绝任务但不做任何动作,源码如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
public static class discardpolicy implements rejectedexecutionhandler {
    /**
     * creates a <tt>discardpolicy</tt>.
     */
    public discardpolicy() { }
    /**
     * does nothing, which has the effect of discarding task r.
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedexecution(runnable r, threadpoolexecutor e) {
    } // 直接拒绝,但不做任何操作
  }

threadpoolexecutor.callerrunspolicy:表示拒绝任务,并在调用者的线程中直接执行该任务,源码如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static class callerrunspolicy implements rejectedexecutionhandler {
    /**
     * creates a <tt>callerrunspolicy</tt>.
     */
    public callerrunspolicy() { }
    /**
     * executes task r in the caller's thread, unless the executor
     * has been shut down, in which case the task is discarded.
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedexecution(runnable r, threadpoolexecutor e) {
      if (!e.isshutdown()) {
        r.run(); // 直接执行任务
      }
    }
  }

 threadpoolexecutor.discardoldestpolicy:表示先丢弃任务队列中的第一个任务,然后把这个任务加进队列。源码如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
public static class discardoldestpolicy implements rejectedexecutionhandler {
    /**
     * creates a <tt>discardoldestpolicy</tt> for the given executor.
     */
    public discardoldestpolicy() { }
    public void rejectedexecution(runnable r, threadpoolexecutor e) {
      if (!e.isshutdown()) {
        e.getqueue().poll(); // 丢弃队列中的第一个任务
        e.execute(r); // 执行新任务
      }
    }
}

当任务源源不断到来的时候,会从queue中poll一个任务出来,然后执行新的任务 

总结

以上所述是小编给大家介绍的jdk自带线程池详解,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对服务器之家网站的支持!

原文链接:http://blog.csdn.net/liuchuanhong1/article/details/52042182