java多线程、线程池及Spring配置线程池详解

时间:2021-06-23 17:29:16


1、java中为什么要使用多线程
使用多线程,可以把一些大任务分解成多个小任务来执行,多个小任务之间互不影像,同时进行,这样,充分利用了cpu资源。
2、java中简单的实现多线程的方式

继承Thread类,重写run方法;
class MyTread extends Thread{

public void run() {
  System.out.println(Thread.currentThread().getName());
}

}
实现Runable接口,实现run方法;
class MyRunnable implements Runnable{

  public void run() {

    System.out.println(Thread.currentThread().getName());
  }
}
class ThreadTest {

  public static void main(String[] args) {

    MyTread thread = new Mythread();
    thread.start();//开启一个线程

    MyRunnable myRunnable = new MyRunnable();
    Thread runnable = new Thread(myRunnable);
    runnable.start();//开启一个线程

  }
}
3、java线程的状态
创建:当new了一个线程,并没有调用start之前,线程处于创建状态;
就绪:当调用了start之后,线程处于就绪状态,这是,线程调度程序还没有设置执行当前线程;
运行:线程调度程序执行到线程时,当前线程从就绪状态转成运行状态,开始执行run方法里边的代码;
阻塞:线程在运行的时候,被暂停执行(通常等待某项资源就绪后在执行,sleep、wait可以导致线程阻塞),这是该线程处于阻塞状态;
死亡:当一个线程执行完run方法里边的代码或调用了stop方法后,该线程结束运行
4、为什么要引入线程池
当我们需要的并发执行线程数量很多时,且每个线程执行很短的时间就结束了,这样,我们频繁的创建、销毁线程就大大降低了工作效率(创建和销毁线程需要时间、资源)。
java中的线程池可以达到这样的效果:一个线程执行完任务之后,继续去执行下一个任务,不被销毁,这样线程利用率提高了。
5、java中的线程池(ThreadPoolExecutor)
说起java中的线程池,就想到java.util.concurrent.ThreadPoolExecutor。ThreadPoolExecutor类是java线程池中的核心类。他的实现方式有四种:
public class ThreadPoolExecutor extends AbstractExecutorService {
  public ThreadPoolExecutor(int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
    Executors.defaultThreadFactory(), defaultHandler);
  }

   public ThreadPoolExecutor(int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
    threadFactory, defaultHandler);
  }

  public ThreadPoolExecutor(int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
    Executors.defaultThreadFactory(), handler);
  }

  public ThreadPoolExecutor(int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    ThreadFactory threadFactory,
    RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
      maximumPoolSize <= 0 ||
      maximumPoolSize < corePoolSize ||
      keepAliveTime < 0)
      throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
      throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
通过ThreadPoolExecutor类的源码可以看出,ThreadPoolExecutor类继承AbstractExecutorService,提供四个构造方法,通过构造方法可以看出前面三个最终掉了最后一个
下面介绍下构造方法中的参数:
corePoolSize:线程池的大小。线程池创建之后不会立即去创建线程,而是等待线程的到来。当当前执行的线程数大于改值是,线程会加入到缓冲队列;
maximumPoolSize:线程池中创建的最大线程数;
keepAliveTime:空闲的线程多久时间后被销毁。默认情况下,改值在线程数大于corePoolSize时,对超出corePoolSize值得这些线程起作用。
unit:TimeUnit枚举类型的值,代表keepAliveTime时间单位,可以取下列值:
TimeUnit.DAYS; //天
  TimeUnit.HOURS; //小时
  TimeUnit.MINUTES; //分钟
  TimeUnit.SECONDS; //秒
  TimeUnit.MILLISECONDS; //毫秒
  TimeUnit.MICROSECONDS; //微妙
  TimeUnit.NANOSECONDS; //纳秒
workQueue:阻塞队列,用来存储等待执行的任务,决定了线程池的排队策略,有以下取值:
  ArrayBlockingQueue;
  LinkedBlockingQueue;
  SynchronousQueue;
  threadFactory:线程工厂,是用来创建线程的。默认new Executors.DefaultThreadFactory();
handler:线程拒绝策略。当创建的线程超出maximumPoolSize,且缓冲队列已满时,新任务会拒绝,有以下取值:
  ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
  ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
  ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
  ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
以下是具体的实现方式:
//默认策略。使用该策略时,如果线程池队列满了丢掉这个任务并且抛出RejectedExecutionException异常
class AbortPolicy implements RejectedExecutionHandler{

