<>第七章:取消和关闭

时间:2021-05-09 21:02:50

Java没有提供任何机制来安全地终止线程,虽然Thread.stop和suspend等方法提供了这样的机制,但是存在严重的缺陷,应该避免使用这些方法。但是Java提供了中断Interruption机制,这是一种协作机制,能够使一个线程终止另一个线程的当前工作。

这种协作方式是必要的,我们很少希望某个任务线程或者服务立即停止,因为这种立即停止会时某个共享的数据结构处于不一致的状态。相反,在编写任务和服务的时候可以使用一种协作方式:当需要停止的时候,它们会先清除当前正在执行的工作,然后再结束。

7.1  任务取消

如果外部代码能够在某个操作正常完成之前将其置于 完成 状态,那么这个操作就可以称为可取消的Cancellable

其中一种协作机制是设置一个取消标志Cancellation Requested标志,而任务定期查看该标志。

  1. @ThreadSafe
  2. public class PrimeGenerator implements Runnable {
  3. private static ExecutorService exec = Executors.newCachedThreadPool();
  4. @GuardedBy("this")
  5. private final List<BigInteger> primes = new ArrayList<BigInteger>();
  6. private volatile boolean cancelled;
  7. public void run() {
  8. BigInteger p = BigInteger.ONE;
  9. while (!cancelled) {
  10. p = p.nextProbablePrime();
  11. synchronized (this) {
  12. primes.add(p);
  13. }
  14. }
  15. }
  16. public void cancel() {
  17. cancelled = true;
  18. }
  19. public synchronized List<BigInteger> get() {
  20. return new ArrayList<BigInteger>(primes);
  21. }
  22. static List<BigInteger> aSecondOfPrimes() throws InterruptedException {
  23. PrimeGenerator generator = new PrimeGenerator();
  24. exec.execute(generator);
  25. try {
  26. SECONDS.sleep(1);
  27. } finally {
  28. generator.cancel();
  29. }
  30. return generator.get();
  31. }
  32. }

在Java的API或语言规范中,并没有将中断与任何语义关联起来,但实际上,如果在取消之外的其他操作中使用中断,那么都是不合适的,并且很难支撑起更大的应用。

每个线程都有一个boolean类型的中断状态。但中断线程时,这个线程的中断状态将被设置成true。

Thread中的三个方法:

public void interrupt()   中断一个线程

public boolean isInterrupted()  获取中断标志,判断是否中断

public static boolean interrupted()  清楚中断状态,并返回它之前的状态值

线程在阻塞状态下发生中断的时候会抛出InterruptedException,例如Thread.sleep(), Thread.wait(), Thread.join()等方法。

当线程在非阻塞状态下中断的时候,它的中断状态将被设置,然后根据检查中断状态来判断是否中断。

调用interrupt并不意味着立即停止目标线程正在进行的工作,而只是传递了请求中断的消息,换句话说,仅仅修改了线程的isInterrupted标志字段。

通常,中断时实现取消的最合理方式。

  1. public class PrimeProducer extends Thread {
  2. private final BlockingQueue<BigInteger> queue;
  3. PrimeProducer(BlockingQueue<BigInteger> queue) {
  4. this.queue = queue;
  5. }
  6. public void run() {
  7. try {
  8. BigInteger p = BigInteger.ONE;
  9. while (!Thread.currentThread().isInterrupted())
  10. queue.put(p = p.nextProbablePrime());
  11. } catch (InterruptedException consumed) {
  12. /* Allow thread to exit */
  13. }
  14. }
  15. public void cancel() {
  16. interrupt();
  17. }
  18. }

7.1.3  响应中断

只有实现了线程中断策略的代码才可以屏蔽中断请求,在常规任务和库代码中都不应该屏蔽中断请求。

两种方法响应中断:

