java并发编程实战:第七章----取消与关闭

时间:2021-05-05 21:20:24

Java没有提供任何机制来安全地终止线程(虽然Thread.stop和suspend方法提供了这样的机制,但由于存在缺陷,因此应该避免使用

中断:一种协作机制,能够使一个线程终止另一个线程的当前工作

立即停止会使共享的数据结构处于不一致的状态,需要停止时,发出中断请求,被要求中断的线程处理完他当前的任务后会自己判断是否停下来

一、任务取消

若外部代码能在某个操作正常完成之前将其置入“完成”状态,则还操作是可取消的。(用户请求取消、有时间限制的操作<并发查找结果,一个线程找到后可取消其他线程>、应用程序事件、错误、关闭)

取消策略:详细地定义取消操作的“How”、“When”以及“What”,即其他代码如何(How)请求取消该任务,任务在何时(When)检查是否已经请求了取消,以及在响应取消请求时应该执行哪些(What)操作

举例:设置volatile变量为取消标志,每次执行前检查

 1 private volatile boolean canceled;
2
3 @Override
4 public void run() {
5 BigInteger p = BigInteger.ONE;
6 while (!canceled){
7 p = p.nextProbablePrime();
8 synchronized (this) { //同步添加素数
9 primes.add(p);
10 }
11 }
12 }

注意:这是一个有问题的取消方式,若线程阻塞在add操作后,那么即使设置了取消状态,它也不会运行到检验阻塞状态的代码,因此会永远阻塞

1、中断

  线程可以通过这种机制来通知另一个线程,告诉它在合适的或者可能的情况下停止当前工作,并转而执行其他的工作。(在取消之外的其他操作使用中断都是不合适的)

  调用interrupt并不意味者立即停止目标线程正在进行的工作,而只是传递了请求中断的消息。会在下一个取消点中断自己,如wait, sleep,join等

1 public class Thread {
2 public void interrupt() { ... }//中断目标线程,恢复中断状态
3 public boolean isInterrupted() { ... }//返回目标线程的中断状态
4 public static boolean interrupted() { ... }//清除当前线程的中断状态,并返回它之前的值(用于已经设置了中断状态,但还尚未相应中断)
5 ...
6 }

阻塞库方法,例如Thread.sleep和Object.wait等,都会检查线程何时中断,并且在发现时提前返回。它们在响应中断时执行的操作包括 : 清除中断状态,抛出InterruptedException,表示阻塞操作由于中断而提前结束

  • 显示的检测中断!Thread.currentThread().isInterrupted()后推出
  • 阻塞方法中抓到InterruptedException后退出

2、中断策略——规定线程如何解释某个中断请求——当发现中断请求时,应该做哪些工作

  由于每个线程拥有各自的中断策略,因此除非你知道中断对该线程的含义,否则就不应该中断这个线程。

3、响应中断

  • 传递异常(throws InterruptedException)

  • 恢复中断状态,从而事调用栈的上层代码能够对其进行处理。(Thread.currentThread().interrupt();)

4、通过Future实现取消

  boolean cancel(boolean mayInterruptIfRunning);

  • 如果任务已完成、或已取消,或者由于某些其他原因而无法取消,则此尝试将失败,返回false
  • 调用cancel时,如果调用成功,而此任务尚未启动,则此任务将永不运行
  • 如果任务已经执行,mayInterruptIfRunning参数决定了是否向执行任务的线程发出interrupt操作

5、处理不可中断的阻塞——对于某些阻塞操作,只是设置了中断状态

  • Java.io包中的同步Socket I/O。虽然InputStream和OutputStream中的read和write等方法都不会响应中断,但通过关闭底层的套接字,可以使得由于执行read或write等方法而被阻塞的线程抛出一个SocketException。
  • Java.io包中的同步I/O。当中断一个正在InterruptibleChannel上等待的线程时,将抛出ClosedByInterruptedException)并关闭链路(这还会使得其他在这条链路上阻塞的线程同样抛出ClosedByInterruptException)。当关闭一个InterruptibleChannel时,将导致所有在链路操作上阻塞的线程抛出AsynchronousCloseException。大多数标准的Channel都实现了InterruptibleChannel。
  • Selector的异步I/O。如果一个线程在调用Selector.select方法(在java.nio.channels中)时阻塞了,那么调用close或wakeup方法会使线程抛出ClosedSelectorException并提前返回。
  • 获取某个锁。如果一个线程由于等待某个内置锁而被阻塞,那么将无法响应中断,因为线程认为它肯定获得锁,所以将不会理会中断请求。但是,在Lock类中提供了lockInterruptibly方法,该方法允许在等待一个锁的同时仍能响应中断。
 1 //改写interrupt方法发出中断请求
