Java程序经常会遇到进程挂掉的情况,一些状态没有正确的保存下来,这时候就需要在JVM关掉的时候执行一些清理现场的代码。JAVA中的ShutdownHook提供了比较好的方案。而在SOFAJRaft-example模块的CounterServer-main方法中就使用了shutdownHook实现优雅停机。
@Author:Akai-yuan
@更新时间:2023/1/25
JDK提供了Java.Runtime.addShutdownHook(Thread hook)方法,可以注册一个JVM关闭的钩子这个钩子可以在以下几种场景中被调用:
- 程序正常退出
- 执行了System.exit()方法
- 终端使用Ctrl+C触发的中断
- 系统关闭
- OutOfMemory宕机
- 使用Kill pid命令干掉进程(使用 **kill -9 pid **是不会被调用的)
以下几种情况中是无法被调用的:
- 通过kill -9命令杀死进程——所以kill -9一定要慎用;
- 程序中执行了Runtime.getRuntime().halt()方法;
- 操作系统突然崩溃,或机器掉电(用电设备因断电、失电、或电的质量达不到要求而不能正常工作)。
2.addShutdownHook方法简述
Runtime.getRuntime().addShutdownHook(shutdownHook);
该方法指,在JVM中增加一个关闭的钩子,当JVM关闭的时候,会执行系统中已经设置的所有通过方法addShutdownHook添加的钩子,当系统执行完这些钩子后,JVM才会关闭。所以这些钩子可以在JVM关闭的时候进行内存清理、对象销毁、关闭连接等操作。
3.SOFAJRaft中钩子函数的实现
通过反射获取到grpcServer实例的shutdown方法和awaitTerminationLimit方法,并添加到钩子函数当中
public static void blockUntilShutdown() {
if (rpcServer == null) {
return;
}
//当RpcFactoryHelper中维护的工厂类型是GrpcRaftRpcFactory时进入if条件内部
if ("com.alipay.sofa.jraft.rpc.impl.GrpcRaftRpcFactory".equals(RpcFactoryHelper.rpcFactory().getClass()
.getName())) {
try {
//反射获取grpcServer中维护的(io.grpc包下的)server实例
Method getServer = rpcServer.getClass().getMethod("getServer");
Object grpcServer = getServer.invoke(rpcServer);
//反射获取server实例的shutdown方法和awaitTerminationLimit方法
Method shutdown = grpcServer.getClass().getMethod("shutdown");
Method awaitTerminationLimit = grpcServer.getClass().getMethod("awaitTermination", long.class,
TimeUnit.class);
//添加一个shutdownHook线程执行方法
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
shutdown.invoke(grpcServer);
awaitTerminationLimit.invoke(grpcServer, 30, TimeUnit.SECONDS);
} catch (Exception e) {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
e.printStackTrace(System.err);
}
}
});
//执行awaitTermination方法
Method awaitTermination = grpcServer.getClass().getMethod("awaitTermination");
awaitTermination.invoke(grpcServer);
} catch (Exception e) {
LOG.error("Failed to block grpc server", e);
}
}
}
4.grpc中的shutdown方法
GrpcServer下的shutdown方法与本文的钩子函数无关,此处再对比分析一下GrpcServer的shutdown方法。
public void shutdown() {
//CAS
//当且仅当期待值为true时(与当前AtomicBoolean类型的started一致),设置为false关闭
if (!this.started.compareAndSet(true, false)) {
return;
}
ExecutorServiceHelper.shutdownAndAwaitTermination(this.defaultExecutor);
GrpcServerHelper.shutdownAndAwaitTermination(this.server);
}
ExecutorServiceHelper#shutdownAndAwaitTermination:
我们可以发现实际上就是在执行ExecutorService 中 的shutdown()、shutdownNow()、awaitTermination() 方法,那么我们来区别以下这几个方法
public static boolean shutdownAndAwaitTermination(final ExecutorService pool, final long timeoutMillis) {
if (pool == null) {
return true;
}
// 禁止提交新任务
pool.shutdown();
final TimeUnit unit = TimeUnit.MILLISECONDS;
final long phaseOne = timeoutMillis / 5;
try {
// 等待一段时间以终止现有任务
if (pool.awaitTermination(phaseOne, unit)) {
return true;
}
pool.shutdownNow();
// 等待一段时间,等待任务响应被取消
if (pool.awaitTermination(timeoutMillis - phaseOne, unit)) {
return true;
}
LOG.warn("Fail to shutdown pool: {}.", pool);
} catch (final InterruptedException e) {
// (Re-)cancel if current thread also interrupted
pool.shutdownNow();
// preserve interrupt status
Thread.currentThread().interrupt();
}
return false;
}
- shutdown():停止接收新任务,原来的任务继续执行
1、停止接收新的submit的任务;
2、已经提交的任务(包括正在跑的和队列中等待的),会继续执行完成;
3、等到第2步完成后,才真正停止;
- shutdownNow():停止接收新任务,原来的任务停止执行
1、跟 shutdown() 一样,先停止接收新submit的任务;
2、忽略队列里等待的任务;
3、尝试将正在执行的任务interrupt中断;
4、返回未执行的任务列表;
说明:
它试图终止线程的方法是通过调用 Thread.interrupt() 方法来实现的,这种方法的作用有限,如果线程中没有sleep 、wait、Condition、定时锁等应用, interrupt() 方法是无法中断当前的线程的。
所以,shutdownNow() 并不代表线程池就一定立即就能退出,它也可能必须要等待所有正在执行的任务都执行完成了才能退出。但是大多数时候是能立即退出的。
- awaitTermination(long timeOut, TimeUnit unit):当前线程阻塞
当前线程阻塞,直到:
- 等所有已提交的任务(包括正在跑的和队列中等待的)执行完;
- 或者 等超时时间到了(timeout 和 TimeUnit设定的时间);
- 或者 线程被中断,抛出InterruptedException
然后会监测 ExecutorService 是否已经关闭,返回true(shutdown请求后所有任务执行完毕)或false(已超时)
GrpcServerHelper#shutdownAndAwaitTermination
与ExecutorServiceHelper类中的shutdownAndAwaitTermination方法类似的,该方法将优雅的关闭grpcServer.
public static boolean shutdownAndAwaitTermination(final Server server, final long timeoutMillis) {
if (server == null) {
return true;
}
// disable new tasks from being submitted
server.shutdown();
final TimeUnit unit = TimeUnit.MILLISECONDS;
final long phaseOne = timeoutMillis / 5;
try {
// wait a while for existing tasks to terminate
if (server.awaitTermination(phaseOne, unit)) {
return true;
}
server.shutdownNow();
// wait a while for tasks to respond to being cancelled
if (server.awaitTermination(timeoutMillis - phaseOne, unit)) {
return true;
}
LOG.warn("Fail to shutdown grpc server: {}.", server);
} catch (final InterruptedException e) {
// (Re-)cancel if current thread also interrupted
server.shutdownNow();
// 保持中断状态
Thread.currentThread().interrupt();
}
return false;
}