* 传递异常InterruptedException
* 恢复中断状态,从而使调用栈中上层代码能够对其进行处理
不可取消的任务在退出前恢复中断标志
  1. public class NoncancelableTask {
  2. public Task getNextTask(BlockingQueue<Task> queue) {
  3. boolean interrupted = false;
  4. try {
  5. while (true) {
  6. try {
  7. return queue.take();
  8. } catch (InterruptedException e) {
  9. interrupted = true;
  10. // fall through and retry
  11. }
  12. }
  13. } finally {
  14. if (interrupted)
  15. Thread.currentThread().interrupt();
  16. }
  17. }
  18. interface Task {
  19. }
  20. }
7.1.5  定时任务,通过Future来实现取消:
除非你清楚线程的中断策略,否则不要中断线程,那么在神马情况下调用cancel可以将参数指定为true呢。
如果任务的线程是由标准的Executor创建的,那么可以设置mayInterruptIfRunning。
  1. public class TimedRun {
  2. private static final ExecutorService taskExec = Executors.newCachedThreadPool();
  3. public static void timedRun(Runnable r, long timeout, TimeUnit unit)
  4. throws InterruptedException {
  5. Future<?> task = taskExec.submit(r);
  6. try {
  7. task.get(timeout, unit);
  8. } catch (TimeoutException e) {
  9. // task will be cancelled below
  10. } catch (ExecutionException e) {
  11. // exception thrown in task; rethrow
  12. throw launderThrowable(e.getCause());
  13. } finally {
  14. // Harmless if task already completed
  15. task.cancel(true); // interrupt if running
  16. }
  17. }
  18. }
7.1.6  处理不可中断的阻塞
对于这些线程,中断请求只能设置线程的中断状态,除此之外没有其他任何作用。
我们可以使用类似中断的手段来停止这些线程,但这要求知道线程阻塞的原因。
通过newTaskFor将非标准的取消操作封装在一个任务中:
  1. public abstract class SocketUsingTask<T> implements CancellableTask<T> {
  2. @GuardedBy("this")
  3. private Socket socket;
  4. protected synchronized void setSocket(Socket s) {
  5. socket = s;
  6. }
  7. public synchronized void cancel() {
  8. try {
  9. if (socket != null)
  10. socket.close();
  11. } catch (IOException ignored) {
  12. }
  13. }
  14. public RunnableFuture<T> newTask() {
  15. return new FutureTask<T>(this) {
  16. public boolean cancel(boolean mayInterruptIfRunning) {
  17. try {
  18. SocketUsingTask.this.cancel();
  19. } finally {
  20. return super.cancel(mayInterruptIfRunning);
  21. }
  22. }
  23. };
  24. }
  25. }
  26. interface CancellableTask<T> extends Callable<T> {
  27. void cancel();
  28. RunnableFuture<T> newTask();
  29. }

7.2  停止基于线程的服务

应用程序通常会创建拥有多个线程的服务,如果应用程序准备退出,那么这些服务所拥有的线程也需要正确的结束,由于java没有抢占式方法停止线程,因此需要它们自行结束。

正确的封装原则是:除非拥有某个线程,否则不要对该线程进行操控,例如中断线程或者修改优先级等。

线程有个相应的所有者,即创建该线程的类,因此线程池是其工作者线程的所有者,如果要中断线程,那么应该使用线程池去中断。

线程的所有权是不可传递的。服务应该提供生命周期方法Lifecycle Method来关闭它自己以及它所拥有的线程。这样当应用程序关闭该服务的时候,服务就可以关闭所有的线程了。在ExecutorService中提供了shutdown和shutdownNow等方法,同样,在其他拥有线程的服务方法中也应该提供类似的关闭机制。

Tips:对于持有线程的服务,只要服务的存在时间大于创建线程的方法的存在时间,那么就应该提供生命周期方法。

7.2.1  示例:日志服务