2 @Override
3 public void interrupt() {
4 try {
5 socket.close(); //中断前关闭socket
6 } catch (IOException e) {
7
8 } finally{
9 super.interrupt();
10 }
11 }

6、采用newTaskFor来封装非标准的取消

二、停止基于线程的服务

应用程序通常会创建基于线程的服务,如线程池。这些服务的时间一般比创建它的方法更长。

  • 服务退出 -> 线程需要结束  无法通过抢占式的方法来停止线程,因此它们需要自行结束
  • 除非拥有某个线程,否则不能对该线程进行操控。例如,中断线程或者修改线程的优先级等
  • 线程池是其工作者线程的所有者,如果要中断这些线程,那么应该使用线程池
  • 应用程序可以拥有服务,服务也可以拥有工作者线程,但应用程序不能拥有工作者线程,因此应用程序不能直接停止工作者线程。

服务应该生命周期方法关闭它自己以及他拥有的线程

  • 要服务的存在时间大于创建线程的方法的存在时间,那么就应该提供生命周期方法
  • ExecutorService提供的shutdown(), shutdownNow()

1、示例:日志服务

 1 // LogWriter就是一个基于线程的服务,但不是一个完成的服务
2 public class LogWriter {
3 //日志缓存
4 private final BlockingQueue<String> queue;
5 private final LoggerThread logger;//日志写线程
6 private static final int CAPACITY = 1000;
7
8 public LogWriter(Writer writer) {
9 this.queue = new LinkedBlockingQueue<String>(CAPACITY);
10 this.logger = new LoggerThread(writer);
11 }
12
13 public void start() { logger.start(); }
14
15 //应用程序向日志缓存中放入要记录的日志
16 public void log(String msg) throws InterruptedException {
17 queue.put(msg);
18 }
19
20 //日志写入线程,这是一个多生产者,单消费者的设计
21 private class LoggerThread extends Thread {
22 private final PrintWriter writer;
23 public LoggerThread(Writer writer) {
24 this.writer = new PrintWriter(writer, true); // autoflush
25 }
26 public void run() {
27 try {
28 while (true)
29 writer.println(queue.take());
30 } catch(InterruptedException ignored) {
31 } finally {
32 writer.close();
33 }
34 }
35 }
36 }

注意:可以中断阻塞的take()方法停止日志线程(消费者线程),但生产者没有专门的线程,没办法取消

 1 //日志服务,提供记录日志的服务,并有管理服务生命周期的相关方法
2 public class LogService {
3 private final BlockingQueue<String> queue;
4 private final LoggerThread loggerThread;// 日志写线程
5 private final PrintWriter writer;
6 private boolean isShutdown;// 服务关闭标示
7 // 队列中的日志消息存储数量。我们不是可以通过queue.size()来获取吗?
8 // 为什么还需要这个?请看后面
9 private int reservations;
10
11 public LogService(Writer writer) {
12 this.queue = new LinkedBlockingQueue<String>();
13 this.loggerThread = new LoggerThread();
14 this.writer = new PrintWriter(writer);
15
16 }
17
18 //启动日志服务
19 public void start() {
20 loggerThread.start();
21 }
22
23 //关闭日志服务
24 public void stop() {
25 synchronized (this) {
26 /*
27 * 为了线程可见性,这里一定要加上同步,当然volatile也可,
28 * 但下面方法还需要原子性,所以这里就直接使用了synchronized,
29 * 但不是将isShutdown定义为volatile
30 */
31 isShutdown = true;
32 }
33 //向日志线程发出中断请求
34 loggerThread.interrupt();
35 }
36
37 //供应用程序调用,用来向日志缓存存放要记录的日志信息
38 public void log(String msg) throws InterruptedException {
39 synchronized (this) {
40 /*
41 * 如果应用程序发出了服务关闭请求,则不存在接受日志,而是直接
42 * 抛出异常,让应用程序知道
43 */
44 if (isShutdown)
45 throw new IllegalStateException(/*日志服务已关闭*/);
46 /*
47 * 由于queue是线程安全的阻塞队列,所以不需要同步(同步也可
48 * 但并发效率会下降,所以将它放到了同步块外)。但是这里是的
49 * 操作序列是由两个操作组成的:即先判断isShutdown,再向缓存
50 * 中放入消息,如果将queue.put(msg)放在同步外,则在多线程环
51 * 境中,LoggerThread中的 queue.size() == 0 将会不准确,所
52 * 以又要想queue.put不同步,又要想queue.size()计算准确,所
53 * 以就使用了一个变量reservations专用来记录缓存中日志条数,
54 * 这样就即解决了同步queue效率低的问题,又解决了安全性问题,
55 * 这真是两全其美
56 */
57 //queue.put(msg);
58 ++reservations;//存储量加1
59 }
60 queue.put(msg);
61 }
62
63 private class LoggerThread extends Thread {
64 public void run() {
65 try {
66 while (true) {
67 try {
68 synchronized (LogService.this) {
69 // 由于 queue 未同步,所以这里不能使用queue.size
70 //if (isShutdown && queue.size() == 0)
71
72 // 如果已关闭,且缓存中的日志信息都已写入,则退出日志线程
73 if (isShutdown && reservations == 0)
74 break;
75 }
76 String msg = queue.take();
77 synchronized (LogService.this) {
78 --reservations;
79 }
80 writer.println(msg);
81 } catch (InterruptedException e) { /* 重试 */
82 }
83 }
84 } finally {
85 writer.close();
86 }
87 }
88 }
89 }

