JAVA随笔之线程编程(Executor框架)

时间:2022-02-13 14:50:55

1.关键字synchronized

  如果一个方法声明为synchronized,那么调用这个方法要求拥有对象的锁。如果这个锁被另一个线程所拥有,线程又调用synchronized方法块,那么线程会被放入对象锁的进入集合。进入集合表示等待锁  可用线程的集合。

  如果当调用一个synchronized方法时,锁可用,那么调用线程成为对象锁的所有者,可进入该方法。

  当线程退出方法时,释放锁。如果释放锁时锁的进入集合不为空,JVM任意选择该集合中的线程作为锁的所有者(“任意”,大多数虚拟机按照FIFO策略排列等待集合中的线程)。

2.Java中的线程同步

  同步                   可重入锁              信号量        条件变量         线程

Synchronized        ReentrantLock      Semaphore    Condition      Thread

     wait()                   lock()               acquire()        await()       sleep()/yield()

notify()/notifyAll()      unlock()            release()        signal()       interrupt()     

3.java并发编程--Executor框架

大多数并发应用程序是以执行任务(task)为基本单位进行管理的。通常情况下,我们会为每个任务单独创建一个线程来执行。这样会带来两个问题:一,大量的线程(>100)会消耗系统资源,使线程调度的开销变大,引起性能下降;二,对于生命周期短暂的任务,频繁地创建和消亡线程并不是明智的选择。因为创建和消亡线程的开销可能会大于使用多线程带来的性能好处。

    一种更加合理的使用多线程的方法是使用线程池(Thread Pool)。 java.util.concurrent 提供了一个灵活的线程池实现:Executor 框架。这个框架可以用于异步任务执行,而且支持很多不同类型的任务执行策略。它还为任务提交和任务执行之间的解耦提供了标准的方法,为使用 Runnable 描述任务提供了通用的方式。 Executor 的实现还提供了对生命周期的支持和 hook 函数,可以添加如统计收集、应用程序管理机制和监视器等扩展。

    在线程池中执行任务线程,可以重用已存在的线程,免除创建新的线程。这样可以在处理多个任务时减少线程创建、消亡的开销。同时,在任务到达时,工作线程通常已经存在,用于创建线程的等待时间不会延迟任务的执行,因此提高了响应性。通过适当的调整线程池的大小,在得到足够多的线程以保持处理器忙碌的同时,还可以防止过多的线程相互竞争资源,导致应用程序在线程管理上耗费过多的资源。

 

Executor 默认提供了一些有用的预设线程池,可以通过调用 Executors 的静态工厂方法来创建。

newFixedThreadPool:提供一个具有最大线程个数限制的线程池。

newCachedThreadPool:提供一个没有最大线程个数限制的线程池。

newSingleThreadExecutor:提供一个单线程的线程池。保证任务按照任务队列说规定的顺序(FIFO,LIFO,优先级)执行。

newScheduledThreadPool:提供一个具有最大线程个数限制线程池,并支持定时以及周期性的任务执行。

Executors类,提供了一系列工厂方法用于创先线程池,返回的线程池都实现了ExecutorService接口。

public static ExecutorService newFixedThreadPool(int nThreads)

创建固定数目线程的线程池。

public static ExecutorService newCachedThreadPool()

创建一个可缓存的线程池,调用execute 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。

public static ExecutorService newSingleThreadExecutor()

创建一个单线程化的Executor。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。

其实查看源码我们可以发现,这些方法都调用了下面这个构造类:

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,
  TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,
  RejectedExecutionHandler handler)

corePoolSize: 线程池维护线程的最少数量

maximumPoolSize:线程池维护线程的最大数量

keepAliveTime:线程池维护线程所允许的空闲时间

unit: 线程池维护线程所允许的空闲时间的单位

workQueue:线程池所使用的缓冲队列

handler: 线程池对拒绝任务的处理策略

一个任务通过 execute(Runnable)方法被添加到线程池,任务就是一个 Runnable类型的对象,任务的执行方法就是 Runnable类型对象的run()方法。

当一个任务通过execute(Runnable)方法欲添加到线程池时:

如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。

如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。

如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。

如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。

也就是:处理任务的优先级为:

核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。

当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。

unit可选的参数为java.util.concurrent.TimeUnit中的几个静态属性:

NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。

workQueue我常用的是:java.util.concurrent.ArrayBlockingQueue

handler有四个选择:

ThreadPoolExecutor.AbortPolicy()

抛出java.util.concurrent.RejectedExecutionException异常

ThreadPoolExecutor.CallerRunsPolicy()

重试添加当前的任务,他会自动重复调用execute()方法

ThreadPoolExecutor.DiscardOldestPolicy()

抛弃旧的任务

ThreadPoolExecutor.DiscardPolicy()

抛弃当前的任务

再来看看创建线程池的一些方法:

public static ExecutorService newCachedThreadPool() { 
  return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads) {
  return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
  return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}

4.java并发编程--Executor框架之使用

  1.创建Callable对象

  Callable代表一个有返回值得操作。

  实现Callable接口,重写call()方法,方法体为需要执行的任务或计算。

  Callable对象用来作为参数传递给Future对象。

  Callable和Runnable:两者类似都是为其实例可能被另一个线程执行的类设计的。但Runnable不会返回结果,并且无法抛出经过检查的异常,定义了一个不带参数的run()方法。Callable返回结果并且可能抛出异常,定义了一个不带任何参数的call()方法。

  2.创建FutureTask对象

  FutureTask类实现Runnable,Future接口,可取消的异步计算。利用开始和取消计算的方法、查询计算是否完成的方法和获取计算结果的方法,此类提供了对 Future 的基本实现。仅在计算完成时才能获取结果;如果计算尚未完成,则阻塞 get 方法。一旦计算完成,就不能再重新开始或取消计算。

  FutureTask 包装 Callable 或 Runnable 对象。因为 FutureTask 实现了 Runnable,所以可将 FutureTask 提交给 Executor 执行。一旦运行就执行给定的 Callable/Runnable

  3.创建线程池

Executors类,提供了一系列工厂方法用于创建线程池,返回的线程池都实现了ExecutorService接口。

  public static ExecutorService newFixedThreadPool(int nThreads)

  创建固定数目线程的线程池。

  public static ExecutorService newCachedThreadPool()

  创建一个可缓存的线程池,调用execute 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。

  public static ExecutorService newSingleThreadExecutor()

  创建一个单线程化的Executor。

  public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

  使用方法:Execute executor = Executors.newFixedThreadPool(10);       强烈建议程序员使用较为方便的 Executors 工厂方法

     或者 ThreadPoolExecutor executor = new ThreadPoolExecutor();

  FutureTask对象作为参数传递给Executor的submit方法,ExecutoreService提供了submit()方法,提交一个Callable任务或Runnable任务用于执行,并返回一个表示该任务的Future。

  FutureTask的shutdown()方法启动一次顺序关闭,执行以前提交的任务,但不接受新任务。

  代码示例见:DefaultAvFlightSearchManager类的submitAbacusFlightSearchTask()方法

并发编程的一种编程方式是把任务拆分为一些列的小任务,即Runnable,然后在提交给一个Executor执行,Executor.execute(Runnalbe) 。Executor在执行时使用内部的线程池完成操作。