我们通常会在应用程序中加入log信息,一般用的框架就是log4j。但是这种内联的日志功能会给一些高容量Highvolume应用程序带来一定的性能开销。另外一种替代方法是通过调用log方法将日志消息放入某个队列中,并由其他线程来处理。

  1. public class LogService {
  2. private final BlockingQueue<String> queue;
  3. private final LoggerThread loggerThread;
  4. private final PrintWriter writer;
  5. @GuardedBy("this")
  6. private boolean isShutdown;
  7. @GuardedBy("this")
  8. private int reservations;
  9. public LogService(Writer writer) {
  10. this.queue = new LinkedBlockingQueue<String>();
  11. this.loggerThread = new LoggerThread();
  12. this.writer = new PrintWriter(writer);
  13. }
  14. public void start() {
  15. loggerThread.start();
  16. }
  17. public void stop() {
  18. synchronized (this) {
  19. isShutdown = true;
  20. }
  21. loggerThread.interrupt();
  22. }
  23. public void log(String msg) throws InterruptedException {
  24. synchronized (this) {
  25. if (isShutdown)
  26. throw new IllegalStateException(/*...*/);
  27. ++reservations;
  28. }
  29. queue.put(msg);
  30. }
  31. private class LoggerThread extends Thread {
  32. public void run() {
  33. try {
  34. while (true) {
  35. try {
  36. synchronized (LogService.this) {
  37. if (isShutdown && reservations == 0)
  38. break;
  39. }
  40. String msg = queue.take();
  41. synchronized (LogService.this) {
  42. --reservations;
  43. }
  44. writer.println(msg);
  45. } catch (InterruptedException e) { /* retry */
  46. }
  47. }
  48. } finally {
  49. writer.close();
  50. }
  51. }
  52. }
  53. }

7.2.2  通过ExecutorService去关闭

简单的程序可以直接在main函数中启动和关闭全局的ExecutorService,而在复杂程序中,通常会将ExecutorService封装在某个更高级别的服务中,并且该服务提供自己的生命周期方法。下面我们利用ExecutorService重构上面的日志服务:

  1. public class LogService {
  2. public void stop() throws InterruptedException {
  3. try {
  4. exec.shutdown(); exec.awaitTermination(TIMEOUT, UNIT);
  5. }
  6. }
  7. }

7.2.3  利用Poison Pill对象关闭Producer-Consumer服务