注意:通过原子方式来检查关闭请求,并且有条件地递增一个计数器来“保持”提提交消息的权利

2、关闭ExecutorService

  shutdown():启动一次顺序关闭,执行完以前提交的任务,没有执行完的任务继续执行完

  shutdownNow():试图停止所有正在执行的任务(向它们发出interrupt操作语法,无法保证能够停止正在处理的任务线程,但是会尽力尝试),并暂停处理正在等待的任务,并返回等待执行的任务列表。

  ExecutorService已关闭,再向它提交任务时会抛RejectedExecutionException异常

3、“毒丸”对象——当得到这个对象时,立即停止

  在提交“毒丸”对象之前提交的所有工作都会被处理,而生产者在提交了“毒丸”对象后,将不会再提交任何工作

4、只执行一次的服务

  如果某个方法需要处理一批任务,并且当所有任务都处理完成后才返回,那么可以通过一次私有的Executor来简化服务的生命周期管理,其中该Executor的生命周期是由这个方法来控制的。

 1 boolean checkMail(Set<String> hosts, long timeout, TimeUnit unit)
2 throws InterruptedException {
3 ExecutorService exec = Executors.newCachedThreadPool();
4 //这里不能使用 volatile hasNewMail,因为还需要在匿名内中修改
5 final AtomicBoolean hasNewMail = new AtomicBoolean(false);
6 try {
7 for (final String host : hosts)//循环检索每台主机
8 exec.execute(new Runnable() {//执行任务
9 public void run() {
10 if (checkMail(host))
11 hasNewMail.set(true);
12 }
13 });
14 } finally {
15 exec.shutdown();//因为ExecutorService只在这个方法中服务,所以完成后即可关闭
16 exec.awaitTermination(timeout, unit);//等待任务的完成,如果超时还未完成也会返回
17 }
18 return hasNewMail.get();
19 }

5、shutdown的局限性

我们无法通过常规方法来找出哪些任务已经开始但尚未结束。这意味着我们无法在关闭过程中知道正在执行的任务的状态,除非任务本身会执行某种检查

 1 public class TrackingExecutor extends AbstractExecutorService {
2 private final ExecutorService exec;
3 private final Set<Runnable> tasksCancelledAtShutdown =
4 Collections.synchronizedSet(new HashSet<Runnable>());
5
6 public TrackingExecutor(ExecutorService exec) {
7 this.exec = exec;
8 }
9
10 public List<Runnable> getCancelledTasks() {//返回被取消的任务
11 if (!exec.isTerminated())//如果shutdownNow未调用或调用未完成时
12 throw new IllegalStateException(/*...*/);
13 return new ArrayList<Runnable>(tasksCancelledAtShutdown);
14 }
15
16 public void execute(final Runnable runnable) {
17 exec.execute(new Runnable() {
18 public void run() {
19 try {
20 runnable.run();
21 /*参考:http://blog.csdn.net/coslay/article/details/48038795
22 * 实质上在这里会有线程安全性问题,存在着竞争条件,比如程序刚
23 * 好运行到这里,即任务任务(run方法)刚好运行完,这时外界调用
24 * 了shutdownNow(),这时下面finally块中的判断会有出错,明显示
25 * 任务已执行完成,但判断给出的是被取消了。如果要想安全,就不
26 * 应该让shutdownNow在run方法运行完成与下面判断前调用。我们要
27 * 将runnable.run()与下面的if放在一个同步块、而且还要将
28 * shutdownNow的调用也放同步块里并且与前面要是同一个监视器锁,
29 * 这样好像就可以解决了,不知道对不能。书上也没有说能不能解决,
30 * 只是说有这个问题!但反过来想,如果真的这样同步了,那又会带
31 * 性能上的问题,因为什么所有的任务都会串形执行,这样还要
32 * ExecutorService线程池干嘛呢?我想这就是后面作者为什么所说
33 * 这是“不可避免的竞争条件”
34 */
35 } finally {
36 //如果调用了shutdownNow且运行的任务被中断
37 if (isShutdown()
38 && Thread.currentThread().isInterrupted())
39 tasksCancelledAtShutdown.add(runnable);//记录被取消的任务
40 }
41 }
42 });
43 }
44 // 将ExecutorService 中的其他方法委托到exec
45 } 