  public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    throw new RejectedExecutionException("Task " + r.toString() +
    " rejected from " +
    executor.toString());
  }
}
//如果线程池队列满了,会直接丢掉这个任务并且不会有任何异常
class DiscardPolicy implements RejectedExecutionHandler{

  public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {

  }
}
//丢弃最老的,会将最早进入队列的任务删掉腾出空间,再尝试加入队列
class DiscardOldestPolicy implements RejectedExecutionHandler{

  public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    if (!executor.isShutdown()) {
      //移除队头元素
      executor.getQueue().poll();
    //再尝试入队
      executor.execute(r);
    }
  }
}
//主线程会自己去执行该任务,不会等待线程池中的线程去执行
class CallerRunsPolicy implements RejectedExecutionHandler{

  public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    if (!executor.isShutdown()) {
      //直接执行run方法
      r.run();
    }
  }
}
以下是ThreadPoolExecutor具体的继承结构
public abstract class AbstractExecutorService implements ExecutorService {

}
这是一个抽象类,实现了ExecutorService接口,并实现了ExecutorService里边的方法,下面看下ExecutorService接口的具体实现
public interface ExecutorService extends Executor {
  void shutdown();
  List<Runnable> shutdownNow();
  boolean isShutdown();
  boolean isTerminated();
  boolean awaitTermination(long timeout, TimeUnit unit)
  throws InterruptedException;
  <T> Future<T> submit(Callable<T> task);
  <T> Future<T> submit(Runnable task, T result);
  Future<?> submit(Runnable task);
  <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
  <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
  long timeout, TimeUnit unit)
  throws InterruptedException;
  <T> T invokeAny(Collection<? extends Callable<T>> tasks)
  throws InterruptedException, ExecutionException;
  <T> T invokeAny(Collection<? extends Callable<T>> tasks,
  long timeout, TimeUnit unit)
  throws InterruptedException, ExecutionException, TimeoutException;
}
ExecutorService继承Executor接口,下面是Executor接口的具体实现
public interface Executor {
  void execute(Runnable command);
}
Executor接口是顶层接口,只声明了一个execute方法,该方法是用来执行传递进来的任务的。
回过头来,咱么重新看ThreadPoolExecutor类,改类里边有以下两个重要的方法:
public void execute(Runnable command) {
  if (command == null)
    throw new NullPointerException();
  int c = ctl.get();
  if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
      return;
    c = ctl.get();
  }
  if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
      reject(command);
    else if (workerCountOf(recheck) == 0)
    addWorker(null, false);
  }else if (!addWorker(command, false))
    reject(command);
}
public <T> Future<T> submit(Callable<T> task) {
  if (task == null) throw new NullPointerException();
  RunnableFuture<T> ftask = newTaskFor(task);
  execute(ftask);
  return ftask;
}
execute()方法是Executor中声明的方法,在ThreadPoolExecutor有了具体的实现,这个方法是ThreadPoolExecutor的核心方法,
通过这个方法可以向线程池提交一个任务,交由线程池去执行
submit()方法是ExecutorService中声明的方法,在AbstractExecutorService中进行了实现,Executor中并没有对其进行重写。从实现中可以看出,submit方法最终也调用了execute
方法,也是执行一个人去,但submit方法可以返回执行结果,利用Future来获取任务执行结果。
6、Spring中的线程池
Spring中的线程池是由ThreadPoolTaskExecutor类来实现的。该类的实现原理最终也是调用了java中的ThreadPoolExecutor类中的一些方法。具体的实现读者可以自己去翻阅Spring
的源码,这里笔者就不罗列了。我们看下ThreadPoolTaskExecutor的初始化。
ThreadPoolTaskExecutor有两种常用的有两种初始化方式:xml配置,java代码初始化
xml配置:
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="5" />
<property name="keepAliveSeconds" value="200" />
<property name="maxPoolSize" value="10" />
<property name="queueCapacity" value="20" />
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
</property>
</bean>
看过上面的内容,读者应该很清楚上面的一些参数代表的意思了吧。笔者在这里不一一去解释了。
public MyThreadPoolTaskExecutor {
@Autowired
private ThreadPoolTaskExecutor taskExecutor;

 private void test(){
  taskExecutor.execute(new Runnable(){
    @Override
    public void run() {
     //执行的代码
    }});
  }
}
Java代码初始化:
private void test2(){
  ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  executor.setCorePoolSize(10);
  executor.setMaxPoolSize(15);
  executor.setKeepAliveSeconds(1);
  executor.setQueueCapacity(5);
  executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
  executor.initialize();
  executor.execute(new Runnable(){
    @Override
    public void run() {
      //执行的代码
    }
  });
}