7.2.5  当关闭线程池的时候,保存尚未开始的和开始后取消的任务数据,以备后面重新处理,下面是一个网页爬虫程序,关闭爬虫服务的时候将记录所有尚未开始的和已经取消的所有页面URL:

  1. public abstract class WebCrawler {
  2. private volatile TrackingExecutor exec;
  3. @GuardedBy("this")
  4. private final Set<URL> urlsToCrawl = new HashSet<URL>();
  5. private final ConcurrentMap<URL, Boolean> seen = new ConcurrentHashMap<URL, Boolean>();
  6. private static final long TIMEOUT = 500;
  7. private static final TimeUnit UNIT = MILLISECONDS;
  8. public WebCrawler(URL startUrl) {
  9. urlsToCrawl.add(startUrl);
  10. }
  11. public synchronized void start() {
  12. exec = new TrackingExecutor(Executors.newCachedThreadPool());
  13. for (URL url : urlsToCrawl) submitCrawlTask(url);
  14. urlsToCrawl.clear();
  15. }
  16. public synchronized void stop() throws InterruptedException {
  17. try {
  18. saveUncrawled(exec.shutdownNow());
  19. if (exec.awaitTermination(TIMEOUT, UNIT))
  20. saveUncrawled(exec.getCancelledTasks());
  21. } finally {
  22. exec = null;
  23. }
  24. }
  25. protected abstract List<URL> processPage(URL url);
  26. private void saveUncrawled(List<Runnable> uncrawled) {
  27. for (Runnable task : uncrawled)
  28. urlsToCrawl.add(((CrawlTask) task).getPage());
  29. }
  30. private void submitCrawlTask(URL u) {
  31. exec.execute(new CrawlTask(u));
  32. }
  33. private class CrawlTask implements Runnable {
  34. private final URL url;
  35. CrawlTask(URL url) {
  36. this.url = url;
  37. }
  38. private int count = 1;
  39. boolean alreadyCrawled() {
  40. return seen.putIfAbsent(url, true) != null;
  41. }
  42. void markUncrawled() {
  43. seen.remove(url);
  44. System.out.printf("marking %s uncrawled%n", url);
  45. }
  46. public void run() {
  47. for (URL link : processPage(url)) {
  48. if (Thread.currentThread().isInterrupted())
  49. return;
  50. submitCrawlTask(link);
  51. }
  52. }
  53. public URL getPage() {
  54. return url;
  55. }
  56. }
  57. }
  1. public class TrackingExecutor extends AbstractExecutorService {
  2. private final ExecutorService exec;
  3. private final Set<Runnable> tasksCancelledAtShutdown =
  4. Collections.synchronizedSet(new HashSet<Runnable>());
  5. public TrackingExecutor(ExecutorService exec) {
  6. this.exec = exec;
  7. }
  8. public void shutdown() {
  9. exec.shutdown();
  10. }
  11. public List<Runnable> shutdownNow() {
  12. return exec.shutdownNow();
  13. }
  14. public boolean isShutdown() {
  15. return exec.isShutdown();
  16. }
  17. public boolean isTerminated() {
  18. return exec.isTerminated();
  19. }
  20. public boolean awaitTermination(long timeout, TimeUnit unit)
  21. throws InterruptedException {
  22. return exec.awaitTermination(timeout, unit);
  23. }
  24. public List<Runnable> getCancelledTasks() {
  25. if (!exec.isTerminated())
  26. throw new IllegalStateException(/*...*/);
  27. return new ArrayList<Runnable>(tasksCancelledAtShutdown);
  28. }
  29. public void execute(final Runnable runnable) {
  30. exec.execute(new Runnable() {
  31. public void run() {
  32. try {
  33. runnable.run();
  34. } finally {
  35. if (isShutdown()
  36. && Thread.currentThread().isInterrupted())
  37. tasksCancelledAtShutdown.add(runnable);
  38. }
  39. }
  40. });
  41. }
  42. }

7.3  处理非正常的线程终止

通过给应用程序提供一个UncaughtExceptionHandler异常处理器来处理未捕获的异常:

  1. public class UEHLogger implements Thread.UncaughtExceptionHandler {
  2. public void uncaughtException(Thread t, Throwable e) {
  3. Logger logger = Logger.getAnonymousLogger();
  4. logger.log(Level.SEVERE, "Thread terminated with exception: " + t.getName(), e);
  5. }
  6. }

只有通过execute提交的任务,才能将它抛出的异常交给未捕获异常处理器。而通过submit提交的任务,无论是抛出未检查异常还是已检查异常,都将被认为是任务返回状态的一部分

7.4  JVM关闭的时候提供关闭钩子

在正常关闭JVM的时候,JVM首先调用所有已注册的关闭钩子Shutdown Hook。关闭钩子可以用来实现服务或者应用程序的清理工作,例如删除临时文件,或者清除无法由操作系统自动清除的资源。

最佳实践是对所有服务都使用同一个关闭钩子,并且在该关闭钩子中执行一系列的关闭操作。这确保了关闭操作在单个线程中串行执行,从而避免竞态条件的发生或者死锁问题。

  1. Runtime.getRuntime().addShutdownHook(new Thread() {
  2. public void run() {
  3. try{LogService.this.stop();} catch(InterruptedException) {..}
  4. }
  5. })

总结:在任务、线程、服务以及应用程序等模块中的生命周期结束问题,可能会增加它们在设计和实现的时候的复杂性。我们通过利用FutureTask和Executor框架,可以帮助我们构建可取消的任务和服务。

原文地址:http://yidao620c.iteye.com/blog/1856914