三、处理非正常的线程终止

  在一个线程中启动另一个线程,另一个线程中抛出异常,如果没有捕获它,这个异常也不会传递到父线程中

  任何代码都可能抛出一个RuntimeException。每当调用另一个方法时,都要对它的行为保持怀疑,不要盲目地认为它一定会正常返回,或者一定会抛出在方法原型中声明的某个已检查异常

 1 //如果任务抛出了一个运行时异常,它将允许线程终结,但是会首先通知框架:线程已经终结
2 public void run() {//工作者线程的实现
3 Throwable thrown = null;
4 try {
5 while (!isInterrupted())
6 runTask(getTaskFromWorkQueue());
7 } catch (Throwable e) {//为了安全,捕获的所有异常
8 thrown = e;//保留异常信息
9 } finally {
10 threadExited(this, thrown);// 重新将异常抛给框架后终结工作线程
11 }
12 }

未捕获异常的线程

在Thread API中提供了UncaughtExceptionHandler,它能检测出某个线程由于未捕获的异常而终结的情况

在运行时间较长的应用程序中,通常会为所有的未捕获异常指定同一个异常处理器,并且该处理器至少会将异常信息记录到日志中。

public class UEHLogger implements Thread.UncaughtExceptionHandler {
public void uncaughtException(Thread t, Throwable e) {
Logger logger = Logger.getAnonymousLogger();
logger.log(Level.SEVERE, "Thread terminated with exception: " + t.getName(), e);
}
}

四、JVM关闭

JVM既可通过正常手段来关闭,也可强行关闭。

  • 正常关闭:当最后一个“正常(非守护)”线程结束时、当有人调用了System.exit时、或者通过其他特定于平台的方法关闭时
  • 强行关闭:Runtime.halt,这种强行关闭方式将无法保证是否将运行关闭钩子

1、关闭钩子

  • 关闭钩子是指通过Runnable.addShutdownHook注册的但尚未开始的线程
  • JVM并不能保证关闭钩子的调用顺序
  • 当所有的关闭钩子都执行结束时,如果runFinalizersOnExit为true,那么JVM将运行终结器(finalize),然后再停止
  • JVM并不会停止或中断任何在关闭时仍然运行的应用程序线程。当JVM最终结束时,这些线程将被强行结束。如果关闭钩子或终结器没有执行完成,那么正常关闭进程“挂起”并且JVM必须被强行关闭。当被强行关闭时,只是关闭JVM,而不会运行关闭钩子
  • 关闭钩子应该是线程安全的
  • 关闭钩子必须尽快退出,因为它们会延迟JVM的结束时间
public void start()//通过注册关闭钩子,停止日志服务
{
Runnable.getRuntime().addShutdownHook(new Thread(){
public void run()
{
try{LogService.this.stop();}
catch(InterruptedException ignored){}
}
});
}

2、守护线程——一个线程来执行一些辅助工作,但有不希望这个线程阻碍JVM的关闭

  线程可分为两种:普通线程和守护线程。在JVM启动时创建的所有线程中,除了主线程以外,其他的线程都是守护线程

  普通线程与守护线程之间的差异仅在于当线程退出时发生的操作。当一个线程退出时,JVM会检查其他正在运行的线程,如果这些线程都是守护线程,那么JVM会正常退出操作。当JVM停止时,所有仍然存在的守护线程都将被抛弃——既不会执行finally代码块,也不会执行回卷栈,而JVM只是直接退出

3、终结器(清理文件句柄或套接字句柄等)——避免使用

  垃圾回收器对那些定义了finalize方法的对象会进行特殊处理:在回收器释放它们后,调用它们的finalize方法,从而确保一些持久化的资源被释放。

  通过使用finally代码块和显式的close方法,能够比使用终结器更好地管理资源

例外:当需要管理对象时,并且该对象持有的资源是通过本地方法获得的